Re: BiqQueryIO.write and Wait.on

2018-07-25 Thread Carlos Alonso
Just opened this PR: https://github.com/apache/beam/pull/6055 to get
feedback ASAP. Basically what it does is return the job status in a
PCollection of BigQueryWriteResult objects

On Fri, Jul 20, 2018 at 11:57 PM Reuven Lax  wrote:

> There already is a org.apache.beam.sdk.io.gcp.bigquery.WriteResult class.
>
> On Tue, Jul 17, 2018 at 9:44 AM Eugene Kirpichov 
> wrote:
>
>> Hmm, I think this approach has some complications:
>> - Using JobStatus makes it tied to using BigQuery batch load jobs, but
>> the return type ought to be the same regardless of which method of writing
>> is used (including potential future BigQuery APIs - they are evolving), or
>> how many BigQuery load jobs are involved in writing a given window (it can
>> be multiple).
>> - Returning a success/failure indicator makes it prone to users ignoring
>> the failure: the default behavior should be that, if the pipeline succeeds,
>> that means all data was successfully written - if users want different
>> error handling, e.g. a deadletter queue, they should have to specify it
>> explicitly.
>>
>> I would recommend to return a PCollection of a type that's invariant to
>> which load method is used (streaming writes, load jobs, multiple load jobs
>> etc.). If it's unclear what type that should be, you could introduce an
>> empty type e.g. "class BigQueryWriteResult {}" just for the sake of
>> signaling success, and later add something to it.
>>
>> On Tue, Jul 17, 2018 at 12:30 AM Carlos Alonso 
>> wrote:
>>
>>> All good so far. I've been a bit side tracked but more or less I have
>>> the idea of using the JobStatus as part of the collection so that not only
>>> the completion is signaled, but also the result (success/failure) can be
>>> accessed, how does it sound?
>>>
>>> Regards
>>>
>>> On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov 
>>> wrote:
>>>
>>>> Hi Carlos,
>>>>
>>>> Any updates / roadblocks you hit?
>>>>
>>>>
>>>> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov 
>>>> wrote:
>>>>
>>>>> Awesome!! Thanks for the heads up, very exciting, this is going to
>>>>> make a lot of people happy :)
>>>>>
>>>>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso 
>>>>> wrote:
>>>>>
>>>>>> + d...@beam.apache.org
>>>>>>
>>>>>> Just a quick email to let you know that I'm starting developing this.
>>>>>>
>>>>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov <
>>>>>> kirpic...@google.com> wrote:
>>>>>>
>>>>>>> Hi Carlos,
>>>>>>>
>>>>>>> Thank you for expressing interest in taking this on! Let me give you
>>>>>>> a few pointers to start, and I'll be happy to help everywhere along the 
>>>>>>> way.
>>>>>>>
>>>>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>>>>> PCollection) that can be used as input to Wait.on().
>>>>>>> Currently it returns a WriteResult, which only contains a
>>>>>>> PCollection of failed inserts - that one can not be used
>>>>>>> directly, instead we should add another component to WriteResult that
>>>>>>> represents the result of successfully writing some data.
>>>>>>>
>>>>>>> Given that BQIO supports dynamic destination writes, I think it
>>>>>>> makes sense for that to be a PCollection> so that 
>>>>>>> in
>>>>>>> theory we could sequence different destinations independently (currently
>>>>>>> Wait.on() does not provide such a feature, but it could); and it will
>>>>>>> require changing WriteResult to be WriteResult. As for 
>>>>>>> what
>>>>>>> the "???" might be - it is something that represents the result of
>>>>>>> successfully writing a window of data. I think it can even be Void, or 
>>>>>>> "?"
>>>>>>> (wildcard type) for now, until we figure out something better.
>>>>>>>
>>>>>>> Implementing this would require roughly the following work:
>>>>>>> - Add this PCollection> to WriteResult
>>>>>>> - Modify the BatchLoads transform to provide it on both c

Re: BiqQueryIO.write and Wait.on

2018-07-17 Thread Carlos Alonso
All good so far. I've been a bit side tracked but more or less I have the
idea of using the JobStatus as part of the collection so that not only the
completion is signaled, but also the result (success/failure) can be
accessed, how does it sound?

Regards

On Tue, Jul 17, 2018 at 3:07 AM Eugene Kirpichov 
wrote:

> Hi Carlos,
>
> Any updates / roadblocks you hit?
>
>
> On Tue, Jul 3, 2018 at 7:13 AM Eugene Kirpichov 
> wrote:
>
>> Awesome!! Thanks for the heads up, very exciting, this is going to make a
>> lot of people happy :)
>>
>> On Tue, Jul 3, 2018, 3:40 AM Carlos Alonso  wrote:
>>
>>> + d...@beam.apache.org
>>>
>>> Just a quick email to let you know that I'm starting developing this.
>>>
>>> On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov 
>>> wrote:
>>>
>>>> Hi Carlos,
>>>>
>>>> Thank you for expressing interest in taking this on! Let me give you a
>>>> few pointers to start, and I'll be happy to help everywhere along the way.
>>>>
>>>> Basically we want BigQueryIO.write() to return something (e.g. a
>>>> PCollection) that can be used as input to Wait.on().
>>>> Currently it returns a WriteResult, which only contains a
>>>> PCollection of failed inserts - that one can not be used
>>>> directly, instead we should add another component to WriteResult that
>>>> represents the result of successfully writing some data.
>>>>
>>>> Given that BQIO supports dynamic destination writes, I think it makes
>>>> sense for that to be a PCollection> so that in theory
>>>> we could sequence different destinations independently (currently Wait.on()
>>>> does not provide such a feature, but it could); and it will require
>>>> changing WriteResult to be WriteResult. As for what the "???"
>>>> might be - it is something that represents the result of successfully
>>>> writing a window of data. I think it can even be Void, or "?" (wildcard
>>>> type) for now, until we figure out something better.
>>>>
>>>> Implementing this would require roughly the following work:
>>>> - Add this PCollection> to WriteResult
>>>> - Modify the BatchLoads transform to provide it on both codepaths:
>>>> expandTriggered() and expandUntriggered()
>>>> ...- expandTriggered() itself writes via 2 codepaths: single-partition
>>>> and multi-partition. Both need to be handled - we need to get a
>>>> PCollection> from each of them, and Flatten these two
>>>> PCollections together to get the final result. The single-partition
>>>> codepath (writeSinglePartition) under the hood already uses WriteTables
>>>> that returns a KV so it's directly usable. The
>>>> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
>>>> codepath drops DestinationT along the way and will need to be refactored a
>>>> bit to keep it until the end.
>>>> ...- expandUntriggered() should be treated the same way.
>>>> - Modify the StreamingWriteTables transform to provide it
>>>> ...- Here also, the challenge is to propagate the DestinationT type all
>>>> the way until the end of StreamingWriteTables - it will need to be
>>>> refactored. After such a refactoring, returning a KV
>>>> should be easy.
>>>>
>>>> Another challenge with all of this is backwards compatibility in terms
>>>> of API and pipeline update.
>>>> Pipeline update is much less of a concern for the BatchLoads codepath,
>>>> because it's typically used in batch-mode pipelines that don't get updated.
>>>> I would recommend to start with this, perhaps even with only the
>>>> untriggered codepath (it is much more commonly used) - that will pave the
>>>> way for future work.
>>>>
>>>> Hope this helps, please ask more if something is unclear!
>>>>
>>>> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso 
>>>> wrote:
>>>>
>>>>> Hey Eugene!!
>>>>>
>>>>> I’d gladly take a stab on it although I’m not sure how much available
>>>>> time I might have to put into but... yeah, let’s try it.
>>>>>
>>>>> Where should I begin? Is there a Jira issue or shall I file one?
>>>>>
>>>>> Thanks!
>>>>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov 
>>>>> wrote:
>>>>>
>>

Re: BiqQueryIO.write and Wait.on

2018-07-03 Thread Carlos Alonso
+ d...@beam.apache.org

Just a quick email to let you know that I'm starting developing this.

On Fri, Apr 20, 2018 at 10:30 PM Eugene Kirpichov 
wrote:

> Hi Carlos,
>
> Thank you for expressing interest in taking this on! Let me give you a few
> pointers to start, and I'll be happy to help everywhere along the way.
>
> Basically we want BigQueryIO.write() to return something (e.g. a
> PCollection) that can be used as input to Wait.on().
> Currently it returns a WriteResult, which only contains a
> PCollection of failed inserts - that one can not be used
> directly, instead we should add another component to WriteResult that
> represents the result of successfully writing some data.
>
> Given that BQIO supports dynamic destination writes, I think it makes
> sense for that to be a PCollection> so that in theory
> we could sequence different destinations independently (currently Wait.on()
> does not provide such a feature, but it could); and it will require
> changing WriteResult to be WriteResult. As for what the "???"
> might be - it is something that represents the result of successfully
> writing a window of data. I think it can even be Void, or "?" (wildcard
> type) for now, until we figure out something better.
>
> Implementing this would require roughly the following work:
> - Add this PCollection> to WriteResult
> - Modify the BatchLoads transform to provide it on both codepaths:
> expandTriggered() and expandUntriggered()
> ...- expandTriggered() itself writes via 2 codepaths: single-partition and
> multi-partition. Both need to be handled - we need to get a
> PCollection> from each of them, and Flatten these two
> PCollections together to get the final result. The single-partition
> codepath (writeSinglePartition) under the hood already uses WriteTables
> that returns a KV so it's directly usable. The
> multi-partition codepath ends in WriteRenameTriggered - unfortunately, this
> codepath drops DestinationT along the way and will need to be refactored a
> bit to keep it until the end.
> ...- expandUntriggered() should be treated the same way.
> - Modify the StreamingWriteTables transform to provide it
> ...- Here also, the challenge is to propagate the DestinationT type all
> the way until the end of StreamingWriteTables - it will need to be
> refactored. After such a refactoring, returning a KV
> should be easy.
>
> Another challenge with all of this is backwards compatibility in terms of
> API and pipeline update.
> Pipeline update is much less of a concern for the BatchLoads codepath,
> because it's typically used in batch-mode pipelines that don't get updated.
> I would recommend to start with this, perhaps even with only the
> untriggered codepath (it is much more commonly used) - that will pave the
> way for future work.
>
> Hope this helps, please ask more if something is unclear!
>
> On Fri, Apr 20, 2018 at 12:48 AM Carlos Alonso 
> wrote:
>
>> Hey Eugene!!
>>
>> I’d gladly take a stab on it although I’m not sure how much available
>> time I might have to put into but... yeah, let’s try it.
>>
>> Where should I begin? Is there a Jira issue or shall I file one?
>>
>> Thanks!
>> On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov 
>> wrote:
>>
>>> Hi,
>>>
>>> Yes, you're both right - BigQueryIO.write() is currently not implemented
>>> in a way that it can be used with Wait.on(). It would certainly be a
>>> welcome contribution to change this - many people expressed interest in
>>> specifically waiting for BigQuery writes. Is any of you interested in
>>> helping out?
>>>
>>> Thanks.
>>>
>>> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso 
>>> wrote:
>>>
>>>> Hi Simon, I think your explanation was very accurate, at least to my
>>>> understanding. I'd also be interested in getting batch load result's
>>>> feedback on the pipeline... hopefully someone may suggest something,
>>>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>>>> simon.kitch...@unbelievable-machine.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I need to write some data to BigQuery (batch-mode) and then send a
>>>>> Pubsub message to trigger further processing.
>>>>>
>>>>> I found this thread titled "Callbacks/other functions run after a
>>>>> PDone/output transform" on the user-list which was very relevant:
>>>>>
>&g

Re: Multimap PCollectionViews' values udpated rather than appended

2018-06-11 Thread Carlos Alonso
Many thanks for your help. Actually, my use case emits the entire map
everytime, so I guess I'm good to go with discarding mode.

This test reproduces the issue:
https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala#L19-L53

Hope it helps

On Mon, Jun 4, 2018 at 9:04 PM Lukasz Cwik  wrote:

> Carlos, can you provide a test/code snippet for the bug that shows the
> issue?
>
> On Mon, Jun 4, 2018 at 11:57 AM Lukasz Cwik  wrote:
>
>> +d...@beam.apache.org
>> Note that this is likely a bug in the DirectRunner for accumulation mode,
>> filed: https://issues.apache.org/jira/browse/BEAM-4470
>>
>> Discarding mode is meant to always be the latest firing, the issue though
>> is that you need to emit the entire map every time. If you can do this,
>> then it makes sense to use discarding mode. The issue with discarding mode
>> is that if your first trigger firing produces (A, 1), (B, 1) and your
>> second firing produces (B, 2), the multimap will only contain (B, 2) and
>> (A, 1) will have been discarded.
>>
>> To my knowledge, there is no guarantee about the order in which the
>> values are combined. You will need to use some piece of information about
>> the element to figure out which is the latest (or encode some additional
>> information along with each element to make this easy).
>>
>> On Thu, May 31, 2018 at 9:16 AM Carlos Alonso 
>> wrote:
>>
>>> I've improved the example a little and added some tests
>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala
>>>
>>> The behaviour is slightly different, which is possibly because of the
>>> different runners (Dataflow/Direct) implementations, but still not working.
>>>
>>> Now what happens is that although the internal PCollection gets updated,
>>> the view isn't. This is happening regardless of the accumulation mode.
>>>
>>> Regarding the accumulation mode on Dataflow... That was it!! Now the
>>> sets contain all the items, however, one more question, is the ordering
>>> within the set deterministic? (i.e: Can I assume that the latest will
>>> always be on the last position of the Iterable object?)
>>>
>>> Also... given that for my particular case I only want the latest
>>> version, would you advice me to go ahead with Discarding mode?
>>>
>>> Regards
>>>
>>> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik  wrote:
>>>
>>>> The trigger definition in the sample code you have is using discarding
>>>> firing mode. Try swapping to using accumulating mode.
>>>>
>>>>
>>>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso 
>>>> wrote:
>>>>
>>>>> But I think what I'm experiencing is quite different. Basically the
>>>>> side input is updated, but only one element is found on the Iterable that
>>>>> is the value of any key of the multimap.
>>>>>
>>>>> I mean, no concatenation seems to be happening. On the linked thread,
>>>>> Kenn suggests that every firing will add the new value to the set of 
>>>>> values
>>>>> for the emitted key, but what I'm experiencing is that the new value is
>>>>> there, but just itself (i.e: is the only element in the set).
>>>>>
>>>>> @Robert, I'm using
>>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>>>>
>>>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik  wrote:
>>>>>
>>>>>> An alternative to the thread that Kenn linked (adding support for
>>>>>> retractions) is to add explicit support for combiners into side inputs. 
>>>>>> The
>>>>>> system currently works by using a hardcoded concatenating combiner, so
>>>>>> maps, lists, iterables, singletons, multimaps all work by concatenating 
>>>>>> the
>>>>>> set of values emitted and then turning it into a view which is why it is 
>>>>>> an
>>>>>> error for a singleton and map view if the trigger fires multiple times.
>>>>>>
>>>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles 
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, this is a known issue. Here's a prior discussion:
>>>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2c

Re: Multimap PCollectionViews' values udpated rather than appended

2018-05-31 Thread Carlos Alonso
I've improved the example a little and added some tests
https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala

The behaviour is slightly different, which is possibly because of the
different runners (Dataflow/Direct) implementations, but still not working.

Now what happens is that although the internal PCollection gets updated,
the view isn't. This is happening regardless of the accumulation mode.

Regarding the accumulation mode on Dataflow... That was it!! Now the sets
contain all the items, however, one more question, is the ordering within
the set deterministic? (i.e: Can I assume that the latest will always be on
the last position of the Iterable object?)

Also... given that for my particular case I only want the latest version,
would you advice me to go ahead with Discarding mode?

Regards

On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik  wrote:

> The trigger definition in the sample code you have is using discarding
> firing mode. Try swapping to using accumulating mode.
>
>
> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso 
> wrote:
>
>> But I think what I'm experiencing is quite different. Basically the side
>> input is updated, but only one element is found on the Iterable that is the
>> value of any key of the multimap.
>>
>> I mean, no concatenation seems to be happening. On the linked thread,
>> Kenn suggests that every firing will add the new value to the set of values
>> for the emitted key, but what I'm experiencing is that the new value is
>> there, but just itself (i.e: is the only element in the set).
>>
>> @Robert, I'm using
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>
>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik  wrote:
>>
>>> An alternative to the thread that Kenn linked (adding support for
>>> retractions) is to add explicit support for combiners into side inputs. The
>>> system currently works by using a hardcoded concatenating combiner, so
>>> maps, lists, iterables, singletons, multimaps all work by concatenating the
>>> set of values emitted and then turning it into a view which is why it is an
>>> error for a singleton and map view if the trigger fires multiple times.
>>>
>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles  wrote:
>>>
>>>> Yes, this is a known issue. Here's a prior discussion:
>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>>>
>>>> It is actually long-standing and the solution is known but hard.
>>>>
>>>>
>>>>
>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso 
>>>> wrote:
>>>>
>>>>> Hi everyone!!
>>>>>
>>>>> Working with multimap based side inputs on the global window I'm
>>>>> experiencing something unexpected (at least to me) that I'd like to share
>>>>> with you to clarify.
>>>>>
>>>>> The way I understand multimaps is that when one emits two values for
>>>>> the same key for the same window (obvious thing here as I'm working on the
>>>>> Global one), the newly emitted values are appended to the Iterable
>>>>> collection that is the value for that particular key on the map.
>>>>>
>>>>> Testing it in this job (it is using scio, but side inputs are
>>>>> implemented with PCollectionViews):
>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>>>
>>>>> The steps to reproduce are:
>>>>> 1. Create one table on the target BQ
>>>>> 2. Run the job
>>>>> 3. Patch the table on BQ (add one field), this should generate a new
>>>>> TableSchema for the corresponding TableReference
>>>>> 4. An updated value of the fields number appear on the logs, but there
>>>>> is only one element within the iterable, as if it had been updated instead
>>>>> of appended!!
>>>>>
>>>>> Is that the expected behaviour? Is a bug? Am I missing something?
>>>>>
>>>>> Thanks!
>>>>>
>>>>


Re: Multimap PCollectionViews' values udpated rather than appended

2018-05-31 Thread Carlos Alonso
But I think what I'm experiencing is quite different. Basically the side
input is updated, but only one element is found on the Iterable that is the
value of any key of the multimap.

I mean, no concatenation seems to be happening. On the linked thread, Kenn
suggests that every firing will add the new value to the set of values for
the emitted key, but what I'm experiencing is that the new value is there,
but just itself (i.e: is the only element in the set).

@Robert, I'm using
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())

On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik  wrote:

> An alternative to the thread that Kenn linked (adding support for
> retractions) is to add explicit support for combiners into side inputs. The
> system currently works by using a hardcoded concatenating combiner, so
> maps, lists, iterables, singletons, multimaps all work by concatenating the
> set of values emitted and then turning it into a view which is why it is an
> error for a singleton and map view if the trigger fires multiple times.
>
> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles  wrote:
>
>> Yes, this is a known issue. Here's a prior discussion:
>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>
>> It is actually long-standing and the solution is known but hard.
>>
>>
>>
>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso 
>> wrote:
>>
>>> Hi everyone!!
>>>
>>> Working with multimap based side inputs on the global window I'm
>>> experiencing something unexpected (at least to me) that I'd like to share
>>> with you to clarify.
>>>
>>> The way I understand multimaps is that when one emits two values for the
>>> same key for the same window (obvious thing here as I'm working on the
>>> Global one), the newly emitted values are appended to the Iterable
>>> collection that is the value for that particular key on the map.
>>>
>>> Testing it in this job (it is using scio, but side inputs are
>>> implemented with PCollectionViews):
>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>
>>> The steps to reproduce are:
>>> 1. Create one table on the target BQ
>>> 2. Run the job
>>> 3. Patch the table on BQ (add one field), this should generate a new
>>> TableSchema for the corresponding TableReference
>>> 4. An updated value of the fields number appear on the logs, but there
>>> is only one element within the iterable, as if it had been updated instead
>>> of appended!!
>>>
>>> Is that the expected behaviour? Is a bug? Am I missing something?
>>>
>>> Thanks!
>>>
>>


Multimap PCollectionViews' values udpated rather than appended

2018-05-30 Thread Carlos Alonso
Hi everyone!!

Working with multimap based side inputs on the global window I'm
experiencing something unexpected (at least to me) that I'd like to share
with you to clarify.

The way I understand multimaps is that when one emits two values for the
same key for the same window (obvious thing here as I'm working on the
Global one), the newly emitted values are appended to the Iterable
collection that is the value for that particular key on the map.

Testing it in this job (it is using scio, but side inputs are implemented
with PCollectionViews):
https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala

The steps to reproduce are:
1. Create one table on the target BQ
2. Run the job
3. Patch the table on BQ (add one field), this should generate a new
TableSchema for the corresponding TableReference
4. An updated value of the fields number appear on the logs, but there is
only one element within the iterable, as if it had been updated instead of
appended!!

Is that the expected behaviour? Is a bug? Am I missing something?

Thanks!


Re: Testing an updating side input on global window

2018-05-29 Thread Carlos Alonso
Hi Lukasz, many thanks for your responses.

I'm actually using them but I think I'm not being able to synchronise the
following steps:
1: The side input gets its first value (v1)
2: The main stream gets that side input applied and finds that v1 value
3: The side one gets updated (v2)
4: The main stream gets the side input applied again and finds the v2 value
(along with v1 as this is multimap)

Regards

On Tue, May 29, 2018 at 10:57 PM Lukasz Cwik  wrote:

> Your best bet is to use TestStreams[1] as it is used to validate
> window/triggering behavior. Note that the transform requires special runner
> based execution and currently only works with the DirectRunner. All
> examples are marked with the JUnit category "UsesTestStream", for example
> [2].
>
> 1:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestStream.java
> 2:
> https://github.com/apache/beam/blob/0cbcf4ad1db7d820c5476d636f3a3d69062021a5/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java#L69
>
>
> On Tue, May 29, 2018 at 1:05 PM Carlos Alonso 
> wrote:
>
>> Hi all!!
>>
>> Basically that's what I'm trying to do. I'm building a pipeline that has
>> a refreshing, multimap, side input (BQ schemas) that then I apply to the
>> main stream of data (records that are ultimately saved to the corresponding
>> BQ table).
>>
>> My job, although being of streaming nature, runs on the global window,
>> and I want to unit test that the side input refreshes and that the updates
>> are successfully applied.
>>
>> I'm using scio and I can't seem to simulate that refreshing behaviour.
>> These are the relevant bits of the code:
>> https://gist.github.com/calonso/87d392a65079a66db75a78cb8d80ea98
>>
>> The way I see understand it, the side collection is refreshed before
>> accessing it so when accessed, it already contains the final (updated)
>> snapshot of the schemas, is that true? In which case, how can I simulate
>> that synchronisation? I'm using processing times as I thought that could be
>> the way to go, but obviously something is wrong there.
>>
>> Many thanks!!
>>
>


Testing an updating side input on global window

2018-05-29 Thread Carlos Alonso
Hi all!!

Basically that's what I'm trying to do. I'm building a pipeline that has a
refreshing, multimap, side input (BQ schemas) that then I apply to the main
stream of data (records that are ultimately saved to the corresponding BQ
table).

My job, although being of streaming nature, runs on the global window, and
I want to unit test that the side input refreshes and that the updates are
successfully applied.

I'm using scio and I can't seem to simulate that refreshing behaviour.
These are the relevant bits of the code:
https://gist.github.com/calonso/87d392a65079a66db75a78cb8d80ea98

The way I see understand it, the side collection is refreshed before
accessing it so when accessed, it already contains the final (updated)
snapshot of the schemas, is that true? In which case, how can I simulate
that synchronisation? I'm using processing times as I thought that could be
the way to go, but obviously something is wrong there.

Many thanks!!


Re: Understanding GenerateSequence and SideInputs

2018-05-25 Thread Carlos Alonso
Many thanks for your comments. Really appreciate it!!

I'm not sure I understood the GenerateSequence guarantees explanation. By
"it will generate at most a given number..." you mean that it won't
generate more than the given number per period, right? If that was the case
maybe we need to review it closely as that's exactly what I've seen (I
configured 1 element every 5 minutes and it was consistently emitting every
5 minutes plus a few millis, except for a time that it emitted after 4
minutes and some seconds.

About the Square Enix link, many thanks Reza, that's something we are
following for our implementation.

Thanks everyone again!

On Fri, May 25, 2018 at 1:34 AM Reza Rokni <r...@google.com> wrote:

> Hi,
>
> Not sure if this is useful for your use case but as you are using BQ with
> a changing schema the following may also be a interesting read ...
>
>
> https://cloud.google.com/blog/big-data/2018/02/how-to-handle-mutating-json-schemas-in-a-streaming-pipeline-with-square-enix
>
> Cheers
>
> Reza
>
>
>
> On Fri, May 25, 2018, 5:50 AM Raghu Angadi <rang...@google.com> wrote:
>
>>
>> On Thu, May 24, 2018 at 1:11 PM Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>>
>>> Hi everyone!!
>>>
>>> I'm building a pipeline to store streaming data into BQ and I'm using
>>> the pattern: Slowly changing lookup cache described here:
>>> https://cloud.google.com/blog/big-data/2017/06/guide-to-common-cloud-dataflow-use-case-patterns-part-1
>>>  to
>>> hold and refresh the table schemas (as they may change from time to time).
>>>
>>> Now I'd like to understand how that is scheduled on a distributed
>>> system. Who is running that code? One random node? One node but always the
>>> same? All nodes?
>>>
>>
>> GenerateSequence() is uses an unbounded source. Like any unbounded
>> source, it can has a set of 'splits' ('desiredNumSplits' [1] is set by
>> runtime). Each of the splits run in parallel.. a typical runtime
>> distributes these across the workers. Typically they stay on a worker
>> unless there is a reason to redistribute (autoscaling, workers unresponsive
>> etc). W.r.t. user application there are no guarantees about affinity.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L337
>>
>>
>>>
>>> Also, what are the GenerateSequence guarantees in terms of precision? I
>>> have it configured to generate 1 element every 5 minutes and most of the
>>> time it works exact, but sometimes it doesn't... Is that expected?
>>>
>>
>> Each of the splits mentioned above essentially runs 'advance() [2]' in a
>> loop. It check current walltime to decide if it needs to emit next element.
>> If the value you see off by a few seconds, it would imply 'advance()' was
>> not called during that time by the framework. Runtime frameworks usually
>> don't provide any hard or soft deadlines for scheduling work.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L337
>>
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L426
>>
>>
>>> Regards
>>>
>>


Understanding GenerateSequence and SideInputs

2018-05-24 Thread Carlos Alonso
Hi everyone!!

I'm building a pipeline to store streaming data into BQ and I'm using the
pattern: Slowly changing lookup cache described here:
https://cloud.google.com/blog/big-data/2017/06/guide-to-common-cloud-dataflow-use-case-patterns-part-1
to
hold and refresh the table schemas (as they may change from time to time).

Now I'd like to understand how that is scheduled on a distributed system.
Who is running that code? One random node? One node but always the same?
All nodes?

Also, what are the GenerateSequence guarantees in terms of precision? I
have it configured to generate 1 element every 5 minutes and most of the
time it works exact, but sometimes it doesn't... Is that expected?

Regards


Re: BigQuery streaming insert errors

2018-05-09 Thread Carlos Alonso
Filed https://issues.apache.org/jira/browse/BEAM-4257 and currently working
on it

On Sat, Apr 7, 2018 at 1:57 AM Gaurav Thakur <gaurav2...@gmail.com> wrote:

> Carlos,
>
> I see your point.
> I was expecting the InsertRetryPolicy.Context to hold and give an handle
> to that information. Spoke too soon.
>
> Thanks, Gaurav
>
> On Fri, Apr 6, 2018 at 8:01 PM, Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>> Hi Carlos,
>>
>> I don't think currently there's a way to collect the errors from BigQuery
>> for failed inserts. I agree that this can be useful addition. Feel free to
>> create a JIRA. Also, any contributions related to this are welcome.
>>
>> Thanks,
>> Cham
>>
>>
>> On Fri, Apr 6, 2018 at 12:29 AM Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>>
>>> Hi Gurav, many thanks for your response. I'm currently using retry
>>> policies, but imagine the following scenario:
>>>
>>> I'm trying to insert an existing field, even if we retry, it will still
>>> fail but I'll never be able to detect that within the pipeline, as
>>> getFailedInserts()
>>> https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.html#getFailedInserts--
>>>  only
>>> contains the TableRows that failed, not the reason.
>>>
>>> Adding the error as well won't be very hard as I understand it because
>>> BigQueryServicesImpl.insertAll|() actually know about it:
>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L750
>>>
>>> I think I would even volunteer to work on it if the community feels it
>>> makes sense as well.
>>>
>>> Regards
>>>
>>> On Fri, Apr 6, 2018 at 1:28 AM Gaurav Thakur <gaurav2...@gmail.com>
>>> wrote:
>>>
>>>> Hi Carlos,
>>>>
>>>> Would an insert retry policy help you?
>>>> Please see this,
>>>> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.Context.html
>>>>
>>>> Thanks, Gaurav
>>>>
>>>> On Fri, Apr 6, 2018 at 8:13 AM, Pablo Estrada <pabl...@google.com>
>>>> wrote:
>>>>
>>>>> Im adding Cham as he might be knowledgeable about BQ IO, or he might
>>>>> be able to redirect to someone else.
>>>>> Cham, do you have guidance for Carlos here?
>>>>> Thanks
>>>>> -P.
>>>>>
>>>>>
>>>>> On Mon, Apr 2, 2018 at 11:08 AM Carlos Alonso <car...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> And... where could I catch that exception?
>>>>>>
>>>>>> Thanks!
>>>>>> On Mon, 2 Apr 2018 at 16:58, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> Wouldn't the following code give you information about failed
>>>>>>> insertions (around line 790 in BigQueryServicesImpl) ?
>>>>>>>
>>>>>>>   if (!allErrors.isEmpty()) {
>>>>>>> throw new IOException("Insert failed: " + allErrors);
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Mon, Apr 2, 2018 at 7:16 AM, Carlos Alonso <car...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone!!
>>>>>>>>
>>>>>>>> I was wondering if there's any way to get the error why an insert
>>>>>>>> (streaming) failed. Looking at the code I think there's currently no 
>>>>>>>> way to
>>>>>>>> do that, as the BigQueryServicesImpl insertAll seems to discard the 
>>>>>>>> errors
>>>>>>>> and just add the failed TableRow instances into the failedInserts list.
>>>>>>>>
>>>>>>>> It would be very nice to have an "enriched" TableRow returned
>>>>>>>> instead that contains the error information for further processing (in 
>>>>>>>> our
>>>>>>>> use case we're saving the failed ones into a different table for 
>>>>>>>> further
>>>>>>>> analysis)
>>>>>>>>
>>>>>>>> Could this be added as an enhancement or similar Issue in GH/Jira?
>>>>>>>> Any other ideas?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>> Got feedback? go/pabloem-feedback
>>>>> <https://goto.google.com/pabloem-feedback>
>>>>>
>>>>
>>>>
>


Re: Chasing "Cannot output with timestamp" errors

2018-05-01 Thread Carlos Alonso
Yes, it is Scio code. That example was only 1ms, true, but there are other
examples where the difference is bigger, around 10 ms or even a bit more...

Thanks!

There are, though, other cases where the
On Tue, 1 May 2018 at 19:38, Kenneth Knowles <k...@google.com> wrote:

> Hmm. Since they are different by 1ms I wonder if it is rounding /
> truncation combined with very slight skew between Pubsub & DF. Just a
> random guess. Your code does seem reasonable at first glance, from a Beam
> perspective (it is Scio code, yes?)
>
> Kenn
>
> On Tue, May 1, 2018 at 8:00 AM Carlos Alonso <car...@mrcalonso.com> wrote:
>
>> Ok, so after checking logs deeper I've found a line that seems to
>> identify the steps (Adding config for S2:
>> {"instructions":[{"name":"pubsubSubscriptionWithAttributes@{PubSubDataReader.scala:10}/PubsubUnboundedSource","originalName":"s13","outputs":..),
>> so that would mean that the exception is thrown from the reading from
>> PubSub step in which I actually run this code:
>>
>> sc.pubsubSubscriptionWithAttributes[String](s"projects/$projectId/subscriptions/$subscription")
>>   .withName("Set timestamps")
>>   .timestampBy(_ => new Instant)
>>   .withName("Apply windowing")
>>   .withFixedWindows(windowSize)
>>
>>
>> I'm setting the elements in the window when they're read because I'm
>> pairing them with the schemas read from BigQuery using a side transform
>> later on... Is it possible that the elements already have a (somehow
>> future) timestamp and this timestampBy transform is causing the issue?
>>
>> If that would be the case, the elements read from PubSub would need to
>> have a "later" timestamp than "now", as, by the exception message, my
>> transform, that is setting timestamps to "now" would actually be trying to
>> set them backwards... Does it make any sense? (I'll try to dive into the
>> read from PubSub transform to double check...)
>>
>> Thanks!
>>
>> On Tue, May 1, 2018 at 4:44 PM Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>>
>>> Hi everyone!!
>>>
>>> I have a job that reads heterogeneous messages from PubSub and,
>>> depending on its type, writes them to the appropriate BigQuery table and I
>>> keep getting random "java.lang.IllegalArgumentException: Cannot output with
>>> timestamp" errors that I cannot identify, and I can't even figure out which
>>> part of the code is actually throwing the Exception by looking at the
>>> stacktrace...
>>>
>>> You can find the full stacktrace here: https://pastebin.com/1gN4ED2A and
>>> a couple of job ids are this 2018-04-26_08_56_42-10071326408494590980 and
>>> this: 2018-04-27_09_19_13-15798240702327849959
>>>
>>> Trying to, at least, figure out the source transform of the error, the
>>> logs says the trace was at stage S2, but I don't know how to identify which
>>> parts of my pipeline form which stages...
>>>
>>> Thanks!!
>>>
>>


Re: Chasing "Cannot output with timestamp" errors

2018-05-01 Thread Carlos Alonso
Ok, so after checking logs deeper I've found a line that seems to identify
the steps (Adding config for S2:
{"instructions":[{"name":"pubsubSubscriptionWithAttributes@{PubSubDataReader.scala:10}/PubsubUnboundedSource","originalName":"s13","outputs":..),
so that would mean that the exception is thrown from the reading from
PubSub step in which I actually run this code:

sc.pubsubSubscriptionWithAttributes[String](s"projects/$projectId/subscriptions/$subscription")
  .withName("Set timestamps")
  .timestampBy(_ => new Instant)
  .withName("Apply windowing")
  .withFixedWindows(windowSize)


I'm setting the elements in the window when they're read because I'm
pairing them with the schemas read from BigQuery using a side transform
later on... Is it possible that the elements already have a (somehow
future) timestamp and this timestampBy transform is causing the issue?

If that would be the case, the elements read from PubSub would need to have
a "later" timestamp than "now", as, by the exception message, my transform,
that is setting timestamps to "now" would actually be trying to set them
backwards... Does it make any sense? (I'll try to dive into the read from
PubSub transform to double check...)

Thanks!

On Tue, May 1, 2018 at 4:44 PM Carlos Alonso <car...@mrcalonso.com> wrote:

> Hi everyone!!
>
> I have a job that reads heterogeneous messages from PubSub and, depending
> on its type, writes them to the appropriate BigQuery table and I keep
> getting random "java.lang.IllegalArgumentException: Cannot output with
> timestamp" errors that I cannot identify, and I can't even figure out which
> part of the code is actually throwing the Exception by looking at the
> stacktrace...
>
> You can find the full stacktrace here: https://pastebin.com/1gN4ED2A and
> a couple of job ids are this 2018-04-26_08_56_42-10071326408494590980 and
> this: 2018-04-27_09_19_13-15798240702327849959
>
> Trying to, at least, figure out the source transform of the error, the
> logs says the trace was at stage S2, but I don't know how to identify which
> parts of my pipeline form which stages...
>
> Thanks!!
>


Re: BiqQueryIO.write and Wait.on

2018-04-20 Thread Carlos Alonso
Hey Eugene!!

I’d gladly take a stab on it although I’m not sure how much available time
I might have to put into but... yeah, let’s try it.

Where should I begin? Is there a Jira issue or shall I file one?

Thanks!
On Thu, 12 Apr 2018 at 00:41, Eugene Kirpichov <kirpic...@google.com> wrote:

> Hi,
>
> Yes, you're both right - BigQueryIO.write() is currently not implemented
> in a way that it can be used with Wait.on(). It would certainly be a
> welcome contribution to change this - many people expressed interest in
> specifically waiting for BigQuery writes. Is any of you interested in
> helping out?
>
> Thanks.
>
> On Fri, Apr 6, 2018 at 12:36 AM Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Hi Simon, I think your explanation was very accurate, at least to my
>> understanding. I'd also be interested in getting batch load result's
>> feedback on the pipeline... hopefully someone may suggest something,
>> otherwise we could propose submitting a Jira, or even better, a PR!! :)
>>
>> Thanks!
>>
>> On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
>> simon.kitch...@unbelievable-machine.com> wrote:
>>
>>> Hi All,
>>>
>>> I need to write some data to BigQuery (batch-mode) and then send a
>>> Pubsub message to trigger further processing.
>>>
>>> I found this thread titled "Callbacks/other functions run after a
>>> PDone/output transform" on the user-list which was very relevant:
>>>
>>> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>>>
>>> Thanks to the author of the Wait transform (Beam 2.4.0)!
>>>
>>> Unfortunately, it appears that the Wait.on transform does not work with
>>> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice
>>> appreciated.
>>>
>>> Here's (most of) the relevant test code:
>>> Pipeline p = Pipeline.create(options);
>>> PCollection lines = p.apply("Read Input",
>>> Create.of("line1", "line2", "line3", "line4"));
>>>
>>> TableFieldSchema f1 = new
>>> TableFieldSchema().setName("value").setType("string");
>>> TableSchema s2 = new
>>> TableSchema().setFields(Collections.singletonList(f1));
>>>
>>> WriteResult writeResult = lines.apply("Write and load data",
>>> BigQueryIO.write() //
>>> .to(options.getTableSpec()) //
>>> .withFormatFunction(new SlowFormatter()) //
>>> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
>>> //.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
>>> //
>>> .withSchema(s2)
>>>
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>> //
>>>
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>
>>>
>>> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
>>> OnCompletion()));
>>>
>>> where
>>> + format-function "SlowFormatter" prints out each line and has a small
>>> sleep for testing purposes, and
>>> + DoFn OnCompletion just prints out the contents of each line
>>>
>>> In production code, OnCompletion would be fed some collection derived
>>> from lines, eg min/max record id, and the operation would be "send pubsub
>>> message" rather than print..
>>>
>>> My expectation is that the "SlowFormatter" would run for each line, then
>>> the data would be uploaded, then OnCompletion would print each line. And
>>> indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS,
>>> LinePrinter runs before the upload takes place.
>>>
>>> I use WriteResult.getFailedInserts as that is the only "output" that
>>> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
>>> believe that it can be used as a "signal" for the Wait.on - ie the output
>>> is "complete for window" only after all data has been uploaded, which is
>>> what I need. And that does seem to work for STREAMING_LOADS.
>>>
>>> I suspect the reason that this does not work for FILE_LOADS is that
>>> method BatchLoads.writeResult returns a WriteResult that wraps an "empty"
>>> failedInserts collection, ie data which is not connected to the
>>> batch-load-job that is triggered:
>>>   private WriteResult writeResult(Pipeline p) {
>>> PCollection empty =
>>> p.apply("CreateEmptyFailedInserts",
>>> Create.empty(TypeDescriptor.of(TableRow.class)));
>>> return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>>>   }
>>>
>>> Note that BatchLoads does "synchronously" invoke BigQuery load jobs;
>>> once a job is submitted the code repeatedly polls the job status until it
>>> reaches DONE or FAILED. However that information does not appear to be
>>> exposed anywhere (unlike streaming which effectively exposes
>>> completion-state via the failedInserts stream).
>>>
>>> If I have misunderstood something, corrections welcome! If not,
>>> suggestions for workarounds or alternate solutions are also welcome :-)
>>>
>>> Thanks,
>>> Simon
>>>
>>>


Re: BiqQueryIO.write and Wait.on

2018-04-06 Thread Carlos Alonso
Hi Simon, I think your explanation was very accurate, at least to my
understanding. I'd also be interested in getting batch load result's
feedback on the pipeline... hopefully someone may suggest something,
otherwise we could propose submitting a Jira, or even better, a PR!! :)

Thanks!

On Thu, Apr 5, 2018 at 2:01 PM Simon Kitching <
simon.kitch...@unbelievable-machine.com> wrote:

> Hi All,
>
> I need to write some data to BigQuery (batch-mode) and then send a Pubsub
> message to trigger further processing.
>
> I found this thread titled "Callbacks/other functions run after a
> PDone/output transform" on the user-list which was very relevant:
>
> https://lists.apache.org/thread.html/ddcdf93604396b1cbcacdff49aba60817dc90ee7c8434725ea0d26c0@%3Cuser.beam.apache.org%3E
>
> Thanks to the author of the Wait transform (Beam 2.4.0)!
>
> Unfortunately, it appears that the Wait.on transform does not work with
> BiqQueryIO in FILE_LOADS mode - or at least I cannot get it to work. Advice
> appreciated.
>
> Here's (most of) the relevant test code:
> Pipeline p = Pipeline.create(options);
> PCollection lines = p.apply("Read Input",
> Create.of("line1", "line2", "line3", "line4"));
>
> TableFieldSchema f1 = new
> TableFieldSchema().setName("value").setType("string");
> TableSchema s2 = new
> TableSchema().setFields(Collections.singletonList(f1));
>
> WriteResult writeResult = lines.apply("Write and load data",
> BigQueryIO.write() //
> .to(options.getTableSpec()) //
> .withFormatFunction(new SlowFormatter()) //
> .withMethod(BigQueryIO.Write.Method.FILE_LOADS) //
> //.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) //
> .withSchema(s2)
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
> //
>
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>
>
> lines.apply(Wait.on(writeResult.getFailedInserts())).apply(ParDo.of(new
> OnCompletion()));
>
> where
> + format-function "SlowFormatter" prints out each line and has a small
> sleep for testing purposes, and
> + DoFn OnCompletion just prints out the contents of each line
>
> In production code, OnCompletion would be fed some collection derived from
> lines, eg min/max record id, and the operation would be "send pubsub
> message" rather than print..
>
> My expectation is that the "SlowFormatter" would run for each line, then
> the data would be uploaded, then OnCompletion would print each line. And
> indeed that happens when STREAMING_INSERTS is used. However for FILE_LOADS,
> LinePrinter runs before the upload takes place.
>
> I use WriteResult.getFailedInserts as that is the only "output" that
> BiqQueryIO.write() generates AFAICT. I don't expect any failed records, but
> believe that it can be used as a "signal" for the Wait.on - ie the output
> is "complete for window" only after all data has been uploaded, which is
> what I need. And that does seem to work for STREAMING_LOADS.
>
> I suspect the reason that this does not work for FILE_LOADS is that method
> BatchLoads.writeResult returns a WriteResult that wraps an "empty"
> failedInserts collection, ie data which is not connected to the
> batch-load-job that is triggered:
>   private WriteResult writeResult(Pipeline p) {
> PCollection empty =
> p.apply("CreateEmptyFailedInserts",
> Create.empty(TypeDescriptor.of(TableRow.class)));
> return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
>   }
>
> Note that BatchLoads does "synchronously" invoke BigQuery load jobs; once
> a job is submitted the code repeatedly polls the job status until it
> reaches DONE or FAILED. However that information does not appear to be
> exposed anywhere (unlike streaming which effectively exposes
> completion-state via the failedInserts stream).
>
> If I have misunderstood something, corrections welcome! If not,
> suggestions for workarounds or alternate solutions are also welcome :-)
>
> Thanks,
> Simon
>
>


Re: BigQuery streaming insert errors

2018-04-06 Thread Carlos Alonso
Hi Gurav, many thanks for your response. I'm currently using retry
policies, but imagine the following scenario:

I'm trying to insert an existing field, even if we retry, it will still
fail but I'll never be able to detect that within the pipeline, as
getFailedInserts()
https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.html#getFailedInserts--
only
contains the TableRows that failed, not the reason.

Adding the error as well won't be very hard as I understand it because
BigQueryServicesImpl.insertAll|() actually know about it:
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L750

I think I would even volunteer to work on it if the community feels it
makes sense as well.

Regards

On Fri, Apr 6, 2018 at 1:28 AM Gaurav Thakur <gaurav2...@gmail.com> wrote:

> Hi Carlos,
>
> Would an insert retry policy help you?
> Please see this,
> https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/gcp/bigquery/InsertRetryPolicy.Context.html
>
> Thanks, Gaurav
>
> On Fri, Apr 6, 2018 at 8:13 AM, Pablo Estrada <pabl...@google.com> wrote:
>
>> Im adding Cham as he might be knowledgeable about BQ IO, or he might be
>> able to redirect to someone else.
>> Cham, do you have guidance for Carlos here?
>> Thanks
>> -P.
>>
>>
>> On Mon, Apr 2, 2018 at 11:08 AM Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>>
>>> And... where could I catch that exception?
>>>
>>> Thanks!
>>> On Mon, 2 Apr 2018 at 16:58, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Wouldn't the following code give you information about failed
>>>> insertions (around line 790 in BigQueryServicesImpl) ?
>>>>
>>>>   if (!allErrors.isEmpty()) {
>>>> throw new IOException("Insert failed: " + allErrors);
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Apr 2, 2018 at 7:16 AM, Carlos Alonso <car...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> Hi everyone!!
>>>>>
>>>>> I was wondering if there's any way to get the error why an insert
>>>>> (streaming) failed. Looking at the code I think there's currently no way 
>>>>> to
>>>>> do that, as the BigQueryServicesImpl insertAll seems to discard the errors
>>>>> and just add the failed TableRow instances into the failedInserts list.
>>>>>
>>>>> It would be very nice to have an "enriched" TableRow returned instead
>>>>> that contains the error information for further processing (in our use 
>>>>> case
>>>>> we're saving the failed ones into a different table for further analysis)
>>>>>
>>>>> Could this be added as an enhancement or similar Issue in GH/Jira? Any
>>>>> other ideas?
>>>>>
>>>>> Thanks!
>>>>>
>>>>
>>>> --
>> Got feedback? go/pabloem-feedback
>>
>
>


Re: BigQuery streaming insert errors

2018-04-02 Thread Carlos Alonso
And... where could I catch that exception?

Thanks!
On Mon, 2 Apr 2018 at 16:58, Ted Yu <yuzhih...@gmail.com> wrote:

> Wouldn't the following code give you information about failed insertions
> (around line 790 in BigQueryServicesImpl) ?
>
>   if (!allErrors.isEmpty()) {
> throw new IOException("Insert failed: " + allErrors);
>
> Cheers
>
> On Mon, Apr 2, 2018 at 7:16 AM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Hi everyone!!
>>
>> I was wondering if there's any way to get the error why an insert
>> (streaming) failed. Looking at the code I think there's currently no way to
>> do that, as the BigQueryServicesImpl insertAll seems to discard the errors
>> and just add the failed TableRow instances into the failedInserts list.
>>
>> It would be very nice to have an "enriched" TableRow returned instead
>> that contains the error information for further processing (in our use case
>> we're saving the failed ones into a different table for further analysis)
>>
>> Could this be added as an enhancement or similar Issue in GH/Jira? Any
>> other ideas?
>>
>> Thanks!
>>
>
>


BigQuery streaming insert errors

2018-04-02 Thread Carlos Alonso
Hi everyone!!

I was wondering if there's any way to get the error why an insert
(streaming) failed. Looking at the code I think there's currently no way to
do that, as the BigQueryServicesImpl insertAll seems to discard the errors
and just add the failed TableRow instances into the failedInserts list.

It would be very nice to have an "enriched" TableRow returned instead that
contains the error information for further processing (in our use case
we're saving the failed ones into a different table for further analysis)

Could this be added as an enhancement or similar Issue in GH/Jira? Any
other ideas?

Thanks!


BigQuery FILE_LOADS recover from errors

2018-03-27 Thread Carlos Alonso
Hi all!!

On my pipeline I want to dump some data into BQ using FILE_LOADS write
method and I can't see how would I recover from errors (i.e. on the
pipeline detect which records couldn't be inserted and store it somewhere
else for further inspection) as the WriteTables transform throws an
Exception after BatchLoads.MAX_RETRY_JOBS retries...

How could I approach that?

Thanks!


Best way to repeatedly update a side input

2018-03-26 Thread Carlos Alonso
Hi all!!

I want to create a job that reads data from pubsub and then joins with
another collection (side input) read from a BigQuery table (actually the
schemas). The thing I'm not 100% sure is what would be the best way to
update that side input (as those schemas may change at any given point in
time). I thought of using GenerateSequence + FixedTimeWindows but I was
wondering if that would be the best approach as it sounded "hacky" to me.

The idea would be something like this:

val schemas = inputPipeline.context.customInput("Schemas",
  GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1)))
  //.withFixedWindows(Duration.standardMinutes(1))
  .withFixedWindows(Duration.standardMinutes(1))
  .flatMap { _ =>
log.info("Retrieving schemas...")
Seq("Table1", "Table2").map(n => (n, getBQSchema(table, n)))
  }.asMapSideInput

In case this makes sense I have another question, how would that behave
with many workers? Will each of the workers actually retrieve the schema or
will it be ran by one and then "broadcasted"?

Thanks!


Re: Table with field based partitioning must have a schema

2018-03-25 Thread Carlos Alonso
Shall I file a GH issue for that? Although I guess that’s something on the
BQ side rather than the Beam, right?

Thanks!!
On Sat, 24 Mar 2018 at 22:50, Ted Yu <yuzhih...@gmail.com> wrote:

> It seems an improvement can be made where if CREATE_NEVER is present, table
> with field based partitioning doesn't have to be associated with a schema.
>
> Cheers
>
> On Sat, Mar 24, 2018 at 2:30 PM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Otherwise the BQ load job fails with the above error as well (Table with
>> field based partitioning must have a schema).
>> On Sat, 24 Mar 2018 at 15:52, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> Hmm, glad it worked, but - if your create disposition was CREATE_NEVER,
>>> then why implement getSchema at all?
>>>
>>>
>>> On Sat, Mar 24, 2018, 7:01 AM Carlos Alonso <car...@mrcalonso.com>
>>> wrote:
>>>
>>>> The thing is that the previous log "Returning schema for ..." never
>>>> appears, so I don't think anything will appear on the log if I log what you
>>>> suggest too.
>>>>
>>>> Actually, after a couple more attempts, I changed the writeDisposition
>>>> of the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically
>>>> worked... So I guess there's something wrong when CREATE_NEVER is set or
>>>> something I don't understand...
>>>>
>>>> FYI my BigQueryIO looks like this
>>>>
>>>> BigQueryIO.write()
>>>>   .to(new JsonRouter(dataset))
>>>>   .withFormatFunction(i => JsonRouter.jsonToRow(i._1))
>>>>   .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>   .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
>>>>   .withMethod(Write.Method.FILE_LOADS)
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>> Can you try logging the result of your BigQueryUtil.parseSchema and
>>>>> confirm that it is always non-empty? What does the result look like for 
>>>>> the
>>>>> table that's failing to load?
>>>>>
>>>>> On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <car...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hi everyone!!
>>>>>>
>>>>>> When trying to insert into BigQuery using dynamic destinations I get
>>>>>> this error: "Tabie with field based partitioning must have a schema" that
>>>>>> suggests that I'm not providing such a schema and I don't understand why 
>>>>>> as
>>>>>> I think I am. Here: https://pastebin.com/Q1jF024B you can find the
>>>>>> full stack trace and below you can see the code of the 
>>>>>> DynamicDestinations
>>>>>> implementation. Basically I'm dumping a stream of PubSub into BQ being 
>>>>>> that
>>>>>> stream of heterogeneous Json documents and routing each type to its
>>>>>> corresponding table.
>>>>>>
>>>>>> The tuples contain the Json document and the schema itself for the
>>>>>> corresponding table (the tuple is composed in a previous transform before
>>>>>> from a side input as the schema is read from BQ using BigQueryClient
>>>>>> class). and the Destination KV[String, String] is supposed to hold the
>>>>>> table name as key and the schema as value.
>>>>>>
>>>>>> The logs show many entries for "Returning destination for...", a few
>>>>>> of "Returning table..." ones and no "Returning schema for..." at all 
>>>>>> which
>>>>>> may indicate why BQ complains that no schema is provided, the question
>>>>>> would then be... Why is that method never invoked?
>>>>>>
>>>>>> class JsonRouter(dataset: String)
>>>>>>   extends DynamicDestinations[(Json, String), KV[String, String]] {
>>>>>>
>>>>>>   import JsonRouter._
>>>>>>
>>>>>>   override def getDestination(element: ValueInSingleWindow[(Json, 
>>>>>> String)]): KV[String, String] = {
>>>>>> log.debug(s"Returning destination for ${element.getValue}")
>>>>>> KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
>>>>>>   }
>>>>>>
>>>>>>   override def getSchema(element: KV[String, String]): TableSchema = {
>>>>>> log.debug(s"Returning schema for ${element.getKey}")
>>>>>> BigQueryUtil.parseSchema(element.getValue)
>>>>>>   }
>>>>>>
>>>>>>   override def getTable(element: KV[String, String]): TableDestination = 
>>>>>> {
>>>>>> log.debug(s"Returning table for ${element.getKey}")
>>>>>> new TableDestination(s"$dataset.${element.getKey}", s"Table to store 
>>>>>> ${element.getKey}",
>>>>>>   BQTypesRouter.TimePartitioning)
>>>>>>   }
>>>>>>
>>>>>>   override def getDestinationCoder: Coder[KV[String, String]] =
>>>>>> KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Thanks!!
>>>>>>
>>>>>
>


Re: Table with field based partitioning must have a schema

2018-03-24 Thread Carlos Alonso
Otherwise the BQ load job fails with the above error as well (Table with
field based partitioning must have a schema).
On Sat, 24 Mar 2018 at 15:52, Eugene Kirpichov <kirpic...@google.com> wrote:

> Hmm, glad it worked, but - if your create disposition was CREATE_NEVER,
> then why implement getSchema at all?
>
> On Sat, Mar 24, 2018, 7:01 AM Carlos Alonso <car...@mrcalonso.com> wrote:
>
>> The thing is that the previous log "Returning schema for ..." never
>> appears, so I don't think anything will appear on the log if I log what you
>> suggest too.
>>
>> Actually, after a couple more attempts, I changed the writeDisposition of
>> the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically
>> worked... So I guess there's something wrong when CREATE_NEVER is set or
>> something I don't understand...
>>
>> FYI my BigQueryIO looks like this
>>
>> BigQueryIO.write()
>>   .to(new JsonRouter(dataset))
>>   .withFormatFunction(i => JsonRouter.jsonToRow(i._1))
>>   .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
>>   .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
>>   .withMethod(Write.Method.FILE_LOADS)
>>
>>
>> Thanks!
>>
>> On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> Can you try logging the result of your BigQueryUtil.parseSchema and
>>> confirm that it is always non-empty? What does the result look like for the
>>> table that's failing to load?
>>>
>>> On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <car...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hi everyone!!
>>>>
>>>> When trying to insert into BigQuery using dynamic destinations I get
>>>> this error: "Tabie with field based partitioning must have a schema" that
>>>> suggests that I'm not providing such a schema and I don't understand why as
>>>> I think I am. Here: https://pastebin.com/Q1jF024B you can find the
>>>> full stack trace and below you can see the code of the DynamicDestinations
>>>> implementation. Basically I'm dumping a stream of PubSub into BQ being that
>>>> stream of heterogeneous Json documents and routing each type to its
>>>> corresponding table.
>>>>
>>>> The tuples contain the Json document and the schema itself for the
>>>> corresponding table (the tuple is composed in a previous transform before
>>>> from a side input as the schema is read from BQ using BigQueryClient
>>>> class). and the Destination KV[String, String] is supposed to hold the
>>>> table name as key and the schema as value.
>>>>
>>>> The logs show many entries for "Returning destination for...", a few of
>>>> "Returning table..." ones and no "Returning schema for..." at all which may
>>>> indicate why BQ complains that no schema is provided, the question would
>>>> then be... Why is that method never invoked?
>>>>
>>>> class JsonRouter(dataset: String)
>>>>   extends DynamicDestinations[(Json, String), KV[String, String]] {
>>>>
>>>>   import JsonRouter._
>>>>
>>>>   override def getDestination(element: ValueInSingleWindow[(Json, 
>>>> String)]): KV[String, String] = {
>>>> log.debug(s"Returning destination for ${element.getValue}")
>>>> KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
>>>>   }
>>>>
>>>>   override def getSchema(element: KV[String, String]): TableSchema = {
>>>> log.debug(s"Returning schema for ${element.getKey}")
>>>> BigQueryUtil.parseSchema(element.getValue)
>>>>   }
>>>>
>>>>   override def getTable(element: KV[String, String]): TableDestination = {
>>>> log.debug(s"Returning table for ${element.getKey}")
>>>> new TableDestination(s"$dataset.${element.getKey}", s"Table to store 
>>>> ${element.getKey}",
>>>>   BQTypesRouter.TimePartitioning)
>>>>   }
>>>>
>>>>   override def getDestinationCoder: Coder[KV[String, String]] =
>>>> KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
>>>> }
>>>>
>>>>
>>>> Thanks!!
>>>>
>>>


Re: Table with field based partitioning must have a schema

2018-03-24 Thread Carlos Alonso
The thing is that the previous log "Returning schema for ..." never
appears, so I don't think anything will appear on the log if I log what you
suggest too.

Actually, after a couple more attempts, I changed the writeDisposition of
the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically
worked... So I guess there's something wrong when CREATE_NEVER is set or
something I don't understand...

FYI my BigQueryIO looks like this

BigQueryIO.write()
  .to(new JsonRouter(dataset))
  .withFormatFunction(i => JsonRouter.jsonToRow(i._1))
  .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
  .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
  .withMethod(Write.Method.FILE_LOADS)


Thanks!

On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Can you try logging the result of your BigQueryUtil.parseSchema and
> confirm that it is always non-empty? What does the result look like for the
> table that's failing to load?
>
> On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Hi everyone!!
>>
>> When trying to insert into BigQuery using dynamic destinations I get this
>> error: "Tabie with field based partitioning must have a schema" that
>> suggests that I'm not providing such a schema and I don't understand why as
>> I think I am. Here: https://pastebin.com/Q1jF024B you can find the full
>> stack trace and below you can see the code of the DynamicDestinations
>> implementation. Basically I'm dumping a stream of PubSub into BQ being that
>> stream of heterogeneous Json documents and routing each type to its
>> corresponding table.
>>
>> The tuples contain the Json document and the schema itself for the
>> corresponding table (the tuple is composed in a previous transform before
>> from a side input as the schema is read from BQ using BigQueryClient
>> class). and the Destination KV[String, String] is supposed to hold the
>> table name as key and the schema as value.
>>
>> The logs show many entries for "Returning destination for...", a few of
>> "Returning table..." ones and no "Returning schema for..." at all which may
>> indicate why BQ complains that no schema is provided, the question would
>> then be... Why is that method never invoked?
>>
>> class JsonRouter(dataset: String)
>>   extends DynamicDestinations[(Json, String), KV[String, String]] {
>>
>>   import JsonRouter._
>>
>>   override def getDestination(element: ValueInSingleWindow[(Json, String)]): 
>> KV[String, String] = {
>> log.debug(s"Returning destination for ${element.getValue}")
>> KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
>>   }
>>
>>   override def getSchema(element: KV[String, String]): TableSchema = {
>> log.debug(s"Returning schema for ${element.getKey}")
>> BigQueryUtil.parseSchema(element.getValue)
>>   }
>>
>>   override def getTable(element: KV[String, String]): TableDestination = {
>> log.debug(s"Returning table for ${element.getKey}")
>> new TableDestination(s"$dataset.${element.getKey}", s"Table to store 
>> ${element.getKey}",
>>   BQTypesRouter.TimePartitioning)
>>   }
>>
>>   override def getDestinationCoder: Coder[KV[String, String]] =
>> KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
>> }
>>
>>
>> Thanks!!
>>
>


Table with field based partitioning must have a schema

2018-03-23 Thread Carlos Alonso
Hi everyone!!

When trying to insert into BigQuery using dynamic destinations I get this
error: "Tabie with field based partitioning must have a schema" that
suggests that I'm not providing such a schema and I don't understand why as
I think I am. Here: https://pastebin.com/Q1jF024B you can find the full
stack trace and below you can see the code of the DynamicDestinations
implementation. Basically I'm dumping a stream of PubSub into BQ being that
stream of heterogeneous Json documents and routing each type to its
corresponding table.

The tuples contain the Json document and the schema itself for the
corresponding table (the tuple is composed in a previous transform before
from a side input as the schema is read from BQ using BigQueryClient
class). and the Destination KV[String, String] is supposed to hold the
table name as key and the schema as value.

The logs show many entries for "Returning destination for...", a few of
"Returning table..." ones and no "Returning schema for..." at all which may
indicate why BQ complains that no schema is provided, the question would
then be... Why is that method never invoked?

class JsonRouter(dataset: String)
  extends DynamicDestinations[(Json, String), KV[String, String]] {

  import JsonRouter._

  override def getDestination(element: ValueInSingleWindow[(Json,
String)]): KV[String, String] = {
log.debug(s"Returning destination for ${element.getValue}")
KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
  }

  override def getSchema(element: KV[String, String]): TableSchema = {
log.debug(s"Returning schema for ${element.getKey}")
BigQueryUtil.parseSchema(element.getValue)
  }

  override def getTable(element: KV[String, String]): TableDestination = {
log.debug(s"Returning table for ${element.getKey}")
new TableDestination(s"$dataset.${element.getKey}", s"Table to
store ${element.getKey}",
  BQTypesRouter.TimePartitioning)
  }

  override def getDestinationCoder: Coder[KV[String, String]] =
KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
}


Thanks!!


Re: BigQueryIO streaming inserts - poor performance with multiple tables

2018-03-06 Thread Carlos Alonso
Could you please keep writing here the findings you make?

I'm very interested in this issue as well.

Thanks!

On Thu, Mar 1, 2018 at 9:45 AM Josh  wrote:

> Hi Cham,
>
> Thanks, I have emailed the dataflow-feedback email address with the
> details.
>
> Best regards,
> Josh
>
> On Thu, Mar 1, 2018 at 12:26 AM, Chamikara Jayalath 
> wrote:
>
>> Could be a DataflowRunner specific issue. Would you mind reporting this
>> with corresponding Dataflow job IDs to either Dataflow stackoverflow
>> channel [1] or dataflow-feedb...@google.com ?
>>
>> I suspect Dataflow split writing to multiple tables into multiple workers
>> which may be keep all workers busy but we have to look at the job to
>> confirm.
>>
>> Thanks,
>> Cham
>>
>> [1] https://stackoverflow.com/questions/tagged/google-cloud-dataflow
>>
>> On Tue, Feb 27, 2018 at 11:56 PM Josh  wrote:
>>
>>> Hi all,
>>>
>>> We are using BigQueryIO.write() to stream data into BigQuery, and are
>>> seeing very poor performance in terms of number of writes per second per
>>> worker.
>>>
>>> We are currently using *32* x *n1-standard-4* workers to stream ~15,000
>>> writes/sec to BigQuery. Each worker has ~90% CPU utilisation. Strangely the
>>> number of workers and worker CPU utilisation remains constant at ~90% even
>>> when the rate of input fluctuates down to below 10,000 writes/sec. The job
>>> always keeps up with the stream (no backlog).
>>>
>>> I've seen BigQueryIO benchmarks which show ~20k writes/sec being
>>> achieved with a single node, when streaming data into a *single* BQ
>>> table... So my theory is that writing to multiple tables is somehow causing
>>> the performance issue. Our writes are spread (unevenly) across 200+ tables.
>>> The job itself does very little processing, and looking at the Dataflow
>>> metrics pretty much all of the wall time is spent in the
>>> *StreamingWrite* step of BigQueryIO. The Beam version is 2.2.0.
>>>
>>> Our code looks like this:
>>>
>>> stream.apply(BigQueryIO.write()
>>> .to(new ToDestination())
>>> .withFormatFunction(new FormatForBigQuery())
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
>>>
>>> where ToDestination is a:
>>>
>>> SerializableFunction
>>>
>>> which returns a:
>>>
>>> new TableDestination(tableName, "")
>>>
>>> where tableName looks like "myproject:dataset.tablename$20180228"
>>>
>>> Has as anyone else seen this kind of poor performance when streaming writes 
>>> to multiple BQ tables? Is there anything here that sounds wrong, or any 
>>> optimisations we can make?
>>>
>>> Thanks for any advice!
>>>
>>> Josh
>>>
>>
>


Re: Partitioning a stream randomly and writing to files with TextIO

2018-02-23 Thread Carlos Alonso
Hi Lukasz, could you please elaborate a bit more around the 2nd part?
What's important to know, from the developers perspective, about Dataflow's
memory management? How big can partitions grow? And what are the
performance considerations? As this sounds like if the workers will "swap"
into disk if partitions are very big, right?

Thanks!

On Fri, Feb 23, 2018 at 2:27 AM Lukasz Cwik  wrote:

> 1) Creating a PartitionFn is the right way to go. I would suggest using
> something which would give you stable output so you could replay your
> pipeline and this would be useful for tests as well. Use something like the
> object's hashcode and divide the hash space into 80%/10%/10% segments could
> work just make sure that if you go with hashcode the hashcode function
> distribute elements well.
>
> 2) This is runner dependent but most runners don't require storing
> everything in memory. For example if you were using Dataflow, you would
> only need to store a couple of elements in memory not the entire
> PCollection.
>
> On Thu, Feb 22, 2018 at 11:38 AM, Josh  wrote:
>
>> Hi all,
>>
>> I want to read a large dataset using BigQueryIO, and then randomly
>> partition the rows into three chunks, where one partition has 80% of the
>> data and there are two other partitions with 10% and 10%. I then want to
>> write the three partitions to three files in GCS.
>>
>> I have a couple of quick questions:
>> (1) What would be the best way to do this random partitioning with Beam?
>> I think I can just use a PartitionFn which uses Math.random to determine
>> which of the three partitions an element should go to, but not sure if
>> there is a better approach.
>>
>> (2) I would then take the resulting PCollectionList and use TextIO to
>> write each partition to a GCS file. For this, would I need all data for the
>> largest partition to fit into the memory of a single worker?
>>
>> Thanks for any advice,
>>
>> Josh
>>
>
>


Re: Unable to decode tag list after update

2018-02-14 Thread Carlos Alonso
Also I think this situation leads to a blocked status in which elements
with two different version UIDs are mixed and therefore unable to progress.

When I realised this was happening, I tried to rollback to the previous
version of the pipeline to drain it and then re-create the job with the new
version, but during that time, as new elements were streamed in, the old
version wasn't able to decode de new ones.

Does it make any sense?

Thanks!

On Wed, Feb 14, 2018 at 12:50 PM Carlos Alonso <car...@mrcalonso.com> wrote:

> Another thing I've realised is that the stacktrace suggests it is using
> SerializerCoder instead of the MessageWithAttributesCoder custom one I have
> implemented. See it here: https://pastebin.com/86gWddi9
>
> Is it possible that, upon updates, custom coders are not chosen? To give a
> little more context. The MessageWithAttribtues object is contained within a
> KV and a kept in a stateful/timely processing step. The coder is set before
> that stateful step via .setCoder(KVCoder.of(StringUtf8Coder.of(),
> MessageWithAttribtuesCoder.of())) and the exception is thrown when the
> buffered items are flushed out.
>
> Thanks!
>
> On Wed, Feb 14, 2018 at 11:33 AM Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> I've added a couple of methods to a case class and updated the job on
>> Dataflow and started getting
>>
>> java.lang.IllegalStateException: Unable to decode tag list using
>> org.apache.beam.sdk.coders.SerializableCoder@4ad81832
>>
>> Caused by java.io.InvalidClassException:
>> my.package.MessageWithAttributes; local class incompatible: stream
>> classdesc serialVersionUID = -5160195612720162441, local class
>> serialVersionUID = -9104690746829156208
>>
>> It seems that, although the data contained in the class has not changed,
>> the class has changed it cannot deserialise it anymore. How should I
>> proceed to avoid this situations?
>>
>> Thanks!
>>
>


Re: Unable to decode tag list after update

2018-02-14 Thread Carlos Alonso
Another thing I've realised is that the stacktrace suggests it is using
SerializerCoder instead of the MessageWithAttributesCoder custom one I have
implemented. See it here: https://pastebin.com/86gWddi9

Is it possible that, upon updates, custom coders are not chosen? To give a
little more context. The MessageWithAttribtues object is contained within a
KV and a kept in a stateful/timely processing step. The coder is set before
that stateful step via .setCoder(KVCoder.of(StringUtf8Coder.of(),
MessageWithAttribtuesCoder.of())) and the exception is thrown when the
buffered items are flushed out.

Thanks!

On Wed, Feb 14, 2018 at 11:33 AM Carlos Alonso <car...@mrcalonso.com> wrote:

> I've added a couple of methods to a case class and updated the job on
> Dataflow and started getting
>
> java.lang.IllegalStateException: Unable to decode tag list using
> org.apache.beam.sdk.coders.SerializableCoder@4ad81832
>
> Caused by java.io.InvalidClassException: my.package.MessageWithAttributes;
> local class incompatible: stream classdesc serialVersionUID =
> -5160195612720162441, local class serialVersionUID = -9104690746829156208
>
> It seems that, although the data contained in the class has not changed,
> the class has changed it cannot deserialise it anymore. How should I
> proceed to avoid this situations?
>
> Thanks!
>


Unable to decode tag list after update

2018-02-14 Thread Carlos Alonso
I've added a couple of methods to a case class and updated the job on
Dataflow and started getting

java.lang.IllegalStateException: Unable to decode tag list using
org.apache.beam.sdk.coders.SerializableCoder@4ad81832

Caused by java.io.InvalidClassException: my.package.MessageWithAttributes;
local class incompatible: stream classdesc serialVersionUID =
-5160195612720162441, local class serialVersionUID = -9104690746829156208

It seems that, although the data contained in the class has not changed,
the class has changed it cannot deserialise it anymore. How should I
proceed to avoid this situations?

Thanks!


Re: How does TextIO decides when to finalise a file?

2018-02-13 Thread Carlos Alonso
Cool thanks!

How does it work internally? Are all the elements routed to the same path
grouped and processed within the same bundle?

Thanks!

On Tue, Feb 13, 2018 at 9:03 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> It will do its best to throw an exception if duplicate names are produced
> within one pane. Otherwise, it will overwrite.
>
> On Tue, Feb 13, 2018 at 11:58 AM Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Cool, thanks.
>>
>> What if the destination is not properly coded and the File naming policy
>> then produces a duplicated path? Will it throw an exception? Overwrite?
>>
>> Thanks!
>>
>> On Tue, Feb 13, 2018 at 6:23 PM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> Dynamic file writes generate 1 set of files (shards) for every pane
>>> firing of every window of every destination. File naming policy is required
>>> to produce different names for every combination of (destination, shard
>>> index, window, pane) so you never have to append or overwrite. A new
>>> element arriving for a destination after something for that destination has
>>> already been written will simply be in the next pane, or in a different
>>> window.
>>>
>>> On Tue, Feb 13, 2018, 6:33 AM Carlos Alonso <car...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hi everyone!!
>>>>
>>>> I'm wondering how a TextIO with dynamic routing knows/decides when to
>>>> finalise a file and what happens if after it is finalised, another element
>>>> routed for the same file appears.
>>>>
>>>> Thanks!
>>>>
>>>


Re: How does TextIO decides when to finalise a file?

2018-02-13 Thread Carlos Alonso
Cool, thanks.

What if the destination is not properly coded and the File naming policy
then produces a duplicated path? Will it throw an exception? Overwrite?

Thanks!

On Tue, Feb 13, 2018 at 6:23 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Dynamic file writes generate 1 set of files (shards) for every pane firing
> of every window of every destination. File naming policy is required to
> produce different names for every combination of (destination, shard index,
> window, pane) so you never have to append or overwrite. A new element
> arriving for a destination after something for that destination has already
> been written will simply be in the next pane, or in a different window.
>
> On Tue, Feb 13, 2018, 6:33 AM Carlos Alonso <car...@mrcalonso.com> wrote:
>
>> Hi everyone!!
>>
>> I'm wondering how a TextIO with dynamic routing knows/decides when to
>> finalise a file and what happens if after it is finalised, another element
>> routed for the same file appears.
>>
>> Thanks!
>>
>


Re: ParDo requires its input to use KvCoder in order to use state and timers

2018-02-13 Thread Carlos Alonso
I have a question about this. My scenario is:

* PubSub input with a timestampAttribute named doc_timestamp
* Fixed windowing of one hour size.
* Keys are an internal attribute of the messages (the type)
* Messages of one particular type are way more frequent than the others, so
it is likely a hot key

Will it help if I add a string representation of the doc_timestamp (to the
hour) to the key, in order to increase the range of keys and therefore make
it more parallelisable?

I wonder if it will help or not, as the final result will be the same (type
+ window), but not sure if it would help before the point where the
windowing is applied.

Thanks!


On Mon, Feb 12, 2018 at 6:41 PM Carlos Alonso <car...@mrcalonso.com> wrote:

> Ok, that makes a lot of sense. Thanks Kenneth!
>
> On Mon, Feb 12, 2018 at 5:41 PM Kenneth Knowles <k...@google.com> wrote:
>
>> Hi Carlos,
>>
>> You are quite correct that choosing the keys is important for work to be
>> evenly distributed. The reason you need to have a KvCoder is that state is
>> partitioned per key (to give natural & automatic parallelism) and window
>> (to allow reclaiming expired state so you can process unbounded data with
>> bounded storage, and also more parallelism). To a Beam runner, most data in
>> the pipeline is "just bytes" that it cannot interpret. KvCoder is a special
>> case where a runner knows the binary layout of encoded data so it can pull
>> out the keys in order to shuffle data of the same key to the same place, so
>> that is why it has to be a KvCoder.
>>
>> Kenn
>>
>> On Mon, Feb 12, 2018 at 5:52 AM, Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>>
>>> I was refactoring my solution a bit and tried to make my stateful
>>> transform to work on simple case classes and I got this exception:
>>> https://pastebin.com/x4xADmvL . I'd like to understand the rationale
>>> behind this as I think carefully choosing the keys would be very important
>>> in order for the work to be properly distributed.
>>>
>>> Thanks!
>>>
>>
>>


How does TextIO decides when to finalise a file?

2018-02-13 Thread Carlos Alonso
Hi everyone!!

I'm wondering how a TextIO with dynamic routing knows/decides when to
finalise a file and what happens if after it is finalised, another element
routed for the same file appears.

Thanks!


Re: Triggers based on size

2018-02-12 Thread Carlos Alonso
I've finally managed to understand, write and run my job using stateful and
timely processing. Here:
https://gist.github.com/calonso/14b06f4f58faf286cc79181cb46a9b85 you can
see the code should someone need inspiration.

Thanks a lot for your help, for encouraging me going that way, for such a
great product and the amazing community you're building around it.

On Wed, Jan 10, 2018 at 6:11 PM Robert Bradshaw <rober...@google.com> wrote:

> Sounds like you have enough to get started. Feel free to come back
> here with more specifics if you can't get it working.
>
> On Wed, Jan 10, 2018 at 9:09 AM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
> > Thanks Robert!!
> >
> > After reading this and the former post about stateful processing
> Kenneth's
> > suggestions sounds sensible. I'll probably give them a try!! Is there
> > anything you would like to advice me before starting?
> >
> > Thanks!
> >
> > On Wed, Jan 10, 2018 at 10:13 AM Robert Bradshaw <rober...@google.com>
> > wrote:
> >>
> >> Unfortunately, the metadata driven trigger is still just an idea, not
> >> yet implemented.
> >>
> >> A good introduction to state and timers can be found at
> >> https://beam.apache.org/blog/2017/08/28/timely-processing.html
> >>
> >> On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso <car...@mrcalonso.com>
> >> wrote:
> >> > Hi Robert, Kenneth.
> >> >
> >> > Thanks a lot to both of you for your responses!!
> >> >
> >> > Kenneth, unfortunately I'm not sure we're experienced enough with
> Apache
> >> > Beam to get anywhere close to your suggestion, but thanks anyway!!
> >> >
> >> > Robert, your suggestion sounds great to me, could you please provide
> any
> >> > example on how to use that 'metadata driven' trigger?
> >> >
> >> > Thanks!
> >> >
> >> > On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles <k...@google.com>
> wrote:
> >> >>
> >> >> Often, when you need or want more control than triggers provide, such
> >> >> as
> >> >> input-type-specific logic like yours, you can use state and timers in
> >> >> ParDo
> >> >> to control when to output. You lose any potential optimizations of
> >> >> Combine
> >> >> based on associativity/commutativity and assume the burden of making
> >> >> sure
> >> >> your output is sensible, but dropping to low-level stateful
> computation
> >> >> may
> >> >> be your best bet.
> >> >>
> >> >> Kenn
> >> >>
> >> >> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw <
> rober...@google.com>
> >> >> wrote:
> >> >>>
> >> >>> We've tossed around the idea of "metadata-driven" triggers which
> would
> >> >>> essentially let you provide a mapping element -> metadata and a
> >> >>> monotonic CombineFn metadata* -> bool that would allow for this (the
> >> >>> AfterCount being a special case of this, with the mapping fn being _
> >> >>> -> 1, and the CombineFn being sum(...) >= N, for size one would
> >> >>> provide a (perhaps approximate) sizing mapping fn).
> >> >>>
> >> >>> Note, however, that there's no guarantee that the trigger fire as
> soon
> >> >>> as possible; due to runtime characteristics a significant amount of
> >> >>> data may be buffered (or come in at once) before the trigger is
> >> >>> queried. One possibility would be to follow your triggering with a
> >> >>> DoFn that breaks up large value streams into multiple manageable
> sized
> >> >>> ones as needed.
> >> >>>
> >> >>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <
> car...@mrcalonso.com>
> >> >>> wrote:
> >> >>> > Hi everyone!!
> >> >>> >
> >> >>> > I was wondering if there is an option to trigger window panes
> based
> >> >>> > on
> >> >>> > the
> >> >>> > size of the pane itself (rather than the number of elements).
> >> >>> >
> >> >>> > To provide a little bit more of context we're backing up a PubSub
> >> >>> > topic
> >> >>> > into
> >> >>> > GCS with the "special" feature that, depending on the "type" of
> the
> >> >>> > message,
> >> >>> > the GCS destination is one or another.
> >> >>> >
> >> >>> > Messages' 'shape' published there is quite random, some of them
> are
> >> >>> > very
> >> >>> > frequent and small, some others very big but sparse... We have
> >> >>> > around
> >> >>> > 150
> >> >>> > messages per second (in total) and we're firing every 15 minutes
> and
> >> >>> > experiencing OOM errors, we've considered firing based on the
> number
> >> >>> > of
> >> >>> > items as well, but given the randomness of the input, I don't
> think
> >> >>> > it
> >> >>> > will
> >> >>> > be a final solution either.
> >> >>> >
> >> >>> > Having a trigger based on size would be great, another option
> would
> >> >>> > be
> >> >>> > to
> >> >>> > have a dynamic shards number for the PTransform that actually
> writes
> >> >>> > the
> >> >>> > files.
> >> >>> >
> >> >>> > What is your recommendation for this use case?
> >> >>> >
> >> >>> > Thanks!!
> >> >>
> >> >>
> >> >
>


Re: ParDo requires its input to use KvCoder in order to use state and timers

2018-02-12 Thread Carlos Alonso
Ok, that makes a lot of sense. Thanks Kenneth!

On Mon, Feb 12, 2018 at 5:41 PM Kenneth Knowles <k...@google.com> wrote:

> Hi Carlos,
>
> You are quite correct that choosing the keys is important for work to be
> evenly distributed. The reason you need to have a KvCoder is that state is
> partitioned per key (to give natural & automatic parallelism) and window
> (to allow reclaiming expired state so you can process unbounded data with
> bounded storage, and also more parallelism). To a Beam runner, most data in
> the pipeline is "just bytes" that it cannot interpret. KvCoder is a special
> case where a runner knows the binary layout of encoded data so it can pull
> out the keys in order to shuffle data of the same key to the same place, so
> that is why it has to be a KvCoder.
>
> Kenn
>
> On Mon, Feb 12, 2018 at 5:52 AM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> I was refactoring my solution a bit and tried to make my stateful
>> transform to work on simple case classes and I got this exception:
>> https://pastebin.com/x4xADmvL . I'd like to understand the rationale
>> behind this as I think carefully choosing the keys would be very important
>> in order for the work to be properly distributed.
>>
>> Thanks!
>>
>
>


Re: London Apache Beam meetup 2: 11/01

2018-02-12 Thread Carlos Alonso
Thanks Matthias!! Lovely!

On Mon, Feb 12, 2018 at 12:24 PM Matthias Baetens <
matthias.baet...@datatonic.com> wrote:

> As promised (but with some delay): the recording
> <http://bit.ly/SecondApacheBeamMeetupLondonRecording> of our second Beam
> meetup in London! Enjoy :-)
>
> On Tue, Jan 9, 2018 at 12:30 PM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Cool, thanks!!
>>
>>
>> On Mon, Jan 8, 2018 at 1:38 PM Matthias Baetens <
>> matthias.baet...@datatonic.com> wrote:
>>
>>> Yes, we put everything in place to record this time and hope to share
>>> the recordings soon after the meetup. Stay tuned!
>>>
>>> On 8 Jan 2018 10:32, "Carlos Alonso" <car...@mrcalonso.com> wrote:
>>>
>>> Will it be recorded?
>>>
>>> On Fri, Jan 5, 2018 at 5:11 PM Matthias Baetens <
>>> matthias.baet...@datatonic.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Excited to announce the second Beam meet up located in the *Qubit
>>>> offices <https://goo.gl/maps/sVmFYrVys1S2> *next *Thursday 11/01.*
>>>>
>>>> We are very excited to have JB flying in to talk about IO and
>>>> Splittable DoFns and Vadim Sobolevski to share on how FutureFlow uses Beam
>>>> in a finance use case.
>>>>
>>>> More info and RSVP here <http://bit.ly/2zcUy5A>. We are looking
>>>> forward to welcome you all!
>>>>
>>>> Best regards,
>>>> Matthias
>>>>
>>>
>>>
>
>
> --
>
>
> *Matthias Baetens*
>
>
> *datatonic | data power unleashed*
>
> office +44 203 668 3680 <+44%2020%203668%203680>  |  mobile +44 74 918
> 20646
>
> Level24 | 1 Canada Square | Canary Wharf | E14 5AB London
> <https://maps.google.com/?q=1+Canada+Square+%7C+Canary+Wharf+%7C+E14+5AB+London=gmail=g>
>
>
> We've been announced
> <https://blog.google/topics/google-cloud/investing-vibrant-google-cloud-ecosystem-new-programs-and-partnerships/>
>  as
> one of the top global Google Cloud Machine Learning partners.
>


Re: Trying to understand Unable to encode element exceptions

2018-02-10 Thread Carlos Alonso
I've added a comment with a link to our working Stateful and timely
processing solution:
https://github.com/spotify/scio/issues/448#issuecomment-364705100

On Fri, Jan 26, 2018 at 1:43 AM Neville Li <neville@gmail.com> wrote:

> Here's a fix to #1020
> https://github.com/spotify/scio/pull/1032
>
> On Sun, Jan 21, 2018 at 4:36 PM Neville Li <neville@gmail.com> wrote:
>
>> Awesome!
>> We have't wrapped any stateful processing API in scala but if you have
>> working snippet or ideas it'd be great to share in that ticket.
>>
>> On Sat, Jan 20, 2018 at 4:31 PM Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>>
>>> Thanks Neville!!
>>>
>>> Your recommendation worked great. Thanks for your help!!
>>>
>>> As a side note, I found this issue:
>>> https://github.com/spotify/scio/issues/448
>>>
>>> I can share/help there with our experience, as our job, with scio +
>>> stateful + timely processing is working fine as of today
>>>
>>> Regards!!
>>>
>>> On Fri, Jan 19, 2018 at 6:21 PM Neville Li <neville@gmail.com>
>>> wrote:
>>>
>>>> Welcome.
>>>>
>>>> Added an issue so we may improve this in the future:
>>>> https://github.com/spotify/scio/issues/1020
>>>>
>>>>
>>>> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <car...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> To build the beam transform I was following this example:
>>>>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala
>>>>>
>>>>> To be honest I don't know how to apply timely and stateful processing
>>>>> without using a beam transform or how to rewrite it using the scio 
>>>>> built-in
>>>>> you suggest. Could you please give me an example?
>>>>>
>>>>> Thanks for your help!
>>>>>
>>>>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <neville@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> That happens when you mix beam transforms into scio and defeats the
>>>>>> safety we have in place. Map the values into something beam-serializable
>>>>>> first or rewrite the transform with a scio built-in which takes care of
>>>>>> KvCoder.
>>>>>>
>>>>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I'm following this example:
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>>>>
>>>>>>> because I'm building something very similar to a group into batches
>>>>>>> functionality. If I don't set the coder manually, this exception arises:
>>>>>>> https://pastebin.com/xxdDMXSf
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <neville@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>>>>> KryoAtomicCoder for most Scala types.
>>>>>>>> More details:
>>>>>>>>
>>>>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>>>>
>>>>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> May it be because I’m using
>>>>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])))
>>>>>>>>>  at
>>>>>>>>> some point in the pipeline
>>>>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>>>>> outputs a SerializableCoder)?
>>>>>>>>>
>>>>>>>>> This is something I've always wondered. How does one specify a
>>>>>>>>> coder for a case class?
>>>>>>>>>
>>>>>>>>> Regard

Re: Stateful processing with session window

2018-02-10 Thread Carlos Alonso
Hi Maurizio, I'm not a very experienced user here, I'm actually getting
started into all this, but I'm going to give this one a try and see if I
can help.

What I think is happening here is that the third 'a' you see is actually on
a different window of the other 3 a's. Stateful being per key and window
means that it keeps state for each key-window pairs, therefore, if your
'a's counter is being restarted is probably because it is actually a
different one, and as the key is the same then the only possibility is that
the window is different. You can try to debug your pipeline and see if my
guess is actually right or not by printing also the window information of
your elements.

Hope it helps.

On Fri, Feb 9, 2018 at 4:46 PM Maurizio Sambati 
wrote:

> Hi everyone,
>
> I'm trying to write a simple pipeline to experiment both stateful
> processing and session window.
>
> I have an event stream, each event has a timestamp and a session key, I
> want to group by each session and enrich all events using a common state of
> the session. In this case I'm just replacing the event with an incremental
> counter.
>
> So, let's say I have a source that outputs an event every second and my
> stream is [a, a, b, a, a, c, a, a, b, c, c, c, a, a] (I'm just writing only
> the session key as the value is useless for the purpose of the issue I'm
> experiencing)
>
> I want the following output: [, , , , , ...]
> (actually the order is not important)
>
> Unluckily my code seems not to work as I was expecting and I'm not able to
> understand the reason. (to be honest I haven't found many resources on the
> topic) What I actually get is something like:
>
> a, 0
> a, 1
> b, 0
> a, 0<-- ???
> a, 2,   <---???
> c, 0,
> ...
>
> that makes me wonder if I have actually understood how the state is
> related to the key-window pair or maybe if I have just misunderstood how
> the window/triggering works.
>
> My pipeline looks something like:
>
> p.apply(TextIO.read().from("input.json"))
>
>  .apply(MapElements.via(new ParseTableRowJson()))
>
>  .apply(new AugmentEvents())
>
>  .apply(ParDo.of(new DoFn, Void>() {
>
>   @ProcessElement
>
>   public void processElement(ProcessContext c)  {
>
> LOG.info(c.element().getKey() + ": " + c.element().getValue());
>
>   }
>
> }));
>
> ...
>
> static class AugmentEvents extends PTransform PCollection>> {
>
>   @Override
>
>   public PCollection> expand(PCollection input)
> {
>
> return input
>
>   .apply(ParDo.of(new ExtractSessionIdAndTimestamp()))
>
>   .apply(new ComputeSessions());
>
>   }
>
> }
>
>
> static class ComputeSessions extends PTransform TableRow>>, PCollection>> {
>
>   @Override
>
>   public PCollection> expand(PCollection TableRow>> events) {
>
> return events
>
>   .apply(Window. TableRow>>into(Sessions.withGapDuration(Duration.standardMinutes(10)))
>
>   .triggering(AfterPane.elementCountAtLeast(1))
>
>   .discardingFiredPanes()
>
>   .withAllowedLateness(Duration.standardMinutes(10)))
>
>   .apply(ParDo.of(new StatefulCount()));
>
>   }
>
> }
>
> static class StatefulCount extends DoFn, KV Long>> {
>
>   @StateId("storage")
>
>   private final StateSpec storageSpec =
>  StateSpecs.value(VarIntCoder.of());
>
>   @ProcessElement
>
>   public void processElement(ProcessContext context, BoundedWindow window
> , @StateId("storage") ValueState storage) {
>
> Integer val = storage.read();
>
> if (val == null) {
>
>   val = new Integer(0);
>
> }
>
> int current = val.intValue();
>
> context.output(KV.of(context.element().getKey(), new Long(current)));
>
> storage.write(current+1);
>
>   }
>
> }
>
> Maurizio
>
>


Re: IllegalStateException when changing allowed lateness?

2018-02-09 Thread Carlos Alonso
Cool, let me know if you need anything else to nail down this issue.

On Thu, Feb 8, 2018 at 3:45 PM Kenneth Knowles <k...@google.com> wrote:

> Hi Carlos,
>
> You are surely correct. Good diagnosis. Filing a bug.
>
> Kenn
>
> On Thu, Feb 8, 2018 at 6:23 AM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Hi everyone!!
>>
>> I've just seen a new IllegalStateException: received state cleanup timer
>> for window... that is before the appropriate cleanup time...
>>
>> The full stack trace is here: https://pastebin.com/J8Vuq9xz
>>
>> And I think it could be because I updated a running job with an increased
>> allowed lateness. Can anyone please confirm it or point me in the right
>> direction?
>>
>> Thanks!
>>
>
>


Re: Lateness droppings debugging

2018-02-08 Thread Carlos Alonso
We’re setting the timestamp to the date of last update of each document.
That’s why I think pushing everything to PubSub before starting the job
could work.

The data is read from CouchDB’s changes feed into PubSub and the idea is to
transfer everything into GCS since the very beginning. Aside from that we
have another job running “live” with live data.

Any other idea/suggestion?

Thanks!!
On Thu, 8 Feb 2018 at 22:39, Robert Bradshaw <rober...@google.com> wrote:

> You can set the timestamp attribute of your pubsub messages which will
> hold back the watermark, see
>
>
> https://beam.apache.org/documentation/sdks/javadoc/2.2.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-
>
> However, if you're mixing historical and live data, it may make more
> sense to read these as two separate sources (e.g. the static data from
> a set of files, the live data from pubsub) and then flatten them for
> further processing.
>
> On Thu, Feb 8, 2018 at 1:23 PM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
> > Yes, the data is finite (although it comes through PubSub, so I guess is
> > considered unbounded).
> > How could I hold the watermark and prevent it from moving?
> >
> > Thanks!
> >
> > On Thu, Feb 8, 2018 at 10:06 PM Robert Bradshaw <rober...@google.com>
> wrote:
> >>
> >> Where is the watermark for this old data coming from? Rather than
> >> messing with allowed lateness, would it be possible to hold the
> >> watermark back appropriately during the time you're injecting old data
> >> (assuming there's only a finite amount of it)?
> >>
> >> On Thu, Feb 8, 2018 at 12:56 PM, Carlos Alonso <car...@mrcalonso.com>
> >> wrote:
> >> > Thanks for your responses!!
> >> >
> >> > I have a scenario where I have to reprocess very disordered data for 4
> >> > or 5
> >> > years and I don't want to lose any data. I'm thinking of setting a
> very
> >> > big
> >> > allowed lateness (5 years), but before doing that I'd like to
> understand
> >> > the
> >> > consequences that may have. I guess memory wise will be very consuming
> >> > as no
> >> > window will ever expire, but I guess I could overcome that with brute
> >> > force
> >> > (many machines with many RAM) but, are there more concerns I should be
> >> > aware
> >> > of? This should be a one-off thing.
> >> >
> >> > Thanks!
> >> >
> >> > On Thu, Feb 8, 2018 at 6:59 PM Raghu Angadi <rang...@google.com>
> wrote:
> >> >>
> >> >> On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek
> >> >> <pawelbartosze...@gmail.com> wrote:
> >> >>>
> >> >>> Hi Raghu,
> >> >>> Can you provide more details about increasing allowed lateness? Even
> >> >>> if I
> >> >>> do that I still need to compare event time of record with processing
> >> >>> time(system current time) in my ParDo?
> >> >>
> >> >>
> >> >> I see. PaneInfo() associated with each element has 'Timing' enum, so
> we
> >> >> can tell if the element is late, but it does not tell how late.
> >> >> How about this : We can have a periodic timer firing every minute and
> >> >> store the scheduled time of the timer in state as the watermark time.
> >> >> We
> >> >> could compare element time to this stored time for good approximation
> >> >> (may
> >> >> require parallel stage with global window, dropping any events that
> >> >> 'clearly
> >> >> within limits' based on current time). There are probably other ways
> to
> >> >> do
> >> >> this with timers within existing stage.
> >> >>
> >> >>>
> >> >>> Pawel
> >> >>>
> >> >>> On 8 February 2018 at 05:40, Raghu Angadi <rang...@google.com>
> wrote:
> >> >>>>
> >> >>>> The watermark is not directly available, you essentially infer from
> >> >>>> fired triggers (e.g. fixed windows). I would consider some of these
> >> >>>> options
> >> >>>> :
> >> >>>>   - Adhoc debugging : if the pipeline is close to realtime, you can
> >> >>>> just
> >> >>>> guess if a element will be dropped based on 

Re: Lateness droppings debugging

2018-02-08 Thread Carlos Alonso
I thought of loading all of it into the PubSub subscription before starting
the job. That should work, right? Any better suggestion?
On Thu, 8 Feb 2018 at 22:23, Carlos Alonso <car...@mrcalonso.com> wrote:

> Yes, the data is finite (although it comes through PubSub, so I guess is
> considered unbounded).
> How could I hold the watermark and prevent it from moving?
>
> Thanks!
>
> On Thu, Feb 8, 2018 at 10:06 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> Where is the watermark for this old data coming from? Rather than
>> messing with allowed lateness, would it be possible to hold the
>> watermark back appropriately during the time you're injecting old data
>> (assuming there's only a finite amount of it)?
>>
>> On Thu, Feb 8, 2018 at 12:56 PM, Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>> > Thanks for your responses!!
>> >
>> > I have a scenario where I have to reprocess very disordered data for 4
>> or 5
>> > years and I don't want to lose any data. I'm thinking of setting a very
>> big
>> > allowed lateness (5 years), but before doing that I'd like to
>> understand the
>> > consequences that may have. I guess memory wise will be very consuming
>> as no
>> > window will ever expire, but I guess I could overcome that with brute
>> force
>> > (many machines with many RAM) but, are there more concerns I should be
>> aware
>> > of? This should be a one-off thing.
>> >
>> > Thanks!
>> >
>> > On Thu, Feb 8, 2018 at 6:59 PM Raghu Angadi <rang...@google.com> wrote:
>> >>
>> >> On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek
>> >> <pawelbartosze...@gmail.com> wrote:
>> >>>
>> >>> Hi Raghu,
>> >>> Can you provide more details about increasing allowed lateness? Even
>> if I
>> >>> do that I still need to compare event time of record with processing
>> >>> time(system current time) in my ParDo?
>> >>
>> >>
>> >> I see. PaneInfo() associated with each element has 'Timing' enum, so we
>> >> can tell if the element is late, but it does not tell how late.
>> >> How about this : We can have a periodic timer firing every minute and
>> >> store the scheduled time of the timer in state as the watermark time.
>> We
>> >> could compare element time to this stored time for good approximation
>> (may
>> >> require parallel stage with global window, dropping any events that
>> 'clearly
>> >> within limits' based on current time). There are probably other ways
>> to do
>> >> this with timers within existing stage.
>> >>
>> >>>
>> >>> Pawel
>> >>>
>> >>> On 8 February 2018 at 05:40, Raghu Angadi <rang...@google.com> wrote:
>> >>>>
>> >>>> The watermark is not directly available, you essentially infer from
>> >>>> fired triggers (e.g. fixed windows). I would consider some of these
>> options
>> >>>> :
>> >>>>   - Adhoc debugging : if the pipeline is close to realtime, you can
>> just
>> >>>> guess if a element will be dropped based on its timestamp and
>> current time
>> >>>> in the first stage (before first aggregation)
>> >>>>   - Increase allowed lateness (say to 3 days) and drop the elements
>> >>>> yourself you notice are later than 1 day.
>> >>>>   - Place the elements into another window with larger allowed
>> lateness
>> >>>> and log very late elements in another parallel aggregation (see
>> >>>> TriggerExample.java in Beam repo).
>> >>>>
>> >>>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <car...@mrcalonso.com>
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi everyone!!
>> >>>>>
>> >>>>> I have a streaming job running with fixed windows of one hour and
>> >>>>> allowed lateness of two days and the number of dropped due to
>> lateness
>> >>>>> elements is slowly, but continuously growing and I'd like to
>> understand
>> >>>>> which elements are those.
>> >>>>>
>> >>>>> I'd like to get the watermark from inside the job to compare it
>> against
>> >>>>> each element and write log messages with the ones that will be
>> potentially
>> >>>>> discarded Does that approach make any sense? I which case...
>> How can I
>> >>>>> get the watermark from inside the job? Any other ideas?
>> >>>>>
>> >>>>> Thanks in advance!!
>> >>>>
>> >>>>
>> >>>
>> >
>>
>


Re: Lateness droppings debugging

2018-02-08 Thread Carlos Alonso
Yes, the data is finite (although it comes through PubSub, so I guess is
considered unbounded).
How could I hold the watermark and prevent it from moving?

Thanks!

On Thu, Feb 8, 2018 at 10:06 PM Robert Bradshaw <rober...@google.com> wrote:

> Where is the watermark for this old data coming from? Rather than
> messing with allowed lateness, would it be possible to hold the
> watermark back appropriately during the time you're injecting old data
> (assuming there's only a finite amount of it)?
>
> On Thu, Feb 8, 2018 at 12:56 PM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
> > Thanks for your responses!!
> >
> > I have a scenario where I have to reprocess very disordered data for 4
> or 5
> > years and I don't want to lose any data. I'm thinking of setting a very
> big
> > allowed lateness (5 years), but before doing that I'd like to understand
> the
> > consequences that may have. I guess memory wise will be very consuming
> as no
> > window will ever expire, but I guess I could overcome that with brute
> force
> > (many machines with many RAM) but, are there more concerns I should be
> aware
> > of? This should be a one-off thing.
> >
> > Thanks!
> >
> > On Thu, Feb 8, 2018 at 6:59 PM Raghu Angadi <rang...@google.com> wrote:
> >>
> >> On Wed, Feb 7, 2018 at 10:33 PM, Pawel Bartoszek
> >> <pawelbartosze...@gmail.com> wrote:
> >>>
> >>> Hi Raghu,
> >>> Can you provide more details about increasing allowed lateness? Even
> if I
> >>> do that I still need to compare event time of record with processing
> >>> time(system current time) in my ParDo?
> >>
> >>
> >> I see. PaneInfo() associated with each element has 'Timing' enum, so we
> >> can tell if the element is late, but it does not tell how late.
> >> How about this : We can have a periodic timer firing every minute and
> >> store the scheduled time of the timer in state as the watermark time. We
> >> could compare element time to this stored time for good approximation
> (may
> >> require parallel stage with global window, dropping any events that
> 'clearly
> >> within limits' based on current time). There are probably other ways to
> do
> >> this with timers within existing stage.
> >>
> >>>
> >>> Pawel
> >>>
> >>> On 8 February 2018 at 05:40, Raghu Angadi <rang...@google.com> wrote:
> >>>>
> >>>> The watermark is not directly available, you essentially infer from
> >>>> fired triggers (e.g. fixed windows). I would consider some of these
> options
> >>>> :
> >>>>   - Adhoc debugging : if the pipeline is close to realtime, you can
> just
> >>>> guess if a element will be dropped based on its timestamp and current
> time
> >>>> in the first stage (before first aggregation)
> >>>>   - Increase allowed lateness (say to 3 days) and drop the elements
> >>>> yourself you notice are later than 1 day.
> >>>>   - Place the elements into another window with larger allowed
> lateness
> >>>> and log very late elements in another parallel aggregation (see
> >>>> TriggerExample.java in Beam repo).
> >>>>
> >>>> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <car...@mrcalonso.com>
> >>>> wrote:
> >>>>>
> >>>>> Hi everyone!!
> >>>>>
> >>>>> I have a streaming job running with fixed windows of one hour and
> >>>>> allowed lateness of two days and the number of dropped due to
> lateness
> >>>>> elements is slowly, but continuously growing and I'd like to
> understand
> >>>>> which elements are those.
> >>>>>
> >>>>> I'd like to get the watermark from inside the job to compare it
> against
> >>>>> each element and write log messages with the ones that will be
> potentially
> >>>>> discarded Does that approach make any sense? I which case... How
> can I
> >>>>> get the watermark from inside the job? Any other ideas?
> >>>>>
> >>>>> Thanks in advance!!
> >>>>
> >>>>
> >>>
> >
>


IllegalStateException when changing allowed lateness?

2018-02-08 Thread Carlos Alonso
Hi everyone!!

I've just seen a new IllegalStateException: received state cleanup timer
for window... that is before the appropriate cleanup time...

The full stack trace is here: https://pastebin.com/J8Vuq9xz

And I think it could be because I updated a running job with an increased
allowed lateness. Can anyone please confirm it or point me in the right
direction?

Thanks!


Re: Lateness droppings debugging

2018-02-07 Thread Carlos Alonso
Cool, I'll try some of these. Thanks Raghu!

On Thu, Feb 8, 2018 at 6:40 AM Raghu Angadi <rang...@google.com> wrote:

> The watermark is not directly available, you essentially infer from fired
> triggers (e.g. fixed windows). I would consider some of these options :
>   - Adhoc debugging : if the pipeline is close to realtime, you can just
> guess if a element will be dropped based on its timestamp and current time
> in the first stage (before first aggregation)
>   - Increase allowed lateness (say to 3 days) and drop the elements
> yourself you notice are later than 1 day.
>   - Place the elements into another window with larger allowed lateness
> and log very late elements in another parallel aggregation (see
> TriggerExample.java in Beam repo).
>
> On Wed, Feb 7, 2018 at 2:55 PM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Hi everyone!!
>>
>> I have a streaming job running with fixed windows of one hour and allowed
>> lateness of two days and the number of dropped due to lateness elements is
>> slowly, but continuously growing and I'd like to understand which elements
>> are those.
>>
>> I'd like to get the watermark from inside the job to compare it against
>> each element and write log messages with the ones that will be potentially
>> discarded Does that approach make any sense? I which case... How can I
>> get the watermark from inside the job? Any other ideas?
>>
>> Thanks in advance!!
>>
>
>


Re: Custom metrics not showing on Stackdriver

2018-01-30 Thread Carlos Alonso
Created this SO question:
https://stackoverflow.com/questions/48530496/google-dataflow-custom-metrics-not-showing-on-stackdriver

Thanks Andrea!

On Tue, Jan 30, 2018 at 9:54 PM Andrea Foegler <foeg...@google.com> wrote:

> I would suggest Platform Support or StackOverflow as the best places to
> request Dataflow-specific support.
>
> This could be an issue coordinating between your Stackdriver Account(s)
> and your Cloud project(s).  We can continue to discuss / investigate
> through one of the above forums.
>
> On Tue, Jan 30, 2018 at 12:30 PM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Thanks Andrea!!
>>
>> Do you mean using the UserVoice forum?
>>
>> Aside from that, there's something that could be helpful that is that
>> when I navigate https://app.google.stackdriver.com/services/dataflow the
>> message I get is this:
>> "You do not have any resources of this type being monitored by
>> Stackdriver." and that's weird as well. As if our Cloud Dataflow wasn't
>> properly connected to Stackdriver, but, on the other hand. Some metrics are
>> displayed and can be monitored such as System Lag, Watermark, etc...
>>
>> Thanks!
>>
>> On Tue, Jan 30, 2018 at 9:20 PM Andrea Foegler <foeg...@google.com>
>> wrote:
>>
>>> Hi Carlos -
>>>
>>> This sounds like something we should investigate further.  Since it
>>> appears to be a Dataflow specific question / issue, would you mind posting
>>> or following up in a Dataflow-specific forum or through Google Cloud
>>> Platform Support: https://cloud.google.com/dataflow/support?  Feel free
>>> to mention my name in your contact.
>>>
>>> Cheers
>>> Andrea
>>>
>>>
>>>
>>> On Tue, Jan 30, 2018 at 10:27 AM, Carlos Alonso <car...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hi Andrea, thank you very much for your response.
>>>>
>>>> I've followed your directions and only droppedDueToLateness appears.
>>>> The way I'm creating those metrics is:
>>>>
>>>> Metrics.counter("Ingester", "IngestedMessages").inc()
>>>>
>>>> I can see those counters on the Custom Counters section on the Google
>>>> Dataflow UI, but nothing appears on Stackdriver...
>>>>
>>>> Thanks!
>>>>
>>>> On Tue, Jan 30, 2018 at 7:22 PM Andrea Foegler <foeg...@google.com>
>>>> wrote:
>>>>
>>>>> Hi Carlos -
>>>>>
>>>>> Custom metrics can be "listed" by going to the Metric Explorer in
>>>>> Stackdriver and entering "custom.googleapis.com/dataflow" in the
>>>>> filter.
>>>>> If that list contains more than 100 different names, new custom
>>>>> metrics will not be created.  If this is a case, there should be a message
>>>>> in the job log reporting as much.
>>>>> (We are working with Stackdriver to improve this experience.)
>>>>>
>>>>> Also, we do not currently export distribution metrics to Stackdriver
>>>>> because we don't yet have a good mechanism to do so.  Gauge metrics are 
>>>>> not
>>>>> implemented yet and would not appear in either the Dataflow UI or
>>>>> Stackdriver.
>>>>>
>>>>> These are the only explanations I can think for for these metrics to
>>>>> not show up.  If neither of these are the case, I'm happy to investigate
>>>>> further on a particular instance.
>>>>>
>>>>> Cheers
>>>>> Andrea
>>>>>
>>>>>
>>>>>
>>>>> On 2018/01/23 19:59:08, Carlos Alonso <c...@mrcalonso.com> wrote:
>>>>> > Hi everyone!!>
>>>>> >
>>>>> > I'm trying to get a deeper view on my dataflow jobs by measuring
>>>>> parts of>
>>>>> > it using `Metrics.counter|gauge` but I cannot find how to see them
>>>>> on>
>>>>> > Stackdriver.>
>>>>> >
>>>>> > I have a premium Stackdriver account and I can see those counters
>>>>> under the>
>>>>> > Custom Counters section on the Dataflow UI.>
>>>>> >
>>>>> > I can see droppedDueToLateness 'custom' counter though on
>>>>> Stackdriver that>
>>>>> > seems to be created via 'Metrics.counter' as well...>
>>>>> >
>>>>> > What am I missing?>
>>>>> >
>>>>> > Regards>
>>>>> >
>>>>>
>>>>
>>>
>


Re: Custom metrics not showing on Stackdriver

2018-01-30 Thread Carlos Alonso
Thanks Andrea!!

Do you mean using the UserVoice forum?

Aside from that, there's something that could be helpful that is that when
I navigate https://app.google.stackdriver.com/services/dataflow the message
I get is this:
"You do not have any resources of this type being monitored by
Stackdriver." and that's weird as well. As if our Cloud Dataflow wasn't
properly connected to Stackdriver, but, on the other hand. Some metrics are
displayed and can be monitored such as System Lag, Watermark, etc...

Thanks!

On Tue, Jan 30, 2018 at 9:20 PM Andrea Foegler <foeg...@google.com> wrote:

> Hi Carlos -
>
> This sounds like something we should investigate further.  Since it
> appears to be a Dataflow specific question / issue, would you mind posting
> or following up in a Dataflow-specific forum or through Google Cloud
> Platform Support: https://cloud.google.com/dataflow/support?  Feel free
> to mention my name in your contact.
>
> Cheers
> Andrea
>
>
>
> On Tue, Jan 30, 2018 at 10:27 AM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> Hi Andrea, thank you very much for your response.
>>
>> I've followed your directions and only droppedDueToLateness appears.
>> The way I'm creating those metrics is:
>>
>> Metrics.counter("Ingester", "IngestedMessages").inc()
>>
>> I can see those counters on the Custom Counters section on the Google
>> Dataflow UI, but nothing appears on Stackdriver...
>>
>> Thanks!
>>
>> On Tue, Jan 30, 2018 at 7:22 PM Andrea Foegler <foeg...@google.com>
>> wrote:
>>
>>> Hi Carlos -
>>>
>>> Custom metrics can be "listed" by going to the Metric Explorer in
>>> Stackdriver and entering "custom.googleapis.com/dataflow" in the filter.
>>> If that list contains more than 100 different names, new custom metrics
>>> will not be created.  If this is a case, there should be a message in the
>>> job log reporting as much.
>>> (We are working with Stackdriver to improve this experience.)
>>>
>>> Also, we do not currently export distribution metrics to Stackdriver
>>> because we don't yet have a good mechanism to do so.  Gauge metrics are not
>>> implemented yet and would not appear in either the Dataflow UI or
>>> Stackdriver.
>>>
>>> These are the only explanations I can think for for these metrics to not
>>> show up.  If neither of these are the case, I'm happy to investigate
>>> further on a particular instance.
>>>
>>> Cheers
>>> Andrea
>>>
>>>
>>>
>>> On 2018/01/23 19:59:08, Carlos Alonso <c...@mrcalonso.com> wrote:
>>> > Hi everyone!!>
>>> >
>>> > I'm trying to get a deeper view on my dataflow jobs by measuring parts
>>> of>
>>> > it using `Metrics.counter|gauge` but I cannot find how to see them on>
>>> > Stackdriver.>
>>> >
>>> > I have a premium Stackdriver account and I can see those counters
>>> under the>
>>> > Custom Counters section on the Dataflow UI.>
>>> >
>>> > I can see droppedDueToLateness 'custom' counter though on Stackdriver
>>> that>
>>> > seems to be created via 'Metrics.counter' as well...>
>>> >
>>> > What am I missing?>
>>> >
>>> > Regards>
>>> >
>>>
>>
>


Re: Google Dataflow jobs stuck analysing the graph

2018-01-24 Thread Carlos Alonso
Finally the jobs were errored because I ran another job that took all the
available quota for that region and the stuck jobs failed as they could not
acquire the required resources.

On Tue, Jan 23, 2018 at 10:20 AM Carlos Alonso <car...@mrcalonso.com> wrote:

> Many thanks for your input.
>
> Unfortunately, neither of the proposals are working. When trying from the
> command line (using the job name instead of the id), same error appears
> `Failed to cancel job [myjobname]: (8b60d3b239f21d49): Could not cancel
> workflow; user does not have sufficient permissions on project: myproject,
> or the job does not exist in the project.`
>
> And I cannot cancel it on the UI as, when I click on the job details, a
> blank screen with only the message `The graph is still being analysed`
> appears.
>
> Regards
>
> On Tue, Jan 23, 2018 at 12:22 AM Alex Amato <ajam...@google.com> wrote:
>
>> Hi Carlos,
>>
>> Can you try cancelling your job using the job name "*myjobname*"
>> instead  of the job_id. I think that is the proper way to cancel from the
>> command line.
>>
>> Otherwise, you can try to find the job in the dataflow UI and click the
>> cancel button on the webpage to cancel your job, if this is a onetime thing.
>>
>> I believe that this should be the proper way to do this on the command
>> line:
>> gcloud beta dataflow jobs --project=myproject cancel myjobname
>> …
>>
>> On Mon, Jan 22, 2018 at 1:31 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>>
>>> -- Forwarded message --
>>> From: Carlos Alonso <car...@mrcalonso.com>
>>> Date: Mon, Jan 22, 2018 at 1:28 PM
>>> Subject: Google Dataflow jobs stuck analysing the graph
>>> To: "user@beam.apache.org" <user@beam.apache.org>
>>>
>>>
>>> We have submitted a couple of jobs that seem to have stuck on the graph
>>> analysing step.
>>>
>>> An weird A job with ID "2018-01-19_03_27_48-15138951989738918594"
>>> doesn't exist error appears on top of the Google Dataflow jobs list
>>> page and trying to list it using gcloud tool shows them as in Unknown
>>>  state:
>>>
>>> JOB_ID NAME TYPE CREATION_TIME STATE REGION
>>> 2018-01-19_03_27_48-15138951989738918594 myjobname2 Streaming 2018-01-19
>>> 12:27:48 Unknown europe-west1 2018-01-19_03_21_05-1065108814086334743
>>> myjobname Streaming 2018-01-19 12:21:06 Unknown europe-west1
>>>
>>> Trying to cancel them using gcloud tool as well doesn't work either:
>>>
>>> ~ gcloud beta dataflow jobs --project=myproject cancel
>>> 2018-01-19_03_21_05-1065108814086334743 Failed to cancel job
>>> [2018-01-19_03_21_05-1065108814086334743]: (9027838c1500ddff): Could not
>>> cancel workflow; user does not have sufficient permissions on project:
>>> myproject, or the job does not exist in the project.
>>>
>>> Any idea?
>>>
>>> Thanks!!
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "dataflow-feedback" group.
>>> To view this discussion on the web visit
>>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAF9t7_6V25%2B9U2ZPPMOGxVF0OXZ-orz%3D4Djnkz_ttA%2B8%2B_W%3D0A%40mail.gmail.com
>>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAF9t7_6V25%2B9U2ZPPMOGxVF0OXZ-orz%3D4Djnkz_ttA%2B8%2B_W%3D0A%40mail.gmail.com?utm_medium=email_source=footer>
>>> .
>>>
>>


Google Dataflow not distributing load across workers

2018-01-24 Thread Carlos Alonso
Hello everyone!!

I'm experiencing a weird issue I'd like to understand. I have a workload
that basically reads data from PubSub and stores it, organised by types and
windows, in GCS.

When I run it on low load it works fine, it actually only needs one worker,
but if I try to increase the load by adding more data into PubSub then it
stresses the system a little more and makes Dataflow to autoscale
(THROUGHPUT_BASED algorithm).

All ok up to this point, the weird issue is that, on the graphs, those
newly added workers are idle!! (3% CPU usage vs 90% the original one and
something similar in terms of memory usage). The pipeline starts struggling
and the System lag starts growing. After a while OOMs start happening and
then the whole pipeline seems to stuck and I have to cancel it.

Looking at the logs after the autoscaling point a few lines appear
continuously and I'd like to understand if that's the reason why workers
are not receiving work and basically understand what they mean.

This log entries below appear continuously and interleaved. It suggests me
that they are all part of the same issue repeating over and over again.

* Setting node annotation to enable volume controller attach/detach
* GetWork timed out, retrying
* 860.865: [Full GC (Ergonomics) [PSYoungGen: 633856K->143127K(1267200K)]
[ParOldGen: 3800948K->3801001K(3801088K)] 4434804K->3944129K(5068288K),
[Metaspace: 133169K->132269K(1204224K)], 3.0900678 secs] [Times: user=9.14
sys=0.06, real=3.09 secs]
* [GC (Allocation Failure) [PSYoungGen: 243712K->9712K(253440K)]
250764K->31183K(338944K), 0.0321758 secs] [Times: user=0.06 sys=0.03,
real=0.03 secs]
* myjobname-01240255-93ce-harness-rts4 Got error NOT_FOUND: No setup task
returned in work item. while requesting setup task
* myjobname-01240255-93ce-harness-0jwl Check failed:
stats.required_used_bytes < 1LL << 40 S5 Realtime Timers
* Error syncing pod 689facdfb2b53aa2cff2f09d1c63fc5a
("dataflow-myjobname-01240255-93ce-harness-0jwl_default(689facdfb2b53aa2cff2f09d1c63fc5a)"),
skipping: failed to "StartContainer" for "windmill" with CrashLoopBackOff:
"Back-off 40s restarting failed container=windmill
pod=dataflow-myjobname-01240255-93ce-harness-0jwl_default(689facdfb2b53aa2cff2f09d1c63fc5a)"

Thanks for your help!!


Custom metrics not showing on Stackdriver

2018-01-23 Thread Carlos Alonso
Hi everyone!!

I'm trying to get a deeper view on my dataflow jobs by measuring parts of
it using `Metrics.counter|gauge` but I cannot find how to see them on
Stackdriver.

I have a premium Stackdriver account and I can see those counters under the
Custom Counters section on the Dataflow UI.

I can see droppedDueToLateness 'custom' counter though on Stackdriver that
seems to be created via 'Metrics.counter' as well...

What am I missing?

Regards


Google Dataflow jobs stuck analysing the graph

2018-01-22 Thread Carlos Alonso
We have submitted a couple of jobs that seem to have stuck on the graph
analysing step.

An weird A job with ID "2018-01-19_03_27_48-15138951989738918594" doesn't
exist error appears on top of the Google Dataflow jobs list page and trying
to list it using gcloud tool shows them as in Unknown state:

JOB_ID NAME TYPE CREATION_TIME STATE REGION
2018-01-19_03_27_48-15138951989738918594 myjobname2 Streaming 2018-01-19
12:27:48 Unknown europe-west1 2018-01-19_03_21_05-1065108814086334743
myjobname Streaming 2018-01-19 12:21:06 Unknown europe-west1

Trying to cancel them using gcloud tool as well doesn't work either:

~ gcloud beta dataflow jobs --project=myproject cancel
2018-01-19_03_21_05-1065108814086334743 Failed to cancel job
[2018-01-19_03_21_05-1065108814086334743]: (9027838c1500ddff): Could not
cancel workflow; user does not have sufficient permissions on project:
myproject, or the job does not exist in the project.

Any idea?

Thanks!!


Re: Trying to understand Unable to encode element exceptions

2018-01-20 Thread Carlos Alonso
Thanks Neville!!

Your recommendation worked great. Thanks for your help!!

As a side note, I found this issue:
https://github.com/spotify/scio/issues/448

I can share/help there with our experience, as our job, with scio +
stateful + timely processing is working fine as of today

Regards!!

On Fri, Jan 19, 2018 at 6:21 PM Neville Li <neville@gmail.com> wrote:

> Welcome.
>
> Added an issue so we may improve this in the future:
> https://github.com/spotify/scio/issues/1020
>
>
> On Fri, Jan 19, 2018 at 11:14 AM Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> To build the beam transform I was following this example:
>> https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala
>>
>> To be honest I don't know how to apply timely and stateful processing
>> without using a beam transform or how to rewrite it using the scio built-in
>> you suggest. Could you please give me an example?
>>
>> Thanks for your help!
>>
>> On Fri, Jan 19, 2018 at 5:04 PM Neville Li <neville@gmail.com> wrote:
>>
>>> That happens when you mix beam transforms into scio and defeats the
>>> safety we have in place. Map the values into something beam-serializable
>>> first or rewrite the transform with a scio built-in which takes care of
>>> KvCoder.
>>>
>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <car...@mrcalonso.com>
>>> wrote:
>>>
>>>> I'm following this example:
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>
>>>> because I'm building something very similar to a group into batches
>>>> functionality. If I don't set the coder manually, this exception arises:
>>>> https://pastebin.com/xxdDMXSf
>>>>
>>>> Thanks!
>>>>
>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <neville@gmail.com>
>>>> wrote:
>>>>
>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>> KryoAtomicCoder for most Scala types.
>>>>> More details:
>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>
>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <car...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> May it be because I’m using
>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) 
>>>>>> at
>>>>>> some point in the pipeline
>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>> outputs a SerializableCoder)?
>>>>>>
>>>>>> This is something I've always wondered. How does one specify a coder
>>>>>> for a case class?
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <neville@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>>>
>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone!!
>>>>>>>>
>>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>>> subscription into GCS buckets. In order to do it I'm using both 
>>>>>>>> stateful
>>>>>>>> and timely processing and after building and testing the project 
>>>>>>>> locally I
>>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>>
>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>>
>>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class 
>>>>>>>> defined as
>>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>>
>>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's 
>>>>>>>> Scio as
>>>>>>>> well) which may suggest that the issue is on serializing the Map, but 
>>>>>>>> to be
>>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>>
>>>>>>>> Can anyone help me, please?
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>


Re: Trying to understand Unable to encode element exceptions

2018-01-19 Thread Carlos Alonso
Ok, I’ll try that.

Thanks a lot for your help!!
On Fri, 19 Jan 2018 at 17:37, Neville Li <neville@gmail.com> wrote:

> Didn't realize the map is in a case class which is serializable, but
> `java.util.Map` is not. So this won't work transitively.
> You best bet is to write a custom Coder (you can compose a map coder for
> the map field) for the entire case class and set it as part of the KvCoder.
>
>
> On Fri, Jan 19, 2018 at 11:22 AM Carlos Alonso <car...@mrcalonso.com>
> wrote:
>
>> You mean replacing the Map[String, String] from the case class into a
>> java.util.Map<String, String>? And then, how could I set that
>> MapCoder<String, String> for that bit?
>>
>> Sorry if those questions are too newbie, but this is my first experience
>> with Beam...
>>
>> Thanks!
>>
>> On Fri, Jan 19, 2018 at 5:19 PM Neville Li <neville@gmail.com> wrote:
>>
>>> In this case it's probably easiest to map the scala `Map[K, V]` into a
>>> `java.util.Map<K, V>` and explicitly set a `MapCoder<K, V>` so you don't
>>> have to deal with internal coder inference.
>>>
>>>
>>> On Fri, Jan 19, 2018 at 11:03 AM Neville Li <neville@gmail.com>
>>> wrote:
>>>
>>>> That happens when you mix beam transforms into scio and defeats the
>>>> safety we have in place. Map the values into something beam-serializable
>>>> first or rewrite the transform with a scio built-in which takes care of
>>>> KvCoder.
>>>>
>>>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <car...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> I'm following this example:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>>>
>>>>> because I'm building something very similar to a group into batches
>>>>> functionality. If I don't set the coder manually, this exception arises:
>>>>> https://pastebin.com/xxdDMXSf
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <neville@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> You shouldn't manually set coder in most cases. It defaults to
>>>>>> KryoAtomicCoder for most Scala types.
>>>>>> More details:
>>>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>>>
>>>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> May it be because I’m using
>>>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])))
>>>>>>>  at
>>>>>>> some point in the pipeline
>>>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>>>> outputs a SerializableCoder)?
>>>>>>>
>>>>>>> This is something I've always wondered. How does one specify a coder
>>>>>>> for a case class?
>>>>>>>
>>>>>>> Regards
>>>>>>>
>>>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <neville@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>>>>
>>>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone!!
>>>>>>>>>
>>>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>>>> subscription into GCS buckets. In order to do it I'm using both 
>>>>>>>>> stateful
>>>>>>>>> and timely processing and after building and testing the project 
>>>>>>>>> locally I
>>>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>>>
>>>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>>>
>>>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class 
>>>>>>>>> defined as
>>>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>>>
>>>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's 
>>>>>>>>> Scio as
>>>>>>>>> well) which may suggest that the issue is on serializing the Map, but 
>>>>>>>>> to be
>>>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>>>
>>>>>>>>> Can anyone help me, please?
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>


Re: Trying to understand Unable to encode element exceptions

2018-01-19 Thread Carlos Alonso
You mean replacing the Map[String, String] from the case class into a
java.util.Map<String, String>? And then, how could I set that
MapCoder<String, String> for that bit?

Sorry if those questions are too newbie, but this is my first experience
with Beam...

Thanks!

On Fri, Jan 19, 2018 at 5:19 PM Neville Li <neville@gmail.com> wrote:

> In this case it's probably easiest to map the scala `Map[K, V]` into a
> `java.util.Map<K, V>` and explicitly set a `MapCoder<K, V>` so you don't
> have to deal with internal coder inference.
>
>
> On Fri, Jan 19, 2018 at 11:03 AM Neville Li <neville@gmail.com> wrote:
>
>> That happens when you mix beam transforms into scio and defeats the
>> safety we have in place. Map the values into something beam-serializable
>> first or rewrite the transform with a scio built-in which takes care of
>> KvCoder.
>>
>> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>>
>>> I'm following this example:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>>
>>> because I'm building something very similar to a group into batches
>>> functionality. If I don't set the coder manually, this exception arises:
>>> https://pastebin.com/xxdDMXSf
>>>
>>> Thanks!
>>>
>>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <neville@gmail.com>
>>> wrote:
>>>
>>>> You shouldn't manually set coder in most cases. It defaults to
>>>> KryoAtomicCoder for most Scala types.
>>>> More details:
>>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>>
>>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <car...@mrcalonso.com>
>>>> wrote:
>>>>
>>>>> May it be because I’m using
>>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) 
>>>>> at
>>>>> some point in the pipeline
>>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>>> outputs a SerializableCoder)?
>>>>>
>>>>> This is something I've always wondered. How does one specify a coder
>>>>> for a case class?
>>>>>
>>>>> Regards
>>>>>
>>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <neville@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>>
>>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <car...@mrcalonso.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone!!
>>>>>>>
>>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>>> and timely processing and after building and testing the project 
>>>>>>> locally I
>>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>>
>>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>>
>>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class 
>>>>>>> defined as
>>>>>>> (content: String, attrs: Map[String, String])
>>>>>>>
>>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's 
>>>>>>> Scio as
>>>>>>> well) which may suggest that the issue is on serializing the Map, but 
>>>>>>> to be
>>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>>
>>>>>>> Can anyone help me, please?
>>>>>>> Thanks!
>>>>>>>
>>>>>>


Re: Trying to understand Unable to encode element exceptions

2018-01-19 Thread Carlos Alonso
To build the beam transform I was following this example:
https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/DoFnExample.scala

To be honest I don't know how to apply timely and stateful processing
without using a beam transform or how to rewrite it using the scio built-in
you suggest. Could you please give me an example?

Thanks for your help!

On Fri, Jan 19, 2018 at 5:04 PM Neville Li <neville@gmail.com> wrote:

> That happens when you mix beam transforms into scio and defeats the safety
> we have in place. Map the values into something beam-serializable first or
> rewrite the transform with a scio built-in which takes care of KvCoder.
>
> On Fri, Jan 19, 2018, 10:56 AM Carlos Alonso <car...@mrcalonso.com> wrote:
>
>> I'm following this example:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60
>>
>> because I'm building something very similar to a group into batches
>> functionality. If I don't set the coder manually, this exception arises:
>> https://pastebin.com/xxdDMXSf
>>
>> Thanks!
>>
>> On Fri, Jan 19, 2018 at 4:35 PM Neville Li <neville@gmail.com> wrote:
>>
>>> You shouldn't manually set coder in most cases. It defaults to
>>> KryoAtomicCoder for most Scala types.
>>> More details:
>>> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>>>
>>> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <car...@mrcalonso.com>
>>> wrote:
>>>
>>>> May it be because I’m using
>>>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>>>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>>>> some point in the pipeline
>>>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>>>> outputs a SerializableCoder)?
>>>>
>>>> This is something I've always wondered. How does one specify a coder
>>>> for a case class?
>>>>
>>>> Regards
>>>>
>>>> On Fri, 19 Jan 2018 at 15:51, Neville Li <neville@gmail.com> wrote:
>>>>
>>>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>>>> issue with ideally a snippet that can reproduce the problem?
>>>>>
>>>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <car...@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hi everyone!!
>>>>>>
>>>>>> I'm building a pipeline to store items from a Google PubSub
>>>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>>>> and timely processing and after building and testing the project locally 
>>>>>> I
>>>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>>>
>>>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>>>
>>>>>> The item I'm trying to serialize is a KV[String,
>>>>>> MessageWithAttributes] and MessageWithAttributes is a case class defined 
>>>>>> as
>>>>>> (content: String, attrs: Map[String, String])
>>>>>>
>>>>>> The underlying clause is java.io.NotSerializableException:
>>>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio 
>>>>>> as
>>>>>> well) which may suggest that the issue is on serializing the Map, but to 
>>>>>> be
>>>>>> honest, I don't know what does it mean and how to fix it.
>>>>>>
>>>>>> Can anyone help me, please?
>>>>>> Thanks!
>>>>>>
>>>>>


Re: Trying to understand Unable to encode element exceptions

2018-01-19 Thread Carlos Alonso
I'm following this example:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L60

because I'm building something very similar to a group into batches
functionality. If I don't set the coder manually, this exception arises:
https://pastebin.com/xxdDMXSf

Thanks!

On Fri, Jan 19, 2018 at 4:35 PM Neville Li <neville@gmail.com> wrote:

> You shouldn't manually set coder in most cases. It defaults to
> KryoAtomicCoder for most Scala types.
> More details:
> https://github.com/spotify/scio/wiki/Scio%2C-Beam-and-Dataflow#coders
>
> On Fri, Jan 19, 2018, 10:27 AM Carlos Alonso <car...@mrcalonso.com> wrote:
>
>> May it be because I’m using
>> .setCoder(KvCoder.of(StringUtf8Coder.of(),
>> CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
>> some point in the pipeline
>> (CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
>> outputs a SerializableCoder)?
>>
>> This is something I've always wondered. How does one specify a coder for
>> a case class?
>>
>> Regards
>>
>> On Fri, 19 Jan 2018 at 15:51, Neville Li <neville@gmail.com> wrote:
>>
>>> Not sure why it falls back to SerializableCoder. Can you file an GH
>>> issue with ideally a snippet that can reproduce the problem?
>>>
>>> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <car...@mrcalonso.com>
>>> wrote:
>>>
>>>> Hi everyone!!
>>>>
>>>> I'm building a pipeline to store items from a Google PubSub
>>>> subscription into GCS buckets. In order to do it I'm using both stateful
>>>> and timely processing and after building and testing the project locally I
>>>> tried to run it on Google Dataflow and I started getting those errors.
>>>>
>>>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>>>
>>>> The item I'm trying to serialize is a KV[String, MessageWithAttributes]
>>>> and MessageWithAttributes is a case class defined as (content: String,
>>>> attrs: Map[String, String])
>>>>
>>>> The underlying clause is java.io.NotSerializableException:
>>>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>>>> well) which may suggest that the issue is on serializing the Map, but to be
>>>> honest, I don't know what does it mean and how to fix it.
>>>>
>>>> Can anyone help me, please?
>>>> Thanks!
>>>>
>>>


Re: Trying to understand Unable to encode element exceptions

2018-01-19 Thread Carlos Alonso
May it be because I’m using
.setCoder(KvCoder.of(StringUtf8Coder.of(),
CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes]))) at
some point in the pipeline
(CoderRegistry.createDefault().getCoder(classOf[MessageWithAttributes])
outputs a SerializableCoder)?

This is something I've always wondered. How does one specify a coder for a
case class?

Regards

On Fri, 19 Jan 2018 at 15:51, Neville Li <neville@gmail.com> wrote:

> Not sure why it falls back to SerializableCoder. Can you file an GH issue
> with ideally a snippet that can reproduce the problem?
>
> On Fri, Jan 19, 2018, 7:43 AM Carlos Alonso <car...@mrcalonso.com> wrote:
>
>> Hi everyone!!
>>
>> I'm building a pipeline to store items from a Google PubSub subscription
>> into GCS buckets. In order to do it I'm using both stateful and timely
>> processing and after building and testing the project locally I tried to
>> run it on Google Dataflow and I started getting those errors.
>>
>> The full stack trace is here: https://pastebin.com/LqecPhsq
>>
>> The item I'm trying to serialize is a KV[String, MessageWithAttributes]
>> and MessageWithAttributes is a case class defined as (content: String,
>> attrs: Map[String, String])
>>
>> The underlying clause is java.io.NotSerializableException:
>> com.spotify.scio.util.JMapWrapper$$anon$2 (yes, I'm using Spotify's Scio as
>> well) which may suggest that the issue is on serializing the Map, but to be
>> honest, I don't know what does it mean and how to fix it.
>>
>> Can anyone help me, please?
>> Thanks!
>>
>


Re: Triggers based on size

2018-01-10 Thread Carlos Alonso
Thanks Robert!!

After reading this and the former post about stateful processing Kenneth's
suggestions sounds sensible. I'll probably give them a try!! Is there
anything you would like to advice me before starting?

Thanks!

On Wed, Jan 10, 2018 at 10:13 AM Robert Bradshaw <rober...@google.com>
wrote:

> Unfortunately, the metadata driven trigger is still just an idea, not
> yet implemented.
>
> A good introduction to state and timers can be found at
> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>
> On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso <car...@mrcalonso.com>
> wrote:
> > Hi Robert, Kenneth.
> >
> > Thanks a lot to both of you for your responses!!
> >
> > Kenneth, unfortunately I'm not sure we're experienced enough with Apache
> > Beam to get anywhere close to your suggestion, but thanks anyway!!
> >
> > Robert, your suggestion sounds great to me, could you please provide any
> > example on how to use that 'metadata driven' trigger?
> >
> > Thanks!
> >
> > On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles <k...@google.com> wrote:
> >>
> >> Often, when you need or want more control than triggers provide, such as
> >> input-type-specific logic like yours, you can use state and timers in
> ParDo
> >> to control when to output. You lose any potential optimizations of
> Combine
> >> based on associativity/commutativity and assume the burden of making
> sure
> >> your output is sensible, but dropping to low-level stateful computation
> may
> >> be your best bet.
> >>
> >> Kenn
> >>
> >> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw <rober...@google.com>
> >> wrote:
> >>>
> >>> We've tossed around the idea of "metadata-driven" triggers which would
> >>> essentially let you provide a mapping element -> metadata and a
> >>> monotonic CombineFn metadata* -> bool that would allow for this (the
> >>> AfterCount being a special case of this, with the mapping fn being _
> >>> -> 1, and the CombineFn being sum(...) >= N, for size one would
> >>> provide a (perhaps approximate) sizing mapping fn).
> >>>
> >>> Note, however, that there's no guarantee that the trigger fire as soon
> >>> as possible; due to runtime characteristics a significant amount of
> >>> data may be buffered (or come in at once) before the trigger is
> >>> queried. One possibility would be to follow your triggering with a
> >>> DoFn that breaks up large value streams into multiple manageable sized
> >>> ones as needed.
> >>>
> >>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <car...@mrcalonso.com>
> >>> wrote:
> >>> > Hi everyone!!
> >>> >
> >>> > I was wondering if there is an option to trigger window panes based
> on
> >>> > the
> >>> > size of the pane itself (rather than the number of elements).
> >>> >
> >>> > To provide a little bit more of context we're backing up a PubSub
> topic
> >>> > into
> >>> > GCS with the "special" feature that, depending on the "type" of the
> >>> > message,
> >>> > the GCS destination is one or another.
> >>> >
> >>> > Messages' 'shape' published there is quite random, some of them are
> >>> > very
> >>> > frequent and small, some others very big but sparse... We have around
> >>> > 150
> >>> > messages per second (in total) and we're firing every 15 minutes and
> >>> > experiencing OOM errors, we've considered firing based on the number
> of
> >>> > items as well, but given the randomness of the input, I don't think
> it
> >>> > will
> >>> > be a final solution either.
> >>> >
> >>> > Having a trigger based on size would be great, another option would
> be
> >>> > to
> >>> > have a dynamic shards number for the PTransform that actually writes
> >>> > the
> >>> > files.
> >>> >
> >>> > What is your recommendation for this use case?
> >>> >
> >>> > Thanks!!
> >>
> >>
> >
>


Re: Triggers based on size

2018-01-10 Thread Carlos Alonso
Hi Robert, Kenneth.

Thanks a lot to both of you for your responses!!

Kenneth, unfortunately I'm not sure we're experienced enough with Apache
Beam to get anywhere close to your suggestion, but thanks anyway!!

Robert, your suggestion sounds great to me, could you please provide any
example on how to use that 'metadata driven' trigger?

Thanks!

On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles <k...@google.com> wrote:

> Often, when you need or want more control than triggers provide, such as
> input-type-specific logic like yours, you can use state and timers in ParDo
> to control when to output. You lose any potential optimizations of Combine
> based on associativity/commutativity and assume the burden of making sure
> your output is sensible, but dropping to low-level stateful computation may
> be your best bet.
>
> Kenn
>
> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw <rober...@google.com>
> wrote:
>
>> We've tossed around the idea of "metadata-driven" triggers which would
>> essentially let you provide a mapping element -> metadata and a
>> monotonic CombineFn metadata* -> bool that would allow for this (the
>> AfterCount being a special case of this, with the mapping fn being _
>> -> 1, and the CombineFn being sum(...) >= N, for size one would
>> provide a (perhaps approximate) sizing mapping fn).
>>
>> Note, however, that there's no guarantee that the trigger fire as soon
>> as possible; due to runtime characteristics a significant amount of
>> data may be buffered (or come in at once) before the trigger is
>> queried. One possibility would be to follow your triggering with a
>> DoFn that breaks up large value streams into multiple manageable sized
>> ones as needed.
>>
>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso <car...@mrcalonso.com>
>> wrote:
>> > Hi everyone!!
>> >
>> > I was wondering if there is an option to trigger window panes based on
>> the
>> > size of the pane itself (rather than the number of elements).
>> >
>> > To provide a little bit more of context we're backing up a PubSub topic
>> into
>> > GCS with the "special" feature that, depending on the "type" of the
>> message,
>> > the GCS destination is one or another.
>> >
>> > Messages' 'shape' published there is quite random, some of them are very
>> > frequent and small, some others very big but sparse... We have around
>> 150
>> > messages per second (in total) and we're firing every 15 minutes and
>> > experiencing OOM errors, we've considered firing based on the number of
>> > items as well, but given the randomness of the input, I don't think it
>> will
>> > be a final solution either.
>> >
>> > Having a trigger based on size would be great, another option would be
>> to
>> > have a dynamic shards number for the PTransform that actually writes the
>> > files.
>> >
>> > What is your recommendation for this use case?
>> >
>> > Thanks!!
>>
>
>


Triggers based on size

2018-01-09 Thread Carlos Alonso
Hi everyone!!

I was wondering if there is an option to trigger window panes based on the
size of the pane itself (rather than the number of elements).

To provide a little bit more of context we're backing up a PubSub topic
into GCS with the "special" feature that, depending on the "type" of the
message, the GCS destination is one or another.

Messages' 'shape' published there is quite random, some of them are very
frequent and small, some others very big but sparse... We have around 150
messages per second (in total) and we're firing every 15 minutes and
experiencing OOM errors, we've considered firing based on the number of
items as well, but given the randomness of the input, I don't think it will
be a final solution either.

Having a trigger based on size would be great, another option would be to
have a dynamic shards number for the PTransform that actually writes the
files.

What is your recommendation for this use case?

Thanks!!


Re: London Apache Beam meetup 2: 11/01

2018-01-09 Thread Carlos Alonso
Cool, thanks!!

On Mon, Jan 8, 2018 at 1:38 PM Matthias Baetens <
matthias.baet...@datatonic.com> wrote:

> Yes, we put everything in place to record this time and hope to share the
> recordings soon after the meetup. Stay tuned!
>
> On 8 Jan 2018 10:32, "Carlos Alonso" <car...@mrcalonso.com> wrote:
>
> Will it be recorded?
>
> On Fri, Jan 5, 2018 at 5:11 PM Matthias Baetens <
> matthias.baet...@datatonic.com> wrote:
>
>> Hi all,
>>
>> Excited to announce the second Beam meet up located in the *Qubit
>> offices <https://goo.gl/maps/sVmFYrVys1S2> *next *Thursday 11/01.*
>>
>> We are very excited to have JB flying in to talk about IO and Splittable
>> DoFns and Vadim Sobolevski to share on how FutureFlow uses Beam in a
>> finance use case.
>>
>> More info and RSVP here <http://bit.ly/2zcUy5A>. We are looking forward
>> to welcome you all!
>>
>> Best regards,
>> Matthias
>>
>
>


Re: London Apache Beam meetup 2: 11/01

2018-01-08 Thread Carlos Alonso
Will it be recorded?

On Fri, Jan 5, 2018 at 5:11 PM Matthias Baetens <
matthias.baet...@datatonic.com> wrote:

> Hi all,
>
> Excited to announce the second Beam meet up located in the *Qubit offices
>  *next *Thursday 11/01.*
>
> We are very excited to have JB flying in to talk about IO and Splittable
> DoFns and Vadim Sobolevski to share on how FutureFlow uses Beam in a
> finance use case.
>
> More info and RSVP here . We are looking forward
> to welcome you all!
>
> Best regards,
> Matthias
>