Pipeline error handling

2018-07-26 Thread Kelsey RIDER
I'm trying to figure out how to handle errors in my Pipeline.

Right now, my main transform is a DoFn. I have 
a few different TupleTag that I use depending on the data contained 
in the records.
In the event there's a problem with a line (due to one of several possible 
causes), I created a TupleTag ERROR. However, just doing this 
doesn't carry with it any information about the error.
I would like for the ERROR tag to have a type other than CSVRecord, e.g. some 
sort of ErrorInfo class containing the row number, filename, message about what 
went wrong, etc...

I can't use multiple TupleTag types with ParDo, because the withOutputTags() 
method forces them to all have the same generic parameter.

I saw the example here: 
https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
But I don't see how this can work, since they use multiple generic types in 
withOutputTags(). (And is this good practice? Seems like they "cheat" by not 
calling apply(), instead directly transforming the PCollection (and why even 
bother extending DoFn in this case?).)

Finally, if I write my own PTransform 
class, and start manually creating PCollections and whatnot...then this would 
effectively become a bottleneck where everything has to be read at once, and 
there's no longer any sequential handling of the records as they're read, right?
Suite ? l'?volution des dispositifs de r?glementation du travail, si vous 
recevez ce mail avant 7h00, en soir?e, durant le week-end ou vos cong?s merci, 
sauf cas d'urgence exceptionnelle, de ne pas le traiter ni d'y r?pondre 
imm?diatement.


Re: Pipeline error handling

2018-07-26 Thread Tim Robertson
Hi Kelsey

Does the example [1] in the docs demonstrate differing generic types when
using withOutputTags()?

Could something like the following work for you?

  final TupleTag type1Records =
  final TupleTag type2Records =
  final TupleTag invalidRecords =  // CSVInvalidLine holds
e.g. an ID and a cause

HTH,
Tim


[1]
https://beam.apache.org/documentation/programming-guide/#additional-outputs

On Thu, Jul 26, 2018 at 9:44 AM, Kelsey RIDER  wrote:

> I’m trying to figure out how to handle errors in my Pipeline.
>
>
>
> Right now, my main transform is a DoFn. I
> have a few different TupleTag that I use depending on the data
> contained in the records.
>
> In the event there’s a problem with a line (due to one of several possible
> causes), I created a TupleTag ERROR. However, just doing this
> doesn’t carry with it any information about the error.
>
> I would like for the ERROR tag to have a type other than CSVRecord, e.g.
> some sort of ErrorInfo class containing the row number, filename, message
> about what went wrong, etc…
>
>
>
> I can’t use multiple TupleTag types with ParDo, because the
> withOutputTags() method forces them to all have the same generic parameter.
>
>
>
> I saw the example here: https://medium.com/@vallerylancey/error-handling-
> elements-in-apache-beam-pipelines-fffdea91af2a
>
> But I don’t see how this can work, since they use multiple generic types
> in withOutputTags(). (And is this good practice? Seems like they “cheat” by
> not calling apply(), instead directly transforming the PCollection (and why
> even bother extending DoFn in this case?).)
>
>
>
> Finally, if I write my own PTransform PCollectionTuple> class, and start manually creating PCollections and
> whatnot…then this would effectively become a bottleneck where everything
> has to be read at once, and there’s no longer any sequential handling of
> the records as they’re read, right?
> Suite à l’évolution des dispositifs de réglementation du travail, si vous
> recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés
> merci, sauf cas d’urgence exceptionnelle, de ne pas le traiter ni d’y
> répondre immédiatement.
>


RE: Pipeline error handling

2018-07-26 Thread Kelsey RIDER
OK, nevermind, I figured it out.
The issue was that the FIRST argument to withOutputTags() has to have the same 
generic type as the DoFn's output.
So by changing my transform to a DoFn I can 
successfully mix tags.

From: Kelsey RIDER 
Sent: jeudi 26 juillet 2018 09:45
To: user@beam.apache.org
Subject: Pipeline error handling

I'm trying to figure out how to handle errors in my Pipeline.

Right now, my main transform is a DoFn. I have 
a few different TupleTag that I use depending on the data contained 
in the records.
In the event there's a problem with a line (due to one of several possible 
causes), I created a TupleTag ERROR. However, just doing this 
doesn't carry with it any information about the error.
I would like for the ERROR tag to have a type other than CSVRecord, e.g. some 
sort of ErrorInfo class containing the row number, filename, message about what 
went wrong, etc...

I can't use multiple TupleTag types with ParDo, because the withOutputTags() 
method forces them to all have the same generic parameter.

I saw the example here: 
https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
But I don't see how this can work, since they use multiple generic types in 
withOutputTags(). (And is this good practice? Seems like they "cheat" by not 
calling apply(), instead directly transforming the PCollection (and why even 
bother extending DoFn in this case?).)

Finally, if I write my own PTransform 
class, and start manually creating PCollections and whatnot...then this would 
effectively become a bottleneck where everything has to be read at once, and 
there's no longer any sequential handling of the records as they're read, right?
Suite à l'évolution des dispositifs de réglementation du travail, si vous 
recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés merci, 
sauf cas d'urgence exceptionnelle, de ne pas le traiter ni d'y répondre 
immédiatement.
Suite à l'évolution des dispositifs de réglementation du travail, si vous 
recevez ce mail avant 7h00, en soirée, durant le week-end ou vos congés merci, 
sauf cas d'urgence exceptionnelle, de ne pas le traiter ni d'y répondre 
immédiatement.


FileBasedSink.WriteOperation copy instead of move?

2018-07-26 Thread Jozef Vilcek
Hello,

just came across FileBasedSink.WriteOperation class which does have
moveToOutput() method. Implementation does a Filesystem.copy() instead of
"move". With large files I find it quote no efficient if underlying FS
supports more efficient ways, so I wonder what is the story behind it? Must
it be a copy?

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L761


Re: FileBasedSink.WriteOperation copy instead of move?

2018-07-26 Thread Lukasz Cwik
+dev

On Thu, Jul 26, 2018 at 2:40 AM Jozef Vilcek  wrote:

> Hello,
>
> just came across FileBasedSink.WriteOperation class which does have
> moveToOutput() method. Implementation does a Filesystem.copy() instead of
> "move". With large files I find it quote no efficient if underlying FS
> supports more efficient ways, so I wonder what is the story behind it? Must
> it be a copy?
>
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L761
>


Re: FileBasedSink.WriteOperation copy instead of move?

2018-07-26 Thread Chamikara Jayalath
Also, we'll have to use StandardMoveOptions.IGNORE_MISSING_FILES for
supporting failures of the rename step. I think this is a good change to do
if the change significantly improves the performance of some of the
FileSystems (note that some FileSystems, for example GCS, implement rename
in the form of a copy+delete, so there will be no significant performance
improvements for such FileSystems).

-Cham

On Thu, Jul 26, 2018 at 10:14 AM Reuven Lax  wrote:

> We might be able to replace this with Filesystem.rename(). One thing to
> keep in mind - the destination files might be in a different directory, so
> we would need to make sure that all Filesystems support cross-directory
> rename.
>
> On Thu, Jul 26, 2018 at 9:58 AM Lukasz Cwik  wrote:
>
>> +dev
>>
>> On Thu, Jul 26, 2018 at 2:40 AM Jozef Vilcek 
>> wrote:
>>
>>> Hello,
>>>
>>> just came across FileBasedSink.WriteOperation class which does have
>>> moveToOutput() method. Implementation does a Filesystem.copy() instead of
>>> "move". With large files I find it quote no efficient if underlying FS
>>> supports more efficient ways, so I wonder what is the story behind it? Must
>>> it be a copy?
>>>
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L761
>>>
>>


Re: FileBasedSink.WriteOperation copy instead of move?

2018-07-26 Thread Jozef Vilcek
Yes, rename can be tricky with cross-directory. This is related
https://issues.apache.org/jira/browse/BEAM-4861
I guess I can file a JIRA for this, right?

On Thu, Jul 26, 2018 at 7:31 PM Chamikara Jayalath 
wrote:

> Also, we'll have to use StandardMoveOptions.IGNORE_MISSING_FILES for
> supporting failures of the rename step. I think this is a good change to do
> if the change significantly improves the performance of some of the
> FileSystems (note that some FileSystems, for example GCS, implement rename
> in the form of a copy+delete, so there will be no significant performance
> improvements for such FileSystems).
>
> -Cham
>
> On Thu, Jul 26, 2018 at 10:14 AM Reuven Lax  wrote:
>
>> We might be able to replace this with Filesystem.rename(). One thing to
>> keep in mind - the destination files might be in a different directory, so
>> we would need to make sure that all Filesystems support cross-directory
>> rename.
>>
>> On Thu, Jul 26, 2018 at 9:58 AM Lukasz Cwik  wrote:
>>
>>> +dev
>>>
>>> On Thu, Jul 26, 2018 at 2:40 AM Jozef Vilcek 
>>> wrote:
>>>
 Hello,

 just came across FileBasedSink.WriteOperation class which does have
 moveToOutput() method. Implementation does a Filesystem.copy() instead of
 "move". With large files I find it quote no efficient if underlying FS
 supports more efficient ways, so I wonder what is the story behind it? Must
 it be a copy?


 https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L761

>>>


Re: FileBasedSink.WriteOperation copy instead of move?

2018-07-26 Thread Chamikara Jayalath
Yeah, please file a JIRA.

- Cham

On Thu, Jul 26, 2018 at 11:33 AM Jozef Vilcek  wrote:

> Yes, rename can be tricky with cross-directory. This is related
> https://issues.apache.org/jira/browse/BEAM-4861
> I guess I can file a JIRA for this, right?
>
> On Thu, Jul 26, 2018 at 7:31 PM Chamikara Jayalath 
> wrote:
>
>> Also, we'll have to use StandardMoveOptions.IGNORE_MISSING_FILES for
>> supporting failures of the rename step. I think this is a good change to do
>> if the change significantly improves the performance of some of the
>> FileSystems (note that some FileSystems, for example GCS, implement rename
>> in the form of a copy+delete, so there will be no significant performance
>> improvements for such FileSystems).
>>
>> -Cham
>>
>> On Thu, Jul 26, 2018 at 10:14 AM Reuven Lax  wrote:
>>
>>> We might be able to replace this with Filesystem.rename(). One thing to
>>> keep in mind - the destination files might be in a different directory, so
>>> we would need to make sure that all Filesystems support cross-directory
>>> rename.
>>>
>>> On Thu, Jul 26, 2018 at 9:58 AM Lukasz Cwik  wrote:
>>>
 +dev

 On Thu, Jul 26, 2018 at 2:40 AM Jozef Vilcek 
 wrote:

> Hello,
>
> just came across FileBasedSink.WriteOperation class which does have
> moveToOutput() method. Implementation does a Filesystem.copy() instead of
> "move". With large files I find it quote no efficient if underlying FS
> supports more efficient ways, so I wonder what is the story behind it? 
> Must
> it be a copy?
>
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L761
>