Re: Input File Tracking

2020-04-13 Thread Vincent Marquez
On first glance it sounds like a problem for a persistent queue such as
Kafka or Google Cloud's pubsub.  You could write a path to the queue upon
download, which would trigger Beam to read the file and then bump the
offset only upon completion of the read to the queue.  If the read of the
file fails, the offset won't get committed, so it should be 'at least once'
semantics.  Just remember, unless you have unlimited memory/disk there's
not really such a thing as 'exactly once', but it sounds like for your case
you'd prefer 'at least once' vs. 'at most once'.

On Mon, Apr 13, 2020 at 4:53 PM Cameron Bateman 
wrote:

> I have a use case where I'm regularly polling for and downloading data
> files from a public (government) web site.  I then intake these files from
> a directory and pass them through a Beam pipeline with the data ultimately
> being deposited into a database.
>
> As the files come in, I would like to track them somewhere like a database
> perhaps with a checksum and some other metadata.  When an intake through
> the pipeline succeeds, I would like to archive the file and delete it from
> the main intake directory.  When an intake on the pipeline fails, I would
> like to keep the file, mark at as an error in that database and either
> leave it at the intake dir or move it to another location for me to fix the
> problem etc.
>
> Is there a framework that does something like this, ideally one with Beam
> integration?  This seems like a common scenario (in a prior life, I did
> this sort of thing for a customer who sent CSV files once a day to a drop
> location, which we then processed).  Yet I've always ended up writing
> something custom.  Maybe I'm just using the wrong Google criteria.
>
> Thanks,
>
> Cameron
>


-- 
*~Vincent*


Re: Global window with Bounded Source

2020-04-16 Thread Vincent Marquez
I actually ran into the same issue, and would love some guidance!

 I had a list of avro files within folders in GCS, each folder
 representing a single day, and I needed to de-dupe events per day (by a
key).  I didn't want a GroupByKey to hold billions of events when it didn't
matter, so I added a timestamp to each folder, then tried windowing.   I
thought perhaps the windowed event passed into the ReadAll would mean
windows could proceed down the pipeline without having to load the entire
batch of files into memory.

I was wrong, and saw the same behavior of Neha.  Is there a better way of
doing this?  I don't see a *technical* reason why the dataflow runner
couldn't be implemented to have this behavior, but I could be mistaken.


*~Vincent*


On Thu, Apr 16, 2020 at 10:13 AM Neha Sharma 
wrote:

> Hi Luke,
>
> It is the order the record appears in the source file.
>
> Basically each record corresponding to a key depends on the previous
> occurrence of the same key and hence parallel processing does not seem to
> be a good idea.
>
> Is there a possibility where bounded source + fixed window based on the
> timestamp in the record can be used to somehow batch the whole data into
> smaller chunks for processing and at the same time can maintain the
> ordering provided a sorting based on the timestamp?
>
> Something like this:
>
> Read from Bounded Source ->
> Fixed window to make smaller batches ->
> Sorting based on timestamp ->
> Processing
>
>
> Regards,
> Neha
>
>
> On Thu, Apr 16, 2020, 6:57 PM Luke Cwik  wrote:
>
>> What do you mean by in sequential order, order across files, keys, ...?
>> Is this an ordering that is based on data such as a timestamp of the
>> record or the order in which the records appear in the source files?
>> Do you have a lot of keys or very few?
>>
>> If you want to process all the data across all the files in sequential
>> order with no parallelism then Apache Beam may not provide much value since
>> its basis is all about parallel data processing.
>>
>> On Wed, Apr 15, 2020 at 10:30 PM Neha Sharma 
>> wrote:
>>
>>> Hello,
>>>
>>> I have a use case where I have a bounded source and I am reading Avro
>>> files from Google Cloud Storage. I am also using group by transform.The
>>> amount of data is huge and I need to process the data in sequential order.
>>>
>>> But as Bounded source reads everything it seemed to be a good idea fixed
>>> window on top of the global window. But it does not seem to be working as
>>> expected.
>>>
>>> Can you please tell me how to handle such scenarios where a bounded
>>> source with large dataset can be broken down into smaller chunks for
>>> processing, using windows such that the window for a key should always be
>>> processed in order.
>>>
>>>
>>> Regards,
>>> Neha
>>>
>>


Re: Doubts on Looping inside a beam transform. Processing sequentially using Apache Beam

2020-12-15 Thread Vincent Marquez
Hi Feba,  I can't say for sure *where* your pipeline is running out of
memory, but I'm going to guess that it's due to the fact that CassandraIO
currently only has the ability to read up an entire table, or have a single
query attached.  So if you are calling CassandraIO.read() that grabs all
the "user data", it's going to load it up as much as possible.

I have a pull request to add a readAll() method to CassandraIO that should
allow you to do what you want.  Ismeal and I have been working on it on and
off for quite some time but hoping to get it merged in this month.  The way
readAll works is that it receives as INPUT what query/data needs to be
retrieved from Cassandra, so it can then be used beyond just the first part
of the pipeline.  We are using this quite a lot (from my branch) at my
current gig for when we have pipelines similar to yours.  Here is what it
would look like:

CassanraIO.read() --->  MapElements into a query for readAll()  --->
cassandraIO.readAll()  ---> aggregation of user data ---> output


This way if you only have one thread doing the aggregation of user data,
you'll in effect only be doing one user at a time.  I'm not sure when
exactly readAll will be merged in, but you could also write your own
connector that does something similar by copying my code (or taking
inspiration from it, etc).

*~Vincent*


On Tue, Dec 15, 2020 at 1:11 AM Feba Fathima 
wrote:

>
> Hi,
>
>We are creating a beam pipeline to do batch processing of data
> bundles. The pipeline reads records using CassandraIO. We want to process
> the data in batches of 30 min then group/stitch 30 min data and write it to
> another table. I have 300 bundles for each employee and we need to process
> at least process 50 employees using the limited resources(~2Gi). But
> currently the heap usage is very high so that we are only able to process 1
> employee(with ~4Gi). if we give more data we are getting Out of memory/Heap
> errors.
>
> Is there a way to process 1 employee at a time. Like a loop so that we can
> process all employees sequentially with our ~2Gi.
>
> We have also posted the same question on Stack Overflow and did not get a
> help till now either.
>
>
> https://stackoverflow.com/questions/65274909/looping-inside-a-beam-transform-process-sequentially-using-apache-beam
>
> Kindly guide us through this if someone is familiar with the scenario.
>
> --
> Thanks & Regards,
> Feba Fathima
>
>
>


KafkaIO.read without dataloss?

2021-02-22 Thread Vincent Marquez
Forgive me for the long email I figured more details was better, also asked
on SO if you prefer there:

https://stackoverflow.com/questions/66325929/dataflow-reading-from-kafka-without-data-loss




We're currently big users of Beam/Dataflow batch jobs and wanting to start
using streaming if it can be done reliably.

Here is a common scenario: We have a very large Kafka topic that we need to
do some basic ETL or aggregation on and a non idempotent upstream queue.
Here is an example of our Kafka data:

ID | msg | timestamp (mm,ss)
---
  1| A   |  01:00
  2| B   |  01:01
  3| D   |  06:00
  4| E   |  06:01
  4.3  | F   |  06:01
   | ..  | .. (millions more)
  4.5  |ZZ   |  19:58

Oops, the data changes from integers to decimals at some point, which will
eventually cause some elements to fail, necessitating us to kill the
pipeline, possibly modify the downstream service, and possibly make minor
code changes to the Dataflow pipeline.

In Spark Structured Streaming, because of the ability to use external
checkpoints, we would be able to restart a streaming job and resume
processing the queue where the previous job left off (successfully
processing) for exactly once processing. In a vanilla or spring boot Java
Application we could loop through with a Kafka consumer, and only after
writing results to our 'sink', commit offsets.

My overall question is *can we achieve similar functionality in
Dataflow/Beam?* I'll list some of my assumptions and concerns:

   1. It seems here in KafkaIO
   

   there is not a relationship between the offset commit PCollection and the
   User's one, does that mean they can drift apart?
   2. It seems here in KafkaOffsetCommit
   

   this is taking a window of five minutes and emitting the highest offset,
   but this is *not wall time*, this is kafka record time. Going back to
   our sample data, to me it looks like the entire queue's offset would be
   committed (in chunks of five minutes) as fast as possible!  *This means
   that if we have only finished processing up to record F in the first five
   minutes, we may have committed almost the entire queue's offests?*

Now in our scenario our Pipeline started failing around F, it seems our
only choice is to start from the beginning or lose data? I believe this
might be overcome with a lot of custom code (Custom DoFn to ensure the
Kafka Consumer never commits) and some custom code for our upstream sink
that would eventually commit offsets. Is there a better way to do this,
and/or are some my assumptions wrong about how offset management is handled
in Beam/Dataflow?
Thanks in advance for any help/pointers/ideas!

*~Vincent*


Write to multiple IOs in linear fashion

2021-03-23 Thread Vincent Marquez
I have a common use case where my pipeline looks like this:
CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write

I do NOT want my pipeline to look like the following:

CassandraIO.readAll -> Aggregate -> CassandraIO.write
 |
  -> PubsubIO.write

Because I need to ensure that only items written to Pubsub have
successfully finished a (quorum) write.

Since CassandraIO.write is a PTransform I can't actually use it
here so I often roll my own 'writer', but maybe there is a recommended way
of doing this?

Thanks in advance for any help.

*~Vincent*


Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Vincent Marquez
No, it only needs to ensure that one record seen on Pubsub has successfully
written to a database.  So "record by record" is fine, or even "bundle".

*~Vincent*


On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko 
wrote:

> Do you want to wait for ALL records are written for Cassandra and then
> write all successfully written records to PubSub or it should be performed
> "record by record"?
>
> On 24 Mar 2021, at 04:58, Vincent Marquez 
> wrote:
>
> I have a common use case where my pipeline looks like this:
> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
>
> I do NOT want my pipeline to look like the following:
>
> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>  |
>   -> PubsubIO.write
>
> Because I need to ensure that only items written to Pubsub have
> successfully finished a (quorum) write.
>
> Since CassandraIO.write is a PTransform I can't actually use it
> here so I often roll my own 'writer', but maybe there is a recommended way
> of doing this?
>
> Thanks in advance for any help.
>
> *~Vincent*
>
>
>


Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Vincent Marquez
*~Vincent*


On Wed, Mar 24, 2021 at 10:01 AM Reuven Lax  wrote:

> Does that work if cassandra returns a PDone?
>

No, it doesn't work.  I wrote my own CassandraIO.Write that is a
PTransform, PCollection> instead.

I'm just asking if there's a better way of doing this because I'm having to
do this with multiple types of Writers, and don't want to have to hand roll
my own Write for each IO type I need this pattern for.


>
> On Wed, Mar 24, 2021 at 10:00 AM Chamikara Jayalath 
> wrote:
>
>> If you want to wait for all records are written (per window) to Cassandra
>> before writing that window to PubSub, you should be able to use the Wait
>> transform:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Wait.java
>>
>> Thanks,
>> Cham
>>
>> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> Do you want to wait for ALL records are written for Cassandra and then
>>> write all successfully written records to PubSub or it should be performed
>>> "record by record"?
>>>
>>> On 24 Mar 2021, at 04:58, Vincent Marquez 
>>> wrote:
>>>
>>> I have a common use case where my pipeline looks like this:
>>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
>>>
>>> I do NOT want my pipeline to look like the following:
>>>
>>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>  |
>>>   ->
>>> PubsubIO.write
>>>
>>> Because I need to ensure that only items written to Pubsub have
>>> successfully finished a (quorum) write.
>>>
>>> Since CassandraIO.write is a PTransform I can't actually use
>>> it here so I often roll my own 'writer', but maybe there is a recommended
>>> way of doing this?
>>>
>>> Thanks in advance for any help.
>>>
>>> *~Vincent*
>>>
>>>
>>>


Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Vincent Marquez
t now would be backwards incompatible. PRs to add non-PDone
>>>> returning variants (probably as another option to the builders) that
>>>> compose well with Wait, etc. would be welcome.
>>>> >
>>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>>> aromanenko@gmail.com> wrote:
>>>> >>
>>>> >> In this way, I think “Wait” PTransform should work for you but, as
>>>> it was mentioned before, it doesn’t work with PDone, only with PCollection
>>>> as a signal.
>>>> >>
>>>> >> Since you already adjusted your own writer for that, it would be
>>>> great to contribute it back to Beam in the way as it was done for other IOs
>>>> (for example, JdbcIO [1] or BigtableIO [2])
>>>> >>
>>>> >> In general, I think we need to have it for all IOs, at least to use
>>>> with “Wait” because this pattern it's quite often required.
>>>> >>
>>>> >> [1]
>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>>> >> [2]
>>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>>> >>
>>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez 
>>>> wrote:
>>>> >>
>>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>>> successfully written to a database.  So "record by record" is fine, or even
>>>> "bundle".
>>>> >>
>>>> >> ~Vincent
>>>> >>
>>>> >>
>>>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>>>> aromanenko@gmail.com> wrote:
>>>> >>>
>>>> >>> Do you want to wait for ALL records are written for Cassandra and
>>>> then write all successfully written records to PubSub or it should be
>>>> performed "record by record"?
>>>> >>>
>>>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez <
>>>> vincent.marq...@gmail.com> wrote:
>>>> >>>
>>>> >>> I have a common use case where my pipeline looks like this:
>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>>>> PubSubIO.write
>>>> >>>
>>>> >>> I do NOT want my pipeline to look like the following:
>>>> >>>
>>>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>> >>>  |
>>>> >>>   ->
>>>> PubsubIO.write
>>>> >>>
>>>> >>> Because I need to ensure that only items written to Pubsub have
>>>> successfully finished a (quorum) write.
>>>> >>>
>>>> >>> Since CassandraIO.write is a PTransform I can't actually
>>>> use it here so I often roll my own 'writer', but maybe there is a
>>>> recommended way of doing this?
>>>> >>>
>>>> >>> Thanks in advance for any help.
>>>> >>>
>>>> >>> ~Vincent
>>>> >>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>


Re: Checkpointing Dataflow Pipeline

2021-04-07 Thread Vincent Marquez
Looks like this is a common source of confusion, I had similar questions
about checkpointing in the beam slack.

In Spark Structured Streaming, checkpoints are saved to an *external* HDFS
location and persist *beyond* each run, so in the event of a stream
crashing, you can just point your next execution of the stream to the
checkpoint location.  Kafka  (or Kinesis/Redis Stream etc) offsets are
persisted in the checkpoint, so the stream would resume off of the last
committed checkpoint location.

It doesn't seem Beam has an external checkpoint that persists beyond a
single stream execution, so in Beam with Kinesis I believe you'll have to
manage your own offsets deliberately with an external source if you want to
achieve 'exactly once' semantics in the event of shutting down a stream and
 resuming it at a later point.

In Kafka you don't need this since as long as we ensure our offsets are
committed in finalization of a bundle, the offsets for a particular group
id are stored on the server.


On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles  wrote:

> This sounds similar to the "Kafka Commit" in
> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
>  and also to how PubsubIO ACKs messages in the
> finalizer. I don't know much about KinesisIO or how Kinesis works. I was
> just asking to clarify, in case other folks know more, like +Alexey
> Romanenko  and +Ismaël Mejía  
> have
> modified KinesisIO. If the feature does not exist today, perhaps we can
> identify the best practices around this pattern.
>
> Kenn
>
> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey  wrote:
>
>> Hi Kenn,
>>
>> yes, resuming reading at the proper timestamp is exactly the issue we are
>> currently struggling with. E.g. with Kinesis Client Lib we could store the
>> last read within some dynamo table. This mechanism is not used with beam,
>> as we understand, the runner is responsible to track that checkpoint mark.
>>
>> Now, obviously on restarting the pipeline, e.g. on non compatible
>> upgrade, that is, an pipeline update is just not feasible, there must be
>> some mechanism in place on how Dataflow will know where to continue. Is
>> that simply the pipeline name? Or is there more involved? So how does
>> checkpointing actually work here?
>>
>> Based on 'name', wouldn't that imply that something like (example taken
>> from
>> https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates
>> )
>>
>>   export REGION="us-central1"
>>
>>   gcloud dataflow flex-template run "streaming-beam-sql-`date 
>> +%Y%m%d-%H%M%S`" \
>> --template-file-gcs-location "$TEMPLATE_PATH" \
>> --parameters inputSubscription="$SUBSCRIPTION" \
>> --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
>> --region "$REGION"
>>
>> will not resume on last read on rerun, because the name obviously changes
>> here?
>>
>> best,
>>
>> michel
>>
>>
>>
>> On Tue, Apr 6, 2021 at 10:38 PM Kenneth Knowles  wrote:
>>
>>> I would assume the main issue is resuming reading from the Kinesis
>>> stream from the last read? In the case for Pubsub (just as another example
>>> of the idea) this is part of the internal state of a pre-created
>>> subscription.
>>>
>>> Kenn
>>>
>>> On Tue, Apr 6, 2021 at 1:26 PM Michael Luckey 
>>> wrote:
>>>
 Hi list,

 with our current project we are implementing our streaming pipeline
 based on Google Dataflow.

 Essentially we receive input via Kinesis, doing some filtering,
 enrichment and sessionizing and output to PubSub and/or google storage.

 After short investigations it is not clear to us, how checkpointing
 will work running on Dataflow in connection with KinesisIO. Is there any
 documentation/discussions to get a better understanding on how that will be
 working? Especially if we are forced to restart our pipelines, how could we
 ensure not to loose any events?

 As far as I understand currently, it should work 'auto-magically' but
 it is not yet clear to us, how it will actually behave. Before we try to
 start testing our expectations or even try to implement some
 watermark-tracking by ourself we hoped to get some insights from other
 users here.

 Any help appreciated.

 Best,

 michel

>>>

~Vincent


Re: Checkpointing Dataflow Pipeline

2021-04-07 Thread Vincent Marquez
On Wed, Apr 7, 2021 at 11:55 AM Kenneth Knowles  wrote:

> [I think this has graduated to a +dev  thread]
>
> Yea, in Beam it is left up to the IOs primarily, hence the bundle
> finalization step, or allowed runners to have their own features of course.
> Dataflow also does have in-place pipeline update that restores the
> persisted checkpoints from one pipeline to another - same basic
> mechanism/idea as Spark Structured Streaming but different overall
> workflow. +Reuven Lax  has put a lot of thought into
> updating, checkpointing, resuming, etc. Runners differ a lot in these
> areas. Is there something that should graduate from runner-specific to the
> Beam model?
>
> Kenn
>

In some ways, having an external checkpoint mechanism makes programming IOs
simpler.  Let's use Redis Streams as an example, as our company recently
implemented a RedisStreamIO internally so the details are fresh.

One requirement was the need to be able to shut down a streaming Beam
Pipeline, and then restart it from a later point in time without lost data
and without starting from the beginning of time.

This meant that I need to ensure only elements that are finished processing
in a bundle are committed as 'processed' back to the redis server, which I
accomplished by keeping track of all the elements that are outputted, then
on finalizeCheckpoint, which I *assume* happens at the end of a bundle but
I'm fuzzy on details, send those element IDs back to the server as consumed
(with Redis XACK).

If instead Beam would let you persist checkpoints externally and allow a
pipeline to bootstrap off of the already existing checkpoint, I simply have
to keep track *in the checkpoint* of the last element ID read, and can use
that as the starting offset.  I would then be able to 'eager ack' read
messages and not worry about delaying commits until elements are outputted
further down the pipeline etc, since if an element is read into a
checkpoint, we know it is recoverable.

This also makes life a lot easier for anything regarding Kinesis since the
Kinesis servers don't have a way of managing offsets/last element read
(from when I used it?, maybe changed), unlike Kafka, Pubsub, Redis Streams,
etc.

Hopefully this makes sense, and if I have some misunderstandings I'd love
to learn more.  This general subject has come up a few times in the beam
slack so I think at the very least some extra documentation on these types
of use cases might be welcome.




>
> On Wed, Apr 7, 2021 at 11:28 AM Vincent Marquez 
> wrote:
>
>> Looks like this is a common source of confusion, I had similar questions
>> about checkpointing in the beam slack.
>>
>> In Spark Structured Streaming, checkpoints are saved to an *external*
>> HDFS location and persist *beyond* each run, so in the event of a stream
>> crashing, you can just point your next execution of the stream to the
>> checkpoint location.  Kafka  (or Kinesis/Redis Stream etc) offsets are
>> persisted in the checkpoint, so the stream would resume off of the last
>> committed checkpoint location.
>>
>> It doesn't seem Beam has an external checkpoint that persists beyond a
>> single stream execution, so in Beam with Kinesis I believe you'll have to
>> manage your own offsets deliberately with an external source if you want to
>> achieve 'exactly once' semantics in the event of shutting down a stream and
>>  resuming it at a later point.
>>
>> In Kafka you don't need this since as long as we ensure our offsets are
>> committed in finalization of a bundle, the offsets for a particular group
>> id are stored on the server.
>>
>>
>> On Tue, Apr 6, 2021 at 3:13 PM Kenneth Knowles  wrote:
>>
>>> This sounds similar to the "Kafka Commit" in
>>> https://github.com/apache/beam/pull/12572 by +Boyuan Zhang
>>>  and also to how PubsubIO ACKs messages in the
>>> finalizer. I don't know much about KinesisIO or how Kinesis works. I was
>>> just asking to clarify, in case other folks know more, like +Alexey
>>> Romanenko  and +Ismaël Mejía
>>>  have modified KinesisIO. If the feature does not
>>> exist today, perhaps we can identify the best practices around this pattern.
>>>
>>> Kenn
>>>
>>> On Tue, Apr 6, 2021 at 1:59 PM Michael Luckey 
>>> wrote:
>>>
>>>> Hi Kenn,
>>>>
>>>> yes, resuming reading at the proper timestamp is exactly the issue we
>>>> are currently struggling with. E.g. with Kinesis Client Lib we could store
>>>> the last read within some dynamo table. This mechanism is not used with
>>>> beam, as we understand, the runner is responsible to track that

Re: File processing triggered from external source

2021-05-25 Thread Vincent Marquez
On Tue, May 25, 2021 at 11:14 AM Sozonoff Serge  wrote:

> Hi,
>
> Thanks for the clarification.
>
> What is an issue with applying windowing/triggering strategy for your case?
>
>
> The problem was actually not the trigger but the whole approach I took.
>
>
> I guess fundamentally the whole issue for me boils down to the fact the
> with bound pipelines we have quite a few approaches which can be taken to
> enrich data and with unbound pipelines we have very few. Obviously in a
> bound pipeline you dont really need to worry about refreshing your
> enriching data either since its all built when the pipeline launches.
>
> So, I had this perfectly working batching pipeline and everything fell
> apart when it became unbound. In an ideal world we could mix an unbound
> pipeline with a bound pipeline. The requirement was fairly simple, process
> a bunch of CSV lines when a new file arrives. The only unbound element in
> this pipeline is when the file arrives and what its path and name are. From
> the moment the file becomes available everything else is batch processing.
>
> This left me looking at options for streaming pipelines.
>
> The Side input approach seemed to fit the best but since I needed a
> refreshing side input I quickly stumbled over the fact that you can't use
> the Beam connectors for this and you need to write your own code to fetch
> the data!! To me it made no sense, Beam has all these IO connectors but I
> cant use them for my side input!
>
> It could be that with IO connectors which implement ReadAll my statement
> is no longer true (I did not find any examples), I have not tested it but
> in any case I would have needed DynamoDBIO which does not implement ReadAll.
>

I have a long standing MR that implements readAll for the Cassandra API,
and our company is using it in production for both batch and streaming
jobs.  We use it in streaming mode to enrich data being read off of a redis
stream (similar to a Kafka stream).

I'm hoping it gets merged soon.  It shouldn't be hard to add similar
functionality to the DynamoDB client.


So after having spent the weekend playing around with various thoughts and
> ideas I did end up coding a lookup against DynamoDB in a DoFn, using the
> AWS SDK :-)
>
> Kind Thanks,
> Serge
>
>
>
>
> On 25 May 2021 at 18:18:31, Alexey Romanenko (aromanenko@gmail.com)
> wrote:
>
> You don’t need to use windowing strategy or aggregation triggers for a
> pipeline with bounded source to perform GbK-like transforms, but since you
> started to use unbounded source then your pcollections became unbounded and
> you need to do that. Otherwise, it’s unknown at which point of time your
> GbK transforms will have all data arrived to process it (in theory, it will
> never happened because of “unbounded” definition).
>
> What is an issue with applying windowing/triggering strategy for your case?
>
> —
> Alexey
>
> On 24 May 2021, at 10:25, Sozonoff Serge  wrote:
>
> Hi,
>
> Referring to the explanation found at the following link under (Stream
> processing triggered from an external source)
>
> https://beam.apache.org/documentation/patterns/file-processing/
>
>
> While implementing this solution I am trying to figure out how to deal
> with the fact that my pipeline, which was bound, has now become unbound. It
> exposes me to windowing/triggering concerns which I did not have de deal
> with before and in essence are unnecessary since I am still fundamentally
> dealing with bound data. The only reason I have an unbound source involved
> is as a trigger and provider of the file to be processed.
>
> Since my pipeline uses GroupByKey transforms I get the following error.
>
> Exception in thread "main" java.lang.IllegalStateException: GroupByKey
> cannot be applied to non-bounded PCollection in the GlobalWindow without a
> trigger. Use a Window.into or Window.triggering transform prior to
> GroupByKey.
>
> Do I really need to add windowing/triggering semantics to the PCollections
> which are built from bound data ?
>
> Thanks for any pointers.
>
> Serge
>
>
> ~Vincent


Re: Is there a way (seetings) to limit the number of element per worker machine

2021-06-02 Thread Vincent Marquez
On Wed, Jun 2, 2021 at 11:11 AM Robert Bradshaw  wrote:

> If you want to control the total number of elements being processed
> across all workers at a time, you can do this by assigning random keys
> of the form RandomInteger() % TotalDesiredConcurrency followed by a
> GroupByKey.
>
> If you want to control the number of elements being processed in
> parallel per VM, you can use the fact that Dataflow assigns one work
> item per core, so an n1-standard-4 would process 4 elements in
> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
>
> You could also control this explicitly by using a global (per worker)
> semaphore in your code. If you do this you may want to proceed your
> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
> distribution. This should be much easier than trying to coordinate
> multiple parallel pipelines.
>
>
Is there a risk here of having an OOM error due to 'build up' of in memory
elements from a streaming input?  Or do the runners have some concept of
throttling bundles based on progress of stages further down the pipeline?




> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
>  wrote:
> >
> > Thanks Robert.
> > I found the following explanation for the number of threads for 4 cores:
> > You have 4 CPU sockets, each CPU can have, up to, 12 cores and each core
> can have two threads. Your max thread count is, 4 CPU x 12 cores x 2
> threads per core, so 12 x 4 x 2 is 96
> > Can I limit the threads using the pipeline options in some way? 10-20
> elements per worker will work for me.
> >
> > My current practice to work around that issue is to limit the number of
> elements in each dataflow pipeline (providing ~10 elements for each
> pipeline)
> > Once I have completed around 200 elements processing = 20 pipelines
> (google does not allow more than 25 dataflow pipelines per region) with 10
> elements each, I am launching the next 20 pipelines.
> >
> > This is ofcourse missing the benefit of serverless.
> >
> > Any idea, how to work around this?
> >
> > Best,
> > Eila
> >
> >
> > On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw 
> wrote:
> >>
> >> Note that workers generally process one element per thread at a time.
> The number of threads defaults to the number of cores of the VM that you're
> using.
> >>
> >> On Mon, May 17, 2021 at 10:18 AM Brian Hulette 
> wrote:
> >>>
> >>> What type of files are you reading? If they can be split and read by
> multiple workers this might be a good candidate for a Splittable DoFn (SDF).
> >>>
> >>> Brian
> >>>
> >>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research <
> e...@orielresearch.org> wrote:
> 
>  Hi,
>  I am running out of resources on the workers machines.
>  The reasons are:
>  1. Every pcollection is a reference to a LARGE file that is copied
> into the worker
>  2. The worker makes calculations on the copied file using a software
> library that consumes memory / storage / compute resources
> 
>  I have changed the workers' CPUs and memory size. At some point, I am
> running out of resources with this method as well
>  I am looking to limit the number of pCollection / elements that are
> being processed in parallel on each worker at a time.
> 
>  Many thank for any advice,
>  Best wishes,
>  --
>  Eila
> 
>  Meetup
> >
> >
> >
> > --
> > Eila
> >
> > Meetup
>


~Vincent


Re: Is there a way (seetings) to limit the number of element per worker machine

2021-06-02 Thread Vincent Marquez
On Wed, Jun 2, 2021 at 11:27 AM Robert Bradshaw  wrote:

> On Wed, Jun 2, 2021 at 11:18 AM Vincent Marquez
>  wrote:
> >
> > On Wed, Jun 2, 2021 at 11:11 AM Robert Bradshaw 
> wrote:
> >>
> >> If you want to control the total number of elements being processed
> >> across all workers at a time, you can do this by assigning random keys
> >> of the form RandomInteger() % TotalDesiredConcurrency followed by a
> >> GroupByKey.
> >>
> >> If you want to control the number of elements being processed in
> >> parallel per VM, you can use the fact that Dataflow assigns one work
> >> item per core, so an n1-standard-4 would process 4 elements in
> >> parallel, an n1-highmem-2 would process 2 elements in parallel, etc.
> >>
> >> You could also control this explicitly by using a global (per worker)
> >> semaphore in your code. If you do this you may want to proceed your
> >> rate-limited DoFn with a Reshuffle to ensure fair (and dynamic) work
> >> distribution. This should be much easier than trying to coordinate
> >> multiple parallel pipelines.
> >>
> >
> > Is there a risk here of having an OOM error due to 'build up' of in
> memory elements from a streaming input?  Or do the runners have some
> concept of throttling bundles based on progress of stages further down the
> pipeline?
>
> For streaming pipelines, hundreds of threads (aka work items) are
> allocated for each worker, so limiting the number of concurrent items
> per worker is harder there.
>
>
Hmm, I did notice this today, that many many many DoFns are instantiated in
a streaming job compared to how many I expected.   This seems like it would
cause all sorts of problems.  For instance, if one were to use the readAll
for say JDBC or Redis or any number of connectors, each of which sets up
connections to some endpoint, a single worker could have hundreds or
thousands of JDBC connections?  I would think this would definitely make
some of the readAll transforms less usable in a streaming pipeline if
scaling out the number of workers would overload the source machines.

Is this behavior documented somewhere?  Is this true for all runners?

--Vincent




> >> On Fri, May 28, 2021 at 5:16 AM Eila Oriel Research
> >>  wrote:
> >> >
> >> > Thanks Robert.
> >> > I found the following explanation for the number of threads for 4
> cores:
> >> > You have 4 CPU sockets, each CPU can have, up to, 12 cores and each
> core can have two threads. Your max thread count is, 4 CPU x 12 cores x 2
> threads per core, so 12 x 4 x 2 is 96
> >> > Can I limit the threads using the pipeline options in some way? 10-20
> elements per worker will work for me.
> >> >
> >> > My current practice to work around that issue is to limit the number
> of elements in each dataflow pipeline (providing ~10 elements for each
> pipeline)
> >> > Once I have completed around 200 elements processing = 20 pipelines
> (google does not allow more than 25 dataflow pipelines per region) with 10
> elements each, I am launching the next 20 pipelines.
> >> >
> >> > This is ofcourse missing the benefit of serverless.
> >> >
> >> > Any idea, how to work around this?
> >> >
> >> > Best,
> >> > Eila
> >> >
> >> >
> >> > On Mon, May 17, 2021 at 1:27 PM Robert Bradshaw 
> wrote:
> >> >>
> >> >> Note that workers generally process one element per thread at a
> time. The number of threads defaults to the number of cores of the VM that
> you're using.
> >> >>
> >> >> On Mon, May 17, 2021 at 10:18 AM Brian Hulette 
> wrote:
> >> >>>
> >> >>> What type of files are you reading? If they can be split and read
> by multiple workers this might be a good candidate for a Splittable DoFn
> (SDF).
> >> >>>
> >> >>> Brian
> >> >>>
> >> >>> On Wed, May 12, 2021 at 6:18 AM Eila Oriel Research <
> e...@orielresearch.org> wrote:
> >> >>>>
> >> >>>> Hi,
> >> >>>> I am running out of resources on the workers machines.
> >> >>>> The reasons are:
> >> >>>> 1. Every pcollection is a reference to a LARGE file that is copied
> into the worker
> >> >>>> 2. The worker makes calculations on the copied file using a
> software library that consumes memory / storage / compute resources
> >> >>>>
> >> >>>> I have changed the workers' CPUs and memory size. At some point, I
> am running out of resources with this method as well
> >> >>>> I am looking to limit the number of pCollection / elements that
> are being processed in parallel on each worker at a time.
> >> >>>>
> >> >>>> Many thank for any advice,
> >> >>>> Best wishes,
> >> >>>> --
> >> >>>> Eila
> >> >>>>
> >> >>>> Meetup
> >> >
> >> >
> >> >
> >> > --
> >> > Eila
> >> >
> >> > Meetup
> >
> >
> >
> > ~Vincent
>


Re: Rate Limiting in Beam

2021-06-17 Thread Vincent Marquez
Do individual stages of a beam job exhibit backpressure to the consumer
though?  I would think buffering elements with Beam's BagState might lead
to OOM errors on the workers if the consumerIO continues to feed in data.
Or does something else happen?

--Vincent


On Thu, Jun 17, 2021 at 11:42 AM Luke Cwik  wrote:

> If the service returns sensible throttling errors you could use a
> StatefulDoFn and buffer elements that error out due to throttling from the
> service instead of failing the bundle and schedule a timer to replay them.
> This will effectively max out the service as long as there is more data
> then the service can handle which doesn't work too well if the service.
>
>
> On Fri, Apr 16, 2021 at 6:20 PM Daniel Thevessen 
> wrote:
>
>> Thanks for the quick response.
>> Querying the Dataflow API seems like something that could break easily,
>> but I can go with that if it turns out to be easier.
>>
>> The Splittable DoFn way sounds interesting, but I'm not very familiar
>> with that so I have some questions around it:
>> Splits seem to operate on offsets within a single element. Does that mean
>> that I'd set a fixed shard number x, and then I'd need to first group my
>> PCollection of single elements into a PCollection of lists, each size x?
>> And are the subsequent writes also limited to x workers, meaning that
>> splits have the same issue as with a GroupByKey?
>> I see the UnboundedCountingSource gets a `desiredNumSplits` parameter.
>> I'm assuming there is nothing similar that would allow a Splittable DoFn to
>> simply figure out the number of workers even if it changes? That's probably
>> very hacky anyway.
>> If the above solution with fixed-size lists makes sense and will
>> redistribute the writes I'm already happy, I don't necessarily need to have
>> the throttling step dynamically match autoscaling.
>>
>> On Thu, Apr 15, 2021 at 4:20 PM Pablo Estrada  wrote:
>>
>>> You could implement a Splittable DoFn that generates a limited number of
>>> splits. We do something like this for
>>> GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It
>>> keeps track of its local EPS, and generates new splits if more EPSs are
>>> wanted. This should help you scale up to the maximum of EPS that you want,
>>> and autoscaling will only produce the appropriate number of workers for
>>> that number of splits.
>>>
>>> - The only issue may be that you can't "scale down" if you find that
>>> some of your splits have a very low throughput, because two splits can't be
>>> merged back together (does that make sense?) - but Dataflow should be able
>>> to scale down and schedule multiple splits in a single worker if that's the
>>> case.
>>>
>>> The UnboundedCountingSource is a Source, so it can't have an input (and
>>> it's deprecated), but you could write a SplittableDoFn that has the same
>>> behavior. Do you think this could work?
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348
>>>
>>> On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin 
>>> wrote:
>>>
 Could you possibly use a side input with fixed interval triggering[1]
 to query the Dataflow API to get the most recent log statement of scaling
 as suggested here[2]?

 [1]
 https://beam.apache.org/documentation/patterns/side-inputs/
 [2]
 https://stackoverflow.com/a/54406878/6432284

 On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen 
 wrote:

> Hi folks,
>
> I've been working on a custom PTransform that makes requests to
> another service, and would like to add a rate limiting feature there. The
> fundamental issue that I'm running into here is that I need a decent
> heuristic to estimate the worker count, so that each worker can
> independently set a limit which globally comes out to the right value. All
> of this is easy if I know how many machines I have, but I'd like to use
> Dataflow's autoscaling, which would easily break any pre-configured value.
> I have seen two main approaches for rate limiting, both for a
> configurable variable x:
>
>- Simply assume worker count is x, then divide by x to figure out
>the "local" limit. The issue I have here is that if we assume x is 
> 500, but
>it is actually 50, I'm now paying for 50 nodes to throttle 10 times as 
> much
>as necessary. I know the pipeline options have a reference to the 
> runner,
>is it possible to get an approximate current worker count from that at
>bundle start (*if* runner is DataflowRunner)?
>- Add another PTransform in front of the API requests, which
>groups by x number of keys, throttles, and keeps forwarding elements 
> with
>an instant trigger. I initially really liked this solution because 
> even if
>x is misconfigured, I will have at most x workers runni

Re: Rate Limiting in Beam

2021-06-18 Thread Vincent Marquez
Thanks, good to know.  Ultimately I still want to lobby for a way to
throttle bundles based on progress made further down the pipeline but I
realize that might involve major architectural changes.

Sometimes I'm forced to cancel a streaming pipeline and I'm unable to drain
it, so that can present a problem for consumers like Kafka or redis
streams.


On Fri, Jun 18, 2021 at 10:11 AM Robert Bradshaw 
wrote:

> For a Streaming pipeline, BagState can be unlimited in size, but for Batch
> it is held in memory.
>
> You can also use keys (Followed by a GBK or Stateful DoFn) to put an upper
> bound on the total concurrency of a step.
>
> On Thu, Jun 17, 2021 at 4:54 PM Vincent Marquez 
> wrote:
>
>>
>>
>> Do individual stages of a beam job exhibit backpressure to the consumer
>> though?  I would think buffering elements with Beam's BagState might lead
>> to OOM errors on the workers if the consumerIO continues to feed in data.
>> Or does something else happen?
>>
>> --Vincent
>>
>>
>> On Thu, Jun 17, 2021 at 11:42 AM Luke Cwik  wrote:
>>
>>> If the service returns sensible throttling errors you could use a
>>> StatefulDoFn and buffer elements that error out due to throttling from the
>>> service instead of failing the bundle and schedule a timer to replay them.
>>> This will effectively max out the service as long as there is more data
>>> then the service can handle which doesn't work too well if the service.
>>>
>>>
>>> On Fri, Apr 16, 2021 at 6:20 PM Daniel Thevessen 
>>> wrote:
>>>
>>>> Thanks for the quick response.
>>>> Querying the Dataflow API seems like something that could break easily,
>>>> but I can go with that if it turns out to be easier.
>>>>
>>>> The Splittable DoFn way sounds interesting, but I'm not very familiar
>>>> with that so I have some questions around it:
>>>> Splits seem to operate on offsets within a single element. Does that
>>>> mean that I'd set a fixed shard number x, and then I'd need to first group
>>>> my PCollection of single elements into a PCollection of lists, each size x?
>>>> And are the subsequent writes also limited to x workers, meaning that
>>>> splits have the same issue as with a GroupByKey?
>>>> I see the UnboundedCountingSource gets a `desiredNumSplits` parameter.
>>>> I'm assuming there is nothing similar that would allow a Splittable DoFn to
>>>> simply figure out the number of workers even if it changes? That's probably
>>>> very hacky anyway.
>>>> If the above solution with fixed-size lists makes sense and will
>>>> redistribute the writes I'm already happy, I don't necessarily need to have
>>>> the throttling step dynamically match autoscaling.
>>>>
>>>> On Thu, Apr 15, 2021 at 4:20 PM Pablo Estrada 
>>>> wrote:
>>>>
>>>>> You could implement a Splittable DoFn that generates a limited number
>>>>> of splits. We do something like this for
>>>>> GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It
>>>>> keeps track of its local EPS, and generates new splits if more EPSs are
>>>>> wanted. This should help you scale up to the maximum of EPS that you want,
>>>>> and autoscaling will only produce the appropriate number of workers for
>>>>> that number of splits.
>>>>>
>>>>> - The only issue may be that you can't "scale down" if you find that
>>>>> some of your splits have a very low throughput, because two splits can't 
>>>>> be
>>>>> merged back together (does that make sense?) - but Dataflow should be able
>>>>> to scale down and schedule multiple splits in a single worker if that's 
>>>>> the
>>>>> case.
>>>>>
>>>>> The UnboundedCountingSource is a Source, so it can't have an input
>>>>> (and it's deprecated), but you could write a SplittableDoFn that has the
>>>>> same behavior. Do you think this could work?
>>>>>
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348
>>>>>
>>>>> On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> Could you possibly 

Mapping *part* of a PCollection possible? (Lens optics for PCollection?)

2021-07-21 Thread Vincent Marquez
Let's say I have PCollection and I want to use the 'readAll' pattern to
enhance some data from an additional source such as redis (which has a
readKeys PTransform).  However I don't want to 'lose'
the original A.  There *are* a few ways to do this currently (side inputs,
joining two streams with CoGroupByKey, using State) all of which have some
problems.

If I could map PCollection into some type that has  for
instance PCollection>, then use the redis readKeys to map to
PCollection> this solves all my problems. This is more or
less a get/set lens optic if anyone is familiar with functional
programming.

Is something like this possible?  Could it be added?  I've run into wanting
this pattern numerous times over the last year.


*~Vincent*


Re: [2.28.0] [Java] [Dataflow] ParquetIO writeDynamic stuck in Garbage Collection when writing ~125K files to dynamic destinations

2021-07-21 Thread Vincent Marquez
Windowing doesn't work with Batch jobs.  You could dump your BQ data to
pubsub and then use a streaming job to window.
*~Vincent*


On Wed, Jul 21, 2021 at 10:13 AM Andrew Kettmann 
wrote:

> Worker machines are n1-standard-2s (2 cpus and 7.5GB of RAM)
>
> Pipeline is simple, but large amounts of end files, ~125K temp files
> written in one case at least
>
>1. Scan Bigtable (NoSQL DB)
>2. Transform with business logic
>3. Convert to GenericRecord
>4. WriteDynamic to a google bucket as Parquet files partitioned by 15
>minute intervals.
>
> (gs://bucket/root_dir/CATEGORY/YEAR/MONTH/DAY/HOUR/MINUTE_FLOOR_15/FILENAME.parquet)
>
>
> Everything does fine until I get to the writeDynamic. When it does the
> groupByKey (
> FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)
> the stackdriver logs show a ton of allocation failure triggered GC that
> then frees up essentially zero space and never progresses, ends up with a
> "The worker lost contact with the service." error four times and then fails.
> Also worth noting that Dataflow sizes down to a single worker during this
> time, so it is trying to do it all at once. What are my options for
> splitting
>
> Likely I am not hitting GC alerts because I am using a snippet I pulled
> from a GCP Dataflow template that queries Bigtable that looks to disable
> the GCThrashing monitoring, due to Bigtable creating at least 5 objects per
> row scanned.
>
> DataflowPipelineDebugOptions debugOptions = options.as
> (DataflowPipelineDebugOptions.class);
> debugOptions.setGCThrashingPercentagePerPeriod(100.00);
>
> What are my options for splitting this up so that it can process this in
> smaller chunks? I tried adding windowing but it didn't seem to help, or I
> needed to do something else other than just the windowing, but I don't
> really have a key to group it by here.
>
>  *Andrew Kettmann*
> DevOps Engineer
> P: 1.314.596.2836
> [image: LinkedIn]  [image: Twitter]
>  [image: Instagram]
> 
>
> evolve24 Confidential & Proprietary Statement: This email and any
> attachments are confidential and may contain information that is
> privileged, confidential or exempt from disclosure under applicable law. It
> is intended for the use of the recipients. If you are not the intended
> recipient, or believe that you have received this communication in error,
> please do not read, print, copy, retransmit, disseminate, or otherwise use
> the information. Please delete this email and attachments, without reading,
> printing, copying, forwarding or saving them, and notify the Sender
> immediately by reply email. No confidentiality or privilege is waived or
> lost by any transmission in error.
>


Re: Mapping *part* of a PCollection possible? (Lens optics for PCollection?)

2021-07-21 Thread Vincent Marquez
On Wed, Jul 21, 2021 at 10:37 AM Andrew Kettmann 
wrote:

> Worth noting that you never "lose" a PCollection. You can use the same
> PCollection in as many transforms as you like and every time you reference
> that PCollection it will be in the same state it was when you first read
> it in.
>
> So if you have:
>
> PCollection colA = ...;
> PCollection = colA.apply(ParDo.of(new ReadRedisDataDoFn());
>
> You have not consumed the colA PCollection and can reference/use it as
> many times as you want in further steps.
>
> My instinct for this is:
>
>
>1. Read Source to get PCollection
>2. Pull the key to look up in Redis from Pcollection into another
>PCollection
>3. Look up with a custom DoFn if the normal IO one doesn't meet your
>needs
>4. CoGroupByKey transform to group them together
>
>
I have done that, however, this doesn't really work for my use case in a
streaming pipeline.  Both of the PCollections need to have the same
windowing and under high load if I don't want to buffer a ton of data I
might get outputs with one side being empty.


>
>1. Do Whatever else you need to do with the combined data.
>
>
> --
> *From:* Vincent Marquez 
> *Sent:* Wednesday, July 21, 2021 12:14 PM
> *To:* user 
> *Subject:* Mapping *part* of a PCollection possible? (Lens optics for
> PCollection?)
>
> Let's say I have PCollection and I want to use the 'readAll' pattern to
> enhance some data from an additional source such as redis (which has a
> readKeys PTransform).  However I don't want to 'lose'
> the original A.  There *are* a few ways to do this currently (side inputs,
> joining two streams with CoGroupByKey, using State) all of which have some
> problems.
>
> If I could map PCollection into some type that has  for
> instance PCollection>, then use the redis readKeys to map to
> PCollection> this solves all my problems. This is more or
> less a get/set lens optic if anyone is familiar with functional
> programming.
>
> Is something like this possible?  Could it be added?  I've run
> into wanting this pattern numerous times over the last year.
>
>
> *~Vincent*
>
> evolve24 Confidential & Proprietary Statement: This email and any
> attachments are confidential and may contain information that is
> privileged, confidential or exempt from disclosure under applicable law. It
> is intended for the use of the recipients. If you are not the intended
> recipient, or believe that you have received this communication in error,
> please do not read, print, copy, retransmit, disseminate, or otherwise use
> the information. Please delete this email and attachments, without reading,
> printing, copying, forwarding or saving them, and notify the Sender
> immediately by reply email. No confidentiality or privilege is waived or
> lost by any transmission in error.
>


Re: Kafka manually commit offsets

2021-12-10 Thread Vincent Marquez
If you want to ensure you have at least once processing I think the
*maximum* amount of parallelization you can have would be the number of
partitions you have, so you'd want to group by partition, process a bundle
of that partition, then commit the last offset for a given partition.

*~Vincent*


On Fri, Dec 10, 2021 at 9:28 AM Luke Cwik  wrote:

> Yes, you will need to deal with records being out of order because the
> system will process many things in parallel.
>
> You can read the last committed offset from Kafka and compare it against
> the offset you have right now. If the offset you have right is not the next
> offset you store it in state and if it is then you find the contiguous
> range of offsets that you have stored in state starting from this offset
> and remove them from state and commit the last one in that contiguous range.
>
> On Fri, Dec 10, 2021 at 8:18 AM Juan Calvo Ferrándiz <
> juancalvoferran...@gmail.com> wrote:
>
>>
>>
>> Thanks Alexey! I understand. Continue thinking in possible solutions of
>> committing records, I was thinking about what happens in this scenario:
>>
>> When processing windows of data, do they get processed in sequential
>> order or is it possible for them to be processed out of order? For example
>> Window 1 contains 1 elements of data whereas window 2 contains 10
>> elements. Assuming Window 1 takes a while to process all of that data, is
>> it possible window 2 will finish before window 1?
>>
>> Thanks again!
>>
>> On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko 
>> wrote:
>>
>>> I answered the similar questions on SO a while ago [1], and I hope it
>>> will help.
>>>
>>> “By default, pipeline.apply(KafkaIO.read()...) will return
>>> a PCollection>. So, downstream in your pipeline you can
>>> get an offset from KafkaRecord metadata and commit it manually in a way
>>> that you need (just don't forget to disable AUTO_COMMIT in KafkaIO.read()).
>>>
>>> By manual way, I mean that you should instantiate your own Kafka client
>>> in your DoFn, process input element (as KafkaRecord), that was read
>>> before, fetch an offset from KafkaRecord and commit it with your own
>>> client.
>>>
>>> Though, you need to make sure that a call to external API and offset
>>> commit will be atomic to prevent potential data loss (if it's critical)."
>>>
>>> [1]
>>> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>>>
>>> —
>>> Alexey
>>>
>>> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
>>> juancalvoferran...@gmail.com> wrote:
>>>
>>> Thanks Luke for your quick response. I see, that makes sense. Now I have
>>> two new questions if I may:
>>> a) How I can get the offsets I want to commit. My investigation now is
>>> going throw getCheckpointMark(), is this correct?
>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>>>
>>> b) With these offsets, I will create a client at the of the pipeline,
>>> with Kafka library, and methods such as commitSync() and commitAsync(). Is
>>> this correct?
>>> https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=log%20an%20error.-,Asynchronous%20Commit,-One%20drawback%20of
>>>
>>> Thanks!!!
>>>
>>> *Juan *
>>>
>>>
>>> On Fri, 10 Dec 2021 at 01:07, Luke Cwik  wrote:
>>>
 commitOffsetsInFinalize is about committing the offset after the output
 has been durably persisted for the bundle containing the Kafka Read. The
 bundle represents a unit of work over a subgraph of the pipeline. You will
 want to ensure the commitOffsetsInFinalize is disabled and that the Kafka
 consumer config doesn't auto commit automatically. This will ensure that
 KafkaIO.Read doesn't commit the offsets. Then it is upto your PTransform to
 perform the committing.

 On Thu, Dec 9, 2021 at 3:36 PM Juan Calvo Ferrándiz <
 juancalvoferran...@gmail.com> wrote:

> Morning!
>
> First of all, thanks for all the incredible work you do, is amazing.
> Then, secondly, I reach you for some help or guidance to manually commit
> records. I want to do this so I can commit the record and the end of the
> pipeline, and not in the read() of the KafkaIO.
>
> Bearing in mind what I have read in this post:
> https://lists.apache.org/list?user@beam.apache.org:2021-9:user@beam.apache.org%20kafka%20commit
> , and thinking of a pipeline similar to the one described, I understand we
> can use commitOffsetsInFinalize() to commit offsets in the read().
> What I don't understand is how this helps to commit the offset if we want
> to do this at the end, not in the reading.Thanks. All comments and
> suggestions are more than welcome. :)
>
>
> *Juan *
>
>
>
>>>


Re: Kafka manually commit offsets

2021-12-13 Thread Vincent Marquez
What I mean is, if you want to only commit offsets *after* a
KafkaRecord is processed, then you need to keep parallelism to the
number of partitions, as offsets are monotonically increasing *per
partition*.  So if you only have one partition and then split into two
'threads', if T1 handling offsets A-C fails while T2 handling D-G succeed,
it will commit back offsets indicating everything processed on T1 also
succeeded.


*~Vincent*


On Mon, Dec 13, 2021 at 11:12 AM Luke Cwik  wrote:

> I believe you would be able to have parallelism greater than the number of
> partitions for most of the pipeline. The checkpoint advancement code is
> likely limited to the number of partitions but can be a very small portion
> of the pipeline.
>
> On Fri, Dec 10, 2021 at 10:20 AM Vincent Marquez <
> vincent.marq...@gmail.com> wrote:
>
>> If you want to ensure you have at least once processing I think the
>> *maximum* amount of parallelization you can have would be the number of
>> partitions you have, so you'd want to group by partition, process a bundle
>> of that partition, then commit the last offset for a given partition.
>>
>> *~Vincent*
>>
>>
>> On Fri, Dec 10, 2021 at 9:28 AM Luke Cwik  wrote:
>>
>>> Yes, you will need to deal with records being out of order because the
>>> system will process many things in parallel.
>>>
>>> You can read the last committed offset from Kafka and compare it against
>>> the offset you have right now. If the offset you have right is not the next
>>> offset you store it in state and if it is then you find the contiguous
>>> range of offsets that you have stored in state starting from this offset
>>> and remove them from state and commit the last one in that contiguous range.
>>>
>>> On Fri, Dec 10, 2021 at 8:18 AM Juan Calvo Ferrándiz <
>>> juancalvoferran...@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> Thanks Alexey! I understand. Continue thinking in possible solutions
>>>> of committing records, I was thinking about what happens in this scenario:
>>>>
>>>> When processing windows of data, do they get processed in sequential
>>>> order or is it possible for them to be processed out of order? For example
>>>> Window 1 contains 1 elements of data whereas window 2 contains 10
>>>> elements. Assuming Window 1 takes a while to process all of that data, is
>>>> it possible window 2 will finish before window 1?
>>>>
>>>> Thanks again!
>>>>
>>>> On Fri, 10 Dec 2021 at 14:39, Alexey Romanenko <
>>>> aromanenko@gmail.com> wrote:
>>>>
>>>>> I answered the similar questions on SO a while ago [1], and I hope it
>>>>> will help.
>>>>>
>>>>> “By default, pipeline.apply(KafkaIO.read()...) will return
>>>>> a PCollection>. So, downstream in your pipeline you can
>>>>> get an offset from KafkaRecord metadata and commit it manually in a way
>>>>> that you need (just don't forget to disable AUTO_COMMIT in 
>>>>> KafkaIO.read()).
>>>>>
>>>>> By manual way, I mean that you should instantiate your own Kafka
>>>>> client in your DoFn, process input element (as KafkaRecord), that 
>>>>> was
>>>>> read before, fetch an offset from KafkaRecord and commit it with your own
>>>>> client.
>>>>>
>>>>> Though, you need to make sure that a call to external API and offset
>>>>> commit will be atomic to prevent potential data loss (if it's critical)."
>>>>>
>>>>> [1]
>>>>> https://stackoverflow.com/questions/69272461/how-to-manually-commit-kafka-offset-in-apache-beam-at-the-end-of-specific-dofun/69272880#69272880
>>>>>
>>>>> —
>>>>> Alexey
>>>>>
>>>>> On 10 Dec 2021, at 10:40, Juan Calvo Ferrándiz <
>>>>> juancalvoferran...@gmail.com> wrote:
>>>>>
>>>>> Thanks Luke for your quick response. I see, that makes sense. Now I
>>>>> have two new questions if I may:
>>>>> a) How I can get the offsets I want to commit. My investigation now is
>>>>> going throw getCheckpointMark(), is this correct?
>>>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/UnboundedSource.UnboundedReader.html#:~:text=has%20been%20called.-,getCheckpointMark,-public%20abstract%C2%A0UnboundedSource
>>>

Re: Beam CassandraIO

2023-02-02 Thread Vincent Marquez
*~Vincent*


On Thu, Feb 2, 2023 at 3:01 AM Alexey Romanenko 
wrote:

> - d...@beam.apache.org
> + user@beam.apache.org
>
> Hi Enzo,
>
> Can you make sure that all your workers were properly added and listed in
> Spark WebUI?
>
> Did you specify a “ --master spark://HOST:PORT” option while running your
> Beam job with a SparkRunner?
>
> PS: Please, use user@beam.apache.org mailing list for such type of
> questions.
>
> —
> Alexey
>
> > On 2 Feb 2023, at 03:18, Enzo Bonggio  wrote:
> >
> > I have a spark standalone installed in two machines but once I send
> spark-submit, it will only execute in one executer. Is that the way that it
> suppose to work?
> > I thought that I could read from Cassandra with multiple machines
>
>
Hi Enzo, I've used the CassandraIO connector quite a bit on Dataflow and
found it scaled out well to multiple machines.  I've not used it on Spark,
so I can't say if this is a spark specific issue.

Could you provide more details on how your pipeline works, and how you can
tell it's only executing on 'one executor'?


Re: [Question] [CassandraIO]Using a query

2023-02-03 Thread Vincent Marquez
There are some examples in the test code that should be easy enough to
follow.

Here is an example of just querying the entire table:

https://github.com/apache/beam/blob/master/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java#L460

Here's an example of using readAll to only pull certain keys:

https://github.com/apache/beam/blob/master/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java#L381




*~Vincent*


On Fri, Feb 3, 2023 at 12:59 PM Adam Scott  wrote:

> HI All,
>
> does anyone have an example of using CassandraIO to query a table?
>
> The following mentions "Alternatively, one may use 
> CassandraIO.readAll()
> .withCoder(SerializableCoder.of(Person.class)) to query a subset of the
> Cassandra database by creating a PCollection of CassandraIO.Read
> each with their own query or RingRange."
>
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/cassandra/CassandraIO.html
>
> Hoping there would be an example of this.
>
> TIA,
> Adam
>
>
>
>


Re: [Question] [CassandraIO]Using a query

2023-02-07 Thread Vincent Marquez
I've not used the BigTable writer, but It should be just creating a
function that takes a MyClass and converts it to an
Iterable,
then calling that from a DoFn you create and outputting the resulting
Iterable to the OutputReceiver.  Then your pipeline would be something like

pipeline.apply(CassandraIO.read().withTable("MyClass").withBlahblah)
  .apply(new CustomDoFnToConvert())
  .apply(BigTable.write())




*~Vincent*


On Tue, Feb 7, 2023 at 11:11 AM Adam Scott  wrote:

> Thank you Vincent!  Thanks for the quick response!
>
> The goal is to read from a Cassandra query and write to a Bigtable table.
>
> I've tried combining this technique, below, with the test examples you
> pointed out. But I am afraid my efforts were wholly naive.
>
> https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/src/main/java/com/google/cloud/teleport/bigtable/CassandraToBigtable.java
>
> It seems I would want, if I were dreaming, instead of a
> BeamRowToBigtableFn (
> https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/cc799861a458eef329aa5a15189a8045109e43e0/v1/src/main/java/com/google/cloud/teleport/bigtable/CassandraToBigtable.java#L220)
> , a BeamtoBigtableFn.
>
> Any hints on how to add a write/output to Bigtable?
>
> TIA,
> Adam
>
>
>
> On Fri, Feb 3, 2023 at 1:25 PM Vincent Marquez 
> wrote:
>
>> There are some examples in the test code that should be easy enough to
>> follow.
>>
>> Here is an example of just querying the entire table:
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java#L460
>>
>> Here's an example of using readAll to only pull certain keys:
>>
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java#L381
>>
>>
>>
>>
>> *~Vincent*
>>
>>
>> On Fri, Feb 3, 2023 at 12:59 PM Adam Scott 
>> wrote:
>>
>>> HI All,
>>>
>>> does anyone have an example of using CassandraIO to query a table?
>>>
>>> The following mentions "Alternatively, one may use 
>>> CassandraIO.readAll()
>>> .withCoder(SerializableCoder.of(Person.class)) to query a subset of the
>>> Cassandra database by creating a PCollection of CassandraIO.Read
>>> each with their own query or RingRange."
>>>
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/cassandra/CassandraIO.html
>>>
>>> Hoping there would be an example of this.
>>>
>>> TIA,
>>> Adam
>>>
>>>
>>>
>>>