Hi Pablo,

Thanks a lot for taking the time out to answer my questions. It's been
fantastic to experience this.
For others reading this in the archives in the future, the trick seems to
be in the *FileIO.readMatches()* call which documents as


   -
>
>    Converts each result of match()
>    
> <https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/FileIO.html#match-->
>     or matchAll()
>    
> <https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/FileIO.html#matchAll-->
>  to
>    a FileIO.ReadableFile
>    
> <https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/FileIO.ReadableFile.html>
>  which
>    can be used to read the contents of each file, optionally decompressing it.


With FileIO.ReadableFile you can then get a hold of the complete content of
the file along with its metadata.  The rest of the apply() calls seem to
"trick" beam into executing exactly once per file.

This will actually be very useful to us in the kettle-beam project.

I think now it should actually be possible to use the same "trick" to
support other Kettle steps on the input side like generic JDBC / system
information and so on.
Probably this will bring a another 50 of so Kettle steps into the
functionality pallet. Visual programming FTW!

Cheers,

Matt
---
Matt Casters <m <mcast...@pentaho.org>attcast...@gmail.com>
Senior Solution Architect, Kettle Project Founder




Op di 8 jan. 2019 om 02:36 schreef Pablo Estrada <pabl...@google.com>:

> I've just learned that there are these transforms that should be useful:
>
> p.apply(FileIO.match().filepattern(...))
>   .apply(WithKeys.of((Void) null))
>   .apply(GroupByKey.create())
>   .apply(Values.create())
>   .apply(Flatten.itearables())
>   .apply(FileIO.readMatches())
>   .apply(ParDo.of(new ConsumeFileDescriptors());
>
> -P.
>
> On Mon, Jan 7, 2019 at 5:23 PM Pablo Estrada <pabl...@google.com> wrote:
>
>> Hi Matt,
>>
>> I am much more familiar with Python, so I usually answer questions using
>> that SDK. Also, it's quicker to type a fully detailed pipeline on an email
>> and the SDKs are similar enough that it should not be too difficult to
>> translate to Java from an IDE.
>>
>> To your questions:
>> 1. Grouping like that should not create a single in-memory iterable.
>> Runners provide lazy iterables that load elements as you go through them.
>>
>> 2. The Java SDK provides the FileIO.match(), and FileIO.readMatches()
>> transforms, which generate a list of file metadata objects, and convert
>> them into file descriptors that can be read in a ParDo (respectively). I
>> think those should do the trick for you.
>>
>> I guess, in Java you'd do something like so:
>>
>> p.apply(FileIO.match().filepattern(...))
>>   .apply(ParDo.of(new AddSingleKeyDoFn())
>>   .apply(GroupByKey.create())
>>   .apply(ParDo.of(new DropKeyDoFn())
>>   .apply(FileIO.readMatches())
>>   .apply(ParDo.of(new ConsumeFileDescriptors());
>>
>> You'll have to follow the FileIO documentation
>> <https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/FileIO.html>[1]
>> to see what the file descriptors look like, and how to use the match
>> function.
>>
>> Best
>> -P.
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/FileIO.html
>>
>> On Mon, Jan 7, 2019 at 2:41 PM Matt Casters <mattcast...@gmail.com>
>> wrote:
>>
>>> Hi Pablo,
>>>
>>> Apologies, I thought the cases were very simple and clear.  Obviously I
>>> should have also mentioned I'm in Java land, not used to the script kiddy
>>> stuff :-)
>>>
>>> On the output side: thanks for the grouping "trick".  However, doesn't
>>> that mean that all rows will end up in a single in-memory Iterable?
>>>
>>> On the input side, reading a JSON or XML file shouldn't be black magic
>>> but apparently it's completely undocumented as far as I could tell.  Any
>>> tips there?
>>>
>>> Mind you, if it's not possible in the Beam API, feel free to let me know
>>> and I'll dive into the Beam source code to figure something out.
>>>
>>> Cheers,
>>>
>>> Matt
>>>
>>>
>>>
>>> ---
>>> Matt Casters <m <mcast...@pentaho.org>attcast...@gmail.com>
>>> Senior Solution Architect, Kettle Project Founder
>>>
>>>
>>>
>>> Op ma 7 jan. 2019 om 23:09 schreef Pablo Estrada <pabl...@google.com>:
>>>
>>>> Hi Matt,
>>>> is this computation running as part of a larger pipeline that does run
>>>> some parallel processing? Otherwise, it's odd that it needs to run on Beam.
>>>> Nonetheless, you can certainly do this with a pipeline that has a single
>>>> element. Here's what that looks like in python:
>>>>
>>>> p | beam.Create(['gs://myfile.json']) | beam.ParDo(LoadEachFile()) |
>>>> WriteToMyDatabase()
>>>>
>>>> If, on the other hand, you may have a PCollection with multiple
>>>> elements (e.g. filenames), and you want to process them one-by-one, you can
>>>> group them all on a single key, like so:
>>>>
>>>> my_filename_pcolll | GroupIntoSingleThread() |
>>>> beam.PArDo(LoadEachFile()) | WriteToMyDatabase()
>>>>
>>>> Where the GroupIntoSingleThread transform looks basically like so:
>>>>
>>>> input | beam.Map(lambda x: ('singlekey', x)) | beam.GroupByKey() |
>>>> beam.FlatMap(lambda x: x[1])
>>>>
>>>> In this example, we are adding a single key to all elements, grouping
>>>> them all together, and then throwing away the key, to get each of the
>>>> elements one-by-one in a single thread. You can do something similar using
>>>> side inputs (with AsIter(my_filename_pcoll)).
>>>>
>>>> Does that help? Or perhaps you could clarify a bit more about your use
>>>> case.
>>>> Best
>>>> -P.
>>>>
>>>> On Mon, Jan 7, 2019 at 1:33 PM Matt Casters <mattcast...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Beam!
>>>>>
>>>>> There's a bunch of stuff that I would like to support and it's
>>>>> probably something silly but I couldn't find it immediately ... or I'm
>>>>> completely dim and making too much of certain things.
>>>>>
>>>>> The thing is, sometimes you just want to do a single threaded
>>>>> operations.
>>>>> For example, we sometimes need to read generic JSON or XML documents
>>>>> or perform single threaded bulk loads into certain databases.
>>>>> There's also simple relational database data you might want to
>>>>> side-load or data from some web service somewhere.
>>>>>
>>>>> So, how can I instruct Beam not to fire up a bunch of readers or
>>>>> writers, what is a good alternative for ParDo?
>>>>>
>>>>> Thanks in advance for any suggestions!
>>>>>
>>>>> Matt
>>>>> ---
>>>>> Matt Casters <m <mcast...@pentaho.org>attcast...@gmail.com>
>>>>> Senior Solution Architect, Kettle Project Founder
>>>>>
>>>>>
>>>>>

Reply via email to