Re: [Question] Best practices about passing binary files between transforms

2021-07-08 Thread Ignacio Taranto
I first passed the binary files as bytes, which is not ideal due to their size.
Then I constructed some ReadableFile instances from paths from a local
 directory which only works by chance (I know) probably because during
my tests Beam is only executing in one worker.
I tried both the Direct runner and Flink.

I guess I need a separate storage service like you mentioned to make
this more efficient.
I thought that Beam had a way to copy files, store them in the
worker's local FS and propagate that through all the pipeline.

On Thu, Jul 8, 2021 at 3:55 PM Luke Cwik  wrote:
>
> How do you find and read the files from the workers now?
>
> You could use network shares such as NFS or setup an HDFS cluster or use 
> cloud storage to ensure that the files are accessible from all the machines,
>
>
> On Thu, Jul 8, 2021 at 9:33 AM Ignacio Taranto 
>  wrote:
>>
>> That was my intention at some point, but I don't know how to pass
>> around "file paths" that work in the whole cluster, even in different
>> workers.
>> Is there any Beam abstraction for that?
>>
>> On Thu, Jul 8, 2021 at 12:45 PM Luke Cwik  wrote:
>> >
>> > The file sizes you describe don't seem to be too large so it may be a 
>> > Flink configuration issue but even if you did it does seem inefficient to 
>> > pass around the binary blob instead of passing around metadata about the 
>> > binary blob.
>> >
>> > For example your records could look like:
>> > {PathToTarFile, PathOfFileWithinTar, ... additional data you need ...}
>> > This way you can always load the data on demand as needed.
>> >
>> > You could also have two types of records, one which always passes around 
>> > the data if the file is small (to prevent needing to uncompress the file) 
>> > and another type which will load the data on demand as needed.
>> >
>> > On Wed, Jul 7, 2021 at 1:01 PM Ignacio Taranto 
>> >  wrote:
>> >>
>> >> Hello everyone,
>> >>
>> >> At my company we are considering using Apache Beam as part of our
>> >> Analytics system using the Python SDK.
>> >>
>> >> Our dataset consists of an unbounded collection of TAR (gzipped)
>> >> archives which contain several JSON and binary files.
>> >> These TAR files need to be split into sub-categories, so, essentially
>> >> outputting a new collection composed of smaller parts.
>> >> Our transforms will operate over this second collection.
>> >>
>> >> The  size of the compressed TAR archive files is around 10 MiB and the
>> >> largest binary files we have are around 16 MiB.
>> >> We only have a couple of these, the rest of the binary files are
>> >> smaller than that.
>> >>
>> >> Also, in some cases, we may want some transformations to generate new
>> >> binary files from this collection.
>> >>
>> >> The first problem I encountered is that there's no native way to
>> >> extract TAR archives, so my first approach was to unpack the TAR in
>> >> place (in a temporary directory) and then return the JSON files as
>> >> objects and the binary files as bytes.
>> >> But this crashes the Flink runner due to the large memory consumption.
>> >>
>> >> Is there a way to pass large binary files along each instance of the 
>> >> pipeline?
>> >>
>> >> I'm aware of fileio.py, I tried using WriteToFiles to write the
>> >> unpacked binary files with no success.
>> >> Apparently WriteToFiles groups all the files data into the same file.
>> >>
>> >> I'm also aware that I can implement my own IO transforms using
>> >> FileBasedSource and FileBasedSink but it seems these classes are
>> >> "record oriented" which is not very useful for us.
>> >>
>> >> Is Apache Beam the right framework for us?
>> >> Can we implement our system using Beam?
>> >>
>> >> Thanks,
>> >> Ignacio.
>> >>
>> >> --
>> >>
>> >>
>> >> This e-mail and any attachments may contain information that is
>> >> privileged, confidential,  and/or exempt from disclosure under applicable
>> >> law.  If you are not the intended recipient, you are hereby notified that
>> >> any disclosure, copying, distribution or use of any information contained
>> >> herein is strictly prohibited. If you have received this transmission in
>> >> error, please immediately notify the sender and destroy the original
>> >> transmission and any attachments, whether in electronic or hard copy
>> >> format, without reading or saving.
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>>
>> --
>>
>>
>> This e-mail and any attachments may contain information that is
>> privileged, confidential,  and/or exempt from disclosure under applicable
>> law.  If you are not the intended recipient, you are hereby notified that
>> any disclosure, copying, distribution or use of any information contained
>> herein is strictly prohibited. If you have received this transmission in
>> error, please immediately notify the sender and destroy the original
>> transmission and any attachments, whether in electronic or hard copy
>> format, without reading or saving.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>

-- 


This e-mail a

Re: [Question] Best practices about passing binary files between transforms

2021-07-08 Thread Ignacio Taranto
That was my intention at some point, but I don't know how to pass
around "file paths" that work in the whole cluster, even in different
workers.
Is there any Beam abstraction for that?

On Thu, Jul 8, 2021 at 12:45 PM Luke Cwik  wrote:
>
> The file sizes you describe don't seem to be too large so it may be a Flink 
> configuration issue but even if you did it does seem inefficient to pass 
> around the binary blob instead of passing around metadata about the binary 
> blob.
>
> For example your records could look like:
> {PathToTarFile, PathOfFileWithinTar, ... additional data you need ...}
> This way you can always load the data on demand as needed.
>
> You could also have two types of records, one which always passes around the 
> data if the file is small (to prevent needing to uncompress the file) and 
> another type which will load the data on demand as needed.
>
> On Wed, Jul 7, 2021 at 1:01 PM Ignacio Taranto 
>  wrote:
>>
>> Hello everyone,
>>
>> At my company we are considering using Apache Beam as part of our
>> Analytics system using the Python SDK.
>>
>> Our dataset consists of an unbounded collection of TAR (gzipped)
>> archives which contain several JSON and binary files.
>> These TAR files need to be split into sub-categories, so, essentially
>> outputting a new collection composed of smaller parts.
>> Our transforms will operate over this second collection.
>>
>> The  size of the compressed TAR archive files is around 10 MiB and the
>> largest binary files we have are around 16 MiB.
>> We only have a couple of these, the rest of the binary files are
>> smaller than that.
>>
>> Also, in some cases, we may want some transformations to generate new
>> binary files from this collection.
>>
>> The first problem I encountered is that there's no native way to
>> extract TAR archives, so my first approach was to unpack the TAR in
>> place (in a temporary directory) and then return the JSON files as
>> objects and the binary files as bytes.
>> But this crashes the Flink runner due to the large memory consumption.
>>
>> Is there a way to pass large binary files along each instance of the 
>> pipeline?
>>
>> I'm aware of fileio.py, I tried using WriteToFiles to write the
>> unpacked binary files with no success.
>> Apparently WriteToFiles groups all the files data into the same file.
>>
>> I'm also aware that I can implement my own IO transforms using
>> FileBasedSource and FileBasedSink but it seems these classes are
>> "record oriented" which is not very useful for us.
>>
>> Is Apache Beam the right framework for us?
>> Can we implement our system using Beam?
>>
>> Thanks,
>> Ignacio.
>>
>> --
>>
>>
>> This e-mail and any attachments may contain information that is
>> privileged, confidential,  and/or exempt from disclosure under applicable
>> law.  If you are not the intended recipient, you are hereby notified that
>> any disclosure, copying, distribution or use of any information contained
>> herein is strictly prohibited. If you have received this transmission in
>> error, please immediately notify the sender and destroy the original
>> transmission and any attachments, whether in electronic or hard copy
>> format, without reading or saving.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>

-- 


This e-mail and any attachments may contain information that is 
privileged, confidential,  and/or exempt from disclosure under applicable 
law.  If you are not the intended recipient, you are hereby notified that 
any disclosure, copying, distribution or use of any information contained 
herein is strictly prohibited. If you have received this transmission in 
error, please immediately notify the sender and destroy the original 
transmission and any attachments, whether in electronic or hard copy 
format, without reading or saving.














[Question] Best practices about passing binary files between transforms

2021-07-07 Thread Ignacio Taranto
Hello everyone,

At my company we are considering using Apache Beam as part of our
Analytics system using the Python SDK.

Our dataset consists of an unbounded collection of TAR (gzipped)
archives which contain several JSON and binary files.
These TAR files need to be split into sub-categories, so, essentially
outputting a new collection composed of smaller parts.
Our transforms will operate over this second collection.

The  size of the compressed TAR archive files is around 10 MiB and the
largest binary files we have are around 16 MiB.
We only have a couple of these, the rest of the binary files are
smaller than that.

Also, in some cases, we may want some transformations to generate new
binary files from this collection.

The first problem I encountered is that there's no native way to
extract TAR archives, so my first approach was to unpack the TAR in
place (in a temporary directory) and then return the JSON files as
objects and the binary files as bytes.
But this crashes the Flink runner due to the large memory consumption.

Is there a way to pass large binary files along each instance of the pipeline?

I'm aware of fileio.py, I tried using WriteToFiles to write the
unpacked binary files with no success.
Apparently WriteToFiles groups all the files data into the same file.

I'm also aware that I can implement my own IO transforms using
FileBasedSource and FileBasedSink but it seems these classes are
"record oriented" which is not very useful for us.

Is Apache Beam the right framework for us?
Can we implement our system using Beam?

Thanks,
Ignacio.

-- 


This e-mail and any attachments may contain information that is 
privileged, confidential,  and/or exempt from disclosure under applicable 
law.  If you are not the intended recipient, you are hereby notified that 
any disclosure, copying, distribution or use of any information contained 
herein is strictly prohibited. If you have received this transmission in 
error, please immediately notify the sender and destroy the original 
transmission and any attachments, whether in electronic or hard copy 
format, without reading or saving.