Re: Pattern for sharing a resource across all worker threads in Beam Java SDK

2020-04-30 Thread Cameron Bateman
I'm interested in this area too.  One limitation I guess is that this
assumes your runner is going to be single JVM if you need your singletons
to be globally unique.  I'm mostly using DirectRunner (I'm still new to all
this) for which this holds.  I suppose for more distributed runners this
would be a more challenging problem.

One tip I would give for your code is protect your singleton return
values.  For example, the Set that you return from
getOrCreateSingletonAllowedCities.
If you want that to be modifiable, you should wrap it using something like
Collections.synchronizedSet().  If you want it to be immutable, use
unmodifiableSet().  Note that even if the general problem of making these
singletons globally available is solved by the framework, you will still
need to make your singletons thread-safe.

--Cam

On Thu, Apr 30, 2020 at 12:45 PM Jeff Klukas  wrote:

> Beam Java users,
>
> I've run into a few cases where I want to present a single thread-safe
> data structure to all threads on a worker, and I end up writing a good bit
> of custom code each time involving a synchronized method that handles
> creating the resource exactly once, and then each thread has its own
> reference to the singleton. I don't have extensive experience with thread
> safety in Java, so it seems likely I'm going to get this wrong.
>
> Are there any best practices for state that is shared across threads? Any
> prior art I can read up on?
>
> The most concrete case I have in mind is loading a GeoIP database for
> doing city lookups from IP addresses. We're using MaxMind's API which
> allows mapping a portion of memory to a file sitting on disk. We have a
> synchronized method that checks if the reader has been initialized [0] ; if
> not, we copy the database file from GCS to local disk, build the
> DatabaseReader instance, and return it. Other threads will see the
> already-initialized and just get a reference to it instead.
>
> This all appears to work, and it saves memory compared to each thread
> maintaining their own DatabaseReader. But is there a safer or more built-in
> way to do this? Am I missing relevant hooks in the Beam API that would make
> this cleaner?
>
> [0]
> https://github.com/mozilla/gcp-ingestion/blob/master/ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/GeoCityLookup.java#L95
>


Re: Input File Tracking

2020-04-13 Thread Cameron Bateman
Thanks Vincent.  I looked briefly at Kafka.  I might revisit that, but the
learning curve looks large and it would probably be over-kill at the scale
I'm at with this project.  My intake right now is a few files a day that
reduce to a few kilobytes worth of data.  I have future projects that
involve a lot more files in a similar scenario, so I will revisit Kafka
then.

Thanks,

Cameron

On Mon, Apr 13, 2020 at 5:28 PM Vincent Marquez 
wrote:

> On first glance it sounds like a problem for a persistent queue such as
> Kafka or Google Cloud's pubsub.  You could write a path to the queue upon
> download, which would trigger Beam to read the file and then bump the
> offset only upon completion of the read to the queue.  If the read of the
> file fails, the offset won't get committed, so it should be 'at least once'
> semantics.  Just remember, unless you have unlimited memory/disk there's
> not really such a thing as 'exactly once', but it sounds like for your case
> you'd prefer 'at least once' vs. 'at most once'.
>
> On Mon, Apr 13, 2020 at 4:53 PM Cameron Bateman 
> wrote:
>
>> I have a use case where I'm regularly polling for and downloading data
>> files from a public (government) web site.  I then intake these files from
>> a directory and pass them through a Beam pipeline with the data ultimately
>> being deposited into a database.
>>
>> As the files come in, I would like to track them somewhere like a
>> database perhaps with a checksum and some other metadata.  When an intake
>> through the pipeline succeeds, I would like to archive the file and delete
>> it from the main intake directory.  When an intake on the pipeline fails, I
>> would like to keep the file, mark at as an error in that database and
>> either leave it at the intake dir or move it to another location for me to
>> fix the problem etc.
>>
>> Is there a framework that does something like this, ideally one with Beam
>> integration?  This seems like a common scenario (in a prior life, I did
>> this sort of thing for a customer who sent CSV files once a day to a drop
>> location, which we then processed).  Yet I've always ended up writing
>> something custom.  Maybe I'm just using the wrong Google criteria.
>>
>> Thanks,
>>
>> Cameron
>>
>
>
> --
> *~Vincent*
>


Input File Tracking

2020-04-13 Thread Cameron Bateman
I have a use case where I'm regularly polling for and downloading data
files from a public (government) web site.  I then intake these files from
a directory and pass them through a Beam pipeline with the data ultimately
being deposited into a database.

As the files come in, I would like to track them somewhere like a database
perhaps with a checksum and some other metadata.  When an intake through
the pipeline succeeds, I would like to archive the file and delete it from
the main intake directory.  When an intake on the pipeline fails, I would
like to keep the file, mark at as an error in that database and either
leave it at the intake dir or move it to another location for me to fix the
problem etc.

Is there a framework that does something like this, ideally one with Beam
integration?  This seems like a common scenario (in a prior life, I did
this sort of thing for a customer who sent CSV files once a day to a drop
location, which we then processed).  Yet I've always ended up writing
something custom.  Maybe I'm just using the wrong Google criteria.

Thanks,

Cameron


Re: Handling imperfect data

2020-04-10 Thread Cameron Bateman
Thanks Varun, that worked.  A small note for anyone following this is that
the API seems to have changed slightly since the blog was written.  In
particular, processElement is no longer a method of the DoFn parent as of
recent versions. Instead, it is referenced via the @ProcessElement
annotation.  Check the up to date API for more info.

On Tue, Apr 7, 2020 at 11:05 PM Varun Dhussa  wrote:

> TupleTags is a good way to proceed. You can add a dead letter side output
> for the tag. A sample implementation is here
> <https://cloud.google.com/blog/products/gcp/handling-invalid-inputs-in-dataflow>
> .
>
> Varun
>
>
> On Wed, Apr 8, 2020 at 11:00 AM Cameron Bateman 
> wrote:
>
>> I am trying to create a pipeline that intakes PDF files, parses the data
>> using Tika and processes the data.  A problem I have is that sometimes Tika
>> doesn't perfectly convert certain pieces of text correctly.
>>
>> I can detect that this and would like to fork the output of my pipeline:
>> for correctly converted PDF files, I want to continue processing the data.
>> For the ones that have errors, I'd like to dump the intermediate XML data
>> to a directory and raise an alert.  For those files, I will go and manually
>> fix the file and effective restart the pipeline from where it failed as if
>> it was correct in the first place.
>>
>> Is there any facility to do this sort of handling of imperfect data
>> inputs?  I see that I can try to use MultiOutputReceiver and TupleTags to
>> try to fork the data but I'm a little at a loss where to proceed.
>>
>> Thanks,
>> Cameron
>>
>


Handling imperfect data

2020-04-07 Thread Cameron Bateman
I am trying to create a pipeline that intakes PDF files, parses the data
using Tika and processes the data.  A problem I have is that sometimes Tika
doesn't perfectly convert certain pieces of text correctly.

I can detect that this and would like to fork the output of my pipeline:
for correctly converted PDF files, I want to continue processing the data.
For the ones that have errors, I'd like to dump the intermediate XML data
to a directory and raise an alert.  For those files, I will go and manually
fix the file and effective restart the pipeline from where it failed as if
it was correct in the first place.

Is there any facility to do this sort of handling of imperfect data
inputs?  I see that I can try to use MultiOutputReceiver and TupleTags to
try to fork the data but I'm a little at a loss where to proceed.

Thanks,
Cameron