@Chamikara, if adding the metadata interface class is too much an effort
now, I would accept the solution with some special PTransform method that
adds the metadata to the output data types. What I wonder is that if this
kind of PTransform becomes more popular to many different BeamIO's, I may
as well extend the existing BeamIO classes to automatically apply this
PTransform before we deliver the raw message types from IO (e.g.
Samza's SystemDescriptor implementation follows this pattern by allowing an
optional InputTransformer). Maybe we can wait to discuss when this use case
becomes more popular.

Best,

-Yi

On Tue, Feb 12, 2019 at 4:52 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> I think introducing a class hierarchy for extracting metadata from IO
> connectors might end up being an overkill. I think what we need to do is to
> add new transforms to various IO connectors that would return associated
> metadata along with each record. This will be fine performance-wise as well
> since current transforms will not be affected.  Each source will end up
> having it's own implementation (as needed) but file-based source transforms
> will end-up sharing a bunch of code here since they share underlying code
> for handling files. For example, we could add a,
>
> *public class ReadAllViaFileBasedSource<T> extends
> PTransform<PCollection<ReadableFile>, PCollection<KV<String, T>>>*
>
> where *KV<String, T>* represents filename and the original record.
>
> One caveat for file-based sources though is that we won't be able to
> support Read transforms since we basically feed in a glob and get a bunch
> of records (splitting happens within the source so composite transform is
> not aware of exact files that would produce records unless we implement a
> new source). ReadFiles/ReadAll transforms should be more flexible and we
> should be able to adapt them to support returning file-names (and they'll
> have the full power of Read transforms after SDF).
>
> Thanks,
> Cham
>
>
> On Tue, Feb 12, 2019 at 12:42 PM Yi Pan <nickpa...@gmail.com> wrote:
>
>> The "general way" is what I was hoping to convey the idea (apologize if
>> KafkaIO is not a good example for that). More specifically, if KafkaIO
>> returns metadata in KafkaRecord, and somehow, FileIO returns a file name
>> associated with each record, it seems that it would make sense to define a
>> general interface of metadata for each record across different BeamIO, as
>> an optional envelope information. i.e. IOContext is the interface
>> associated with each record and KafkaIO implements KafkaIOContext, and
>> FileIO implements FileIOContext, etc.
>>
>> Best.
>>
>> On Mon, Feb 11, 2019 at 3:14 AM Alexey Romanenko <
>> aromanenko....@gmail.com> wrote:
>>
>>> Talking about KafkaIO, it’s already possible to have this since*
>>> "apply(KafkaIO.<K, V>read())"* returns
>>> *"PCollection<KafkaRecord<K, V>>”* where  *KafkaRecord* contains
>>> message metadata (topic, partition, etc).
>>> Though, it works _only_ if* “withoutMetadata()”*  was not used before -
>>> in this case it will return simple *KV<K, V>*.
>>>
>>> In the same time, I agree that it would be useful to have some general
>>> way to obtain meta information of records across all Beam IOs.
>>>
>>> On 7 Feb 2019, at 18:25, Yi Pan <nickpa...@gmail.com> wrote:
>>>
>>> Shouldn't this apply to more generic scenario for any BeamIO? For
>>> example, I am using KafkaIO and wanted to get the topic and partition from
>>> which the message was received. Some IOContext associated with each data
>>> unit from BeamIO may be useful here?
>>>
>>> -Yi
>>>
>>> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> This comes up a lot, wanting file names alongside the data that came
>>>> from the file. It is a historical quirk that none of our connectors used to
>>>> have the file names. What is the change needed for FileIO + parse Avro to
>>>> be really easy to use?
>>>>
>>>> Kenn
>>>>
>>>> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas <jklu...@mozilla.com> wrote:
>>>>
>>>>> I haven't needed to do this with Beam before, but I've definitely had
>>>>> similar needs in the past. Spark, for example, provides an input_file_name
>>>>> function that can be applied to a dataframe to add the input file as an
>>>>> additional column. It's not clear to me how that's implemented, though.
>>>>>
>>>>> Perhaps others have suggestions, but I'm not aware of a way to do this
>>>>> conveniently in Beam today. To my knowledge, today you would have to use
>>>>> FileIO.match() and FileIO.readMatches() to get a collection of
>>>>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
>>>>> and the bytes of the file, and you'd be responsible for parsing those 
>>>>> bytes
>>>>> into avro records. You'd  be able to output something like a KV<String, T>
>>>>> that groups the file name together with the parsed avro record.
>>>>>
>>>>> Seems like something worth providing better support for in Beam itself
>>>>> if this indeed doesn't already exist.
>>>>>
>>>>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel <ch...@behalf.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>   I am working on a pipeline that listens to a topic on pubsub to get
>>>>>> files that have changes in the storage. Then i read avro files, and
>>>>>> would like to write them to bigquery based on the file name (to
>>>>>> different tables).
>>>>>>   My problem is that the transformer that reads the avro does not give
>>>>>> me back the files name (like a tuple or something like that). I seem
>>>>>> to have this pattern come back a lot.
>>>>>> Can you think of any solutions?
>>>>>>
>>>>>> Chaim
>>>>>>
>>>>>> --
>>>>>>
>>>>>>
>>>>>> Loans are funded by
>>>>>> FinWise Bank, a Utah-chartered bank located in Sandy,
>>>>>> Utah, member FDIC, Equal
>>>>>> Opportunity Lender. Merchant Cash Advances are
>>>>>> made by Behalf. For more
>>>>>> information on ECOA, click here
>>>>>> <https://www.behalf.com/legal/ecoa/>. For important information
>>>>>> about
>>>>>> opening a new
>>>>>> account, review Patriot Act procedures here
>>>>>> <https://www.behalf.com/legal/patriot/>.
>>>>>> Visit Legal
>>>>>> <https://www.behalf.com/legal/> to
>>>>>> review our comprehensive program terms,
>>>>>> conditions, and disclosures.
>>>>>>
>>>>>
>>>

Reply via email to