[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=403492=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-403492 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 14/Mar/20 18:34 Start Date: 14/Mar/20 18:34 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-599117706 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 403492) Time Spent: 11h 40m (was: 11.5h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 11h 40m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=403493=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-403493 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 14/Mar/20 18:34 Start Date: 14/Mar/20 18:34 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 403493) Time Spent: 11h 50m (was: 11h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 11h 50m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=403122=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-403122 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 13/Mar/20 19:54 Start Date: 13/Mar/20 19:54 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824#issuecomment-598887535 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 403122) Time Spent: 11h (was: 10h 50m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 11h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=403125=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-403125 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 13/Mar/20 19:54 Start Date: 13/Mar/20 19:54 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 403125) Time Spent: 11.5h (was: 11h 20m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 11.5h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=403123=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-403123 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 13/Mar/20 19:54 Start Date: 13/Mar/20 19:54 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251#issuecomment-598887538 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 403123) Time Spent: 11h 10m (was: 11h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 11h 10m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=403124=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-403124 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 13/Mar/20 19:54 Start Date: 13/Mar/20 19:54 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 403124) Time Spent: 11h 20m (was: 11h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 11h 20m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=399710=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-399710 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 07/Mar/20 18:03 Start Date: 07/Mar/20 18:03 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-596121650 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 399710) Time Spent: 10h 50m (was: 10h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 10h 50m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=399315=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-399315 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 06/Mar/20 19:24 Start Date: 06/Mar/20 19:24 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824#issuecomment-595922176 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 399315) Time Spent: 10h 40m (was: 10.5h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 10h 40m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=399314=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-399314 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 06/Mar/20 19:24 Start Date: 06/Mar/20 19:24 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251#issuecomment-595922168 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 399314) Time Spent: 10.5h (was: 10h 20m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 10.5h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=367568=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-367568 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 07/Jan/20 17:18 Start Date: 07/Jan/20 17:18 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on pull request #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823 This PR is a small, discrete chunk of the Sort-Merge Bucket algorithm implementation ([design doc](https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit?usp=sharing)), extracted into a smaller PR for easier reviewing as suggested [in the JIRA ticket](https://issues.apache.org/jira/browse/BEAM-6766). It contains a JSON-serializable `BucketMetadata` abstract class and specific implementations for JSON, Avro, and TensorFlow (`org.tensorflow.example.Example`) collections. The intention here is that during an SMB write, a `metadata.json` file will be written alongside the bucketed record files, and read again at join time to ensure compatibility, reconstruct key coders, etc. CC @kennknowles @nevillelyh @reuvenlax @kanterov Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=367569=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-367569 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 07/Jan/20 17:18 Start Date: 07/Jan/20 17:18 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-571684057 This pull request is no longer marked as stale. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 367569) Time Spent: 10h 20m (was: 10h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 10h 20m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=367561=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-367561 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 07/Jan/20 16:57 Start Date: 07/Jan/20 16:57 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 367561) Time Spent: 10h (was: 9h 50m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 10h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=367560=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-367560 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 07/Jan/20 16:57 Start Date: 07/Jan/20 16:57 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-571675673 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 367560) Time Spent: 9h 50m (was: 9h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 9h 50m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=366849=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366849 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 06/Jan/20 18:52 Start Date: 06/Jan/20 18:52 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824#issuecomment-571262878 This pull request is no longer marked as stale. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 366849) Time Spent: 9h 20m (was: 9h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 9h 20m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=366851=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366851 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 06/Jan/20 18:52 Start Date: 06/Jan/20 18:52 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251#issuecomment-571262966 This pull request is no longer marked as stale. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 366851) Time Spent: 9h 40m (was: 9.5h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 9h 40m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=366848=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366848 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 06/Jan/20 18:52 Start Date: 06/Jan/20 18:52 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on pull request #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824 This PR is a small, discrete chunk of the Sort-Merge Bucket algorithm implementation ([design doc](https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit?usp=sharing)), extracted into a smaller PR for easier reviewing as suggested [in the JIRA ticket](https://issues.apache.org/jira/browse/BEAM-6766). It contains read/write logic for Avro, Tensorflow, and JSON records at a per-record granularity. These are needed because once the PCollection is bucketed and sorted, we can't re-use existing sources/sinks and risk a reshuffling of the data, so we have to manage file reads and writes very explicitly. CC @kennknowles @nevillelyh @reuvenlax @kanterov Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=366850=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366850 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 06/Jan/20 18:52 Start Date: 06/Jan/20 18:52 Worklog Time Spent: 10m Work Description: nevillelyh commented on pull request #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251 Depends on #8823 and #8824. Part of the larger Sort Merge Bucket extension [design doc](https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit). This PR adds `SortedBucketSource`, a `PTransform` that reads from multiple sorted-bucket sources and merges key-value pairs into `KV`. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=366018=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366018 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 03/Jan/20 20:47 Start Date: 03/Jan/20 20:47 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824#issuecomment-570693532 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 366018) Time Spent: 8h 40m (was: 8.5h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 8h 40m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=366020=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366020 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 03/Jan/20 20:47 Start Date: 03/Jan/20 20:47 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 366020) Time Spent: 9h (was: 8h 50m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 9h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=366017=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366017 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 03/Jan/20 20:47 Start Date: 03/Jan/20 20:47 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251#issuecomment-570693526 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 366017) Time Spent: 8.5h (was: 8h 20m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 8.5h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=366019=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-366019 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 03/Jan/20 20:47 Start Date: 03/Jan/20 20:47 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 366019) Time Spent: 8h 50m (was: 8h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 8h 50m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=365027=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-365027 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 31/Dec/19 16:30 Start Date: 31/Dec/19 16:30 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-569954835 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 365027) Time Spent: 8h 20m (was: 8h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 8h 20m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=364018=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-364018 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 27/Dec/19 18:50 Start Date: 27/Dec/19 18:50 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824#issuecomment-569325171 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 364018) Time Spent: 8h (was: 7h 50m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 8h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=364019=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-364019 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 27/Dec/19 18:50 Start Date: 27/Dec/19 18:50 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251#issuecomment-569325174 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 364019) Time Spent: 8h 10m (was: 8h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 8h 10m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=337387=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337387 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 01/Nov/19 15:55 Start Date: 01/Nov/19 15:55 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-548841630 This pull request is no longer marked as stale. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 337387) Time Spent: 7h 50m (was: 7h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 7h 50m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=337386=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337386 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 01/Nov/19 15:55 Start Date: 01/Nov/19 15:55 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on pull request #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823 This PR is a small, discrete chunk of the Sort-Merge Bucket algorithm implementation ([design doc](https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit?usp=sharing)), extracted into a smaller PR for easier reviewing as suggested [in the JIRA ticket](https://issues.apache.org/jira/browse/BEAM-6766). It contains a JSON-serializable `BucketMetadata` abstract class and specific implementations for JSON, Avro, and TensorFlow (`org.tensorflow.example.Example`) collections. The intention here is that during an SMB write, a `metadata.json` file will be written alongside the bucketed record files, and read again at join time to ensure compatibility, reconstruct key coders, etc. CC @kennknowles @nevillelyh @reuvenlax @kanterov Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=337372=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337372 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 01/Nov/19 15:01 Start Date: 01/Nov/19 15:01 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 337372) Time Spent: 7.5h (was: 7h 20m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=337371=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-337371 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 01/Nov/19 15:01 Start Date: 01/Nov/19 15:01 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-548820713 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 337371) Time Spent: 7h 20m (was: 7h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 7h 20m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=335055=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335055 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 28/Oct/19 17:30 Start Date: 28/Oct/19 17:30 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on pull request #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824 This PR is a small, discrete chunk of the Sort-Merge Bucket algorithm implementation ([design doc](https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit?usp=sharing)), extracted into a smaller PR for easier reviewing as suggested [in the JIRA ticket](https://issues.apache.org/jira/browse/BEAM-6766). It contains read/write logic for Avro, Tensorflow, and JSON records at a per-record granularity. These are needed because once the PCollection is bucketed and sorted, we can't re-use existing sources/sinks and risk a reshuffling of the data, so we have to manage file reads and writes very explicitly. CC @kennknowles @nevillelyh @reuvenlax @kanterov Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=335058=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335058 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 28/Oct/19 17:30 Start Date: 28/Oct/19 17:30 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251#issuecomment-547059483 This pull request is no longer marked as stale. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 335058) Time Spent: 7h 10m (was: 7h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 7h 10m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=335056=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335056 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 28/Oct/19 17:30 Start Date: 28/Oct/19 17:30 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824#issuecomment-547059465 This pull request is no longer marked as stale. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 335056) Time Spent: 6h 50m (was: 6h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 6h 50m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=335057=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-335057 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 28/Oct/19 17:30 Start Date: 28/Oct/19 17:30 Worklog Time Spent: 10m Work Description: nevillelyh commented on pull request #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251 Depends on #8823 and #8824. Part of the larger Sort Merge Bucket extension [design doc](https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit). This PR adds `SortedBucketSource`, a `PTransform` that reads from multiple sorted-bucket sources and merges key-value pairs into `KV`. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=334603=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334603 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 27/Oct/19 03:38 Start Date: 27/Oct/19 03:38 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334603) Time Spent: 6.5h (was: 6h 20m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=334601=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334601 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 27/Oct/19 03:38 Start Date: 27/Oct/19 03:38 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251#issuecomment-546658848 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334601) Time Spent: 6h 10m (was: 6h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 6h 10m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=334600=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334600 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 27/Oct/19 03:38 Start Date: 27/Oct/19 03:38 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824#issuecomment-546658846 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334600) Time Spent: 6h (was: 5h 50m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 6h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=334602=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334602 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 27/Oct/19 03:38 Start Date: 27/Oct/19 03:38 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334602) Time Spent: 6h 20m (was: 6h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 6h 20m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=334174=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-334174 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 25/Oct/19 14:17 Start Date: 25/Oct/19 14:17 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-546372732 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 334174) Time Spent: 5h 50m (was: 5h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 5h 50m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=331018=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331018 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 20/Oct/19 02:55 Start Date: 20/Oct/19 02:55 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251#issuecomment-544215560 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331018) Time Spent: 5.5h (was: 5h 20m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=331019=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-331019 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 20/Oct/19 02:55 Start Date: 20/Oct/19 02:55 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824#issuecomment-544215565 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 331019) Time Spent: 5h 40m (was: 5.5h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 5h 40m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=327390=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327390 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 12/Oct/19 21:04 Start Date: 12/Oct/19 21:04 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #9253: [BEAM-6766] Implement SMB high-level API URL: https://github.com/apache/beam/pull/9253 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 327390) Time Spent: 5h 10m (was: 5h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 5h 10m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=327388=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327388 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 12/Oct/19 21:04 Start Date: 12/Oct/19 21:04 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9253: [BEAM-6766] Implement SMB high-level API URL: https://github.com/apache/beam/pull/9253#issuecomment-541361335 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 327388) Time Spent: 4h 50m (was: 4h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 4h 50m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=327389=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327389 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 12/Oct/19 21:04 Start Date: 12/Oct/19 21:04 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9279: [BEAM-6766] Implement SMB benchmarks URL: https://github.com/apache/beam/pull/9279#issuecomment-541361336 This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 327389) Time Spent: 5h (was: 4h 50m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 5h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=327391=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-327391 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 12/Oct/19 21:04 Start Date: 12/Oct/19 21:04 Worklog Time Spent: 10m Work Description: stale[bot] commented on pull request #9279: [BEAM-6766] Implement SMB benchmarks URL: https://github.com/apache/beam/pull/9279 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 327391) Time Spent: 5h 20m (was: 5h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 5h 20m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=323973=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323973 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 05/Oct/19 20:13 Start Date: 05/Oct/19 20:13 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9253: [BEAM-6766] Implement SMB high-level API URL: https://github.com/apache/beam/pull/9253#issuecomment-538684812 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 323973) Time Spent: 4h 40m (was: 4.5h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=323972=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-323972 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 05/Oct/19 20:13 Start Date: 05/Oct/19 20:13 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #9279: [BEAM-6766] Implement SMB benchmarks URL: https://github.com/apache/beam/pull/9279#issuecomment-538684811 This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the d...@beam.apache.org list. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 323972) Time Spent: 4.5h (was: 4h 20m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 4.5h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=301253=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-301253 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 26/Aug/19 13:25 Start Date: 26/Aug/19 13:25 Worklog Time Spent: 10m Work Description: kanterov commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-524859305 @kennknowles we are having discussions (internally) on a few changes to the structure of metadata This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 301253) Time Spent: 4h 20m (was: 4h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 4h 20m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=300234=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-300234 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 23/Aug/19 10:52 Start Date: 23/Aug/19 10:52 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-524269814 I am not sufficiently familiar with these bits of code, so I'd prefer to find another reviewer to merge this faster. My earlier comment was to make sure that this PR receives attention once ready. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 300234) Time Spent: 4h 10m (was: 4h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 4h 10m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=298365=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-298365 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 21/Aug/19 02:09 Start Date: 21/Aug/19 02:09 Worklog Time Spent: 10m Work Description: kennknowles commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-523265462 Ping. Just back from vacation and noticed the email thread. @kanterov or @tvalentyn if you don't have time now can you comment so we know? or recommend someone? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 298365) Time Spent: 4h (was: 3h 50m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 4h > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=293159=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-293159 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 12/Aug/19 14:06 Start Date: 12/Aug/19 14:06 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on issue #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824#issuecomment-520439482 @kanterov , PTAL? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 293159) Time Spent: 3h 50m (was: 3h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 3h 50m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=293158=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-293158 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 12/Aug/19 14:06 Start Date: 12/Aug/19 14:06 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-520439338 @tvalentyn , @kanterov , PTAL? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 293158) Time Spent: 3h 40m (was: 3.5h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > > Design doc: > https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit# > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=289768=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-289768 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 06/Aug/19 15:05 Start Date: 06/Aug/19 15:05 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-518677187 Please say "PTAL" and/or reach out to reviewers of this change (@kanterov) once this PR is ready for review. Thank you. Feel free to do the same for any other dependent changes that you need to get feedback on. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 289768) Time Spent: 3.5h (was: 3h 20m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=289666=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-289666 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 06/Aug/19 13:48 Start Date: 06/Aug/19 13:48 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-518677187 Please say "PTAL" and/or reach out to reviewers of this change (@kanterov) once this PR is ready for review. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 289666) Time Spent: 3h 20m (was: 3h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=289072=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-289072 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 05/Aug/19 17:33 Start Date: 05/Aug/19 17:33 Worklog Time Spent: 10m Work Description: nevillelyh commented on pull request #9253: [BEAM-6766] Implement SMB high-level API URL: https://github.com/apache/beam/pull/9253 Depends on #8823, #8824, #9250, and #9251. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=289070=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-289070 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 05/Aug/19 17:32 Start Date: 05/Aug/19 17:32 Worklog Time Spent: 10m Work Description: nevillelyh commented on pull request #9251: [BEAM-6766] Implement SMB source URL: https://github.com/apache/beam/pull/9251 Depends on #8823 and #8824. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- |
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=289062=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-289062 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 05/Aug/19 17:19 Start Date: 05/Aug/19 17:19 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on issue #8486: [BEAM-6766] Add Sort Merge Bucket sink and join source URL: https://github.com/apache/beam/pull/8486#issuecomment-518322106 Closing this PR so we can focus on smaller, more granular PRs. We'll keep implementation details updated in the [design doc](https://docs.google.com/document/d/1AQlonN8t4YJrARcWzepyP7mWHTxHAd6WIECwk1s3LQQ/edit?usp=sharing). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 289062) Time Spent: 2h 50m (was: 2h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=289061=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-289061 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 05/Aug/19 17:19 Start Date: 05/Aug/19 17:19 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on pull request #8486: [BEAM-6766] Add Sort Merge Bucket sink and join source URL: https://github.com/apache/beam/pull/8486 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 289061) Time Spent: 2h 40m (was: 2.5h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 2h 40m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=264971=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-264971 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 21/Jun/19 20:43 Start Date: 21/Jun/19 20:43 Worklog Time Spent: 10m Work Description: aaltay commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-504566794 That makes sense. I pinged the thread today, hopefully there will be some feedback. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 264971) Time Spent: 2.5h (was: 2h 20m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=264967=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-264967 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 21/Jun/19 20:38 Start Date: 21/Jun/19 20:38 Worklog Time Spent: 10m Work Description: kanterov commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-504565242 @aaltay no, I want to get feedback from dev@ before we proceed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 264967) Time Spent: 2h 20m (was: 2h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=264831=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-264831 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 21/Jun/19 17:29 Start Date: 21/Jun/19 17:29 Worklog Time Spent: 10m Work Description: aaltay commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-504506798 @kanterov are you planning to review this change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 264831) Time Spent: 2h 10m (was: 2h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=259395=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-259395 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 13/Jun/19 09:25 Start Date: 13/Jun/19 09:25 Worklog Time Spent: 10m Work Description: kanterov commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-501624445 Something you might find useful is the design proposal for [cost-estimation]. [cost-estimation]: https://lists.apache.org/thread.html/5f1dcd775daee55a8005db9ae87898a8e5efaee140a89417703dc823@%3Cdev.beam.apache.org%3E This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 259395) Time Spent: 2h (was: 1h 50m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=259376=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-259376 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 13/Jun/19 09:02 Start Date: 13/Jun/19 09:02 Worklog Time Spent: 10m Work Description: kanterov commented on issue #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823#issuecomment-501616342 What do you think about having a design doc for the metadata format and discussing it on dev@? Not saying that there is anything wrong, but it would be a good way to collect feedback before being invested in the approach, you can find inspiration and template on [desing-documents]. [desing-documents]: https://beam.apache.org/contribute/design-documents/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 259376) Time Spent: 1h 50m (was: 1h 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: extensions-java-join-library, io-ideas >Reporter: Claire McGinty >Assignee: Claire McGinty >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=258090=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-258090 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 11/Jun/19 20:28 Start Date: 11/Jun/19 20:28 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on pull request #8824: [BEAM-6766] Implement SMB file operations URL: https://github.com/apache/beam/pull/8824 This PR is a small, discrete chunk of the larger [SMB branch/PR](https://github.com/apache/beam/pull/8486), extracted into a smaller PR for easier reviewing as suggested [in the JIRA ticket](https://issues.apache.org/jira/browse/BEAM-6766). It contains read/write logic for Avro, Tensorflow, and JSON records at a per-record granularity. These are needed because once the PCollection is bucketed and sorted, we can't re-use existing sources/sinks and risk a reshuffling of the data, so we have to manage file reads and writes very explicitly. CC @kennknowles @nevillelyh @reuvenlax Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=258085=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-258085 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 11/Jun/19 20:21 Start Date: 11/Jun/19 20:21 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on pull request #8823: [BEAM-6766] Metadata file implementation for Sort Merge Bucket source/sink URL: https://github.com/apache/beam/pull/8823 This PR is a small, discrete chunk of the larger [SMB branch/PR](https://github.com/apache/beam/pull/8486), extracted into a smaller PR for easier reviewing as suggested [in the JIRA ticket](https://issues.apache.org/jira/browse/BEAM-6766). It contains a JSON-serializable `BucketMetadata` abstract class and specific implementations for JSON, Avro, and TensorFlow (`org.tensorflow.example.Example`) collections. The intention here is that during an SMB write, a `metadata.json` file will be written alongside the bucketed record files, and read again at join time to ensure compatibility, reconstruct key coders, etc. CC @kennknowles @nevillelyh @reuvenlax Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/) Python | [![Build
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=249340=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-249340 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 28/May/19 13:25 Start Date: 28/May/19 13:25 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on issue #8486: [BEAM-6766] Add Sort Merge Bucket sink and join source URL: https://github.com/apache/beam/pull/8486#issuecomment-496515226 cool! I can't envision many truly breaking changes in sharding strategy that would affect reads...e.g. dynamic # of shards per bucket shouldn't really impact reads. A sharding strategy based on target byte size rather than a random split is the main breaking change I can think of. It would be nice to support multiple sharding strategies with a reasonable default, and encode that in metadata, but the implementations are so different it's hard to envision how we could abstract it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 249340) Time Spent: 1h 20m (was: 1h 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: io-ideas, sdk-java-join-library >Reporter: Claire McGinty >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=249315=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-249315 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 28/May/19 12:49 Start Date: 28/May/19 12:49 Worklog Time Spent: 10m Work Description: nevillelyh commented on issue #8486: [BEAM-6766] Add Sort Merge Bucket sink and join source URL: https://github.com/apache/beam/pull/8486#issuecomment-496502572 Agree on monotonic integer versioning. Seems the simplest. If necessary we can branch in reader code path to handle old data, assuming there shouldn't be many breaking changes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 249315) Time Spent: 1h 10m (was: 1h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: io-ideas, sdk-java-join-library >Reporter: Claire McGinty >Priority: Major > Time Spent: 1h 10m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=249310=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-249310 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 28/May/19 12:40 Start Date: 28/May/19 12:40 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on issue #8486: [BEAM-6766] Add Sort Merge Bucket sink and join source URL: https://github.com/apache/beam/pull/8486#issuecomment-496499340 @nevillelyh Re: versioning the metadata. If we wrote the version as, say the Beam version we'd have to add extra logic to specify which Beam versions have compatible SMB implementations, which seems undesirable... we could also use just a monotonic integer or something like serial version ID and increment that when there's a breaking change. Or store information like sharding/bucketing strategy as String enums and compare those. However, I'm concerned about the implementation of SMB having truly backwards-incompatible changes across Beam versions--if the only way to read old SMB data is for the user to downgrade Beam I think that's a bit scary. Ideally at least all SMB reads are backwards-compatible, even if they're a bit less performant with older data. Wdyt? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 249310) Time Spent: 1h (was: 50m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: io-ideas, sdk-java-join-library >Reporter: Claire McGinty >Priority: Major > Time Spent: 1h > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=247894=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-247894 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 24/May/19 05:42 Start Date: 24/May/19 05:42 Worklog Time Spent: 10m Work Description: nevillelyh commented on issue #8486: [BEAM-6766] Add Sort Merge Bucket sink and join source URL: https://github.com/apache/beam/pull/8486#issuecomment-495479770 Some extra considerations. Ones that will affect backwards compatibility: - [ ] version in metadata, so we can reject joining data written by incompatible SMB versions - [ ] explicit bucket->`numShards` mapping in metadata, in order to have variable `numShards` Ones that are nice to have but can be added safely: - [ ] multiple (date/hour) partitions of any input source, e.g. when joining events with lookup table - [ ] determine `numBuckets` automatically - [ ] determine `numShards` (per bucket) automatically This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 247894) Time Spent: 50m (was: 40m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: io-ideas, sdk-java-join-library >Reporter: Claire McGinty >Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=237049=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-237049 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 03/May/19 20:34 Start Date: 03/May/19 20:34 Worklog Time Spent: 10m Work Description: kanterov commented on issue #8486: [BEAM-6766] Add Sort Merge Bucket sink and join source URL: https://github.com/apache/beam/pull/8486#issuecomment-489231172 I can take a look on Tuesday. Before the review, please check out PTransform style guide [1] and dependencies guide [2] and testing guide [3], if you didn't find it yet. [1]: https://beam.apache.org/contribute/ptransform-style-guide/ [2]: https://beam.apache.org/contribute/dependencies/ [3]: https://cwiki.apache.org/confluence/display/BEAM/Contribution+Testing+Guide This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 237049) Time Spent: 0.5h (was: 20m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: io-ideas, sdk-java-join-library >Reporter: Claire McGinty >Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=237050=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-237050 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 03/May/19 20:34 Start Date: 03/May/19 20:34 Worklog Time Spent: 10m Work Description: kanterov commented on issue #8486: [BEAM-6766] Add Sort Merge Bucket sink and join source URL: https://github.com/apache/beam/pull/8486#issuecomment-489231172 I can take a look on Tuesday. Before the review, please check out PTransform style guide [ptransform-style-guide] and dependencies guide [dependencies] and testing guide [Contribution+Testing+Guide], if you didn't find it yet. [ptransform-style-guide]: https://beam.apache.org/contribute/ptransform-style-guide/ [dependencies]: https://beam.apache.org/contribute/dependencies/ [Contribution+Testing+Guide]: https://cwiki.apache.org/confluence/display/BEAM/Contribution+Testing+Guide This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 237050) Time Spent: 40m (was: 0.5h) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: io-ideas, sdk-java-join-library >Reporter: Claire McGinty >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=237006=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-237006 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 03/May/19 18:57 Start Date: 03/May/19 18:57 Worklog Time Spent: 10m Work Description: ClaireMcGinty commented on pull request #8486: [BEAM-6766] Add Sort Merge Bucket sink and join source URL: https://github.com/apache/beam/pull/8486 Master ticket for the SMB work. https://en.wikipedia.org/wiki/Sort-merge_join BEAM issue: https://issues.apache.org/jira/browse/BEAM-6766 cc: @nevillelyh This is a first pass that successfully runs on DataflowRunner with test input. There are a few To-Do items listed below, as well as much more thorough/modular unit tests to write, but we wanted some feedback on this initial design before continuing. Todo - [ ] support compression - [ ] decouple intermediate layer e.g. `Metadata`, `FileOperations`, `BucketedInputIterator` for easy testing - [ ] work around `JsonSubTypes` limit in `BucketMetadata` - [ ] reuse temp directory logic in `FileBasedSink` - [ ] reuse file handling logic in `FileBasedSink` - [ ] support mixing sources of different number of buckets - [ ] how to provide better hints to auto-scaling so runners can pick better initial `numWorkers` instead of one - [ ] Cleanup temp files after failed writes Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-6766) Sort Merge Bucket Join support in Beam
[ https://issues.apache.org/jira/browse/BEAM-6766?focusedWorklogId=237009=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-237009 ] ASF GitHub Bot logged work on BEAM-6766: Author: ASF GitHub Bot Created on: 03/May/19 19:00 Start Date: 03/May/19 19:00 Worklog Time Spent: 10m Work Description: nevillelyh commented on issue #8486: [BEAM-6766] Add Sort Merge Bucket sink and join source URL: https://github.com/apache/beam/pull/8486#issuecomment-489204784 R: @reuvenlax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 237009) Time Spent: 20m (was: 10m) > Sort Merge Bucket Join support in Beam > -- > > Key: BEAM-6766 > URL: https://issues.apache.org/jira/browse/BEAM-6766 > Project: Beam > Issue Type: Improvement > Components: io-ideas, sdk-java-join-library >Reporter: Claire McGinty >Priority: Major > Time Spent: 20m > Remaining Estimate: 0h > > Hi! Spotify has been internally prototyping and testing an implementation of > the sort merge join using Beam primitives and we're interested in > contributing it open-source – probably to Beam's extensions package in its > own `smb` module or as part of the joins module? > We've tested this with Avro files using Avro's GenericDatumWriter/Reader > directly (although this could theoretically be expanded to other > serialization formats). We'd add two transforms*, an SMB write and an SMB > join. > SMB write would take in one PCollection and a # of buckets and: > 1) Apply a partitioning function to the input to assign each record to one > bucket. (the user code would have to statically specify a # of buckets... > hard to see a way to do this dynamically.) > 2) Group by that bucket ID and within each bucket perform an in-memory sort > on join key. If the grouped records are too large to fit in memory, fall back > to an external sort (although if this happens, user should probably increase > bucket size so every group fits in memory). > 3) Directly write the contents of bucket to a sequentially named file. > 4) Write a metadata file to the same output path with info about hash > algorithm/# buckets. > SMB join would take in the input paths for 2 or more Sources, all of which > are written in a bucketed and partitioned way, and : > 1) Verify that the metadata files have compatible bucket # and hash algorithm. > 2) Expand the input paths to enumerate the `ResourceIds` of every file in the > paths. Group all inputs with the same bucket ID. > 3) Within each group, open a file reader on all `ResourceIds`. Sequentially > read files one record at a time, outputting tuples of all record pairs with > matching join key. > \* These could be implemented either directly as `PTransforms` with the > writer being a `DoFn` but I semantically do like the idea of extending > `FileBasedSource`/`Sink` with abstract classes like > `SortedBucketSink`/`SortedBucketSource`... if we represent the elements in a > sink as KV pairs of >>, so that the # > of elements in the PCollection == # of buckets == # of output files, we could > just implement something like `SortedBucketSink` extending `FileBasedSink` > with a dynamic file naming function. I'd like to be able to take advantage of > the existing write/read implementation logic in the `io` package as much as > possible although I guess some of those are package private. > – > From our internal testing, we've seen some substantial performance > improvements using the right bucket size--not only by avoiding a shuffle > during the join step, but also in storage costs, since we're getting better > compression in Avro by storing sorted records. > Please let us know what you think/any concerns we can address! Our > implementation isn't quite production-ready yet, but we'd like to start a > discussion about it early. -- This message was sent by Atlassian JIRA (v7.6.3#76005)