Re: pipeline steps

2019-02-14 Thread Yi Pan
@Chamikara, if adding the metadata interface class is too much an effort
now, I would accept the solution with some special PTransform method that
adds the metadata to the output data types. What I wonder is that if this
kind of PTransform becomes more popular to many different BeamIO's, I may
as well extend the existing BeamIO classes to automatically apply this
PTransform before we deliver the raw message types from IO (e.g.
Samza's SystemDescriptor implementation follows this pattern by allowing an
optional InputTransformer). Maybe we can wait to discuss when this use case
becomes more popular.

Best,

-Yi

On Tue, Feb 12, 2019 at 4:52 PM Chamikara Jayalath 
wrote:

> I think introducing a class hierarchy for extracting metadata from IO
> connectors might end up being an overkill. I think what we need to do is to
> add new transforms to various IO connectors that would return associated
> metadata along with each record. This will be fine performance-wise as well
> since current transforms will not be affected.  Each source will end up
> having it's own implementation (as needed) but file-based source transforms
> will end-up sharing a bunch of code here since they share underlying code
> for handling files. For example, we could add a,
>
> *public class ReadAllViaFileBasedSource extends
> PTransform, PCollection>>*
>
> where *KV* represents filename and the original record.
>
> One caveat for file-based sources though is that we won't be able to
> support Read transforms since we basically feed in a glob and get a bunch
> of records (splitting happens within the source so composite transform is
> not aware of exact files that would produce records unless we implement a
> new source). ReadFiles/ReadAll transforms should be more flexible and we
> should be able to adapt them to support returning file-names (and they'll
> have the full power of Read transforms after SDF).
>
> Thanks,
> Cham
>
>
> On Tue, Feb 12, 2019 at 12:42 PM Yi Pan  wrote:
>
>> The "general way" is what I was hoping to convey the idea (apologize if
>> KafkaIO is not a good example for that). More specifically, if KafkaIO
>> returns metadata in KafkaRecord, and somehow, FileIO returns a file name
>> associated with each record, it seems that it would make sense to define a
>> general interface of metadata for each record across different BeamIO, as
>> an optional envelope information. i.e. IOContext is the interface
>> associated with each record and KafkaIO implements KafkaIOContext, and
>> FileIO implements FileIOContext, etc.
>>
>> Best.
>>
>> On Mon, Feb 11, 2019 at 3:14 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> Talking about KafkaIO, it’s already possible to have this since*
>>> "apply(KafkaIO.read())"* returns
>>> *"PCollection>”* where  *KafkaRecord* contains
>>> message metadata (topic, partition, etc).
>>> Though, it works _only_ if* “withoutMetadata()”*  was not used before -
>>> in this case it will return simple *KV*.
>>>
>>> In the same time, I agree that it would be useful to have some general
>>> way to obtain meta information of records across all Beam IOs.
>>>
>>> On 7 Feb 2019, at 18:25, Yi Pan  wrote:
>>>
>>> Shouldn't this apply to more generic scenario for any BeamIO? For
>>> example, I am using KafkaIO and wanted to get the topic and partition from
>>> which the message was received. Some IOContext associated with each data
>>> unit from BeamIO may be useful here?
>>>
>>> -Yi
>>>
>>> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  wrote:
>>>
 This comes up a lot, wanting file names alongside the data that came
 from the file. It is a historical quirk that none of our connectors used to
 have the file names. What is the change needed for FileIO + parse Avro to
 be really easy to use?

 Kenn

 On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:

> I haven't needed to do this with Beam before, but I've definitely had
> similar needs in the past. Spark, for example, provides an input_file_name
> function that can be applied to a dataframe to add the input file as an
> additional column. It's not clear to me how that's implemented, though.
>
> Perhaps others have suggestions, but I'm not aware of a way to do this
> conveniently in Beam today. To my knowledge, today you would have to use
> FileIO.match() and FileIO.readMatches() to get a collection of
> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
> and the bytes of the file, and you'd be responsible for parsing those 
> bytes
> into avro records. You'd  be able to output something like a KV
> that groups the file name together with the parsed avro record.
>
> Seems like something worth providing better support for in Beam itself
> if this indeed doesn't already exist.
>
> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:
>
>> Hi,
>>   I am working on a pipeline that listens to a topic on pubsub to 

Re: pipeline steps

2019-02-12 Thread Chamikara Jayalath
I think introducing a class hierarchy for extracting metadata from IO
connectors might end up being an overkill. I think what we need to do is to
add new transforms to various IO connectors that would return associated
metadata along with each record. This will be fine performance-wise as well
since current transforms will not be affected.  Each source will end up
having it's own implementation (as needed) but file-based source transforms
will end-up sharing a bunch of code here since they share underlying code
for handling files. For example, we could add a,

*public class ReadAllViaFileBasedSource extends
PTransform, PCollection>>*

where *KV* represents filename and the original record.

One caveat for file-based sources though is that we won't be able to
support Read transforms since we basically feed in a glob and get a bunch
of records (splitting happens within the source so composite transform is
not aware of exact files that would produce records unless we implement a
new source). ReadFiles/ReadAll transforms should be more flexible and we
should be able to adapt them to support returning file-names (and they'll
have the full power of Read transforms after SDF).

Thanks,
Cham


On Tue, Feb 12, 2019 at 12:42 PM Yi Pan  wrote:

> The "general way" is what I was hoping to convey the idea (apologize if
> KafkaIO is not a good example for that). More specifically, if KafkaIO
> returns metadata in KafkaRecord, and somehow, FileIO returns a file name
> associated with each record, it seems that it would make sense to define a
> general interface of metadata for each record across different BeamIO, as
> an optional envelope information. i.e. IOContext is the interface
> associated with each record and KafkaIO implements KafkaIOContext, and
> FileIO implements FileIOContext, etc.
>
> Best.
>
> On Mon, Feb 11, 2019 at 3:14 AM Alexey Romanenko 
> wrote:
>
>> Talking about KafkaIO, it’s already possible to have this since*
>> "apply(KafkaIO.read())"* returns *"PCollection>”*
>> where  *KafkaRecord* contains message metadata (topic, partition, etc).
>> Though, it works _only_ if* “withoutMetadata()”*  was not used before -
>> in this case it will return simple *KV*.
>>
>> In the same time, I agree that it would be useful to have some general
>> way to obtain meta information of records across all Beam IOs.
>>
>> On 7 Feb 2019, at 18:25, Yi Pan  wrote:
>>
>> Shouldn't this apply to more generic scenario for any BeamIO? For
>> example, I am using KafkaIO and wanted to get the topic and partition from
>> which the message was received. Some IOContext associated with each data
>> unit from BeamIO may be useful here?
>>
>> -Yi
>>
>> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  wrote:
>>
>>> This comes up a lot, wanting file names alongside the data that came
>>> from the file. It is a historical quirk that none of our connectors used to
>>> have the file names. What is the change needed for FileIO + parse Avro to
>>> be really easy to use?
>>>
>>> Kenn
>>>
>>> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:
>>>
 I haven't needed to do this with Beam before, but I've definitely had
 similar needs in the past. Spark, for example, provides an input_file_name
 function that can be applied to a dataframe to add the input file as an
 additional column. It's not clear to me how that's implemented, though.

 Perhaps others have suggestions, but I'm not aware of a way to do this
 conveniently in Beam today. To my knowledge, today you would have to use
 FileIO.match() and FileIO.readMatches() to get a collection of
 ReadableFile. You'd then have to FlatMapElements to pull out the metadata
 and the bytes of the file, and you'd be responsible for parsing those bytes
 into avro records. You'd  be able to output something like a KV
 that groups the file name together with the parsed avro record.

 Seems like something worth providing better support for in Beam itself
 if this indeed doesn't already exist.

 On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:

> Hi,
>   I am working on a pipeline that listens to a topic on pubsub to get
> files that have changes in the storage. Then i read avro files, and
> would like to write them to bigquery based on the file name (to
> different tables).
>   My problem is that the transformer that reads the avro does not give
> me back the files name (like a tuple or something like that). I seem
> to have this pattern come back a lot.
> Can you think of any solutions?
>
> Chaim
>
> --
>
>
> Loans are funded by
> FinWise Bank, a Utah-chartered bank located in Sandy,
> Utah, member FDIC, Equal
> Opportunity Lender. Merchant Cash Advances are
> made by Behalf. For more
> information on ECOA, click here
> . For important information about
> opening a new
> account, review Patriot Act 

Re: pipeline steps

2019-02-12 Thread Yi Pan
The "general way" is what I was hoping to convey the idea (apologize if
KafkaIO is not a good example for that). More specifically, if KafkaIO
returns metadata in KafkaRecord, and somehow, FileIO returns a file name
associated with each record, it seems that it would make sense to define a
general interface of metadata for each record across different BeamIO, as
an optional envelope information. i.e. IOContext is the interface
associated with each record and KafkaIO implements KafkaIOContext, and
FileIO implements FileIOContext, etc.

Best.

On Mon, Feb 11, 2019 at 3:14 AM Alexey Romanenko 
wrote:

> Talking about KafkaIO, it’s already possible to have this since*
> "apply(KafkaIO.read())"* returns *"PCollection>”*
> where  *KafkaRecord* contains message metadata (topic, partition, etc).
> Though, it works _only_ if* “withoutMetadata()”*  was not used before -
> in this case it will return simple *KV*.
>
> In the same time, I agree that it would be useful to have some general way
> to obtain meta information of records across all Beam IOs.
>
> On 7 Feb 2019, at 18:25, Yi Pan  wrote:
>
> Shouldn't this apply to more generic scenario for any BeamIO? For example,
> I am using KafkaIO and wanted to get the topic and partition from which the
> message was received. Some IOContext associated with each data unit from
> BeamIO may be useful here?
>
> -Yi
>
> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  wrote:
>
>> This comes up a lot, wanting file names alongside the data that came from
>> the file. It is a historical quirk that none of our connectors used to have
>> the file names. What is the change needed for FileIO + parse Avro to be
>> really easy to use?
>>
>> Kenn
>>
>> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:
>>
>>> I haven't needed to do this with Beam before, but I've definitely had
>>> similar needs in the past. Spark, for example, provides an input_file_name
>>> function that can be applied to a dataframe to add the input file as an
>>> additional column. It's not clear to me how that's implemented, though.
>>>
>>> Perhaps others have suggestions, but I'm not aware of a way to do this
>>> conveniently in Beam today. To my knowledge, today you would have to use
>>> FileIO.match() and FileIO.readMatches() to get a collection of
>>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
>>> and the bytes of the file, and you'd be responsible for parsing those bytes
>>> into avro records. You'd  be able to output something like a KV
>>> that groups the file name together with the parsed avro record.
>>>
>>> Seems like something worth providing better support for in Beam itself
>>> if this indeed doesn't already exist.
>>>
>>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:
>>>
 Hi,
   I am working on a pipeline that listens to a topic on pubsub to get
 files that have changes in the storage. Then i read avro files, and
 would like to write them to bigquery based on the file name (to
 different tables).
   My problem is that the transformer that reads the avro does not give
 me back the files name (like a tuple or something like that). I seem
 to have this pattern come back a lot.
 Can you think of any solutions?

 Chaim

 --


 Loans are funded by
 FinWise Bank, a Utah-chartered bank located in Sandy,
 Utah, member FDIC, Equal
 Opportunity Lender. Merchant Cash Advances are
 made by Behalf. For more
 information on ECOA, click here
 . For important information about
 opening a new
 account, review Patriot Act procedures here
 .
 Visit Legal
  to
 review our comprehensive program terms,
 conditions, and disclosures.

>>>
>


Re: pipeline steps

2019-02-11 Thread Reuven Lax
On Mon, Feb 11, 2019 at 8:53 AM Kenneth Knowles  wrote:

> In use cases that actually need the filename / topic name / etc, it
> mandatory information. It isn't overhead or a performance hit.
>

I think  many other systems track records as offsets in a source. So
despite the fact that they provide access to filename per record, they
don't actually materialize the filename per record. This is definitely how
MapReduce worked.


>
> Before SDF, FileIO was somewhat of a special case because it read globs
> and directories. Most other IOs knew the names of their data source
> statically anyhow so reifying it in the elements didn't add anything that
> you couldn't do another way. It is SDF that makes this a
> universally-relevant feature.
>
> Kenn
>
> On Mon, Feb 11, 2019 at 3:14 AM Alexey Romanenko 
> wrote:
>
>> Talking about KafkaIO, it’s already possible to have this since*
>> "apply(KafkaIO.read())"* returns *"PCollection>”*
>> where  *KafkaRecord* contains message metadata (topic, partition, etc).
>> Though, it works _only_ if* “withoutMetadata()”*  was not used before -
>> in this case it will return simple *KV*.
>>
>> In the same time, I agree that it would be useful to have some general
>> way to obtain meta information of records across all Beam IOs.
>>
>> On 7 Feb 2019, at 18:25, Yi Pan  wrote:
>>
>> Shouldn't this apply to more generic scenario for any BeamIO? For
>> example, I am using KafkaIO and wanted to get the topic and partition from
>> which the message was received. Some IOContext associated with each data
>> unit from BeamIO may be useful here?
>>
>> -Yi
>>
>> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  wrote:
>>
>>> This comes up a lot, wanting file names alongside the data that came
>>> from the file. It is a historical quirk that none of our connectors used to
>>> have the file names. What is the change needed for FileIO + parse Avro to
>>> be really easy to use?
>>>
>>> Kenn
>>>
>>> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:
>>>
 I haven't needed to do this with Beam before, but I've definitely had
 similar needs in the past. Spark, for example, provides an input_file_name
 function that can be applied to a dataframe to add the input file as an
 additional column. It's not clear to me how that's implemented, though.

 Perhaps others have suggestions, but I'm not aware of a way to do this
 conveniently in Beam today. To my knowledge, today you would have to use
 FileIO.match() and FileIO.readMatches() to get a collection of
 ReadableFile. You'd then have to FlatMapElements to pull out the metadata
 and the bytes of the file, and you'd be responsible for parsing those bytes
 into avro records. You'd  be able to output something like a KV
 that groups the file name together with the parsed avro record.

 Seems like something worth providing better support for in Beam itself
 if this indeed doesn't already exist.

 On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:

> Hi,
>   I am working on a pipeline that listens to a topic on pubsub to get
> files that have changes in the storage. Then i read avro files, and
> would like to write them to bigquery based on the file name (to
> different tables).
>   My problem is that the transformer that reads the avro does not give
> me back the files name (like a tuple or something like that). I seem
> to have this pattern come back a lot.
> Can you think of any solutions?
>
> Chaim
>
> --
>
>
> Loans are funded by
> FinWise Bank, a Utah-chartered bank located in Sandy,
> Utah, member FDIC, Equal
> Opportunity Lender. Merchant Cash Advances are
> made by Behalf. For more
> information on ECOA, click here
> . For important information about
> opening a new
> account, review Patriot Act procedures here
> .
> Visit Legal
>  to
> review our comprehensive program terms,
> conditions, and disclosures.
>

>>


Re: pipeline steps

2019-02-11 Thread Kenneth Knowles
In use cases that actually need the filename / topic name / etc, it
mandatory information. It isn't overhead or a performance hit.

Before SDF, FileIO was somewhat of a special case because it read globs and
directories. Most other IOs knew the names of their data source statically
anyhow so reifying it in the elements didn't add anything that you couldn't
do another way. It is SDF that makes this a universally-relevant feature.

Kenn

On Mon, Feb 11, 2019 at 3:14 AM Alexey Romanenko 
wrote:

> Talking about KafkaIO, it’s already possible to have this since*
> "apply(KafkaIO.read())"* returns *"PCollection>”*
> where  *KafkaRecord* contains message metadata (topic, partition, etc).
> Though, it works _only_ if* “withoutMetadata()”*  was not used before -
> in this case it will return simple *KV*.
>
> In the same time, I agree that it would be useful to have some general way
> to obtain meta information of records across all Beam IOs.
>
> On 7 Feb 2019, at 18:25, Yi Pan  wrote:
>
> Shouldn't this apply to more generic scenario for any BeamIO? For example,
> I am using KafkaIO and wanted to get the topic and partition from which the
> message was received. Some IOContext associated with each data unit from
> BeamIO may be useful here?
>
> -Yi
>
> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  wrote:
>
>> This comes up a lot, wanting file names alongside the data that came from
>> the file. It is a historical quirk that none of our connectors used to have
>> the file names. What is the change needed for FileIO + parse Avro to be
>> really easy to use?
>>
>> Kenn
>>
>> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:
>>
>>> I haven't needed to do this with Beam before, but I've definitely had
>>> similar needs in the past. Spark, for example, provides an input_file_name
>>> function that can be applied to a dataframe to add the input file as an
>>> additional column. It's not clear to me how that's implemented, though.
>>>
>>> Perhaps others have suggestions, but I'm not aware of a way to do this
>>> conveniently in Beam today. To my knowledge, today you would have to use
>>> FileIO.match() and FileIO.readMatches() to get a collection of
>>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
>>> and the bytes of the file, and you'd be responsible for parsing those bytes
>>> into avro records. You'd  be able to output something like a KV
>>> that groups the file name together with the parsed avro record.
>>>
>>> Seems like something worth providing better support for in Beam itself
>>> if this indeed doesn't already exist.
>>>
>>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:
>>>
 Hi,
   I am working on a pipeline that listens to a topic on pubsub to get
 files that have changes in the storage. Then i read avro files, and
 would like to write them to bigquery based on the file name (to
 different tables).
   My problem is that the transformer that reads the avro does not give
 me back the files name (like a tuple or something like that). I seem
 to have this pattern come back a lot.
 Can you think of any solutions?

 Chaim

 --


 Loans are funded by
 FinWise Bank, a Utah-chartered bank located in Sandy,
 Utah, member FDIC, Equal
 Opportunity Lender. Merchant Cash Advances are
 made by Behalf. For more
 information on ECOA, click here
 . For important information about
 opening a new
 account, review Patriot Act procedures here
 .
 Visit Legal
  to
 review our comprehensive program terms,
 conditions, and disclosures.

>>>
>


Re: pipeline steps

2019-02-11 Thread Alexey Romanenko
Talking about KafkaIO, it’s already possible to have this since 
"apply(KafkaIO.read())" returns "PCollection>” where  
KafkaRecord contains message metadata (topic, partition, etc).
Though, it works _only_ if “withoutMetadata()”  was not used before - in this 
case it will return simple KV.

In the same time, I agree that it would be useful to have some general way to 
obtain meta information of records across all Beam IOs.

> On 7 Feb 2019, at 18:25, Yi Pan  wrote:
> 
> Shouldn't this apply to more generic scenario for any BeamIO? For example, I 
> am using KafkaIO and wanted to get the topic and partition from which the 
> message was received. Some IOContext associated with each data unit from 
> BeamIO may be useful here?
> 
> -Yi
> 
> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  > wrote:
> This comes up a lot, wanting file names alongside the data that came from the 
> file. It is a historical quirk that none of our connectors used to have the 
> file names. What is the change needed for FileIO + parse Avro to be really 
> easy to use?
> 
> Kenn
> 
> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  > wrote:
> I haven't needed to do this with Beam before, but I've definitely had similar 
> needs in the past. Spark, for example, provides an input_file_name function 
> that can be applied to a dataframe to add the input file as an additional 
> column. It's not clear to me how that's implemented, though.
> 
> Perhaps others have suggestions, but I'm not aware of a way to do this 
> conveniently in Beam today. To my knowledge, today you would have to use 
> FileIO.match() and FileIO.readMatches() to get a collection of ReadableFile. 
> You'd then have to FlatMapElements to pull out the metadata and the bytes of 
> the file, and you'd be responsible for parsing those bytes into avro records. 
> You'd  be able to output something like a KV that groups the file 
> name together with the parsed avro record.
> 
> Seems like something worth providing better support for in Beam itself if 
> this indeed doesn't already exist.
> 
> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  > wrote:
> Hi,
>   I am working on a pipeline that listens to a topic on pubsub to get
> files that have changes in the storage. Then i read avro files, and
> would like to write them to bigquery based on the file name (to
> different tables).
>   My problem is that the transformer that reads the avro does not give
> me back the files name (like a tuple or something like that). I seem
> to have this pattern come back a lot.
> Can you think of any solutions?
> 
> Chaim
> 
> -- 
> 
> 
> Loans are funded by
> FinWise Bank, a Utah-chartered bank located in Sandy, 
> Utah, member FDIC, Equal
> Opportunity Lender. Merchant Cash Advances are 
> made by Behalf. For more
> information on ECOA, click here 
> >. 
> For important information about 
> opening a new
> account, review Patriot Act procedures here 
>  >.
> Visit Legal 
> > to
> review our comprehensive program terms, 
> conditions, and disclosures. 



Re: pipeline steps

2019-02-11 Thread Robert Bradshaw
In terms of performance, it would likely be minimal overhead if (as is
likely) the step consuming the filename gets fused with the read. There's
still overhead constructing this composite, object, etc. but that's (again
likely) smaller than the cost of doing the read itself.

On Sun, Feb 10, 2019 at 7:03 AM Reuven Lax  wrote:

> I think we could definitely add an option to FileIO to add the filename to
> every record. It would come at a (performance) cost - often the filename is
> much larger than the actual record..
>
> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  wrote:
>
>> This comes up a lot, wanting file names alongside the data that came from
>> the file. It is a historical quirk that none of our connectors used to have
>> the file names. What is the change needed for FileIO + parse Avro to be
>> really easy to use?
>>
>> Kenn
>>
>> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:
>>
>>> I haven't needed to do this with Beam before, but I've definitely had
>>> similar needs in the past. Spark, for example, provides an input_file_name
>>> function that can be applied to a dataframe to add the input file as an
>>> additional column. It's not clear to me how that's implemented, though.
>>>
>>> Perhaps others have suggestions, but I'm not aware of a way to do this
>>> conveniently in Beam today. To my knowledge, today you would have to use
>>> FileIO.match() and FileIO.readMatches() to get a collection of
>>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
>>> and the bytes of the file, and you'd be responsible for parsing those bytes
>>> into avro records. You'd  be able to output something like a KV
>>> that groups the file name together with the parsed avro record.
>>>
>>> Seems like something worth providing better support for in Beam itself
>>> if this indeed doesn't already exist.
>>>
>>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:
>>>
 Hi,
   I am working on a pipeline that listens to a topic on pubsub to get
 files that have changes in the storage. Then i read avro files, and
 would like to write them to bigquery based on the file name (to
 different tables).
   My problem is that the transformer that reads the avro does not give
 me back the files name (like a tuple or something like that). I seem
 to have this pattern come back a lot.
 Can you think of any solutions?

 Chaim

 --


 Loans are funded by
 FinWise Bank, a Utah-chartered bank located in Sandy,
 Utah, member FDIC, Equal
 Opportunity Lender. Merchant Cash Advances are
 made by Behalf. For more
 information on ECOA, click here
 . For important information about
 opening a new
 account, review Patriot Act procedures here
 .
 Visit Legal
  to
 review our comprehensive program terms,
 conditions, and disclosures.

>>>


Re: pipeline steps

2019-02-10 Thread Niel Markwick
This would have to flow through to the other IO wrappers as well, perhaps
outputting a KV

I recently wrote an AvroIO parseAllGenericRecord() equivalent transform,
because I was reading files of various schemas and needed the the parseFn
to know both the filename currently being read and use some side-input...

It ended up being quite complex - especially as I wanted to shard the file
read, like AvroIO already does - and I basically re-implemented part of
AvroIO for my use-case...

@Chaim, one simpler option could be to use parseGenericRecord and use the
*name* of the Avro schema in the GenericRecord as a way to determine the
table name - this may mean that you have to change the way your Avro files
are being written..




On Sun, 10 Feb 2019, 07:03 Reuven Lax,  wrote:

> I think we could definitely add an option to FileIO to add the filename to
> every record. It would come at a (performance) cost - often the filename is
> much larger than the actual record..
>
> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  wrote:
>
>> This comes up a lot, wanting file names alongside the data that came from
>> the file. It is a historical quirk that none of our connectors used to have
>> the file names. What is the change needed for FileIO + parse Avro to be
>> really easy to use?
>>
>> Kenn
>>
>> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:
>>
>>> I haven't needed to do this with Beam before, but I've definitely had
>>> similar needs in the past. Spark, for example, provides an input_file_name
>>> function that can be applied to a dataframe to add the input file as an
>>> additional column. It's not clear to me how that's implemented, though.
>>>
>>> Perhaps others have suggestions, but I'm not aware of a way to do this
>>> conveniently in Beam today. To my knowledge, today you would have to use
>>> FileIO.match() and FileIO.readMatches() to get a collection of
>>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
>>> and the bytes of the file, and you'd be responsible for parsing those bytes
>>> into avro records. You'd  be able to output something like a KV
>>> that groups the file name together with the parsed avro record.
>>>
>>> Seems like something worth providing better support for in Beam itself
>>> if this indeed doesn't already exist.
>>>
>>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:
>>>
 Hi,
   I am working on a pipeline that listens to a topic on pubsub to get
 files that have changes in the storage. Then i read avro files, and
 would like to write them to bigquery based on the file name (to
 different tables).
   My problem is that the transformer that reads the avro does not give
 me back the files name (like a tuple or something like that). I seem
 to have this pattern come back a lot.
 Can you think of any solutions?

 Chaim

 --


 Loans are funded by
 FinWise Bank, a Utah-chartered bank located in Sandy,
 Utah, member FDIC, Equal
 Opportunity Lender. Merchant Cash Advances are
 made by Behalf. For more
 information on ECOA, click here
 . For important information about
 opening a new
 account, review Patriot Act procedures here
 .
 Visit Legal
  to
 review our comprehensive program terms,
 conditions, and disclosures.

>>>


Re: pipeline steps

2019-02-09 Thread Reuven Lax
I think we could definitely add an option to FileIO to add the filename to
every record. It would come at a (performance) cost - often the filename is
much larger than the actual record..

On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  wrote:

> This comes up a lot, wanting file names alongside the data that came from
> the file. It is a historical quirk that none of our connectors used to have
> the file names. What is the change needed for FileIO + parse Avro to be
> really easy to use?
>
> Kenn
>
> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:
>
>> I haven't needed to do this with Beam before, but I've definitely had
>> similar needs in the past. Spark, for example, provides an input_file_name
>> function that can be applied to a dataframe to add the input file as an
>> additional column. It's not clear to me how that's implemented, though.
>>
>> Perhaps others have suggestions, but I'm not aware of a way to do this
>> conveniently in Beam today. To my knowledge, today you would have to use
>> FileIO.match() and FileIO.readMatches() to get a collection of
>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
>> and the bytes of the file, and you'd be responsible for parsing those bytes
>> into avro records. You'd  be able to output something like a KV
>> that groups the file name together with the parsed avro record.
>>
>> Seems like something worth providing better support for in Beam itself if
>> this indeed doesn't already exist.
>>
>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:
>>
>>> Hi,
>>>   I am working on a pipeline that listens to a topic on pubsub to get
>>> files that have changes in the storage. Then i read avro files, and
>>> would like to write them to bigquery based on the file name (to
>>> different tables).
>>>   My problem is that the transformer that reads the avro does not give
>>> me back the files name (like a tuple or something like that). I seem
>>> to have this pattern come back a lot.
>>> Can you think of any solutions?
>>>
>>> Chaim
>>>
>>> --
>>>
>>>
>>> Loans are funded by
>>> FinWise Bank, a Utah-chartered bank located in Sandy,
>>> Utah, member FDIC, Equal
>>> Opportunity Lender. Merchant Cash Advances are
>>> made by Behalf. For more
>>> information on ECOA, click here
>>> . For important information about
>>> opening a new
>>> account, review Patriot Act procedures here
>>> .
>>> Visit Legal
>>>  to
>>> review our comprehensive program terms,
>>> conditions, and disclosures.
>>>
>>


Re: pipeline steps

2019-02-07 Thread Yi Pan
Shouldn't this apply to more generic scenario for any BeamIO? For example,
I am using KafkaIO and wanted to get the topic and partition from which the
message was received. Some IOContext associated with each data unit from
BeamIO may be useful here?

-Yi

On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles  wrote:

> This comes up a lot, wanting file names alongside the data that came from
> the file. It is a historical quirk that none of our connectors used to have
> the file names. What is the change needed for FileIO + parse Avro to be
> really easy to use?
>
> Kenn
>
> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:
>
>> I haven't needed to do this with Beam before, but I've definitely had
>> similar needs in the past. Spark, for example, provides an input_file_name
>> function that can be applied to a dataframe to add the input file as an
>> additional column. It's not clear to me how that's implemented, though.
>>
>> Perhaps others have suggestions, but I'm not aware of a way to do this
>> conveniently in Beam today. To my knowledge, today you would have to use
>> FileIO.match() and FileIO.readMatches() to get a collection of
>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
>> and the bytes of the file, and you'd be responsible for parsing those bytes
>> into avro records. You'd  be able to output something like a KV
>> that groups the file name together with the parsed avro record.
>>
>> Seems like something worth providing better support for in Beam itself if
>> this indeed doesn't already exist.
>>
>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:
>>
>>> Hi,
>>>   I am working on a pipeline that listens to a topic on pubsub to get
>>> files that have changes in the storage. Then i read avro files, and
>>> would like to write them to bigquery based on the file name (to
>>> different tables).
>>>   My problem is that the transformer that reads the avro does not give
>>> me back the files name (like a tuple or something like that). I seem
>>> to have this pattern come back a lot.
>>> Can you think of any solutions?
>>>
>>> Chaim
>>>
>>> --
>>>
>>>
>>> Loans are funded by
>>> FinWise Bank, a Utah-chartered bank located in Sandy,
>>> Utah, member FDIC, Equal
>>> Opportunity Lender. Merchant Cash Advances are
>>> made by Behalf. For more
>>> information on ECOA, click here
>>> . For important information about
>>> opening a new
>>> account, review Patriot Act procedures here
>>> .
>>> Visit Legal
>>>  to
>>> review our comprehensive program terms,
>>> conditions, and disclosures.
>>>
>>


Re: pipeline steps

2019-02-07 Thread Kenneth Knowles
This comes up a lot, wanting file names alongside the data that came from
the file. It is a historical quirk that none of our connectors used to have
the file names. What is the change needed for FileIO + parse Avro to be
really easy to use?

Kenn

On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas  wrote:

> I haven't needed to do this with Beam before, but I've definitely had
> similar needs in the past. Spark, for example, provides an input_file_name
> function that can be applied to a dataframe to add the input file as an
> additional column. It's not clear to me how that's implemented, though.
>
> Perhaps others have suggestions, but I'm not aware of a way to do this
> conveniently in Beam today. To my knowledge, today you would have to use
> FileIO.match() and FileIO.readMatches() to get a collection of
> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
> and the bytes of the file, and you'd be responsible for parsing those bytes
> into avro records. You'd  be able to output something like a KV
> that groups the file name together with the parsed avro record.
>
> Seems like something worth providing better support for in Beam itself if
> this indeed doesn't already exist.
>
> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:
>
>> Hi,
>>   I am working on a pipeline that listens to a topic on pubsub to get
>> files that have changes in the storage. Then i read avro files, and
>> would like to write them to bigquery based on the file name (to
>> different tables).
>>   My problem is that the transformer that reads the avro does not give
>> me back the files name (like a tuple or something like that). I seem
>> to have this pattern come back a lot.
>> Can you think of any solutions?
>>
>> Chaim
>>
>> --
>>
>>
>> Loans are funded by
>> FinWise Bank, a Utah-chartered bank located in Sandy,
>> Utah, member FDIC, Equal
>> Opportunity Lender. Merchant Cash Advances are
>> made by Behalf. For more
>> information on ECOA, click here
>> . For important information about
>> opening a new
>> account, review Patriot Act procedures here
>> .
>> Visit Legal
>>  to
>> review our comprehensive program terms,
>> conditions, and disclosures.
>>
>


Re: pipeline steps

2019-02-07 Thread Jeff Klukas
I haven't needed to do this with Beam before, but I've definitely had
similar needs in the past. Spark, for example, provides an input_file_name
function that can be applied to a dataframe to add the input file as an
additional column. It's not clear to me how that's implemented, though.

Perhaps others have suggestions, but I'm not aware of a way to do this
conveniently in Beam today. To my knowledge, today you would have to use
FileIO.match() and FileIO.readMatches() to get a collection of
ReadableFile. You'd then have to FlatMapElements to pull out the metadata
and the bytes of the file, and you'd be responsible for parsing those bytes
into avro records. You'd  be able to output something like a KV
that groups the file name together with the parsed avro record.

Seems like something worth providing better support for in Beam itself if
this indeed doesn't already exist.

On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:

> Hi,
>   I am working on a pipeline that listens to a topic on pubsub to get
> files that have changes in the storage. Then i read avro files, and
> would like to write them to bigquery based on the file name (to
> different tables).
>   My problem is that the transformer that reads the avro does not give
> me back the files name (like a tuple or something like that). I seem
> to have this pattern come back a lot.
> Can you think of any solutions?
>
> Chaim
>
> --
>
>
> Loans are funded by
> FinWise Bank, a Utah-chartered bank located in Sandy,
> Utah, member FDIC, Equal
> Opportunity Lender. Merchant Cash Advances are
> made by Behalf. For more
> information on ECOA, click here
> . For important information about
> opening a new
> account, review Patriot Act procedures here
> .
> Visit Legal
>  to
> review our comprehensive program terms,
> conditions, and disclosures.
>


pipeline steps

2019-02-07 Thread Chaim Turkel
Hi,
  I am working on a pipeline that listens to a topic on pubsub to get
files that have changes in the storage. Then i read avro files, and
would like to write them to bigquery based on the file name (to
different tables).
  My problem is that the transformer that reads the avro does not give
me back the files name (like a tuple or something like that). I seem
to have this pattern come back a lot.
Can you think of any solutions?

Chaim

-- 


Loans are funded by
FinWise Bank, a Utah-chartered bank located in Sandy, 
Utah, member FDIC, Equal
Opportunity Lender. Merchant Cash Advances are 
made by Behalf. For more
information on ECOA, click here 
. For important information about 
opening a new
account, review Patriot Act procedures here 
.
Visit Legal 
 to
review our comprehensive program terms, 
conditions, and disclosures.