Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-18 Thread Robert Bradshaw
On Wed, Jul 17, 2019 at 9:12 PM Gleb Kanterov  wrote:
>>
>> Suppose one assigns a sharding function to a PCollection. Is it lazy,
>> or does it induce a reshuffle right at that point? In either case,
>> once the ShardingFn has been applied, how long does it remain in
>> effect? Does it prohibit the runner (or user) from doing subsequent
>> resharding (including dynamic load balancing)? What happens when one
>> has a DoFn that changes the value? (Including the DoFns in our sinks
>> that assign random keys.)
>
>
> What if we would reason about sharding in the same way as we reason about 
> timestamps?
>
> Please correct me if I am wrong, as I know, in Beam, timestamps exist for 
> each element. You can get timestamp by using Reify.timestamps. If there are 
> timestamped values, and they go through ParDo, timestamps are preserved.

That is correct.

> We can think of the same with sharding, where Reify.shards would be 
> PTransform, ShardedValue> and ShardedValue would contain 
> shard and a grouping key.

Meaning the shard that the PCollection is currently sharded by, or the
one that it should be sharded by in the future. (Your use case is a
bit strange in that a single key may be spread across multiple shards,
as long as they're part of the same "bucket.")

> ParDo wouldn't change sharding and would propagate ShardingFn.

The ShardingFn may not be applicable to downstream (mutated) elements.

FYI, internally this is handled by having annotations on DoFns as
being key-preserving, and only reasoning about operations separated by
such DoFns.

> CoGroupByKey on such PTransforms would reify grouping key, and do regular 
> CoGroupByKey, or be rewritten to a regular ParDo if sharding of inputs is 
> compatible.
>
> As you mentioned, it requires dynamic work rebalancing to preserve sharding. 
> What if we do dynamic work rebalancing for each shard independently, as, I 
> guess, it's done today for fixed windows.

Currently, the unit of colocation is by key. Generally sharding
introduces a notion of colocation where multiple keys (or mulitple
elements, I suppose it need not be keyed) are promised to be processed
by the same machine. This is both to constraining (wrt dynamic
reshrading) and not needed (with respect to SMB, as your "colocation"
is per bucket, but buckets themselves can be processed in a
distributed manner).

> When we do a split, we would split one shard into two. It should be possible 
> to do consistently if values within buckets are sorted, in this case, we 
> would split ranges of possible values.

I'm not quite following here. Suppose one processes element a, m, and
z. Then one decides to split the bundle, but there's not a "range" we
can pick for the "other" as this bundle already spans the whole range.
But maybe I'm just off in the weeds here.

> On Wed, Jul 17, 2019 at 6:37 PM Robert Bradshaw  wrote:
>>
>> On Wed, Jul 17, 2019 at 4:26 PM Gleb Kanterov  wrote:
>> >
>> > I find there is an interesting point in the comments brought by Ahmed 
>> > Eleryan. Similar to WindowFn, having a concept of ShardingFn, that enables 
>> > users to implement a class for sharding data. Each Beam node can have 
>> > ShardingFn set, similar to WindowFn (or WindowingStrategy). Sinks and 
>> > sources are aware of that and preserve this information. Using that it's 
>> > possible to do optimization on Beam graph, removing redundant 
>> > CoGroupByKey, and it would be transparent to users.
>> >
>> > It feels like a nice addition to the Beam model, or possibly we can 
>> > implement it using existing windowing mechanics. There are people on the 
>> > list with strong experience in the area, I'm wondering what do you think?
>>
>> I've actually thought about this some, though it's been quite a while.
>> At the time it seemed hard to work it into a cohesive part of the
>> model (even ignoring the fact that sharding is primarily an execution,
>> rather than logical, property).
>>
>> Suppose one assigns a sharding function to a PCollection. Is it lazy,
>> or does it induce a reshuffle right at that point? In either case,
>> once the ShardingFn has been applied, how long does it remain in
>> effect? Does it prohibit the runner (or user) from doing subsequent
>> resharding (including dynamic load balancing)? What happens when one
>> has a DoFn that changes the value? (Including the DoFns in our sinks
>> that assign random keys.)
>>
>> Right now one can get most of the semantics of sharding by keying by
>> the shard id and doing a GBK, where the resulting value set (which is
>> allowed to be arbitrarily big) is the (indivisible) shard (e.g. for
>> writing to a single file.)
>>
>> I think sharding (like ordering, the two are quite related) is a
>> property that a PCollection can have, and could be leveraged by the
>> optimizer, but it's difficult to see how it's propagated through
>> transforms. The most sane way to reason about it IMHO is similar to
>> sink triggers, where one specifies that one wants a sharding at some
>> p

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-17 Thread Gleb Kanterov
>
> Suppose one assigns a sharding function to a PCollection. Is it lazy,
> or does it induce a reshuffle right at that point? In either case,
> once the ShardingFn has been applied, how long does it remain in
> effect? Does it prohibit the runner (or user) from doing subsequent
> resharding (including dynamic load balancing)? What happens when one
> has a DoFn that changes the value? (Including the DoFns in our sinks
> that assign random keys.)


What if we would reason about sharding in the same way as we reason about
timestamps?

Please correct me if I am wrong, as I know, in Beam, timestamps exist for
each element. You can get timestamp by using Reify.timestamps. If there are
timestamped values, and they go through ParDo, timestamps are preserved. We
can think of the same with sharding, where Reify.shards would be
PTransform, ShardedValue> and ShardedValue would
contain shard and a grouping key. ParDo wouldn't change sharding and would
propagate ShardingFn. CoGroupByKey on such PTransforms would reify grouping
key, and do regular CoGroupByKey, or be rewritten to a regular ParDo if
sharding of inputs is compatible.

As you mentioned, it requires dynamic work rebalancing to preserve
sharding. What if we do dynamic work rebalancing for each shard
independently, as, I guess, it's done today for fixed windows. When we do a
split, we would split one shard into two. It should be possible to do
consistently if values within buckets are sorted, in this case, we would
split ranges of possible values.






On Wed, Jul 17, 2019 at 6:37 PM Robert Bradshaw  wrote:

> On Wed, Jul 17, 2019 at 4:26 PM Gleb Kanterov  wrote:
> >
> > I find there is an interesting point in the comments brought by Ahmed
> Eleryan. Similar to WindowFn, having a concept of ShardingFn, that enables
> users to implement a class for sharding data. Each Beam node can have
> ShardingFn set, similar to WindowFn (or WindowingStrategy). Sinks and
> sources are aware of that and preserve this information. Using that it's
> possible to do optimization on Beam graph, removing redundant CoGroupByKey,
> and it would be transparent to users.
> >
> > It feels like a nice addition to the Beam model, or possibly we can
> implement it using existing windowing mechanics. There are people on the
> list with strong experience in the area, I'm wondering what do you think?
>
> I've actually thought about this some, though it's been quite a while.
> At the time it seemed hard to work it into a cohesive part of the
> model (even ignoring the fact that sharding is primarily an execution,
> rather than logical, property).
>
> Suppose one assigns a sharding function to a PCollection. Is it lazy,
> or does it induce a reshuffle right at that point? In either case,
> once the ShardingFn has been applied, how long does it remain in
> effect? Does it prohibit the runner (or user) from doing subsequent
> resharding (including dynamic load balancing)? What happens when one
> has a DoFn that changes the value? (Including the DoFns in our sinks
> that assign random keys.)
>
> Right now one can get most of the semantics of sharding by keying by
> the shard id and doing a GBK, where the resulting value set (which is
> allowed to be arbitrarily big) is the (indivisible) shard (e.g. for
> writing to a single file.)
>
> I think sharding (like ordering, the two are quite related) is a
> property that a PCollection can have, and could be leveraged by the
> optimizer, but it's difficult to see how it's propagated through
> transforms. The most sane way to reason about it IMHO is similar to
> sink triggers, where one specifies that one wants a sharding at some
> point, and the runner arranges things upstream such that it is so, and
> some operations can declare that they happen to produce data sharded
> in some way (though again, PCollection to PCollection one needs a
> consistent notion of key to have a consistent notion of sharding).
>
> > Gleb
> >
> > On Tue, Jul 16, 2019 at 11:34 PM Eugene Kirpichov 
> wrote:
> >>
> >> I'd like to reiterate the request to not build anything on top of
> FileBasedSource/Reader.
> >> If the design requires having some interface for representing a
> function from a filename to a stream of records, better introduce a new
> interface for that.
> >> If it requires interoperability with other IOs that read files, better
> change them to use the new interface.
> >>
> >> On Tue, Jul 16, 2019 at 9:08 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>>
> >>> Thanks this clarifies a lot.
> >>>
> >>> For writer, I think it's great if you can utilize existing FileIO.Sink
> implementations even if you have to reimplement some of the logic (for
> example compression, temp file handling) that is already implemented in
> Beam FileIO/WriteFiles transforms in your SMB sink transform.
> >>>
> >>> For reader, you are right that there's no FileIO.Read. What we have
> are various implementations of FileBasedSource/FileBasedReader classes that
> are currently inte

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-17 Thread Robert Bradshaw
On Wed, Jul 17, 2019 at 4:26 PM Gleb Kanterov  wrote:
>
> I find there is an interesting point in the comments brought by Ahmed 
> Eleryan. Similar to WindowFn, having a concept of ShardingFn, that enables 
> users to implement a class for sharding data. Each Beam node can have 
> ShardingFn set, similar to WindowFn (or WindowingStrategy). Sinks and sources 
> are aware of that and preserve this information. Using that it's possible to 
> do optimization on Beam graph, removing redundant CoGroupByKey, and it would 
> be transparent to users.
>
> It feels like a nice addition to the Beam model, or possibly we can implement 
> it using existing windowing mechanics. There are people on the list with 
> strong experience in the area, I'm wondering what do you think?

I've actually thought about this some, though it's been quite a while.
At the time it seemed hard to work it into a cohesive part of the
model (even ignoring the fact that sharding is primarily an execution,
rather than logical, property).

Suppose one assigns a sharding function to a PCollection. Is it lazy,
or does it induce a reshuffle right at that point? In either case,
once the ShardingFn has been applied, how long does it remain in
effect? Does it prohibit the runner (or user) from doing subsequent
resharding (including dynamic load balancing)? What happens when one
has a DoFn that changes the value? (Including the DoFns in our sinks
that assign random keys.)

Right now one can get most of the semantics of sharding by keying by
the shard id and doing a GBK, where the resulting value set (which is
allowed to be arbitrarily big) is the (indivisible) shard (e.g. for
writing to a single file.)

I think sharding (like ordering, the two are quite related) is a
property that a PCollection can have, and could be leveraged by the
optimizer, but it's difficult to see how it's propagated through
transforms. The most sane way to reason about it IMHO is similar to
sink triggers, where one specifies that one wants a sharding at some
point, and the runner arranges things upstream such that it is so, and
some operations can declare that they happen to produce data sharded
in some way (though again, PCollection to PCollection one needs a
consistent notion of key to have a consistent notion of sharding).

> Gleb
>
> On Tue, Jul 16, 2019 at 11:34 PM Eugene Kirpichov  
> wrote:
>>
>> I'd like to reiterate the request to not build anything on top of 
>> FileBasedSource/Reader.
>> If the design requires having some interface for representing a function 
>> from a filename to a stream of records, better introduce a new interface for 
>> that.
>> If it requires interoperability with other IOs that read files, better 
>> change them to use the new interface.
>>
>> On Tue, Jul 16, 2019 at 9:08 AM Chamikara Jayalath  
>> wrote:
>>>
>>> Thanks this clarifies a lot.
>>>
>>> For writer, I think it's great if you can utilize existing FileIO.Sink 
>>> implementations even if you have to reimplement some of the logic (for 
>>> example compression, temp file handling) that is already implemented in 
>>> Beam FileIO/WriteFiles transforms in your SMB sink transform.
>>>
>>> For reader, you are right that there's no FileIO.Read. What we have are 
>>> various implementations of FileBasedSource/FileBasedReader classes that are 
>>> currently intentionally hidden since Beam IO transforms are expected to be 
>>> the intended public interface for users. If you can expose and re-use these 
>>> classes with slight modifications (keeping backwards compatibility) I'm OK 
>>> with it. Otherwise you'll have to write your own reader implementations.
>>>
>>> In general, seems like SMB has very strong requirements related to 
>>> sharding/hot-key management that are not easily achievable by implementing 
>>> SMB source/sink as a composite transform that utilizes existing source/sink 
>>> transforms. This forces you to implement this logic in your own DoFns and 
>>> existing Beam primitives are not easily re-usable in this context.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Jul 16, 2019 at 8:26 AM Neville Li  wrote:

 A little clarification of the IO requirement and my understanding of the 
 current state of IO.

 tl;dr: not sure if there're reusable bits for the reader. It's possible to 
 reuse some for the writer but with heavy refactoring.

 Reader

 For each bucket (containing the same key partition, sorted) across 
 multiple input data sets, we stream records from bucket files and merge 
 sort.
 We open the files in a DoFn, and emit KV where the CGBKR 
 encapsulates Iterable from each input.
 Basically we need a simple API like ResourceId -> Iterator, i.e. 
 sequential read, no block/offset/split requirement.
 FileBasedSource.FileBasedReader seems the closest fit but they're nested & 
 decoupled.
 There's no FileIO.Read, only a ReadMatches[1], which can be used with 
 ReadAllViaFileBasedSource. But that's not 

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-17 Thread Robert Bradshaw
It is not possible to implement SMB on top of the various top-level
SomeFileIO.{write,read} PTransforms. One need the internal details.

It seems we should re-use (and expose) the existing FileSinks as a
parameter to SMBSink (and also port the old-style sinks to use these).
We also need the complementary FileSource that SMBSource could be
parameterized by (and would also be useful in the readAll transforms
that take a list of files (e.g. from a match) and actually read them
(do these not exist already?). This particular use does not need
(dynamic or otherwise) range restrictions, but it's easy to say "read
from 0 to infinity" if it has them. It does require maintaining
ordering (within a shard).

Perhaps this could be sketched out more in a doc?

On Tue, Jul 16, 2019 at 11:34 PM Eugene Kirpichov  wrote:
>
> I'd like to reiterate the request to not build anything on top of 
> FileBasedSource/Reader.
> If the design requires having some interface for representing a function from 
> a filename to a stream of records, better introduce a new interface for that.
> If it requires interoperability with other IOs that read files, better change 
> them to use the new interface.
>
> On Tue, Jul 16, 2019 at 9:08 AM Chamikara Jayalath  
> wrote:
>>
>> Thanks this clarifies a lot.
>>
>> For writer, I think it's great if you can utilize existing FileIO.Sink 
>> implementations even if you have to reimplement some of the logic (for 
>> example compression, temp file handling) that is already implemented in Beam 
>> FileIO/WriteFiles transforms in your SMB sink transform.
>>
>> For reader, you are right that there's no FileIO.Read. What we have are 
>> various implementations of FileBasedSource/FileBasedReader classes that are 
>> currently intentionally hidden since Beam IO transforms are expected to be 
>> the intended public interface for users. If you can expose and re-use these 
>> classes with slight modifications (keeping backwards compatibility) I'm OK 
>> with it. Otherwise you'll have to write your own reader implementations.
>>
>> In general, seems like SMB has very strong requirements related to 
>> sharding/hot-key management that are not easily achievable by implementing 
>> SMB source/sink as a composite transform that utilizes existing source/sink 
>> transforms. This forces you to implement this logic in your own DoFns and 
>> existing Beam primitives are not easily re-usable in this context.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Jul 16, 2019 at 8:26 AM Neville Li  wrote:
>>>
>>> A little clarification of the IO requirement and my understanding of the 
>>> current state of IO.
>>>
>>> tl;dr: not sure if there're reusable bits for the reader. It's possible to 
>>> reuse some for the writer but with heavy refactoring.
>>>
>>> Reader
>>>
>>> For each bucket (containing the same key partition, sorted) across multiple 
>>> input data sets, we stream records from bucket files and merge sort.
>>> We open the files in a DoFn, and emit KV where the CGBKR 
>>> encapsulates Iterable from each input.
>>> Basically we need a simple API like ResourceId -> Iterator, i.e. 
>>> sequential read, no block/offset/split requirement.
>>> FileBasedSource.FileBasedReader seems the closest fit but they're nested & 
>>> decoupled.
>>> There's no FileIO.Read, only a ReadMatches[1], which can be used with 
>>> ReadAllViaFileBasedSource. But that's not the granularity we need, since 
>>> we lose ordering of the input records, and can't merge 2+ sources.
>>>
>>> Writer
>>>
>>> We get a `PCollection>` after bucket and and 
>>> sort, where Iterable is the records sorted by key and BucketShardId is 
>>> used to produce filename, e.g. bucket-1-shard-2.avro.
>>> We write each Iterable to a temp file and move to final destination when 
>>> done. Both should ideally reuse existing code.
>>> Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO) supports 
>>> record writing into a WritableByteChannel, but some logic like compression 
>>> is handled in FileIO through ViaFileBasedSink which extends FileBasedSink.
>>> FileIO uses WriteFiles[3] to shard and write of PCollection. Again we 
>>> lose ordering of the output records or custom file naming scheme. However, 
>>> WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in WriteFiles 
>>> seem closest to our need but would have to be split out and generalized.
>>>
>>> Note on reader block/offset/split requirement
>>>
>>> Because of the merge sort, we can't split or offset seek a bucket file. 
>>> Because without persisting the offset index of a key group somewhere, we 
>>> can't efficiently skip to a key group without exhausting the previous ones. 
>>> Furthermore we need to merge sort and align keys from multiple sources, 
>>> which may not have the same key distribution. It might be possible to 
>>> binary search for matching keys but that's extra complication. IMO the 
>>> reader work distribution is better solved by better bucket/shard strategy 
>>> in upstream write

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-17 Thread Gleb Kanterov
I find there is an interesting point in the comments brought by Ahmed
Eleryan. Similar to WindowFn, having a concept of ShardingFn, that enables
users to implement a class for sharding data. Each Beam node can have
ShardingFn set, similar to WindowFn (or WindowingStrategy). Sinks and
sources are aware of that and preserve this information. Using that it's
possible to do optimization on Beam graph, removing redundant CoGroupByKey,
and it would be transparent to users.

It feels like a nice addition to the Beam model, or possibly we can
implement it using existing windowing mechanics. There are people on the
list with strong experience in the area, I'm wondering what do you think?

Gleb

On Tue, Jul 16, 2019 at 11:34 PM Eugene Kirpichov 
wrote:

> I'd like to reiterate the request to not build anything on top of
> FileBasedSource/Reader.
> If the design requires having some interface for representing a function
> from a filename to a stream of records, better introduce a new interface
> for that.
> If it requires interoperability with other IOs that read files, better
> change them to use the new interface.
>
> On Tue, Jul 16, 2019 at 9:08 AM Chamikara Jayalath 
> wrote:
>
>> Thanks this clarifies a lot.
>>
>> For writer, I think it's great if you can utilize existing FileIO.Sink
>> implementations even if you have to reimplement some of the logic (for
>> example compression, temp file handling) that is already implemented in
>> Beam FileIO/WriteFiles transforms in your SMB sink transform.
>>
>> For reader, you are right that there's no FileIO.Read. What we have are
>> various implementations of FileBasedSource/FileBasedReader classes that are
>> currently intentionally hidden since Beam IO transforms are expected to be
>> the intended public interface for users. If you can expose and re-use these
>> classes with slight modifications (keeping backwards compatibility) I'm OK
>> with it. Otherwise you'll have to write your own reader implementations.
>>
>> In general, seems like SMB has very strong requirements related to
>> sharding/hot-key management that are not easily achievable by implementing
>> SMB source/sink as a composite transform that utilizes existing source/sink
>> transforms. This forces you to implement this logic in your own DoFns and
>> existing Beam primitives are not easily re-usable in this context.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Jul 16, 2019 at 8:26 AM Neville Li  wrote:
>>
>>> A little clarification of the IO requirement and my understanding of the
>>> current state of IO.
>>>
>>> tl;dr: not sure if there're reusable bits for the reader. It's possible
>>> to reuse some for the writer but with heavy refactoring.
>>>
>>> *Reader*
>>>
>>>- For each bucket (containing the same key partition, sorted) across
>>>multiple input data sets, we stream records from bucket files and merge
>>>sort.
>>>- We open the files in a DoFn, and emit KV where the
>>>CGBKR encapsulates Iterable from each input.
>>>- Basically we need a simple API like ResourceId -> Iterator,
>>>i.e. sequential read, no block/offset/split requirement.
>>>- FileBasedSource.FileBasedReader seems the closest fit but they're
>>>nested & decoupled.
>>>- There's no FileIO.Read, only a ReadMatches[1], which can be used
>>>with ReadAllViaFileBasedSource. But that's not the granularity we 
>>> need,
>>>since we lose ordering of the input records, and can't merge 2+ sources.
>>>
>>> *Writer*
>>>
>>>- We get a `PCollection>` after bucket
>>>and and sort, where Iterable is the records sorted by key and
>>>BucketShardId is used to produce filename, e.g.
>>>bucket-1-shard-2.avro.
>>>- We write each Iterable to a temp file and move to final
>>>destination when done. Both should ideally reuse existing code.
>>>- Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO)
>>>supports record writing into a WritableByteChannel, but some logic like
>>>compression is handled in FileIO through ViaFileBasedSink which extends
>>>FileBasedSink.
>>>- FileIO uses WriteFiles[3] to shard and write of PCollection.
>>>Again we lose ordering of the output records or custom file naming 
>>> scheme.
>>>However, WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in
>>>WriteFiles seem closest to our need but would have to be split out and
>>>generalized.
>>>
>>> *Note on reader block/offset/split requirement*
>>>
>>>- Because of the merge sort, we can't split or offset seek a bucket
>>>file. Because without persisting the offset index of a key group 
>>> somewhere,
>>>we can't efficiently skip to a key group without exhausting the previous
>>>ones. Furthermore we need to merge sort and align keys from multiple
>>>sources, which may not have the same key distribution. It might be 
>>> possible
>>>to binary search for matching keys but that's extra complication. IMO the
>>>reader work distribution is bette

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-16 Thread Eugene Kirpichov
I'd like to reiterate the request to not build anything on top of
FileBasedSource/Reader.
If the design requires having some interface for representing a function
from a filename to a stream of records, better introduce a new interface
for that.
If it requires interoperability with other IOs that read files, better
change them to use the new interface.

On Tue, Jul 16, 2019 at 9:08 AM Chamikara Jayalath 
wrote:

> Thanks this clarifies a lot.
>
> For writer, I think it's great if you can utilize existing FileIO.Sink
> implementations even if you have to reimplement some of the logic (for
> example compression, temp file handling) that is already implemented in
> Beam FileIO/WriteFiles transforms in your SMB sink transform.
>
> For reader, you are right that there's no FileIO.Read. What we have are
> various implementations of FileBasedSource/FileBasedReader classes that are
> currently intentionally hidden since Beam IO transforms are expected to be
> the intended public interface for users. If you can expose and re-use these
> classes with slight modifications (keeping backwards compatibility) I'm OK
> with it. Otherwise you'll have to write your own reader implementations.
>
> In general, seems like SMB has very strong requirements related to
> sharding/hot-key management that are not easily achievable by implementing
> SMB source/sink as a composite transform that utilizes existing source/sink
> transforms. This forces you to implement this logic in your own DoFns and
> existing Beam primitives are not easily re-usable in this context.
>
> Thanks,
> Cham
>
> On Tue, Jul 16, 2019 at 8:26 AM Neville Li  wrote:
>
>> A little clarification of the IO requirement and my understanding of the
>> current state of IO.
>>
>> tl;dr: not sure if there're reusable bits for the reader. It's possible
>> to reuse some for the writer but with heavy refactoring.
>>
>> *Reader*
>>
>>- For each bucket (containing the same key partition, sorted) across
>>multiple input data sets, we stream records from bucket files and merge
>>sort.
>>- We open the files in a DoFn, and emit KV where the
>>CGBKR encapsulates Iterable from each input.
>>- Basically we need a simple API like ResourceId -> Iterator, i.e.
>>sequential read, no block/offset/split requirement.
>>- FileBasedSource.FileBasedReader seems the closest fit but they're
>>nested & decoupled.
>>- There's no FileIO.Read, only a ReadMatches[1], which can be used
>>with ReadAllViaFileBasedSource. But that's not the granularity we need,
>>since we lose ordering of the input records, and can't merge 2+ sources.
>>
>> *Writer*
>>
>>- We get a `PCollection>` after bucket and
>>and sort, where Iterable is the records sorted by key and BucketShardId
>>is used to produce filename, e.g. bucket-1-shard-2.avro.
>>- We write each Iterable to a temp file and move to final
>>destination when done. Both should ideally reuse existing code.
>>- Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO)
>>supports record writing into a WritableByteChannel, but some logic like
>>compression is handled in FileIO through ViaFileBasedSink which extends
>>FileBasedSink.
>>- FileIO uses WriteFiles[3] to shard and write of PCollection.
>>Again we lose ordering of the output records or custom file naming scheme.
>>However, WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in
>>WriteFiles seem closest to our need but would have to be split out and
>>generalized.
>>
>> *Note on reader block/offset/split requirement*
>>
>>- Because of the merge sort, we can't split or offset seek a bucket
>>file. Because without persisting the offset index of a key group 
>> somewhere,
>>we can't efficiently skip to a key group without exhausting the previous
>>ones. Furthermore we need to merge sort and align keys from multiple
>>sources, which may not have the same key distribution. It might be 
>> possible
>>to binary search for matching keys but that's extra complication. IMO the
>>reader work distribution is better solved by better bucket/shard strategy
>>in upstream writer.
>>
>> *References*
>>
>>1. ReadMatches extends PTransform,
>>PCollection>
>>2. ReadAllViaFileBasedSource extends
>>PTransform, PCollection>
>>3. WriteFiles extends
>>PTransform, WriteFilesResult>
>>4. WriteShardsIntoTempFilesFn extends DoFn,
>>Iterable>, FileResult>
>>5. FinalizeTempFileBundles extends PTransform<
>>PCollection>>, 
>> WriteFilesResult>
>>
>>
>> On Tue, Jul 16, 2019 at 5:15 AM Robert Bradshaw 
>> wrote:
>>
>>> On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov 
>>> wrote:
>>> >
>>> > Quick note: I didn't look through the document, but please do not
>>> build on either FileBasedSink or FileBasedReader. They are both remnants of
>>> the old, non-composable IO world; and in fact much of the composable IO
>>> work emerged from frustration

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-16 Thread Chamikara Jayalath
Thanks this clarifies a lot.

For writer, I think it's great if you can utilize existing FileIO.Sink
implementations even if you have to reimplement some of the logic (for
example compression, temp file handling) that is already implemented in
Beam FileIO/WriteFiles transforms in your SMB sink transform.

For reader, you are right that there's no FileIO.Read. What we have are
various implementations of FileBasedSource/FileBasedReader classes that are
currently intentionally hidden since Beam IO transforms are expected to be
the intended public interface for users. If you can expose and re-use these
classes with slight modifications (keeping backwards compatibility) I'm OK
with it. Otherwise you'll have to write your own reader implementations.

In general, seems like SMB has very strong requirements related to
sharding/hot-key management that are not easily achievable by implementing
SMB source/sink as a composite transform that utilizes existing source/sink
transforms. This forces you to implement this logic in your own DoFns and
existing Beam primitives are not easily re-usable in this context.

Thanks,
Cham

On Tue, Jul 16, 2019 at 8:26 AM Neville Li  wrote:

> A little clarification of the IO requirement and my understanding of the
> current state of IO.
>
> tl;dr: not sure if there're reusable bits for the reader. It's possible to
> reuse some for the writer but with heavy refactoring.
>
> *Reader*
>
>- For each bucket (containing the same key partition, sorted) across
>multiple input data sets, we stream records from bucket files and merge
>sort.
>- We open the files in a DoFn, and emit KV where the
>CGBKR encapsulates Iterable from each input.
>- Basically we need a simple API like ResourceId -> Iterator, i.e.
>sequential read, no block/offset/split requirement.
>- FileBasedSource.FileBasedReader seems the closest fit but they're
>nested & decoupled.
>- There's no FileIO.Read, only a ReadMatches[1], which can be used
>with ReadAllViaFileBasedSource. But that's not the granularity we need,
>since we lose ordering of the input records, and can't merge 2+ sources.
>
> *Writer*
>
>- We get a `PCollection>` after bucket and
>and sort, where Iterable is the records sorted by key and BucketShardId
>is used to produce filename, e.g. bucket-1-shard-2.avro.
>- We write each Iterable to a temp file and move to final
>destination when done. Both should ideally reuse existing code.
>- Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO)
>supports record writing into a WritableByteChannel, but some logic like
>compression is handled in FileIO through ViaFileBasedSink which extends
>FileBasedSink.
>- FileIO uses WriteFiles[3] to shard and write of PCollection.
>Again we lose ordering of the output records or custom file naming scheme.
>However, WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in
>WriteFiles seem closest to our need but would have to be split out and
>generalized.
>
> *Note on reader block/offset/split requirement*
>
>- Because of the merge sort, we can't split or offset seek a bucket
>file. Because without persisting the offset index of a key group somewhere,
>we can't efficiently skip to a key group without exhausting the previous
>ones. Furthermore we need to merge sort and align keys from multiple
>sources, which may not have the same key distribution. It might be possible
>to binary search for matching keys but that's extra complication. IMO the
>reader work distribution is better solved by better bucket/shard strategy
>in upstream writer.
>
> *References*
>
>1. ReadMatches extends PTransform,
>PCollection>
>2. ReadAllViaFileBasedSource extends
>PTransform, PCollection>
>3. WriteFiles extends
>PTransform, WriteFilesResult>
>4. WriteShardsIntoTempFilesFn extends DoFn,
>Iterable>, FileResult>
>5. FinalizeTempFileBundles extends PTransform<
>PCollection>>, 
> WriteFilesResult>
>
>
> On Tue, Jul 16, 2019 at 5:15 AM Robert Bradshaw 
> wrote:
>
>> On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov 
>> wrote:
>> >
>> > Quick note: I didn't look through the document, but please do not build
>> on either FileBasedSink or FileBasedReader. They are both remnants of the
>> old, non-composable IO world; and in fact much of the composable IO work
>> emerged from frustration with their limitations and recognizing that many
>> other IOs were suffering from the same limitations.
>> > Instead of FileBasedSink, build on FileIO.write; instead of
>> FileBasedReader, build on FileIO.read.
>>
>> +1
>>
>> I think the sink could be written atop FileIO.write, possibly using
>> dynamic destinations. At the very least the FileSink interface, which
>> handles the details of writing a single shard, would be an ideal way
>> to parameterize an SMB sink. It seems that none of our existing IOs
>> (publically?) expose FileSin

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-16 Thread Neville Li
A little clarification of the IO requirement and my understanding of the
current state of IO.

tl;dr: not sure if there're reusable bits for the reader. It's possible to
reuse some for the writer but with heavy refactoring.

*Reader*

   - For each bucket (containing the same key partition, sorted) across
   multiple input data sets, we stream records from bucket files and merge
   sort.
   - We open the files in a DoFn, and emit KV where the
   CGBKR encapsulates Iterable from each input.
   - Basically we need a simple API like ResourceId -> Iterator, i.e.
   sequential read, no block/offset/split requirement.
   - FileBasedSource.FileBasedReader seems the closest fit but they're
   nested & decoupled.
   - There's no FileIO.Read, only a ReadMatches[1], which can be used with
   ReadAllViaFileBasedSource. But that's not the granularity we need, since
   we lose ordering of the input records, and can't merge 2+ sources.

*Writer*

   - We get a `PCollection>` after bucket and
   and sort, where Iterable is the records sorted by key and BucketShardId
   is used to produce filename, e.g. bucket-1-shard-2.avro.
   - We write each Iterable to a temp file and move to final destination
   when done. Both should ideally reuse existing code.
   - Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO)
   supports record writing into a WritableByteChannel, but some logic like
   compression is handled in FileIO through ViaFileBasedSink which extends
   FileBasedSink.
   - FileIO uses WriteFiles[3] to shard and write of PCollection. Again
   we lose ordering of the output records or custom file naming scheme.
   However, WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in
   WriteFiles seem closest to our need but would have to be split out and
   generalized.

*Note on reader block/offset/split requirement*

   - Because of the merge sort, we can't split or offset seek a bucket
   file. Because without persisting the offset index of a key group somewhere,
   we can't efficiently skip to a key group without exhausting the previous
   ones. Furthermore we need to merge sort and align keys from multiple
   sources, which may not have the same key distribution. It might be possible
   to binary search for matching keys but that's extra complication. IMO the
   reader work distribution is better solved by better bucket/shard strategy
   in upstream writer.

*References*

   1. ReadMatches extends PTransform,
   PCollection>
   2. ReadAllViaFileBasedSource extends
   PTransform, PCollection>
   3. WriteFiles extends
   PTransform, WriteFilesResult>
   4. WriteShardsIntoTempFilesFn extends DoFn,
   Iterable>, FileResult>
   5. FinalizeTempFileBundles extends PTransform<
   PCollection>>, WriteFilesResult>


On Tue, Jul 16, 2019 at 5:15 AM Robert Bradshaw  wrote:

> On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov 
> wrote:
> >
> > Quick note: I didn't look through the document, but please do not build
> on either FileBasedSink or FileBasedReader. They are both remnants of the
> old, non-composable IO world; and in fact much of the composable IO work
> emerged from frustration with their limitations and recognizing that many
> other IOs were suffering from the same limitations.
> > Instead of FileBasedSink, build on FileIO.write; instead of
> FileBasedReader, build on FileIO.read.
>
> +1
>
> I think the sink could be written atop FileIO.write, possibly using
> dynamic destinations. At the very least the FileSink interface, which
> handles the details of writing a single shard, would be an ideal way
> to parameterize an SMB sink. It seems that none of our existing IOs
> (publically?) expose FileSink implementations.
>
> FileIO.read is not flexible enough to do the merging. Eugene, is there
> a composable analogue to FileSink, for sources, i.e. something that
> can turn a file handle (possibly with offsets) into a set of records
> other than FileBasedReader?
>
> > On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov  wrote:
> >>
> >> I share the same concern with Robert regarding re-implementing parts of
> IO. At the same time, in the past, I worked on internal libraries that try
> to re-use code from existing IO, and it's hardly possible because it feels
> like it wasn't designed for re-use. There are a lot of classes that are
> nested (non-static) or non-public. I can understand why they were made
> non-public, it's a hard abstraction to design well and keep compatibility.
> As Neville mentioned, decoupling readers and writers would not only benefit
> for this proposal but for any other use-case that has to deal with
> low-level API such as FileSystem API, that is hardly possible today without
> copy-pasting,
> >>
> >>
> >>
> >>
> >>
> >> On Mon, Jul 15, 2019 at 5:05 PM Neville Li 
> wrote:
> >>>
> >>> Re: avoiding mirroring IO functionality, what about:
> >>>
> >>> - Decouple the nested FileBasedSink.Writer and
> FileBasedSource.FileBasedReader, make them top level and remove references
> to parent 

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-16 Thread Robert Bradshaw
On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov  wrote:
>
> Quick note: I didn't look through the document, but please do not build on 
> either FileBasedSink or FileBasedReader. They are both remnants of the old, 
> non-composable IO world; and in fact much of the composable IO work emerged 
> from frustration with their limitations and recognizing that many other IOs 
> were suffering from the same limitations.
> Instead of FileBasedSink, build on FileIO.write; instead of FileBasedReader, 
> build on FileIO.read.

+1

I think the sink could be written atop FileIO.write, possibly using
dynamic destinations. At the very least the FileSink interface, which
handles the details of writing a single shard, would be an ideal way
to parameterize an SMB sink. It seems that none of our existing IOs
(publically?) expose FileSink implementations.

FileIO.read is not flexible enough to do the merging. Eugene, is there
a composable analogue to FileSink, for sources, i.e. something that
can turn a file handle (possibly with offsets) into a set of records
other than FileBasedReader?

> On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov  wrote:
>>
>> I share the same concern with Robert regarding re-implementing parts of IO. 
>> At the same time, in the past, I worked on internal libraries that try to 
>> re-use code from existing IO, and it's hardly possible because it feels like 
>> it wasn't designed for re-use. There are a lot of classes that are nested 
>> (non-static) or non-public. I can understand why they were made non-public, 
>> it's a hard abstraction to design well and keep compatibility. As Neville 
>> mentioned, decoupling readers and writers would not only benefit for this 
>> proposal but for any other use-case that has to deal with low-level API such 
>> as FileSystem API, that is hardly possible today without copy-pasting,
>>
>>
>>
>>
>>
>> On Mon, Jul 15, 2019 at 5:05 PM Neville Li  wrote:
>>>
>>> Re: avoiding mirroring IO functionality, what about:
>>>
>>> - Decouple the nested FileBasedSink.Writer and 
>>> FileBasedSource.FileBasedReader, make them top level and remove references 
>>> to parent classes.
>>> - Simplify the interfaces, while maintaining support for block/offset read 
>>> & sequential write.
>>> - As a bonus, the refactored IO classes can be used standalone in case when 
>>> the user wants to perform custom IO in a DoFn, i.e. a 
>>> PTransform, PCollection>>. Today 
>>> this requires a lot of copy-pasted Avro boilerplate.
>>> - For compatibility, we can delegate to the new classes from the old ones 
>>> and remove them in the next breaking release.
>>>
>>> Re: WriteFiles logic, I'm not sure about generalizing it, but what about 
>>> splitting the part handling writing temp files into a new 
>>> PTransform>>, 
>>> PCollection>>? That splits the bucket-shard 
>>> logic from actual file IO.
>>>
>>> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw  
>>> wrote:

 I agree that generalizing the existing FileIO may not be the right
 path forward, and I'd only make their innards public with great care.
 (Would this be used like like
 SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
 unique that the source and sink are much more coupled than other
 sources and sinks (which happen to be completely independent, if
 complementary implementations, whereas SMB attempts to be a kind of
 pipe where one half is instanciated in each pipeline).

 In short, an SMB source/sink that is parameterized by an arbitrary,
 existing IO would be ideal (but possibly not feasible (per existing
 prioritizations)), or an SMB source/sink that works as a pair. What
 I'd like to avoid is a set of parallel SMB IO classes that (partially,
 and incompletely) mirror the existing IO ones (from an API
 perspective--how much implementation it makes sense to share is an
 orthogonal issue that I'm sure can be worked out.)

 On Mon, Jul 15, 2019 at 4:18 PM Neville Li  wrote:
 >
 > Hi Robert,
 >
 > I agree, it'd be nice to reuse FileIO logic of different file types. But 
 > given the current code structure of FileIO & scope of the change, I feel 
 > it's better left for future refactor PRs.
 >
 > Some thoughts:
 > - SMB file operation is simple single file sequential reads/writes, 
 > which already exists as Writer & FileBasedReader but are private inner 
 > classes, and have references to the parent Sink/Source instance.
 > - The readers also have extra offset/split logic but that can be worked 
 > around.
 > - It'll be nice to not duplicate temp->destination file logic but again 
 > WriteFiles is assuming a single integer shard key, so it'll take some 
 > refactoring to reuse it.
 >
 > All of these can be done in backwards compatible way. OTOH generalizing 
 > the existing components too much (esp. WriteFiles, which is already 
 > complex) might lead to two logi

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-15 Thread Eugene Kirpichov
Quick note: I didn't look through the document, but please do not build on
either FileBasedSink or FileBasedReader. They are both remnants of the old,
non-composable IO world; and in fact much of the composable IO work emerged
from frustration with their limitations and recognizing that many other IOs
were suffering from the same limitations.
Instead of FileBasedSink, build on FileIO.write; instead of
FileBasedReader, build on FileIO.read.

On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov  wrote:

> I share the same concern with Robert regarding re-implementing parts of
> IO. At the same time, in the past, I worked on internal libraries that try
> to re-use code from existing IO, and it's hardly possible because it feels
> like it wasn't designed for re-use. There are a lot of classes that are
> nested (non-static) or non-public. I can understand why they were made
> non-public, it's a hard abstraction to design well and keep compatibility.
> As Neville mentioned, decoupling readers and writers would not only benefit
> for this proposal but for any other use-case that has to deal with
> low-level API such as FileSystem API, that is hardly possible today without
> copy-pasting,
>
>
>
>
>
> On Mon, Jul 15, 2019 at 5:05 PM Neville Li  wrote:
>
>> Re: avoiding mirroring IO functionality, what about:
>>
>> - Decouple the nested FileBasedSink.Writer and
>> FileBasedSource.FileBasedReader, make them top level and remove references
>> to parent classes.
>> - Simplify the interfaces, while maintaining support for block/offset
>> read & sequential write.
>> - As a bonus, the refactored IO classes can be used standalone in case
>> when the user wants to perform custom IO in a DoFn, i.e. a
>> PTransform, PCollection>>. Today
>> this requires a lot of copy-pasted Avro boilerplate.
>> - For compatibility, we can delegate to the new classes from the old ones
>> and remove them in the next breaking release.
>>
>> Re: WriteFiles logic, I'm not sure about generalizing it, but what about
>> splitting the part handling writing temp files into a new
>> PTransform>>,
>> PCollection>>? That splits the bucket-shard
>> logic from actual file IO.
>>
>> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw 
>> wrote:
>>
>>> I agree that generalizing the existing FileIO may not be the right
>>> path forward, and I'd only make their innards public with great care.
>>> (Would this be used like like
>>> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
>>> unique that the source and sink are much more coupled than other
>>> sources and sinks (which happen to be completely independent, if
>>> complementary implementations, whereas SMB attempts to be a kind of
>>> pipe where one half is instanciated in each pipeline).
>>>
>>> In short, an SMB source/sink that is parameterized by an arbitrary,
>>> existing IO would be ideal (but possibly not feasible (per existing
>>> prioritizations)), or an SMB source/sink that works as a pair. What
>>> I'd like to avoid is a set of parallel SMB IO classes that (partially,
>>> and incompletely) mirror the existing IO ones (from an API
>>> perspective--how much implementation it makes sense to share is an
>>> orthogonal issue that I'm sure can be worked out.)
>>>
>>> On Mon, Jul 15, 2019 at 4:18 PM Neville Li 
>>> wrote:
>>> >
>>> > Hi Robert,
>>> >
>>> > I agree, it'd be nice to reuse FileIO logic of different file types.
>>> But given the current code structure of FileIO & scope of the change, I
>>> feel it's better left for future refactor PRs.
>>> >
>>> > Some thoughts:
>>> > - SMB file operation is simple single file sequential reads/writes,
>>> which already exists as Writer & FileBasedReader but are private inner
>>> classes, and have references to the parent Sink/Source instance.
>>> > - The readers also have extra offset/split logic but that can be
>>> worked around.
>>> > - It'll be nice to not duplicate temp->destination file logic but
>>> again WriteFiles is assuming a single integer shard key, so it'll take some
>>> refactoring to reuse it.
>>> >
>>> > All of these can be done in backwards compatible way. OTOH
>>> generalizing the existing components too much (esp. WriteFiles, which is
>>> already complex) might lead to two logic paths, one specialized for the SMB
>>> case. It might be easier to decouple some of them for better reuse. But
>>> again I feel it's a separate discussion.
>>> >
>>> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty <
>>> claire.d.mcgi...@gmail.com> wrote:
>>> >>
>>> >> Thanks Robert!
>>> >>
>>> >> We'd definitely like to be able to re-use existing I/O
>>> components--for example the Writer>> OutputT>/FileBasedReader (since they operate on a
>>> WritableByteChannel/ReadableByteChannel, which is the level of granularity
>>> we need) but the Writers, at least, seem to be mostly private-access. Do
>>> you foresee them being made public at any point?
>>> >>
>>> >> - Claire
>>> >>
>>> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw 
>>> wrote:
>>> >>>
>>> >

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-15 Thread Gleb Kanterov
I share the same concern with Robert regarding re-implementing parts of IO.
At the same time, in the past, I worked on internal libraries that try to
re-use code from existing IO, and it's hardly possible because it feels
like it wasn't designed for re-use. There are a lot of classes that are
nested (non-static) or non-public. I can understand why they were made
non-public, it's a hard abstraction to design well and keep compatibility.
As Neville mentioned, decoupling readers and writers would not only benefit
for this proposal but for any other use-case that has to deal with
low-level API such as FileSystem API, that is hardly possible today without
copy-pasting,





On Mon, Jul 15, 2019 at 5:05 PM Neville Li  wrote:

> Re: avoiding mirroring IO functionality, what about:
>
> - Decouple the nested FileBasedSink.Writer and
> FileBasedSource.FileBasedReader, make them top level and remove references
> to parent classes.
> - Simplify the interfaces, while maintaining support for block/offset read
> & sequential write.
> - As a bonus, the refactored IO classes can be used standalone in case
> when the user wants to perform custom IO in a DoFn, i.e. a
> PTransform, PCollection>>. Today
> this requires a lot of copy-pasted Avro boilerplate.
> - For compatibility, we can delegate to the new classes from the old ones
> and remove them in the next breaking release.
>
> Re: WriteFiles logic, I'm not sure about generalizing it, but what about
> splitting the part handling writing temp files into a new
> PTransform>>,
> PCollection>>? That splits the bucket-shard
> logic from actual file IO.
>
> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw 
> wrote:
>
>> I agree that generalizing the existing FileIO may not be the right
>> path forward, and I'd only make their innards public with great care.
>> (Would this be used like like
>> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
>> unique that the source and sink are much more coupled than other
>> sources and sinks (which happen to be completely independent, if
>> complementary implementations, whereas SMB attempts to be a kind of
>> pipe where one half is instanciated in each pipeline).
>>
>> In short, an SMB source/sink that is parameterized by an arbitrary,
>> existing IO would be ideal (but possibly not feasible (per existing
>> prioritizations)), or an SMB source/sink that works as a pair. What
>> I'd like to avoid is a set of parallel SMB IO classes that (partially,
>> and incompletely) mirror the existing IO ones (from an API
>> perspective--how much implementation it makes sense to share is an
>> orthogonal issue that I'm sure can be worked out.)
>>
>> On Mon, Jul 15, 2019 at 4:18 PM Neville Li  wrote:
>> >
>> > Hi Robert,
>> >
>> > I agree, it'd be nice to reuse FileIO logic of different file types.
>> But given the current code structure of FileIO & scope of the change, I
>> feel it's better left for future refactor PRs.
>> >
>> > Some thoughts:
>> > - SMB file operation is simple single file sequential reads/writes,
>> which already exists as Writer & FileBasedReader but are private inner
>> classes, and have references to the parent Sink/Source instance.
>> > - The readers also have extra offset/split logic but that can be worked
>> around.
>> > - It'll be nice to not duplicate temp->destination file logic but again
>> WriteFiles is assuming a single integer shard key, so it'll take some
>> refactoring to reuse it.
>> >
>> > All of these can be done in backwards compatible way. OTOH generalizing
>> the existing components too much (esp. WriteFiles, which is already
>> complex) might lead to two logic paths, one specialized for the SMB case.
>> It might be easier to decouple some of them for better reuse. But again I
>> feel it's a separate discussion.
>> >
>> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty <
>> claire.d.mcgi...@gmail.com> wrote:
>> >>
>> >> Thanks Robert!
>> >>
>> >> We'd definitely like to be able to re-use existing I/O components--for
>> example the Writer/FileBasedReader (since they
>> operate on a WritableByteChannel/ReadableByteChannel, which is the level of
>> granularity we need) but the Writers, at least, seem to be mostly
>> private-access. Do you foresee them being made public at any point?
>> >>
>> >> - Claire
>> >>
>> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw 
>> wrote:
>> >>>
>> >>> I left some comments on the doc.
>> >>>
>> >>> I think the general idea is sound, but one thing that worries me is
>> >>> the introduction of a parallel set of IOs that mirrors the (existing)
>> >>> FileIOs. I would suggest either (1) incorporate this functionality
>> >>> into the generic FileIO infrastructure, or let it be parameterized by
>> >>> arbitrary IO (which I'm not sure is possible, especially for the Read
>> >>> side (and better would be the capability of supporting arbitrary
>> >>> sources, aka an optional "as-sharded-source" operation that returns a
>> >>> PTransform<..., KV>> where the iterable is
>>

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-15 Thread Neville Li
Re: avoiding mirroring IO functionality, what about:

- Decouple the nested FileBasedSink.Writer and
FileBasedSource.FileBasedReader, make them top level and remove references
to parent classes.
- Simplify the interfaces, while maintaining support for block/offset read
& sequential write.
- As a bonus, the refactored IO classes can be used standalone in case when
the user wants to perform custom IO in a DoFn, i.e. a
PTransform, PCollection>>. Today
this requires a lot of copy-pasted Avro boilerplate.
- For compatibility, we can delegate to the new classes from the old ones
and remove them in the next breaking release.

Re: WriteFiles logic, I'm not sure about generalizing it, but what about
splitting the part handling writing temp files into a new
PTransform>>,
PCollection>>? That splits the bucket-shard
logic from actual file IO.

On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw 
wrote:

> I agree that generalizing the existing FileIO may not be the right
> path forward, and I'd only make their innards public with great care.
> (Would this be used like like
> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
> unique that the source and sink are much more coupled than other
> sources and sinks (which happen to be completely independent, if
> complementary implementations, whereas SMB attempts to be a kind of
> pipe where one half is instanciated in each pipeline).
>
> In short, an SMB source/sink that is parameterized by an arbitrary,
> existing IO would be ideal (but possibly not feasible (per existing
> prioritizations)), or an SMB source/sink that works as a pair. What
> I'd like to avoid is a set of parallel SMB IO classes that (partially,
> and incompletely) mirror the existing IO ones (from an API
> perspective--how much implementation it makes sense to share is an
> orthogonal issue that I'm sure can be worked out.)
>
> On Mon, Jul 15, 2019 at 4:18 PM Neville Li  wrote:
> >
> > Hi Robert,
> >
> > I agree, it'd be nice to reuse FileIO logic of different file types. But
> given the current code structure of FileIO & scope of the change, I feel
> it's better left for future refactor PRs.
> >
> > Some thoughts:
> > - SMB file operation is simple single file sequential reads/writes,
> which already exists as Writer & FileBasedReader but are private inner
> classes, and have references to the parent Sink/Source instance.
> > - The readers also have extra offset/split logic but that can be worked
> around.
> > - It'll be nice to not duplicate temp->destination file logic but again
> WriteFiles is assuming a single integer shard key, so it'll take some
> refactoring to reuse it.
> >
> > All of these can be done in backwards compatible way. OTOH generalizing
> the existing components too much (esp. WriteFiles, which is already
> complex) might lead to two logic paths, one specialized for the SMB case.
> It might be easier to decouple some of them for better reuse. But again I
> feel it's a separate discussion.
> >
> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty <
> claire.d.mcgi...@gmail.com> wrote:
> >>
> >> Thanks Robert!
> >>
> >> We'd definitely like to be able to re-use existing I/O components--for
> example the Writer/FileBasedReader (since they
> operate on a WritableByteChannel/ReadableByteChannel, which is the level of
> granularity we need) but the Writers, at least, seem to be mostly
> private-access. Do you foresee them being made public at any point?
> >>
> >> - Claire
> >>
> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw 
> wrote:
> >>>
> >>> I left some comments on the doc.
> >>>
> >>> I think the general idea is sound, but one thing that worries me is
> >>> the introduction of a parallel set of IOs that mirrors the (existing)
> >>> FileIOs. I would suggest either (1) incorporate this functionality
> >>> into the generic FileIO infrastructure, or let it be parameterized by
> >>> arbitrary IO (which I'm not sure is possible, especially for the Read
> >>> side (and better would be the capability of supporting arbitrary
> >>> sources, aka an optional "as-sharded-source" operation that returns a
> >>> PTransform<..., KV>> where the iterable is
> >>> promised to be in key order)) or support a single SMB aka
> >>> "PreGrouping" source/sink pair that's aways used together (and whose
> >>> underlying format is not necessarily public).
> >>>
> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li 
> wrote:
> >>> >
> >>> > 4 people have commented but mostly clarifying details and not much
> on the overall design.
> >>> >
> >>> > It'd be great to have thumbs up/down on the design, specifically
> metadata, bucket & shard strategy, etc., since that affects backwards
> compatibility of output files.
> >>> > Some breaking changes, e.g. dynamic # of shards, are out of scope
> for V1 unless someone feels strongly about it. The current scope should
> cover all our use cases and leave room for optimization.
> >>> >
> >>> > Once green lighted we can start adopting internally, ironing out
> r

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-15 Thread Robert Bradshaw
I agree that generalizing the existing FileIO may not be the right
path forward, and I'd only make their innards public with great care.
(Would this be used like like
SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
unique that the source and sink are much more coupled than other
sources and sinks (which happen to be completely independent, if
complementary implementations, whereas SMB attempts to be a kind of
pipe where one half is instanciated in each pipeline).

In short, an SMB source/sink that is parameterized by an arbitrary,
existing IO would be ideal (but possibly not feasible (per existing
prioritizations)), or an SMB source/sink that works as a pair. What
I'd like to avoid is a set of parallel SMB IO classes that (partially,
and incompletely) mirror the existing IO ones (from an API
perspective--how much implementation it makes sense to share is an
orthogonal issue that I'm sure can be worked out.)

On Mon, Jul 15, 2019 at 4:18 PM Neville Li  wrote:
>
> Hi Robert,
>
> I agree, it'd be nice to reuse FileIO logic of different file types. But 
> given the current code structure of FileIO & scope of the change, I feel it's 
> better left for future refactor PRs.
>
> Some thoughts:
> - SMB file operation is simple single file sequential reads/writes, which 
> already exists as Writer & FileBasedReader but are private inner classes, and 
> have references to the parent Sink/Source instance.
> - The readers also have extra offset/split logic but that can be worked 
> around.
> - It'll be nice to not duplicate temp->destination file logic but again 
> WriteFiles is assuming a single integer shard key, so it'll take some 
> refactoring to reuse it.
>
> All of these can be done in backwards compatible way. OTOH generalizing the 
> existing components too much (esp. WriteFiles, which is already complex) 
> might lead to two logic paths, one specialized for the SMB case. It might be 
> easier to decouple some of them for better reuse. But again I feel it's a 
> separate discussion.
>
> On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty  
> wrote:
>>
>> Thanks Robert!
>>
>> We'd definitely like to be able to re-use existing I/O components--for 
>> example the Writer/FileBasedReader (since they 
>> operate on a WritableByteChannel/ReadableByteChannel, which is the level of 
>> granularity we need) but the Writers, at least, seem to be mostly 
>> private-access. Do you foresee them being made public at any point?
>>
>> - Claire
>>
>> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw  wrote:
>>>
>>> I left some comments on the doc.
>>>
>>> I think the general idea is sound, but one thing that worries me is
>>> the introduction of a parallel set of IOs that mirrors the (existing)
>>> FileIOs. I would suggest either (1) incorporate this functionality
>>> into the generic FileIO infrastructure, or let it be parameterized by
>>> arbitrary IO (which I'm not sure is possible, especially for the Read
>>> side (and better would be the capability of supporting arbitrary
>>> sources, aka an optional "as-sharded-source" operation that returns a
>>> PTransform<..., KV>> where the iterable is
>>> promised to be in key order)) or support a single SMB aka
>>> "PreGrouping" source/sink pair that's aways used together (and whose
>>> underlying format is not necessarily public).
>>>
>>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li  wrote:
>>> >
>>> > 4 people have commented but mostly clarifying details and not much on the 
>>> > overall design.
>>> >
>>> > It'd be great to have thumbs up/down on the design, specifically 
>>> > metadata, bucket & shard strategy, etc., since that affects backwards 
>>> > compatibility of output files.
>>> > Some breaking changes, e.g. dynamic # of shards, are out of scope for V1 
>>> > unless someone feels strongly about it. The current scope should cover 
>>> > all our use cases and leave room for optimization.
>>> >
>>> > Once green lighted we can start adopting internally, ironing out rough 
>>> > edges while iterating on the PRs in parallel.
>>> >
>>> > Most of the implementation is self-contained in the extensions:smb 
>>> > module, except making a few core classes/methods public for reuse. So 
>>> > despite the amount of work it's still fairly low risk to the code base. 
>>> > There're some proposed optimization & refactoring involving core (see 
>>> > appendix) but IMO they're better left for followup PRs.
>>> >
>>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles  wrote:
>>> >>
>>> >> I've seen some discussion on the doc. I cannot tell whether the 
>>> >> questions are resolved or what the status of review is. Would you mind 
>>> >> looping this thread with a quick summary? This is such a major piece of 
>>> >> work I don't want it to sit with everyone thinking they are waiting on 
>>> >> someone else, or any such thing. (not saying this is happening, just 
>>> >> pinging to be sure)
>>> >>
>>> >> Kenn
>>> >>
>>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li  wrote:
>>> >>>

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-15 Thread Neville Li
Hi Robert,

I agree, it'd be nice to reuse FileIO logic of different file types. But
given the current code structure of FileIO & scope of the change, I feel
it's better left for future refactor PRs.

Some thoughts:
- SMB file operation is simple single file sequential reads/writes, which
already exists as Writer & FileBasedReader but are private inner classes,
and have references to the parent Sink/Source instance.
- The readers also have extra offset/split logic but that can be worked
around.
- It'll be nice to not duplicate temp->destination file logic but again
WriteFiles is assuming a single integer shard key, so it'll take some
refactoring to reuse it.

All of these can be done in backwards compatible way. OTOH generalizing the
existing components too much (esp. WriteFiles, which is already complex)
might lead to two logic paths, one specialized for the SMB case. It might
be easier to decouple some of them for better reuse. But again I feel it's
a separate discussion.

On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty 
wrote:

> Thanks Robert!
>
> We'd definitely like to be able to re-use existing I/O components--for
> example the Writer
> 
> /FileBasedReader (since they operate on a
> WritableByteChannel/ReadableByteChannel, which is the level of granularity
> we need) but the Writers, at least, seem to be mostly private-access. Do
> you foresee them being made public at any point?
>
> - Claire
>
> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw 
> wrote:
>
>> I left some comments on the doc.
>>
>> I think the general idea is sound, but one thing that worries me is
>> the introduction of a parallel set of IOs that mirrors the (existing)
>> FileIOs. I would suggest either (1) incorporate this functionality
>> into the generic FileIO infrastructure, or let it be parameterized by
>> arbitrary IO (which I'm not sure is possible, especially for the Read
>> side (and better would be the capability of supporting arbitrary
>> sources, aka an optional "as-sharded-source" operation that returns a
>> PTransform<..., KV>> where the iterable is
>> promised to be in key order)) or support a single SMB aka
>> "PreGrouping" source/sink pair that's aways used together (and whose
>> underlying format is not necessarily public).
>>
>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li  wrote:
>> >
>> > 4 people have commented but mostly clarifying details and not much on
>> the overall design.
>> >
>> > It'd be great to have thumbs up/down on the design, specifically
>> metadata, bucket & shard strategy, etc., since that affects backwards
>> compatibility of output files.
>> > Some breaking changes, e.g. dynamic # of shards, are out of scope for
>> V1 unless someone feels strongly about it. The current scope should cover
>> all our use cases and leave room for optimization.
>> >
>> > Once green lighted we can start adopting internally, ironing out rough
>> edges while iterating on the PRs in parallel.
>> >
>> > Most of the implementation is self-contained in the extensions:smb
>> module, except making a few core classes/methods public for reuse. So
>> despite the amount of work it's still fairly low risk to the code base.
>> There're some proposed optimization & refactoring involving core (see
>> appendix) but IMO they're better left for followup PRs.
>> >
>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles 
>> wrote:
>> >>
>> >> I've seen some discussion on the doc. I cannot tell whether the
>> questions are resolved or what the status of review is. Would you mind
>> looping this thread with a quick summary? This is such a major piece of
>> work I don't want it to sit with everyone thinking they are waiting on
>> someone else, or any such thing. (not saying this is happening, just
>> pinging to be sure)
>> >>
>> >> Kenn
>> >>
>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li 
>> wrote:
>> >>>
>> >>> Updated the doc a bit with more future work (appendix). IMO most of
>> them are non-breaking and better done in separate PRs later since some
>> involve pretty big refactoring and are outside the scope of MVP.
>> >>>
>> >>> For now we'd really like to get feedback on some fundamental design
>> decisions and find a way to move forward.
>> >>>
>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li 
>> wrote:
>> 
>>  Thanks. I responded to comments in the doc. More inline.
>> 
>>  On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>> >
>> > Thanks added few comments.
>> >
>> > If I understood correctly, you basically assign elements with keys
>> to different buckets which are written to unique files and merge files for
>> the same key while reading ?
>> >
>> > Some of my concerns are.
>> >
>> > (1)  Seems like you rely on an in-memory sorting of buckets. Will
>> this end up limiting the size of a PCollection you ca

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-15 Thread Claire McGinty
Thanks Robert!

We'd definitely like to be able to re-use existing I/O components--for
example the Writer

/FileBasedReader  (since they operate on a
WritableByteChannel/ReadableByteChannel, which is the level of granularity
we need) but the Writers, at least, seem to be mostly private-access. Do
you foresee them being made public at any point?

- Claire

On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw  wrote:

> I left some comments on the doc.
>
> I think the general idea is sound, but one thing that worries me is
> the introduction of a parallel set of IOs that mirrors the (existing)
> FileIOs. I would suggest either (1) incorporate this functionality
> into the generic FileIO infrastructure, or let it be parameterized by
> arbitrary IO (which I'm not sure is possible, especially for the Read
> side (and better would be the capability of supporting arbitrary
> sources, aka an optional "as-sharded-source" operation that returns a
> PTransform<..., KV>> where the iterable is
> promised to be in key order)) or support a single SMB aka
> "PreGrouping" source/sink pair that's aways used together (and whose
> underlying format is not necessarily public).
>
> On Sat, Jul 13, 2019 at 3:19 PM Neville Li  wrote:
> >
> > 4 people have commented but mostly clarifying details and not much on
> the overall design.
> >
> > It'd be great to have thumbs up/down on the design, specifically
> metadata, bucket & shard strategy, etc., since that affects backwards
> compatibility of output files.
> > Some breaking changes, e.g. dynamic # of shards, are out of scope for V1
> unless someone feels strongly about it. The current scope should cover all
> our use cases and leave room for optimization.
> >
> > Once green lighted we can start adopting internally, ironing out rough
> edges while iterating on the PRs in parallel.
> >
> > Most of the implementation is self-contained in the extensions:smb
> module, except making a few core classes/methods public for reuse. So
> despite the amount of work it's still fairly low risk to the code base.
> There're some proposed optimization & refactoring involving core (see
> appendix) but IMO they're better left for followup PRs.
> >
> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles 
> wrote:
> >>
> >> I've seen some discussion on the doc. I cannot tell whether the
> questions are resolved or what the status of review is. Would you mind
> looping this thread with a quick summary? This is such a major piece of
> work I don't want it to sit with everyone thinking they are waiting on
> someone else, or any such thing. (not saying this is happening, just
> pinging to be sure)
> >>
> >> Kenn
> >>
> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li 
> wrote:
> >>>
> >>> Updated the doc a bit with more future work (appendix). IMO most of
> them are non-breaking and better done in separate PRs later since some
> involve pretty big refactoring and are outside the scope of MVP.
> >>>
> >>> For now we'd really like to get feedback on some fundamental design
> decisions and find a way to move forward.
> >>>
> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li 
> wrote:
> 
>  Thanks. I responded to comments in the doc. More inline.
> 
>  On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >
> > Thanks added few comments.
> >
> > If I understood correctly, you basically assign elements with keys
> to different buckets which are written to unique files and merge files for
> the same key while reading ?
> >
> > Some of my concerns are.
> >
> > (1)  Seems like you rely on an in-memory sorting of buckets. Will
> this end up limiting the size of a PCollection you can process ?
> 
>  The sorter transform we're using supports spilling and external sort.
> We can break up large key groups further by sharding, similar to fan out in
> some GBK transforms.
> 
> > (2) Seems like you rely on Reshuffle.viaRandomKey() which is
> actually implemented using a shuffle (which you try to replace with this
> proposal).
> 
>  That's for distributing task metadata, so that each DoFn thread picks
> up a random bucket and sort merge key-values. It's not shuffling actual
> data.
> 
> >
> > (3) I think (at least some of the) shuffle implementations are
> implemented in ways similar to this (writing to files and merging). So I'm
> wondering if the performance benefits you see are for a very specific case
> and may limit the functionality in other ways.
> 
>  This is for the common pattern of few core data producer pipelines
> and many downstream consumer pipelines. It's not intended to replace
> shuffle/join within a single pipeline. On the producer side, by
> pre-grouping/sorting data and

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-15 Thread Robert Bradshaw
I left some comments on the doc.

I think the general idea is sound, but one thing that worries me is
the introduction of a parallel set of IOs that mirrors the (existing)
FileIOs. I would suggest either (1) incorporate this functionality
into the generic FileIO infrastructure, or let it be parameterized by
arbitrary IO (which I'm not sure is possible, especially for the Read
side (and better would be the capability of supporting arbitrary
sources, aka an optional "as-sharded-source" operation that returns a
PTransform<..., KV>> where the iterable is
promised to be in key order)) or support a single SMB aka
"PreGrouping" source/sink pair that's aways used together (and whose
underlying format is not necessarily public).

On Sat, Jul 13, 2019 at 3:19 PM Neville Li  wrote:
>
> 4 people have commented but mostly clarifying details and not much on the 
> overall design.
>
> It'd be great to have thumbs up/down on the design, specifically metadata, 
> bucket & shard strategy, etc., since that affects backwards compatibility of 
> output files.
> Some breaking changes, e.g. dynamic # of shards, are out of scope for V1 
> unless someone feels strongly about it. The current scope should cover all 
> our use cases and leave room for optimization.
>
> Once green lighted we can start adopting internally, ironing out rough edges 
> while iterating on the PRs in parallel.
>
> Most of the implementation is self-contained in the extensions:smb module, 
> except making a few core classes/methods public for reuse. So despite the 
> amount of work it's still fairly low risk to the code base. There're some 
> proposed optimization & refactoring involving core (see appendix) but IMO 
> they're better left for followup PRs.
>
> On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles  wrote:
>>
>> I've seen some discussion on the doc. I cannot tell whether the questions 
>> are resolved or what the status of review is. Would you mind looping this 
>> thread with a quick summary? This is such a major piece of work I don't want 
>> it to sit with everyone thinking they are waiting on someone else, or any 
>> such thing. (not saying this is happening, just pinging to be sure)
>>
>> Kenn
>>
>> On Mon, Jul 1, 2019 at 1:09 PM Neville Li  wrote:
>>>
>>> Updated the doc a bit with more future work (appendix). IMO most of them 
>>> are non-breaking and better done in separate PRs later since some involve 
>>> pretty big refactoring and are outside the scope of MVP.
>>>
>>> For now we'd really like to get feedback on some fundamental design 
>>> decisions and find a way to move forward.
>>>
>>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li  wrote:

 Thanks. I responded to comments in the doc. More inline.

 On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath  
 wrote:
>
> Thanks added few comments.
>
> If I understood correctly, you basically assign elements with keys to 
> different buckets which are written to unique files and merge files for 
> the same key while reading ?
>
> Some of my concerns are.
>
> (1)  Seems like you rely on an in-memory sorting of buckets. Will this 
> end up limiting the size of a PCollection you can process ?

 The sorter transform we're using supports spilling and external sort. We 
 can break up large key groups further by sharding, similar to fan out in 
 some GBK transforms.

> (2) Seems like you rely on Reshuffle.viaRandomKey() which is actually 
> implemented using a shuffle (which you try to replace with this proposal).

 That's for distributing task metadata, so that each DoFn thread picks up a 
 random bucket and sort merge key-values. It's not shuffling actual data.

>
> (3) I think (at least some of the) shuffle implementations are 
> implemented in ways similar to this (writing to files and merging). So 
> I'm wondering if the performance benefits you see are for a very specific 
> case and may limit the functionality in other ways.

 This is for the common pattern of few core data producer pipelines and 
 many downstream consumer pipelines. It's not intended to replace 
 shuffle/join within a single pipeline. On the producer side, by 
 pre-grouping/sorting data and writing to bucket/shard output files, the 
 consumer can sort/merge matching ones without a CoGBK. Essentially we're 
 paying the shuffle cost upfront to avoid them repeatedly in each consumer 
 pipeline that wants to join data.

>
> Thanks,
> Cham
>
>
> On Thu, Jun 27, 2019 at 8:12 AM Neville Li  wrote:
>>
>> Ping again. Any chance someone takes a look to get this thing going? 
>> It's just a design doc and basic metadata/IO impl. We're not talking 
>> about actual source/sink code yet (already done but saved for future 
>> PRs).
>>
>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay  wrote:
>>>
>>> Thank you Claire, this looks 

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-13 Thread Neville Li
4 people have commented but mostly clarifying details and not much on the
overall design.

It'd be great to have thumbs up/down on the design, specifically metadata,
bucket & shard strategy, etc., since that affects backwards compatibility
of output files.
Some breaking changes, e.g. dynamic # of shards, are out of scope for V1
unless someone feels strongly about it. The current scope should cover all
our use cases and leave room for optimization.

Once green lighted we can start adopting internally, ironing out rough
edges while iterating on the PRs in parallel.

Most of the implementation is self-contained in the extensions:smb module,
except making a few core classes/methods public for reuse. So despite the
amount of work it's still fairly low risk to the code base. There're some
proposed optimization & refactoring involving core (see appendix) but IMO
they're better left for followup PRs.

On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles  wrote:

> I've seen some discussion on the doc. I cannot tell whether the questions
> are resolved or what the status of review is. Would you mind looping this
> thread with a quick summary? This is such a major piece of work I don't
> want it to sit with everyone thinking they are waiting on someone else, or
> any such thing. (not saying this is happening, just pinging to be sure)
>
> Kenn
>
> On Mon, Jul 1, 2019 at 1:09 PM Neville Li  wrote:
>
>> Updated the doc a bit with more future work (appendix). IMO most of them
>> are non-breaking and better done in separate PRs later since some involve
>> pretty big refactoring and are outside the scope of MVP.
>>
>> For now we'd really like to get feedback on some fundamental design
>> decisions and find a way to move forward.
>>
>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li  wrote:
>>
>>> Thanks. I responded to comments in the doc. More inline.
>>>
>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath 
>>> wrote:
>>>
 Thanks added few comments.

 If I understood correctly, you basically assign elements with keys to
 different buckets which are written to unique files and merge files for the
 same key while reading ?

 Some of my concerns are.

 (1)  Seems like you rely on an in-memory sorting of buckets. Will this
 end up limiting the size of a PCollection you can process ?

>>> The sorter transform we're using supports spilling and external sort. We
>>> can break up large key groups further by sharding, similar to fan out in
>>> some GBK transforms.
>>>
>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which is actually
 implemented using a shuffle (which you try to replace with this proposal).

>>> That's for distributing task metadata, so that each DoFn thread picks up
>>> a random bucket and sort merge key-values. It's not shuffling actual data.
>>>
>>>
 (3) I think (at least some of the) shuffle implementations are
 implemented in ways similar to this (writing to files and merging). So I'm
 wondering if the performance benefits you see are for a very specific case
 and may limit the functionality in other ways.

>>> This is for the common pattern of few core data producer pipelines and
>>> many downstream consumer pipelines. It's not intended to replace
>>> shuffle/join within a single pipeline. On the producer side, by
>>> pre-grouping/sorting data and writing to bucket/shard output files, the
>>> consumer can sort/merge matching ones without a CoGBK. Essentially we're
>>> paying the shuffle cost upfront to avoid them repeatedly in each consumer
>>> pipeline that wants to join data.
>>>
>>>
 Thanks,
 Cham


 On Thu, Jun 27, 2019 at 8:12 AM Neville Li 
 wrote:

> Ping again. Any chance someone takes a look to get this thing going?
> It's just a design doc and basic metadata/IO impl. We're not talking about
> actual source/sink code yet (already done but saved for future PRs).
>
> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay  wrote:
>
>> Thank you Claire, this looks promising. Explicitly adding a few folks
>> that might have feedback: +Ismaël Mejía  +Robert
>> Bradshaw  +Lukasz Cwik  +Chamikara
>> Jayalath 
>>
>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty <
>> claire.d.mcgi...@gmail.com> wrote:
>>
>>> Hey dev@!
>>>
>>> Myself and a few other Spotify data engineers have put together a design
>>> doc for SMB Join support in Beam
>>> ,
>>>  and
>>> have a working Java implementation we've started to put up for PR ([
>>> 0 ], [1
>>> ], [2
>>> ]). There's more detailed
>>> information in the document, but the tl;dr is that SMB is a strategy to
>>> optimize joins for file-based sources by modify

Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-12 Thread Kenneth Knowles
I've seen some discussion on the doc. I cannot tell whether the questions
are resolved or what the status of review is. Would you mind looping this
thread with a quick summary? This is such a major piece of work I don't
want it to sit with everyone thinking they are waiting on someone else, or
any such thing. (not saying this is happening, just pinging to be sure)

Kenn

On Mon, Jul 1, 2019 at 1:09 PM Neville Li  wrote:

> Updated the doc a bit with more future work (appendix). IMO most of them
> are non-breaking and better done in separate PRs later since some involve
> pretty big refactoring and are outside the scope of MVP.
>
> For now we'd really like to get feedback on some fundamental design
> decisions and find a way to move forward.
>
> On Thu, Jun 27, 2019 at 4:39 PM Neville Li  wrote:
>
>> Thanks. I responded to comments in the doc. More inline.
>>
>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath 
>> wrote:
>>
>>> Thanks added few comments.
>>>
>>> If I understood correctly, you basically assign elements with keys to
>>> different buckets which are written to unique files and merge files for the
>>> same key while reading ?
>>>
>>> Some of my concerns are.
>>>
>>> (1)  Seems like you rely on an in-memory sorting of buckets. Will this
>>> end up limiting the size of a PCollection you can process ?
>>>
>> The sorter transform we're using supports spilling and external sort. We
>> can break up large key groups further by sharding, similar to fan out in
>> some GBK transforms.
>>
>> (2) Seems like you rely on Reshuffle.viaRandomKey() which is actually
>>> implemented using a shuffle (which you try to replace with this proposal).
>>>
>> That's for distributing task metadata, so that each DoFn thread picks up
>> a random bucket and sort merge key-values. It's not shuffling actual data.
>>
>>
>>> (3) I think (at least some of the) shuffle implementations are
>>> implemented in ways similar to this (writing to files and merging). So I'm
>>> wondering if the performance benefits you see are for a very specific case
>>> and may limit the functionality in other ways.
>>>
>> This is for the common pattern of few core data producer pipelines and
>> many downstream consumer pipelines. It's not intended to replace
>> shuffle/join within a single pipeline. On the producer side, by
>> pre-grouping/sorting data and writing to bucket/shard output files, the
>> consumer can sort/merge matching ones without a CoGBK. Essentially we're
>> paying the shuffle cost upfront to avoid them repeatedly in each consumer
>> pipeline that wants to join data.
>>
>>
>>> Thanks,
>>> Cham
>>>
>>>
>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li 
>>> wrote:
>>>
 Ping again. Any chance someone takes a look to get this thing going?
 It's just a design doc and basic metadata/IO impl. We're not talking about
 actual source/sink code yet (already done but saved for future PRs).

 On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay  wrote:

> Thank you Claire, this looks promising. Explicitly adding a few folks
> that might have feedback: +Ismaël Mejía  +Robert
> Bradshaw  +Lukasz Cwik  +Chamikara
> Jayalath 
>
> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty <
> claire.d.mcgi...@gmail.com> wrote:
>
>> Hey dev@!
>>
>> Myself and a few other Spotify data engineers have put together a design
>> doc for SMB Join support in Beam
>> ,
>>  and
>> have a working Java implementation we've started to put up for PR ([0
>> ], [1
>> ], [2
>> ]). There's more detailed
>> information in the document, but the tl;dr is that SMB is a strategy to
>> optimize joins for file-based sources by modifying the initial write
>> operation to write records in sorted buckets based on the desired join 
>> key.
>> This means that subsequent joins of datasets written in this way are only
>> sequential file reads, no shuffling involved. We've seen some pretty
>> substantial performance speedups with our implementation and would love 
>> to
>> get it checked in to Beam's Java SDK.
>>
>> We'd appreciate any suggestions or feedback on our proposal--the
>> design doc should be public to comment on.
>>
>> Thanks!
>> Claire / Neville
>>
>


Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-07-01 Thread Neville Li
Updated the doc a bit with more future work (appendix). IMO most of them
are non-breaking and better done in separate PRs later since some involve
pretty big refactoring and are outside the scope of MVP.

For now we'd really like to get feedback on some fundamental design
decisions and find a way to move forward.

On Thu, Jun 27, 2019 at 4:39 PM Neville Li  wrote:

> Thanks. I responded to comments in the doc. More inline.
>
> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath 
> wrote:
>
>> Thanks added few comments.
>>
>> If I understood correctly, you basically assign elements with keys to
>> different buckets which are written to unique files and merge files for the
>> same key while reading ?
>>
>> Some of my concerns are.
>>
>> (1)  Seems like you rely on an in-memory sorting of buckets. Will this
>> end up limiting the size of a PCollection you can process ?
>>
> The sorter transform we're using supports spilling and external sort. We
> can break up large key groups further by sharding, similar to fan out in
> some GBK transforms.
>
> (2) Seems like you rely on Reshuffle.viaRandomKey() which is actually
>> implemented using a shuffle (which you try to replace with this proposal).
>>
> That's for distributing task metadata, so that each DoFn thread picks up a
> random bucket and sort merge key-values. It's not shuffling actual data.
>
>
>> (3) I think (at least some of the) shuffle implementations are
>> implemented in ways similar to this (writing to files and merging). So I'm
>> wondering if the performance benefits you see are for a very specific case
>> and may limit the functionality in other ways.
>>
> This is for the common pattern of few core data producer pipelines and
> many downstream consumer pipelines. It's not intended to replace
> shuffle/join within a single pipeline. On the producer side, by
> pre-grouping/sorting data and writing to bucket/shard output files, the
> consumer can sort/merge matching ones without a CoGBK. Essentially we're
> paying the shuffle cost upfront to avoid them repeatedly in each consumer
> pipeline that wants to join data.
>
>
>> Thanks,
>> Cham
>>
>>
>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li  wrote:
>>
>>> Ping again. Any chance someone takes a look to get this thing going?
>>> It's just a design doc and basic metadata/IO impl. We're not talking about
>>> actual source/sink code yet (already done but saved for future PRs).
>>>
>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay  wrote:
>>>
 Thank you Claire, this looks promising. Explicitly adding a few folks
 that might have feedback: +Ismaël Mejía  +Robert
 Bradshaw  +Lukasz Cwik  +Chamikara
 Jayalath 

 On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty <
 claire.d.mcgi...@gmail.com> wrote:

> Hey dev@!
>
> Myself and a few other Spotify data engineers have put together a design
> doc for SMB Join support in Beam
> ,
>  and
> have a working Java implementation we've started to put up for PR ([0
> ], [1
> ], [2
> ]). There's more detailed
> information in the document, but the tl;dr is that SMB is a strategy to
> optimize joins for file-based sources by modifying the initial write
> operation to write records in sorted buckets based on the desired join 
> key.
> This means that subsequent joins of datasets written in this way are only
> sequential file reads, no shuffling involved. We've seen some pretty
> substantial performance speedups with our implementation and would love to
> get it checked in to Beam's Java SDK.
>
> We'd appreciate any suggestions or feedback on our proposal--the
> design doc should be public to comment on.
>
> Thanks!
> Claire / Neville
>



Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-06-27 Thread Neville Li
Thanks. I responded to comments in the doc. More inline.

On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath 
wrote:

> Thanks added few comments.
>
> If I understood correctly, you basically assign elements with keys to
> different buckets which are written to unique files and merge files for the
> same key while reading ?
>
> Some of my concerns are.
>
> (1)  Seems like you rely on an in-memory sorting of buckets. Will this end
> up limiting the size of a PCollection you can process ?
>
The sorter transform we're using supports spilling and external sort. We
can break up large key groups further by sharding, similar to fan out in
some GBK transforms.

(2) Seems like you rely on Reshuffle.viaRandomKey() which is actually
> implemented using a shuffle (which you try to replace with this proposal).
>
That's for distributing task metadata, so that each DoFn thread picks up a
random bucket and sort merge key-values. It's not shuffling actual data.


> (3) I think (at least some of the) shuffle implementations are implemented
> in ways similar to this (writing to files and merging). So I'm wondering if
> the performance benefits you see are for a very specific case and may limit
> the functionality in other ways.
>
This is for the common pattern of few core data producer pipelines and many
downstream consumer pipelines. It's not intended to replace shuffle/join
within a single pipeline. On the producer side, by pre-grouping/sorting
data and writing to bucket/shard output files, the consumer can sort/merge
matching ones without a CoGBK. Essentially we're paying the shuffle cost
upfront to avoid them repeatedly in each consumer pipeline that wants to
join data.


> Thanks,
> Cham
>
>
> On Thu, Jun 27, 2019 at 8:12 AM Neville Li  wrote:
>
>> Ping again. Any chance someone takes a look to get this thing going? It's
>> just a design doc and basic metadata/IO impl. We're not talking about
>> actual source/sink code yet (already done but saved for future PRs).
>>
>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay  wrote:
>>
>>> Thank you Claire, this looks promising. Explicitly adding a few folks
>>> that might have feedback: +Ismaël Mejía  +Robert
>>> Bradshaw  +Lukasz Cwik  +Chamikara
>>> Jayalath 
>>>
>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty <
>>> claire.d.mcgi...@gmail.com> wrote:
>>>
 Hey dev@!

 Myself and a few other Spotify data engineers have put together a design
 doc for SMB Join support in Beam
 ,
  and
 have a working Java implementation we've started to put up for PR ([0
 ], [1
 ], [2
 ]). There's more detailed
 information in the document, but the tl;dr is that SMB is a strategy to
 optimize joins for file-based sources by modifying the initial write
 operation to write records in sorted buckets based on the desired join key.
 This means that subsequent joins of datasets written in this way are only
 sequential file reads, no shuffling involved. We've seen some pretty
 substantial performance speedups with our implementation and would love to
 get it checked in to Beam's Java SDK.

 We'd appreciate any suggestions or feedback on our proposal--the design
 doc should be public to comment on.

 Thanks!
 Claire / Neville

>>>


Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-06-27 Thread Chamikara Jayalath
Thanks added few comments.

If I understood correctly, you basically assign elements with keys to
different buckets which are written to unique files and merge files for the
same key while reading ?

Some of my concerns are.

(1)  Seems like you rely on an in-memory sorting of buckets. Will this end
up limiting the size of a PCollection you can process ?
(2) Seems like you rely on Reshuffle.viaRandomKey() which is actually
implemented using a shuffle (which you try to replace with this proposal).
(3) I think (at least some of the) shuffle implementations are implemented
in ways similar to this (writing to files and merging). So I'm wondering if
the performance benefits you see are for a very specific case and may limit
the functionality in other ways.

Thanks,
Cham


On Thu, Jun 27, 2019 at 8:12 AM Neville Li  wrote:

> Ping again. Any chance someone takes a look to get this thing going? It's
> just a design doc and basic metadata/IO impl. We're not talking about
> actual source/sink code yet (already done but saved for future PRs).
>
> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay  wrote:
>
>> Thank you Claire, this looks promising. Explicitly adding a few folks
>> that might have feedback: +Ismaël Mejía  +Robert
>> Bradshaw  +Lukasz Cwik  +Chamikara
>> Jayalath 
>>
>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty <
>> claire.d.mcgi...@gmail.com> wrote:
>>
>>> Hey dev@!
>>>
>>> Myself and a few other Spotify data engineers have put together a design
>>> doc for SMB Join support in Beam
>>> ,
>>>  and
>>> have a working Java implementation we've started to put up for PR ([0
>>> ], [1
>>> ], [2
>>> ]). There's more detailed
>>> information in the document, but the tl;dr is that SMB is a strategy to
>>> optimize joins for file-based sources by modifying the initial write
>>> operation to write records in sorted buckets based on the desired join key.
>>> This means that subsequent joins of datasets written in this way are only
>>> sequential file reads, no shuffling involved. We've seen some pretty
>>> substantial performance speedups with our implementation and would love to
>>> get it checked in to Beam's Java SDK.
>>>
>>> We'd appreciate any suggestions or feedback on our proposal--the design
>>> doc should be public to comment on.
>>>
>>> Thanks!
>>> Claire / Neville
>>>
>>


Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-06-27 Thread Neville Li
Ping again. Any chance someone takes a look to get this thing going? It's
just a design doc and basic metadata/IO impl. We're not talking about
actual source/sink code yet (already done but saved for future PRs).

On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay  wrote:

> Thank you Claire, this looks promising. Explicitly adding a few folks that
> might have feedback: +Ismaël Mejía  +Robert Bradshaw
>  +Lukasz Cwik  +Chamikara Jayalath
> 
>
> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty 
> wrote:
>
>> Hey dev@!
>>
>> Myself and a few other Spotify data engineers have put together a design
>> doc for SMB Join support in Beam
>> ,
>>  and
>> have a working Java implementation we've started to put up for PR ([0
>> ], [1
>> ], [2
>> ]). There's more detailed
>> information in the document, but the tl;dr is that SMB is a strategy to
>> optimize joins for file-based sources by modifying the initial write
>> operation to write records in sorted buckets based on the desired join key.
>> This means that subsequent joins of datasets written in this way are only
>> sequential file reads, no shuffling involved. We've seen some pretty
>> substantial performance speedups with our implementation and would love to
>> get it checked in to Beam's Java SDK.
>>
>> We'd appreciate any suggestions or feedback on our proposal--the design
>> doc should be public to comment on.
>>
>> Thanks!
>> Claire / Neville
>>
>


Re: Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-06-21 Thread Ahmet Altay
Thank you Claire, this looks promising. Explicitly adding a few folks that
might have feedback: +Ismaël Mejía  +Robert Bradshaw
 +Lukasz Cwik  +Chamikara Jayalath


On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty 
wrote:

> Hey dev@!
>
> Myself and a few other Spotify data engineers have put together a design
> doc for SMB Join support in Beam
> ,
>  and
> have a working Java implementation we've started to put up for PR ([0
> ], [1
> ], [2
> ]). There's more detailed
> information in the document, but the tl;dr is that SMB is a strategy to
> optimize joins for file-based sources by modifying the initial write
> operation to write records in sorted buckets based on the desired join key.
> This means that subsequent joins of datasets written in this way are only
> sequential file reads, no shuffling involved. We've seen some pretty
> substantial performance speedups with our implementation and would love to
> get it checked in to Beam's Java SDK.
>
> We'd appreciate any suggestions or feedback on our proposal--the design
> doc should be public to comment on.
>
> Thanks!
> Claire / Neville
>


Discussion/Proposal: support Sort Merge Bucket joins in Beam

2019-06-17 Thread Claire McGinty
Hey dev@!

Myself and a few other Spotify data engineers have put together a design
doc for SMB Join support in Beam
,
and
have a working Java implementation we've started to put up for PR ([0
], [1
], [2
]). There's more detailed
information in the document, but the tl;dr is that SMB is a strategy to
optimize joins for file-based sources by modifying the initial write
operation to write records in sorted buckets based on the desired join key.
This means that subsequent joins of datasets written in this way are only
sequential file reads, no shuffling involved. We've seen some pretty
substantial performance speedups with our implementation and would love to
get it checked in to Beam's Java SDK.

We'd appreciate any suggestions or feedback on our proposal--the design doc
should be public to comment on.

Thanks!
Claire / Neville