Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-03 Thread Jozef Vilcek
Regarding splitting, I think SDF is being split on spark runner, but I
agree with Jan's comments about split's contract. Specific SDF is also free
to make decisions about how big the minimal split will be and the runner
should be able to process that with reasonable resources. E.g. ParquetIO is
splitting on format's row groups. If the row group is larger and format
contains a lot of well compressed column data, it will challenge memory
resources.

Jan, as for suggested options to implement it, I have an MR with approach
1) to translate all SDFs to two-threaded executions. I did consider
something like option 3) but I was not sure if it makes sense in general
for other runners as well for Spark. It begs a question for me if it ever
makes sense to create SDF and want it on Spark not to use 2 thread
execution and possibly apply memory pressure.


On Mon, Jan 2, 2023 at 4:49 PM Jan Lukavský  wrote:

> There are different translations of streaming and batch Pipelines in
> SparkRunner, this thread was focused on the batch part, if I understand it
> correctly. Unbounded PCollections are not supported in batch Spark (by
> definition). I agree that fixing the splitting is a valid option, though it
> still requires unnecessarily big heap for buffering and/or might induce
> some overhead with splitting the restriction. Not to mention, that the
> splitting is somewhat optional in the contract of SDF (the DoFn might not
> support it, if it is bounded), so it might not solve the issue for all
> SDFs. The source might not even be splittable at all (e.g. a completely
> compressed blob, without any blocks).
>
>  Jan
> On 1/2/23 16:22, Daniel Collins via dev wrote:
>
> If spark's SDF solution doesn't support splitting, fixing that seems like
> the best solution to me. Splitting is the mechanism exposed by the model to
> actually limit the amount of data produced in a bundle. If unsupported,
> then unbounded-per-element SDFs wouldn't be supported at all.
>
> -Daniel
>
> On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský  wrote:
>
>> Hi Jozef,
>>
>> I agree that this issue is most likely related to Spark for the reason
>> how Spark uses functional style for doing flatMap().
>>
>> It could be fixed with the following two options:
>>
>>  a) SparkRunner's SDF implementation does not use splitting - it could be
>> fixed so that the SDF is stopped after N elements buffered via trySplit,
>> buffer gets flushed and the restriction is resumed
>>
>>  b) alternatively use two threads and a BlockingQueue between them, which
>> is what you propose
>>
>> The number of output elements per input element is bounded (we are
>> talking about batch case anyway), but bounded does not mean it has to fit
>> to memory. Furthermore, unnecessary buffering of large number of elements
>> is memory-inefficient, which is why I think that the two-thread approach
>> (b) should be the most efficient. The option (a) seems orthogonal and might
>> be implemented as well.
>>
>> It rises the question of how to determine if the runner should do some
>> special translation of SDF in this case. There are probably only these
>> options:
>>
>>  1) translate all SDFs to two-thread execution
>>
>>  2) add runtime flag, that will turn the translation on (once turned on,
>> it will translate all SDFs) - this is the current proposal
>>
>>  3) extend @DoFn.BoundedPerElement annotation with some kind of
>> (optional) hint - e.g. @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the
>> default would be Bounded.FITS_IN_MEMORY (which is the current approach)
>>
>> The approach (3) seems to give more information to all runners and might
>> result in the ability to apply various optimizations for multiple runners,
>> so I'd say that this might be the ideal variant.
>>
>>   Jan
>> On 12/29/22 13:07, Jozef Vilcek wrote:
>>
>> I am surprised to hear that Dataflow runner ( which I never used ) would
>> have this kind oflimitation. I see that the `OutputManager` interface is
>> implemented to write to `Receiver` [1] which follows the push model. Do you
>> have a reference I can take a look to review the must fit memory
>> limitation?
>>
>> In Spark, the problem is that the leaf operator pulls data from previous
>> ones by consuming an `Iterator` of values. As per your suggestion, this is
>> not a problem with `sources` because they hold e.g. source file and can
>> pull data as they are being requested. This gets problematic exactly with
>> SDF and flatMaps and not sources. It could be one of the reasons why SDF
>> performed badly on Spark where community reported performance degradation
>> [2] and increas

Re: SparkRunner - ensure SDF output does not need to fit in memory

2022-12-29 Thread Jozef Vilcek
I am surprised to hear that Dataflow runner ( which I never used ) would
have this kind oflimitation. I see that the `OutputManager` interface is
implemented to write to `Receiver` [1] which follows the push model. Do you
have a reference I can take a look to review the must fit memory limitation?

In Spark, the problem is that the leaf operator pulls data from previous
ones by consuming an `Iterator` of values. As per your suggestion, this is
not a problem with `sources` because they hold e.g. source file and can
pull data as they are being requested. This gets problematic exactly with
SDF and flatMaps and not sources. It could be one of the reasons why SDF
performed badly on Spark where community reported performance degradation
[2] and increases memory use [3]

My proposed solution is to, similar as Dataflow, use `Receiver`-like
implementation for DoFns which can output large number of elements. For
now, this WIP targets SDFs only.

[1]
https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
[2] https://github.com/apache/beam/pull/14755
[3]
https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005

On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev 
wrote:

> I believe that for dataflow runner, the result of processElement must also
> fit in memory, so this is not just a constraint for the spark runner.
>
> The best approach at present might be to convert the source from a flatMap
> to an SDF that reads out chunks of the file at a time, and supports runner
> checkpointing (i.e. with a file seek point to resume from) to chunk your
> data in a way that doesn't require the runner to support unbounded outputs
> from any individual @ProcessElements downcall.
>
> -Daniel
>
> On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek 
> wrote:
>
>> Hello,
>>
>> I am working on an issue which currently limits spark runner by requiring
>> the result of processElement to fit the memory [1]. This is problematic e.g
>> for flatMap where the input element is file split and generates possibly
>> large output.
>>
>> The intended fix is to add an option to have dofn processing over input
>> in one thread and consumption of outputs and forwarding them to downstream
>> operators in another thread. One challenge for me is to identify which DoFn
>> should be using this async approach.
>>
>> Here [2] is a commit which is WIP and use async processing only for SDF
>> naive expansion. I would like to get feedback on:
>>
>> 1) does the approach make sense overall
>>
>> 2) to target DoFn which needs an async processing __ generates possibly
>> large output __ I am currently just checking if it is DoFn of SDF naive
>> expansion type [3]. I failed to find a better / more systematic approach
>> for identifying which DoFn should benefit from that. I would appreciate any
>> thoughts how to make this better.
>>
>> 3) Config option and validatesRunner tests - do we want to make it
>> possible to turn async DoFn off? If yes, do we want to run validatesRunner
>> tests for borth options? How do I make sure of that?
>>
>> Looking forward to the feedback.
>> Best,
>> Jozef
>>
>> [1] https://github.com/apache/beam/issues/23852
>> [2]
>> https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff
>> [3]
>> https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff#diff-bd72087119a098aa8c947d0989083ec9a6f2b54ef18da57d50e0978799c79191R362
>>
>>


SparkRunner - ensure SDF output does not need to fit in memory

2022-12-28 Thread Jozef Vilcek
Hello,

I am working on an issue which currently limits spark runner by requiring
the result of processElement to fit the memory [1]. This is problematic e.g
for flatMap where the input element is file split and generates possibly
large output.

The intended fix is to add an option to have dofn processing over input in
one thread and consumption of outputs and forwarding them to downstream
operators in another thread. One challenge for me is to identify which DoFn
should be using this async approach.

Here [2] is a commit which is WIP and use async processing only for SDF
naive expansion. I would like to get feedback on:

1) does the approach make sense overall

2) to target DoFn which needs an async processing __ generates possibly
large output __ I am currently just checking if it is DoFn of SDF naive
expansion type [3]. I failed to find a better / more systematic approach
for identifying which DoFn should benefit from that. I would appreciate any
thoughts how to make this better.

3) Config option and validatesRunner tests - do we want to make it possible
to turn async DoFn off? If yes, do we want to run validatesRunner tests for
borth options? How do I make sure of that?

Looking forward to the feedback.
Best,
Jozef

[1] https://github.com/apache/beam/issues/23852
[2]
https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff
[3]
https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff#diff-bd72087119a098aa8c947d0989083ec9a6f2b54ef18da57d50e0978799c79191R362


Beam job details not available on Spark History Server

2022-02-23 Thread Jozef Vilcek
I would like to discuss a problem I am facing upgrading Beam 2.24.0 ->
2.33.0.

Running Beam batch jobs on SparkRunner with Spark 2.4.4 stopped showing me
job details on Spark History Server. Problem is that there are 2 event
logging. listener running and they step on each other. More details in [1].
One is run by Spark itself, the other is started by Beam, which was added
by MR [2].

My first question is towards understanding why there is Spark's even
logging listener started manually within Beam next to the one started by
Spark Context internally?


[1] https://issues.apache.org/jira/browse/BEAM-13981
[2] https://github.com/apache/beam/pull/14409


Re: Convert Row with nullable fields to Protobuf fails

2021-12-18 Thread Jozef Vilcek
Any feedback on this one please?

On Mon, Dec 13, 2021 at 11:02 AM Jozef Vilcek  wrote:

> Yes, field is marked as nullable. Here is a test case change to
> illustrate it:
>
> https://github.com/JozoVilcek/beam/commit/5e1c6324868c2fd6145dd2348c7358fdc787ac38
>
> On Sun, Dec 12, 2021 at 7:28 PM Reuven Lax  wrote:
>
>> Is the schema field marked as nullable?
>>
>> On Sun, Dec 12, 2021 at 4:21 AM Jozef Vilcek 
>> wrote:
>>
>>> I did notice that protobuf schema translator supports nullable for proto
>>> fiels [1]. E.g. if I want to a nullable string, then in proto I can use
>>> `google.protobuf.StringValue` and schema will look fine.
>>>
>>> However, fromRow creator does not support this and throw exception if it
>>> is presented with row instance with null value. It tries to pass this null
>>> to a `StringValue` while proto way would be probably ignoring the set in
>>> that case.
>>>
>>> I can submit a failing test case and JIRA but want to check first if
>>> this is supposed to be supported - having row with nulls handled and
>>> creating instance of compatible protobuf.
>>>
>>> Thanks,
>>> Jozo
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java#L106
>>>
>>


Re: Convert Row with nullable fields to Protobuf fails

2021-12-13 Thread Jozef Vilcek
Yes, field is marked as nullable. Here is a test case change to
illustrate it:
https://github.com/JozoVilcek/beam/commit/5e1c6324868c2fd6145dd2348c7358fdc787ac38

On Sun, Dec 12, 2021 at 7:28 PM Reuven Lax  wrote:

> Is the schema field marked as nullable?
>
> On Sun, Dec 12, 2021 at 4:21 AM Jozef Vilcek 
> wrote:
>
>> I did notice that protobuf schema translator supports nullable for proto
>> fiels [1]. E.g. if I want to a nullable string, then in proto I can use
>> `google.protobuf.StringValue` and schema will look fine.
>>
>> However, fromRow creator does not support this and throw exception if it
>> is presented with row instance with null value. It tries to pass this null
>> to a `StringValue` while proto way would be probably ignoring the set in
>> that case.
>>
>> I can submit a failing test case and JIRA but want to check first if this
>> is supposed to be supported - having row with nulls handled and
>> creating instance of compatible protobuf.
>>
>> Thanks,
>> Jozo
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java#L106
>>
>


Convert Row with nullable fields to Protobuf fails

2021-12-12 Thread Jozef Vilcek
I did notice that protobuf schema translator supports nullable for proto
fiels [1]. E.g. if I want to a nullable string, then in proto I can use
`google.protobuf.StringValue` and schema will look fine.

However, fromRow creator does not support this and throw exception if it is
presented with row instance with null value. It tries to pass this null to
a `StringValue` while proto way would be probably ignoring the set in that
case.

I can submit a failing test case and JIRA but want to check first if this
is supposed to be supported - having row with nulls handled and
creating instance of compatible protobuf.

Thanks,
Jozo


[1]
https://github.com/apache/beam/blob/master/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoSchemaTranslator.java#L106


Re: FileIO with custom sharding function

2021-07-13 Thread Jozef Vilcek
I would like to bump this thread. So far we did not get much far.
Allowing the user to specify a sharing function and not only the desired
number of shards seems to be a very unsettling change. So far I do not have
a good understanding of why this is so unacceptable. From the thread I
sense some fear from users not using it correctly and complaining about
performance. While true, this could be the same for any GBK operation with
a bad key and can be mitigated to some extent by documentation.

What else do you see feature wise as a blocker for this change? Right now I
do not have much data based on which I can evolve my proposal.


On Wed, Jul 7, 2021 at 10:01 AM Jozef Vilcek  wrote:

>
>
> On Sat, Jul 3, 2021 at 7:30 PM Reuven Lax  wrote:
>
>>
>>
>> On Sat, Jul 3, 2021 at 1:02 AM Jozef Vilcek 
>> wrote:
>>
>>> I don't think this has anything to do with external shuffle services.
>>>
>>> Arbitrarily recomputing data is fundamentally incompatible with Beam,
>>> since Beam does not restrict transforms to being deterministic. The Spark
>>> runner works (at least it did last I checked) by checkpointing the RDD.
>>> Spark will not recompute the DAG past a checkpoint, so this creates stable
>>> input to a transform. This adds some cost to the Spark runner, but makes
>>> things correct. You should not have sharding problems due to replay unless
>>> there is a bug in the current Spark runner.
>>>
>>> Beam does not restrict non-determinism, true. If users do add it, then
>>> they can work around it should they need to address any side effects. But
>>> should non-determinism be deliberately added by Beam core? Probably can if
>>> runners can 100% deal with that effectively. To the point of RDD
>>> `checkpoint`, afaik SparkRunner does use `cache`, not checkpoint. Also, I
>>> thought cache is invoked on fork points in DAG. I have just a join and map
>>> and in such cases I thought data is always served out by shuffle service.
>>> Am I mistaken?
>>>
>>
>> Non determinism is already there  in core Beam features. If any sort of
>> trigger is used anywhere in the pipeline, the result is non deterministic.
>> We've also found that users tend not to know when their ParDos are non
>> deterministic, so telling users to works around it tends not to work.
>>
>> The Spark runner definitely used to use checkpoint in this case.
>>
>
> It is beyond my knowledge level of Beam how triggers introduce
> non-determinism into the code processing in batch mode. So I leave that
> out. Reuven, do you mind pointing me to the Spark runner code which is
> supposed to handle this by using checkpoint? I can not find it myself.
>
>
>
>>
>>>
>>> On Fri, Jul 2, 2021 at 8:23 PM Reuven Lax  wrote:
>>>
>>>>
>>>>
>>>> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek 
>>>> wrote:
>>>>
>>>>>
>>>>> How will @RequiresStableInput prevent this situation when running
>>>>>> batch use case?
>>>>>>
>>>>>
>>>>> So this is handled in combination of @RequiresStableInput and output
>>>>> file finalization. @RequiresStableInput (or Reshuffle for most runners)
>>>>> makes sure that the input provided for the write stage does not get
>>>>> recomputed in the presence of failures. Output finalization makes sure 
>>>>> that
>>>>> we only finalize one run of each bundle and discard the rest.
>>>>>
>>>>> So this I believe relies on having robust external service to hold
>>>>> shuffle data and serve it out when needed so pipeline does not need to
>>>>> recompute it via non-deterministic function. In Spark however, shuffle
>>>>> service can not be (if I am not mistaking) deployed in this fashion (HA +
>>>>> replicated shuffle data). Therefore, if instance of shuffle service 
>>>>> holding
>>>>> a portion of shuffle data fails, spark recovers it by recomputing parts of
>>>>> a DAG from source to recover lost shuffle results. I am not sure what Beam
>>>>> can do here to prevent it or make it stable? Will @RequiresStableInput 
>>>>> work
>>>>> as expected? ... Note that this, I believe, concerns only the batch.
>>>>>
>>>>
>>>> I don't think this has anything to do with external shuffle services.
>>>>
>>>> Arbitrarily recomputing data is fundamentally incompatible with Beam,
>>>&g

Re: FileIO with custom sharding function

2021-07-07 Thread Jozef Vilcek
On Sat, Jul 3, 2021 at 7:30 PM Reuven Lax  wrote:

>
>
> On Sat, Jul 3, 2021 at 1:02 AM Jozef Vilcek  wrote:
>
>> I don't think this has anything to do with external shuffle services.
>>
>> Arbitrarily recomputing data is fundamentally incompatible with Beam,
>> since Beam does not restrict transforms to being deterministic. The Spark
>> runner works (at least it did last I checked) by checkpointing the RDD.
>> Spark will not recompute the DAG past a checkpoint, so this creates stable
>> input to a transform. This adds some cost to the Spark runner, but makes
>> things correct. You should not have sharding problems due to replay unless
>> there is a bug in the current Spark runner.
>>
>> Beam does not restrict non-determinism, true. If users do add it, then
>> they can work around it should they need to address any side effects. But
>> should non-determinism be deliberately added by Beam core? Probably can if
>> runners can 100% deal with that effectively. To the point of RDD
>> `checkpoint`, afaik SparkRunner does use `cache`, not checkpoint. Also, I
>> thought cache is invoked on fork points in DAG. I have just a join and map
>> and in such cases I thought data is always served out by shuffle service.
>> Am I mistaken?
>>
>
> Non determinism is already there  in core Beam features. If any sort of
> trigger is used anywhere in the pipeline, the result is non deterministic.
> We've also found that users tend not to know when their ParDos are non
> deterministic, so telling users to works around it tends not to work.
>
> The Spark runner definitely used to use checkpoint in this case.
>

It is beyond my knowledge level of Beam how triggers introduce
non-determinism into the code processing in batch mode. So I leave that
out. Reuven, do you mind pointing me to the Spark runner code which is
supposed to handle this by using checkpoint? I can not find it myself.



>
>>
>> On Fri, Jul 2, 2021 at 8:23 PM Reuven Lax  wrote:
>>
>>>
>>>
>>> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek 
>>> wrote:
>>>
>>>>
>>>> How will @RequiresStableInput prevent this situation when running batch
>>>>> use case?
>>>>>
>>>>
>>>> So this is handled in combination of @RequiresStableInput and output
>>>> file finalization. @RequiresStableInput (or Reshuffle for most runners)
>>>> makes sure that the input provided for the write stage does not get
>>>> recomputed in the presence of failures. Output finalization makes sure that
>>>> we only finalize one run of each bundle and discard the rest.
>>>>
>>>> So this I believe relies on having robust external service to hold
>>>> shuffle data and serve it out when needed so pipeline does not need to
>>>> recompute it via non-deterministic function. In Spark however, shuffle
>>>> service can not be (if I am not mistaking) deployed in this fashion (HA +
>>>> replicated shuffle data). Therefore, if instance of shuffle service holding
>>>> a portion of shuffle data fails, spark recovers it by recomputing parts of
>>>> a DAG from source to recover lost shuffle results. I am not sure what Beam
>>>> can do here to prevent it or make it stable? Will @RequiresStableInput work
>>>> as expected? ... Note that this, I believe, concerns only the batch.
>>>>
>>>
>>> I don't think this has anything to do with external shuffle services.
>>>
>>> Arbitrarily recomputing data is fundamentally incompatible with Beam,
>>> since Beam does not restrict transforms to being deterministic. The Spark
>>> runner works (at least it did last I checked) by checkpointing the RDD.
>>> Spark will not recompute the DAG past a checkpoint, so this creates stable
>>> input to a transform. This adds some cost to the Spark runner, but makes
>>> things correct. You should not have sharding problems due to replay unless
>>> there is a bug in the current Spark runner.
>>>
>>>
>>>>
>>>> Also, when withNumShards() truly have to be used, round robin
>>>> assignment of elements to shards sounds like the optimal solution (at least
>>>> for the vast majority of pipelines)
>>>>
>>>> I agree. But random seed is my problem here with respect to the
>>>> situation mentioned above.
>>>>
>>>> Right, I would like to know if there are more true use-cases before
>>>> adding this. This essentially allows users to map elements to exact output
>>>&g

Re: FileIO with custom sharding function

2021-07-07 Thread Jozef Vilcek
On Sat, Jul 3, 2021 at 12:55 PM Jan Lukavský  wrote:

>
> I don't think this has anything to do with external shuffle services.
>
>
> Sorry, for stepping into this discussion again, but I don't think this
> statement is 100% correct. What Spark's checkpoint does is that it saves
> intermediate data (prior to shuffle) to external storage so that it can be
> made available in case it is needed? Now, what is the purpose of an
> external shuffle service? Persist data externally to make it available when
> it is needed. It that sense enforcing checkpoint prior to each shuffle
> effectively means, we force Spark to shuffle data twice (ok,
> once-and-a-half-times, the external checkpoint is likely not be read) -
> once using its own internal shuffle and second time the external
> checkpoint. That is not going to be efficient. And if spark could switch
> off its internal shuffle and use the external checkpoint for the same
> purpose, then the checkpoint would play role of external shuffle service,
> which is why this whole discussion has to do something with external
> shuffle services.
>
> To move this discussion forward I think that the solution could be in that
> SparkRunner can override the default FileIO transform and use a
> deterministic sharding function. @Jozef, would that work for you? It would
> mean, that the same should probably have to be done (sooner or later) for
> Flink batch and probably for other runners. Maybe there could be a
> deterministic override ready-made in runners-core. Or, actually, maybe the
> more easy way would be the other way around, that Dataflow would use the
> non-deterministic version, while other runners could use the (more
> conservative, yet not that performant) version.
>
> WDYT?
>
Yes, that could work for the problem with Beam on Spark producing
inconsistent file output when shuffle data is lost. I would still need to
address the "bucketing problem" e.g. wanting data from the same user_id and
hour end up in the same file. It feels ok to tackle those separately.


>  Jan
> On 7/2/21 8:23 PM, Reuven Lax wrote:
>
>
>
> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek 
> wrote:
>
>>
>> How will @RequiresStableInput prevent this situation when running batch
>>> use case?
>>>
>>
>> So this is handled in combination of @RequiresStableInput and output file
>> finalization. @RequiresStableInput (or Reshuffle for most runners) makes
>> sure that the input provided for the write stage does not get recomputed in
>> the presence of failures. Output finalization makes sure that we only
>> finalize one run of each bundle and discard the rest.
>>
>> So this I believe relies on having robust external service to hold
>> shuffle data and serve it out when needed so pipeline does not need to
>> recompute it via non-deterministic function. In Spark however, shuffle
>> service can not be (if I am not mistaking) deployed in this fashion (HA +
>> replicated shuffle data). Therefore, if instance of shuffle service holding
>> a portion of shuffle data fails, spark recovers it by recomputing parts of
>> a DAG from source to recover lost shuffle results. I am not sure what Beam
>> can do here to prevent it or make it stable? Will @RequiresStableInput work
>> as expected? ... Note that this, I believe, concerns only the batch.
>>
>
> I don't think this has anything to do with external shuffle services.
>
> Arbitrarily recomputing data is fundamentally incompatible with Beam,
> since Beam does not restrict transforms to being deterministic. The Spark
> runner works (at least it did last I checked) by checkpointing the RDD.
> Spark will not recompute the DAG past a checkpoint, so this creates stable
> input to a transform. This adds some cost to the Spark runner, but makes
> things correct. You should not have sharding problems due to replay unless
> there is a bug in the current Spark runner.
>
>
>>
>> Also, when withNumShards() truly have to be used, round robin assignment
>> of elements to shards sounds like the optimal solution (at least for
>> the vast majority of pipelines)
>>
>> I agree. But random seed is my problem here with respect to the situation
>> mentioned above.
>>
>> Right, I would like to know if there are more true use-cases before
>> adding this. This essentially allows users to map elements to exact output
>> shards which could change the characteristics of pipelines in very
>> significant ways without users being aware of it. For example, this could
>> result in extremely imbalanced workloads for any downstream processors. If
>> it's just Hive I would rather work around it (even with a perf penalty for
>&

Re: FileIO with custom sharding function

2021-07-03 Thread Jozef Vilcek
I don't think this has anything to do with external shuffle services.

Arbitrarily recomputing data is fundamentally incompatible with Beam, since
Beam does not restrict transforms to being deterministic. The Spark runner
works (at least it did last I checked) by checkpointing the RDD. Spark will
not recompute the DAG past a checkpoint, so this creates stable input to a
transform. This adds some cost to the Spark runner, but makes things
correct. You should not have sharding problems due to replay unless there
is a bug in the current Spark runner.

Beam does not restrict non-determinism, true. If users do add it, then they
can work around it should they need to address any side effects. But should
non-determinism be deliberately added by Beam core? Probably can if runners
can 100% deal with that effectively. To the point of RDD `checkpoint`,
afaik SparkRunner does use `cache`, not checkpoint. Also, I thought cache
is invoked on fork points in DAG. I have just a join and map and in such
cases I thought data is always served out by shuffle service. Am I mistaken?


On Fri, Jul 2, 2021 at 8:23 PM Reuven Lax  wrote:

>
>
> On Tue, Jun 29, 2021 at 1:03 AM Jozef Vilcek 
> wrote:
>
>>
>> How will @RequiresStableInput prevent this situation when running batch
>>> use case?
>>>
>>
>> So this is handled in combination of @RequiresStableInput and output file
>> finalization. @RequiresStableInput (or Reshuffle for most runners) makes
>> sure that the input provided for the write stage does not get recomputed in
>> the presence of failures. Output finalization makes sure that we only
>> finalize one run of each bundle and discard the rest.
>>
>> So this I believe relies on having robust external service to hold
>> shuffle data and serve it out when needed so pipeline does not need to
>> recompute it via non-deterministic function. In Spark however, shuffle
>> service can not be (if I am not mistaking) deployed in this fashion (HA +
>> replicated shuffle data). Therefore, if instance of shuffle service holding
>> a portion of shuffle data fails, spark recovers it by recomputing parts of
>> a DAG from source to recover lost shuffle results. I am not sure what Beam
>> can do here to prevent it or make it stable? Will @RequiresStableInput work
>> as expected? ... Note that this, I believe, concerns only the batch.
>>
>
> I don't think this has anything to do with external shuffle services.
>
> Arbitrarily recomputing data is fundamentally incompatible with Beam,
> since Beam does not restrict transforms to being deterministic. The Spark
> runner works (at least it did last I checked) by checkpointing the RDD.
> Spark will not recompute the DAG past a checkpoint, so this creates stable
> input to a transform. This adds some cost to the Spark runner, but makes
> things correct. You should not have sharding problems due to replay unless
> there is a bug in the current Spark runner.
>
>
>>
>> Also, when withNumShards() truly have to be used, round robin assignment
>> of elements to shards sounds like the optimal solution (at least for
>> the vast majority of pipelines)
>>
>> I agree. But random seed is my problem here with respect to the situation
>> mentioned above.
>>
>> Right, I would like to know if there are more true use-cases before
>> adding this. This essentially allows users to map elements to exact output
>> shards which could change the characteristics of pipelines in very
>> significant ways without users being aware of it. For example, this could
>> result in extremely imbalanced workloads for any downstream processors. If
>> it's just Hive I would rather work around it (even with a perf penalty for
>> that case).
>>
>> User would have to explicitly ask FileIO to use specific sharding.
>> Documentation can educate about the tradeoffs. But I am open to workaround
>> alternatives.
>>
>>
>> On Mon, Jun 28, 2021 at 5:50 PM Chamikara Jayalath 
>> wrote:
>>
>>>
>>>
>>> On Mon, Jun 28, 2021 at 2:47 AM Jozef Vilcek 
>>> wrote:
>>>
>>>> Hi Cham, thanks for the feedback
>>>>
>>>> > Beam has a policy of no knobs (or keeping knobs minimum) to allow
>>>> runners to optimize better. I think one concern might be that the addition
>>>> of this option might be going against this.
>>>>
>>>> I agree that less knobs is more. But if assignment of a key is specific
>>>> per user request via API (optional), than runner should not optimize that
>>>> but need to respect how data is requested to be laid down. It could however
>>>> optimize number 

Re: FileIO with custom sharding function

2021-06-29 Thread Jozef Vilcek
> How will @RequiresStableInput prevent this situation when running batch
> use case?
>

So this is handled in combination of @RequiresStableInput and output file
finalization. @RequiresStableInput (or Reshuffle for most runners) makes
sure that the input provided for the write stage does not get recomputed in
the presence of failures. Output finalization makes sure that we only
finalize one run of each bundle and discard the rest.

So this I believe relies on having robust external service to hold shuffle
data and serve it out when needed so pipeline does not need to recompute it
via non-deterministic function. In Spark however, shuffle service can not
be (if I am not mistaking) deployed in this fashion (HA + replicated
shuffle data). Therefore, if instance of shuffle service holding a portion
of shuffle data fails, spark recovers it by recomputing parts of a DAG from
source to recover lost shuffle results. I am not sure what Beam can do here
to prevent it or make it stable? Will @RequiresStableInput work as
expected? ... Note that this, I believe, concerns only the batch.

Also, when withNumShards() truly have to be used, round robin assignment of
elements to shards sounds like the optimal solution (at least for the vast
majority of pipelines)

I agree. But random seed is my problem here with respect to the situation
mentioned above.

Right, I would like to know if there are more true use-cases before adding
this. This essentially allows users to map elements to exact output shards
which could change the characteristics of pipelines in very significant
ways without users being aware of it. For example, this could result in
extremely imbalanced workloads for any downstream processors. If it's just
Hive I would rather work around it (even with a perf penalty for that case).

User would have to explicitly ask FileIO to use specific sharding.
Documentation can educate about the tradeoffs. But I am open to workaround
alternatives.


On Mon, Jun 28, 2021 at 5:50 PM Chamikara Jayalath 
wrote:

>
>
> On Mon, Jun 28, 2021 at 2:47 AM Jozef Vilcek 
> wrote:
>
>> Hi Cham, thanks for the feedback
>>
>> > Beam has a policy of no knobs (or keeping knobs minimum) to allow
>> runners to optimize better. I think one concern might be that the addition
>> of this option might be going against this.
>>
>> I agree that less knobs is more. But if assignment of a key is specific
>> per user request via API (optional), than runner should not optimize that
>> but need to respect how data is requested to be laid down. It could however
>> optimize number of shards as it is doing right now if numShards is not set
>> explicitly.
>> Anyway FileIO feels fluid here. You can leave shards empty and let runner
>> decide or optimize,  but only in batch and not in streaming.  Then it
>> allows you to set number of shards but never let you change the logic how
>> it is assigned and defaults to round robin with random seed. It begs
>> question for me why I can manipulate number of shards but not the logic of
>> assignment. Are there plans to (or already case implemented) optimize
>> around and have runner changing that under different circumstances?
>>
>
> I don't know the exact history behind withNumShards() feature for batch. I
> would guess it was originally introduced due to limitations for streaming
> but was made available to batch as well with a strong performance warning.
>
> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L159
>
> Also, when withNumShards() truly have to be used, round robin assignment
> of elements to shards sounds like the optimal solution (at least for
> the vast majority of pipelines)
>
>
>>
>> > Also looking at your original reasoning for adding this.
>>
>> > > I need to generate shards which are compatible with Hive bucketing
>> and therefore need to decide shard assignment based on data fields of input
>> element
>>
>> > This sounds very specific to Hive. Again I think the downside here is
>> that we will be giving up the flexibility for the runner to optimize the
>> pipeline and decide sharding. Do you think it's possible to somehow relax
>> this requirement for > > > Hive or use a secondary process to format the
>> output to a form that is suitable for Hive after being written to an
>> intermediate storage.
>>
>> It is possible. But it is quite suboptimal because of extra IO penalty
>> and I would have to use completely custom file writing as FileIO would not
>> support this. Then naturally the question would be, should I use FileIO at
>> all? I am quite not sure if I am doing something so rare that it is n

Re: FileIO with custom sharding function

2021-06-28 Thread Jozef Vilcek
Hi Cham, thanks for the feedback

> Beam has a policy of no knobs (or keeping knobs minimum) to allow runners
to optimize better. I think one concern might be that the addition of this
option might be going against this.

I agree that less knobs is more. But if assignment of a key is specific per
user request via API (optional), than runner should not optimize that but
need to respect how data is requested to be laid down. It could however
optimize number of shards as it is doing right now if numShards is not set
explicitly.
Anyway FileIO feels fluid here. You can leave shards empty and let runner
decide or optimize,  but only in batch and not in streaming.  Then it
allows you to set number of shards but never let you change the logic how
it is assigned and defaults to round robin with random seed. It begs
question for me why I can manipulate number of shards but not the logic of
assignment. Are there plans to (or already case implemented) optimize
around and have runner changing that under different circumstances?

> Also looking at your original reasoning for adding this.

> > I need to generate shards which are compatible with Hive bucketing and
therefore need to decide shard assignment based on data fields of input
element

> This sounds very specific to Hive. Again I think the downside here is
that we will be giving up the flexibility for the runner to optimize the
pipeline and decide sharding. Do you think it's possible to somehow relax
this requirement for > > > Hive or use a secondary process to format the
output to a form that is suitable for Hive after being written to an
intermediate storage.

It is possible. But it is quite suboptimal because of extra IO penalty and
I would have to use completely custom file writing as FileIO would not
support this. Then naturally the question would be, should I use FileIO at
all? I am quite not sure if I am doing something so rare that it is not and
can not be supported. Maybe I am. I remember an old discussion from Scio
guys when they wanted to introduce Sorted Merge Buckets to FileIO where
sharding was also needed to be manipulated (among other things)

> > When running e.g. on Spark and job encounters kind of failure which
cause a loss of some data from previous stages, Spark does issue recompute
of necessary task in necessary stages to recover data. Because the shard
assignment function is random as default, some data will end up in
different shards and cause duplicates in the final output.

> I think this was already addressed. The correct solution here is to
implement RequiresStableInput for runners that do not already support that
and update FileIO to use that.

How will @RequiresStableInput prevent this situation when running batch use
case?


On Mon, Jun 28, 2021 at 10:29 AM Chamikara Jayalath 
wrote:

>
>
> On Sun, Jun 27, 2021 at 10:48 PM Jozef Vilcek 
> wrote:
>
>> Hi,
>>
>> how do we proceed with reviewing MR proposed for this change?
>>
>> I sense there is a concern exposing existing sharding function to the
>> API. But from the discussion here I do not have a clear picture
>> about arguments not doing so.
>> Only one argument was that dynamic destinations should be able to do the
>> same. While this is true, as it is illustrated in previous commnet, it is
>> not simple nor convenient to use and requires more customization than
>> exposing sharding which is already there.
>> Are there more negatives to exposing sharding function?
>>
>
> Seems like currently FlinkStreamingPipelineTranslator is the only real
> usage of WriteFiles.withShardingFunction() :
> https://github.com/apache/beam/blob/90c854e97787c19cd5b94034d37c5319317567a8/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java#L281
> WriteFiles is not expected to be used by pipeline authors but exposing
> this through the FileIO will make this available to pipeline authors (and
> some may choose to do so for various reasons).
> This will result in sharding being strict and runners being inflexible
> when it comes to optimizing execution of pipelines that use FileIO.
>
> Beam has a policy of no knobs (or keeping knobs minimum) to allow runners
> to optimize better. I think one concern might be that the addition of this
> option might be going against this.
>
> Also looking at your original reasoning for adding this.
>
> > I need to generate shards which are compatible with Hive bucketing and
> therefore need to decide shard assignment based on data fields of input
> element
>
> This sounds very specific to Hive. Again I think the downside here is that
> we will be giving up the flexibility for the runner to optimize the
> pipeline and decide sharding. Do you think it's possible to somehow relax
> this requirement for Hive or use a secondary proc

Re: FileIO with custom sharding function

2021-06-27 Thread Jozef Vilcek
Hi,

how do we proceed with reviewing MR proposed for this change?

I sense there is a concern exposing existing sharding function to the API.
But from the discussion here I do not have a clear picture about arguments
not doing so.
Only one argument was that dynamic destinations should be able to do the
same. While this is true, as it is illustrated in previous commnet, it is
not simple nor convenient to use and requires more customization than
exposing sharding which is already there.
Are there more negatives to exposing sharding function?

On Wed, Jun 23, 2021 at 9:36 AM Jozef Vilcek  wrote:

> The difference in my opinion is in distinguishing between - as written in
> this thread - physical vs logical properties of the pipeline. I proposed to
> keep dynamic destination (logical) and sharding (physical) separate on API
> level as it is at implementation level.
>
> When I reason about using `by()` for my case ... I am using dynamic
> destination to partition data into hourly folders. So my destination is
> e.g. `2021-06-23-07`. To add a shard there I assume I will need
> * encode shard to the destination .. e.g. in form of file prefix
> `2021-06-23-07/shard-1`
> * dynamic destination now does not span a group of files but must be
> exactly one file
> * to respect above, I have to. "disable" sharding in WriteFiles and make
> sure to use `.withNumShards(1)` ... I am not sure what `runnner determined`
> sharding would do, if it would not split destination further into more
> files
> * make sure that FileNameing will respect my intent and name files as I
> expect based on that destination
> * post process `WriteFilesResult` to turn my destination which targets
> physical single file (date-with-hour + shard-num) back into the destination
> with targets logical group of files (date-with-hour) so I hook it up do
> downstream post-process as usual
>
> Am I roughly correct? Or do I miss something more straight forward?
> If the above is correct then it feels more fragile and less intuitive to
> me than the option in my MR.
>
>
> On Tue, Jun 22, 2021 at 4:28 PM Reuven Lax  wrote:
>
>> I'm not sure I understand your PR. How is this PR different than the by()
>> method in FileIO?
>>
>> On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek 
>> wrote:
>>
>>> MR for review for this change is here:
>>> https://github.com/apache/beam/pull/15051
>>>
>>> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek 
>>> wrote:
>>>
>>>> I would like this thread to stay focused on sharding FileIO only.
>>>> Possible change to the model is an interesting topic but of a much
>>>> different scope.
>>>>
>>>> Yes, I agree that sharding is mostly a physical rather than logical
>>>> property of the pipeline. That is why it feels more natural to distinguish
>>>> between those two on the API level.
>>>> As for handling sharding requirements by adding more sugar to dynamic
>>>> destinations + file naming one has to keep in mind that results of dynamic
>>>> writes can be observed in the form of KV, so written
>>>> files per dynamic destination. Often we do GBP to post-process files per
>>>> destination / logical group. If sharding would be encoded there, then it
>>>> might complicate things either downstream or inside the sugar part to put
>>>> shard in and then take it out later.
>>>> From the user perspective I do not see much difference. We would still
>>>> need to allow API to define both behaviors and it would only be executed
>>>> differently by implementation.
>>>> I do not see a value in changing FileIO (WriteFiles) logic to stop
>>>> using sharding and use dynamic destination for both given that sharding
>>>> function is already there and in use.
>>>>
>>>> To the point of external shuffle and non-deterministic user input.
>>>> Yes users can create non-deterministic behaviors but they are in
>>>> control. Here, Beam internally adds non-deterministic behavior and users
>>>> can not opt-out.
>>>> All works fine as long as external shuffle service (depends on Runner)
>>>> holds to the data and hands it out on retries. However if data in shuffle
>>>> service is lost for some reason - e.g. disk failure, node breaks down -
>>>> then pipeline have to recover the data by recomputing necessary paths.
>>>>
>>>> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> Sharding is typically a physical rather than logical property of the
>>>>>

Re: FileIO with custom sharding function

2021-06-23 Thread Jozef Vilcek
The difference in my opinion is in distinguishing between - as written in
this thread - physical vs logical properties of the pipeline. I proposed to
keep dynamic destination (logical) and sharding (physical) separate on API
level as it is at implementation level.

When I reason about using `by()` for my case ... I am using dynamic
destination to partition data into hourly folders. So my destination is
e.g. `2021-06-23-07`. To add a shard there I assume I will need
* encode shard to the destination .. e.g. in form of file prefix
`2021-06-23-07/shard-1`
* dynamic destination now does not span a group of files but must be
exactly one file
* to respect above, I have to. "disable" sharding in WriteFiles and make
sure to use `.withNumShards(1)` ... I am not sure what `runnner determined`
sharding would do, if it would not split destination further into more
files
* make sure that FileNameing will respect my intent and name files as I
expect based on that destination
* post process `WriteFilesResult` to turn my destination which targets
physical single file (date-with-hour + shard-num) back into the destination
with targets logical group of files (date-with-hour) so I hook it up do
downstream post-process as usual

Am I roughly correct? Or do I miss something more straight forward?
If the above is correct then it feels more fragile and less intuitive to me
than the option in my MR.


On Tue, Jun 22, 2021 at 4:28 PM Reuven Lax  wrote:

> I'm not sure I understand your PR. How is this PR different than the by()
> method in FileIO?
>
> On Tue, Jun 22, 2021 at 1:22 AM Jozef Vilcek 
> wrote:
>
>> MR for review for this change is here:
>> https://github.com/apache/beam/pull/15051
>>
>> On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek 
>> wrote:
>>
>>> I would like this thread to stay focused on sharding FileIO only.
>>> Possible change to the model is an interesting topic but of a much
>>> different scope.
>>>
>>> Yes, I agree that sharding is mostly a physical rather than logical
>>> property of the pipeline. That is why it feels more natural to distinguish
>>> between those two on the API level.
>>> As for handling sharding requirements by adding more sugar to dynamic
>>> destinations + file naming one has to keep in mind that results of dynamic
>>> writes can be observed in the form of KV, so written
>>> files per dynamic destination. Often we do GBP to post-process files per
>>> destination / logical group. If sharding would be encoded there, then it
>>> might complicate things either downstream or inside the sugar part to put
>>> shard in and then take it out later.
>>> From the user perspective I do not see much difference. We would still
>>> need to allow API to define both behaviors and it would only be executed
>>> differently by implementation.
>>> I do not see a value in changing FileIO (WriteFiles) logic to stop using
>>> sharding and use dynamic destination for both given that sharding function
>>> is already there and in use.
>>>
>>> To the point of external shuffle and non-deterministic user input.
>>> Yes users can create non-deterministic behaviors but they are in
>>> control. Here, Beam internally adds non-deterministic behavior and users
>>> can not opt-out.
>>> All works fine as long as external shuffle service (depends on Runner)
>>> holds to the data and hands it out on retries. However if data in shuffle
>>> service is lost for some reason - e.g. disk failure, node breaks down -
>>> then pipeline have to recover the data by recomputing necessary paths.
>>>
>>> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> Sharding is typically a physical rather than logical property of the
>>>> pipeline, and I'm not convinced it makes sense to add it to Beam in
>>>> general. One can already use keys and GBK/Stateful DoFns if some kind
>>>> of logical grouping is needed, and adding constraints like this can
>>>> prevent opportunities for optimizations (like dynamic sharding and
>>>> fusion).
>>>>
>>>> That being said, file output are one area where it could make sense. I
>>>> would expect that dynamic destinations could cover this usecase, and a
>>>> general FileNaming subclass could be provided to make this pattern
>>>> easier (and possibly some syntactic sugar for auto-setting num shards
>>>> to 0). (One downside of this approach is that one couldn't do dynamic
>>>> destinations, and have each sharded with a distinct sharing function
>>>> as well.)
>>>>

Re: FileIO with custom sharding function

2021-06-22 Thread Jozef Vilcek
MR for review for this change is here:
https://github.com/apache/beam/pull/15051

On Fri, Jun 18, 2021 at 8:47 AM Jozef Vilcek  wrote:

> I would like this thread to stay focused on sharding FileIO only. Possible
> change to the model is an interesting topic but of a much different scope.
>
> Yes, I agree that sharding is mostly a physical rather than logical
> property of the pipeline. That is why it feels more natural to distinguish
> between those two on the API level.
> As for handling sharding requirements by adding more sugar to dynamic
> destinations + file naming one has to keep in mind that results of dynamic
> writes can be observed in the form of KV, so written
> files per dynamic destination. Often we do GBP to post-process files per
> destination / logical group. If sharding would be encoded there, then it
> might complicate things either downstream or inside the sugar part to put
> shard in and then take it out later.
> From the user perspective I do not see much difference. We would still
> need to allow API to define both behaviors and it would only be executed
> differently by implementation.
> I do not see a value in changing FileIO (WriteFiles) logic to stop using
> sharding and use dynamic destination for both given that sharding function
> is already there and in use.
>
> To the point of external shuffle and non-deterministic user input.
> Yes users can create non-deterministic behaviors but they are in control.
> Here, Beam internally adds non-deterministic behavior and users can not
> opt-out.
> All works fine as long as external shuffle service (depends on Runner)
> holds to the data and hands it out on retries. However if data in shuffle
> service is lost for some reason - e.g. disk failure, node breaks down -
> then pipeline have to recover the data by recomputing necessary paths.
>
> On Thu, Jun 17, 2021 at 7:36 PM Robert Bradshaw 
> wrote:
>
>> Sharding is typically a physical rather than logical property of the
>> pipeline, and I'm not convinced it makes sense to add it to Beam in
>> general. One can already use keys and GBK/Stateful DoFns if some kind
>> of logical grouping is needed, and adding constraints like this can
>> prevent opportunities for optimizations (like dynamic sharding and
>> fusion).
>>
>> That being said, file output are one area where it could make sense. I
>> would expect that dynamic destinations could cover this usecase, and a
>> general FileNaming subclass could be provided to make this pattern
>> easier (and possibly some syntactic sugar for auto-setting num shards
>> to 0). (One downside of this approach is that one couldn't do dynamic
>> destinations, and have each sharded with a distinct sharing function
>> as well.)
>>
>> If this doesn't work, we could look into adding ShardingFunction as a
>> publicly exposed parameter to FileIO. (I'm actually surprised it
>> already exists.)
>>
>> On Thu, Jun 17, 2021 at 9:39 AM  wrote:
>> >
>> > Alright, but what is worth emphasizing is that we talk about batch
>> workloads. The typical scenario is that the output is committed once the
>> job finishes (e.g., by atomic rename of directory).
>> >  Jan
>> >
>> > Dne 17. 6. 2021 17:59 napsal uživatel Reuven Lax :
>> >
>> > Yes - the problem is that Beam makes no guarantees of determinism
>> anywhere in the system. User DoFns might be non deterministic, and have no
>> way to know (we've discussed proving users with an @IsDeterministic
>> annotation, however empirically users often think their functions are
>> deterministic when they are in fact not). _Any_ sort of triggered
>> aggregation (including watermark based) can always be non deterministic.
>> >
>> > Even if everything was deterministic, replaying everything is tricky.
>> The output files already exist - should the system delete them and recreate
>> them, or leave them as is and delete the temp files? Either decision could
>> be problematic.
>> >
>> > On Wed, Jun 16, 2021 at 11:40 PM Jan Lukavský  wrote:
>> >
>> > Correct, by the external shuffle service I pretty much meant
>> "offloading the contents of a shuffle phase out of the system". Looks like
>> that is what the Spark's checkpoint does as well. On the other hand (if I
>> understand the concept correctly), that implies some performance penalty -
>> the data has to be moved to external distributed filesystem. Which then
>> feels weird if we optimize code to avoid computing random numbers, but are
>> okay with moving complete datasets back and forth. I think in this
>> particular case making the Pipeline determinis

Re: FileIO with custom sharding function

2021-06-18 Thread Jozef Vilcek
" sharding - e.g. hashCode based (though Java makes no
> guarantee that hashCode is stable across JVM instances, so this technique
> ends up not being stable) doesn't really help matters in Beam. Unlike other
> systems, Beam makes no assumptions that transforms are idempotent or
> deterministic. What's more, _any_ transform that has any sort of triggered
> grouping (whether the trigger used is watermark based or otherwise) is non
> deterministic.
> >
> > Forcing a hash of every element imposed quite a CPU cost; even
> generating a random number per-element slowed things down too much, which
> is why the current code generates a random number only in startBundle.
> >
> > Any runner that does not implement RequiresStableInput will not properly
> execute FileIO. Dataflow and Flink both support this. I believe that the
> Spark runner implicitly supports it by manually calling checkpoint as Ken
> mentioned (unless someone removed that from the Spark runner, but if so
> that would be a correctness regression). Implementing this has nothing to
> do with external shuffle services - neither Flink, Spark, nor Dataflow
> appliance (classic shuffle) have any problem correctly implementing
> RequiresStableInput.
> >
> > On Wed, Jun 16, 2021 at 11:18 AM Jan Lukavský  wrote:
> >
> > I think that the support for @RequiresStableInput is rather limited.
> AFAIK it is supported by streaming Flink (where it is not needed in this
> situation) and by Dataflow. Batch runners without external shuffle service
> that works as in the case of Dataflow have IMO no way to implement it
> correctly. In the case of FileIO (which do not use @RequiresStableInput as
> it would not be supported on Spark) the randomness is easily avoidable
> (hashCode of key?) and I it seems to me it should be preferred.
> >
> >  Jan
> >
> > On 6/16/21 6:23 PM, Kenneth Knowles wrote:
> >
> >
> > On Wed, Jun 16, 2021 at 5:18 AM Jan Lukavský  wrote:
> >
> > Hi,
> >
> > maybe a little unrelated, but I think we definitely should not use
> random assignment of shard keys (RandomShardingFunction), at least for
> bounded workloads (seems to be fine for streaming workloads). Many batch
> runners simply recompute path in the computation DAG from the failed node
> (transform) to the root (source). In the case there is any non-determinism
> involved in the logic, then it can result in duplicates (as the 'previous'
> attempt might have ended in DAG path that was not affected by the fail).
> That addresses the option 2) of what Jozef have mentioned.
> >
> > This is the reason we introduced "@RequiresStableInput".
> >
> > When things were only Dataflow, we knew that each shuffle was a
> checkpoint, so we inserted a Reshuffle after the random numbers were
> generated, freezing them so it was safe for replay. Since other engines do
> not checkpoint at every shuffle, we needed a way to communicate that this
> checkpointing was required for correctness. I think we still have many IOs
> that are written using Reshuffle instead of @RequiresStableInput, and I
> don't know which runners process @RequiresStableInput properly.
> >
> > By the way, I believe the SparkRunner explicitly calls materialize()
> after a GBK specifically so that it gets correct results for IOs that rely
> on Reshuffle. Has that changed?
> >
> > I agree that we should minimize use of RequiresStableInput. It has a
> significant cost, and the cost varies across runners. If we can use a
> deterministic function, we should.
> >
> > Kenn
> >
> >
> >  Jan
> >
> > On 6/16/21 1:43 PM, Jozef Vilcek wrote:
> >
> >
> >
> > On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles  wrote:
> >
> > In general, Beam only deals with keys and grouping by key. I think
> expanding this idea to some more abstract notion of a sharding function
> could make sense.
> >
> > For FileIO specifically, I wonder if you can use writeDynamic() to get
> the behavior you are seeking.
> >
> >
> > The change in mind looks like this:
> >
> https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18
> >
> > Dynamic Destinations in my mind is more towards the need for
> "partitioning" data (destination as directory level) or if one needs to
> handle groups of events differently, e.g. write some events in FormatA and
> others in FormatB.
> > Shards are now used for distributing writes or bucketing of events
> within a particular destination group. More specifically, currently, each
> element is assigned `ShardedKey` [1] before GBK operation. Sharded
> key is a compound of destination and assigned

Re: FileIO with custom sharding function

2021-06-16 Thread Jozef Vilcek
On Wed, Jun 16, 2021 at 1:38 AM Kenneth Knowles  wrote:

> In general, Beam only deals with keys and grouping by key. I think
> expanding this idea to some more abstract notion of a sharding function
> could make sense.
>
> For FileIO specifically, I wonder if you can use writeDynamic() to get the
> behavior you are seeking.
>

The change in mind looks like this:
https://github.com/JozoVilcek/beam/commit/9c5a7fe35388f06f72972ec4c1846f1dbe85eb18

Dynamic Destinations in my mind is more towards the need for "partitioning"
data (destination as directory level) or if one needs to handle groups of
events differently, e.g. write some events in FormatA and others in FormatB.
Shards are now used for distributing writes or bucketing of events within a
particular destination group. More specifically, currently, each element is
assigned `ShardedKey` [1] before GBK operation. Sharded key is a
compound of destination and assigned shard.

Having said that, I might be able to use dynamic destination for this,
possibly with the need of custom FileNaming, and set shards to be always 1.
But it feels less natural than allowing the user to swap already present
`RandomShardingFunction` [2] with something of his own choosing.


[1]
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java

[2]
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856

Kenn
>
> On Tue, Jun 15, 2021 at 3:49 PM Tyson Hamilton  wrote:
>
>> Adding sharding to the model may require a wider discussion than FileIO
>> alone. I'm not entirely sure how wide, or if this has been proposed before,
>> but IMO it warrants a design doc or proposal.
>>
>> A couple high level questions I can think of are,
>>   - What runners support sharding?
>>   * There will be some work in Dataflow required to support this but
>> I'm not sure how much.
>>   - What does sharding mean for streaming pipelines?
>>
>> A more nitty-detail question:
>>   - How can this be achieved performantly? For example, if a shuffle is
>> required to achieve a particular sharding constraint, should we
>> allow transforms to declare they don't modify the sharding property (e.g.
>> key preserving) which may allow a runner to avoid an additional shuffle if
>> a preceding shuffle can guarantee the sharding requirements?
>>
>> Where X is the shuffle that could be avoided: input -> shuffle (key
>> sharding fn A) -> transform1 (key preserving) -> transform 2 (key
>> preserving) -> X -> fileio (key sharding fn A)
>>
>> On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek 
>> wrote:
>>
>>> I would like to extend FileIO with possibility to specify a custom
>>> sharding function:
>>> https://issues.apache.org/jira/browse/BEAM-12493
>>>
>>> I have 2 use-cases for this:
>>>
>>>1. I need to generate shards which are compatible with Hive
>>>bucketing and therefore need to decide shard assignment based on data
>>>fields of input element
>>>2. When running e.g. on Spark and job encounters kind of failure
>>>which cause a loss of some data from previous stages, Spark does issue
>>>recompute of necessary task in necessary stages to recover data. Because
>>>the shard assignment function is random as default, some data will end up
>>>in different shards and cause duplicates in the final output.
>>>
>>> Please let me know your thoughts in case you see a reason to not to add
>>> such improvement.
>>>
>>> Thanks,
>>> Jozef
>>>
>>


Re: FileIO with custom sharding function

2021-06-16 Thread Jozef Vilcek
On Wed, Jun 16, 2021 at 12:49 AM Tyson Hamilton  wrote:

> Adding sharding to the model may require a wider discussion than FileIO
> alone. I'm not entirely sure how wide, or if this has been proposed before,
> but IMO it warrants a design doc or proposal.
>

I should have been more clear about the intent. I am not trying to add it
to a general model. Only upgrade possibility to control sharding loginc
in FileIO (java) as sharding already is there.
You can call `.withNumShards(...)` on FileIO in which case this will be
used:
https://github.com/apache/beam/blob/release-2.29.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L856


> A couple high level questions I can think of are,
>   - What runners support sharding?
>   * There will be some work in Dataflow required to support this but
> I'm not sure how much.
>   - What does sharding mean for streaming pipelines?
>
> A more nitty-detail question:
>   - How can this be achieved performantly? For example, if a shuffle is
> required to achieve a particular sharding constraint, should we
> allow transforms to declare they don't modify the sharding property (e.g.
> key preserving) which may allow a runner to avoid an additional shuffle if
> a preceding shuffle can guarantee the sharding requirements?
>
> Where X is the shuffle that could be avoided: input -> shuffle (key
> sharding fn A) -> transform1 (key preserving) -> transform 2 (key
> preserving) -> X -> fileio (key sharding fn A)
>
> On Tue, Jun 15, 2021 at 1:02 AM Jozef Vilcek 
> wrote:
>
>> I would like to extend FileIO with possibility to specify a custom
>> sharding function:
>> https://issues.apache.org/jira/browse/BEAM-12493
>>
>> I have 2 use-cases for this:
>>
>>1. I need to generate shards which are compatible with Hive bucketing
>>and therefore need to decide shard assignment based on data fields of 
>> input
>>element
>>2. When running e.g. on Spark and job encounters kind of failure
>>which cause a loss of some data from previous stages, Spark does issue
>>recompute of necessary task in necessary stages to recover data. Because
>>the shard assignment function is random as default, some data will end up
>>in different shards and cause duplicates in the final output.
>>
>> Please let me know your thoughts in case you see a reason to not to add
>> such improvement.
>>
>> Thanks,
>> Jozef
>>
>


FileIO with custom sharding function

2021-06-15 Thread Jozef Vilcek
I would like to extend FileIO with possibility to specify a custom sharding
function:
https://issues.apache.org/jira/browse/BEAM-12493

I have 2 use-cases for this:

   1. I need to generate shards which are compatible with Hive bucketing
   and therefore need to decide shard assignment based on data fields of input
   element
   2. When running e.g. on Spark and job encounters kind of failure which
   cause a loss of some data from previous stages, Spark does issue recompute
   of necessary task in necessary stages to recover data. Because the shard
   assignment function is random as default, some data will end up in
   different shards and cause duplicates in the final output.

Please let me know your thoughts in case you see a reason to not to add
such improvement.

Thanks,
Jozef


Re: RequiresStableInput on Spark runner

2020-07-08 Thread Jozef Vilcek
Would it then be safe to enable the same behavior for Spark batch? I can
create a JIRA and patch for this, if there is no other reason to not to
do so

On Wed, Jul 8, 2020 at 11:51 AM Maximilian Michels  wrote:

> Correct, for batch we rely on re-running the entire job which will
> produce stable input within each run.
>
> For streaming, the Flink Runner buffers all input to a
> @RequiresStableInput DoFn until a checkpoint is complete, only then it
> processes the buffered data. Dataflow effectively does the same by going
> through the Shuffle service which produces a consistent result.
>
> -Max
>
> On 08.07.20 11:08, Jozef Vilcek wrote:
> > My last question was more towards the graph translation for batch mode.
> >
> > Should DoFn with @RequiresStableInput be translated/expanded in some
> > specific way (e.g. DoFn -> Reshuffle + DoFn) or is it not needed for
> batch?
> > Most runners fail in the presence of @RequiresStableInput for both batch
> > and streaming. I can not find a fail for Flink and Dataflow, but at the
> > same time, I can not find what those runners do with such DoFn.
> >
> > On Tue, Jul 7, 2020 at 9:18 PM Kenneth Knowles  > <mailto:k...@apache.org>> wrote:
> >
> > I hope someone who knows better than me can respond.
> >
> > A long time ago, the SparkRunner added a call to materialize() at
> > every GroupByKey. This was to mimic Dataflow, since so many of the
> > initial IO transforms relied on using shuffle to create stable
> inputs.
> >
> > The overall goal is to be able to remove these extra calls to
> > materialize() and only include them when @RequiresStableInput.
> >
> > The intermediate state is to analyze whether input is already stable
> > from materialize() and add another materialize() only if it is not
> >     stable.
> >
> > I don't know the current state of the SparkRunner. This may already
> > have changed.
> >
> > Kenn
> >
> > On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek  > <mailto:jozo.vil...@gmail.com>> wrote:
> >
> > I was trying to look for references on how other runners handle
> > @RequiresStableInput for batch cases, however I was not able to
> > find any.
> > In Flink I can see added support for streaming case and in
> > Dataflow I see that support for the feature was turned off
> > https://github.com/apache/beam/pull/8065
> >
> > It seems to me that @RequiresStableInput is ignored for the
> > batch case and the runner relies on being able to recompute the
> > whole job in the worst case scenario.
> > Is this assumption correct?
> > Could I just change SparkRunner to crash on @RequiresStableInput
> > annotation for streaming mode and ignore it in batch?
> >
> >
> >
> > On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek
> > mailto:jozo.vil...@gmail.com>> wrote:
> >
> > We have a component which we use in streaming and batch
> > jobs. Streaming we run on FlinkRunner and batch on
> > SparkRunner. Recently we needed to add @RequiresStableInput
> > to taht component because of streaming use-case. But now
> > batch case crash on SparkRunner with
> >
> > Caused by: java.lang.UnsupportedOperationException: Spark
> runner currently doesn't support @RequiresStableInput annotation.
> >   at
> org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58)
> >   at
> org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556)
> >   at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292)
> >   at
> org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
> >   at
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168)
> >   at
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90)
> >   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> >   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
> >   at
> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42)
> >   at
> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35)
> >   at scala.util.Try$.apply(Try.scala:192)
> >   at
> com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35)
> >
> >
> > We are using Beam 2.19.0. Is the @RequiresStableInput
> > problematic to support for both streaming and batch
> > use-case? What are the options here?
> > https://issues.apache.org/jira/browse/BEAM-5358
> >
>


Re: RequiresStableInput on Spark runner

2020-07-08 Thread Jozef Vilcek
My last question was more towards the graph translation for batch mode.

Should DoFn with @RequiresStableInput be translated/expanded in some
specific way (e.g. DoFn -> Reshuffle + DoFn) or is it not needed for batch?
Most runners fail in the presence of @RequiresStableInput for both batch
and streaming. I can not find a fail for Flink and Dataflow, but at the
same time, I can not find what those runners do with such DoFn.

On Tue, Jul 7, 2020 at 9:18 PM Kenneth Knowles  wrote:

> I hope someone who knows better than me can respond.
>
> A long time ago, the SparkRunner added a call to materialize() at every
> GroupByKey. This was to mimic Dataflow, since so many of the initial IO
> transforms relied on using shuffle to create stable inputs.
>
> The overall goal is to be able to remove these extra calls to
> materialize() and only include them when @RequiresStableInput.
>
> The intermediate state is to analyze whether input is already stable from
> materialize() and add another materialize() only if it is not stable.
>
> I don't know the current state of the SparkRunner. This may already have
> changed.
>
> Kenn
>
> On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek 
> wrote:
>
>> I was trying to look for references on how other runners handle
>> @RequiresStableInput for batch cases, however I was not able to find any.
>> In Flink I can see added support for streaming case and in Dataflow I see
>> that support for the feature was turned off
>> https://github.com/apache/beam/pull/8065
>>
>> It seems to me that @RequiresStableInput is ignored for the batch case
>> and the runner relies on being able to recompute the whole job in the worst
>> case scenario.
>> Is this assumption correct?
>> Could I just change SparkRunner to crash on @RequiresStableInput
>> annotation for streaming mode and ignore it in batch?
>>
>>
>>
>> On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek 
>> wrote:
>>
>>> We have a component which we use in streaming and batch jobs.
>>> Streaming we run on FlinkRunner and batch on SparkRunner. Recently we
>>> needed to add @RequiresStableInput to taht component because of streaming
>>> use-case. But now batch case crash on SparkRunner with
>>>
>>> Caused by: java.lang.UnsupportedOperationException: Spark runner currently 
>>> doesn't support @RequiresStableInput annotation.
>>> at 
>>> org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58)
>>> at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556)
>>> at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292)
>>> at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
>>> at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168)
>>> at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
>>> at 
>>> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42)
>>> at 
>>> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35)
>>> at scala.util.Try$.apply(Try.scala:192)
>>> at 
>>> com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35)
>>>
>>>
>>> We are using Beam 2.19.0. Is the @RequiresStableInput problematic to
>>> support for both streaming and batch use-case? What are the options here?
>>> https://issues.apache.org/jira/browse/BEAM-5358
>>>
>>>


Re: RequiresStableInput on Spark runner

2020-07-02 Thread Jozef Vilcek
I was trying to look for references on how other runners handle
@RequiresStableInput for batch cases, however I was not able to find any.
In Flink I can see added support for streaming case and in Dataflow I see
that support for the feature was turned off
https://github.com/apache/beam/pull/8065

It seems to me that @RequiresStableInput is ignored for the batch case and
the runner relies on being able to recompute the whole job in the worst
case scenario.
Is this assumption correct?
Could I just change SparkRunner to crash on @RequiresStableInput annotation
for streaming mode and ignore it in batch?



On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek  wrote:

> We have a component which we use in streaming and batch jobs. Streaming we
> run on FlinkRunner and batch on SparkRunner. Recently we needed to
> add @RequiresStableInput to taht component because of streaming use-case.
> But now batch case crash on SparkRunner with
>
> Caused by: java.lang.UnsupportedOperationException: Spark runner currently 
> doesn't support @RequiresStableInput annotation.
>   at 
> org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58)
>   at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556)
>   at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292)
>   at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168)
>   at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
>   at 
> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42)
>   at 
> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35)
>   at scala.util.Try$.apply(Try.scala:192)
>   at 
> com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35)
>
>
> We are using Beam 2.19.0. Is the @RequiresStableInput problematic to
> support for both streaming and batch use-case? What are the options here?
> https://issues.apache.org/jira/browse/BEAM-5358
>
>


RequiresStableInput on Spark runner

2020-07-01 Thread Jozef Vilcek
We have a component which we use in streaming and batch jobs. Streaming we
run on FlinkRunner and batch on SparkRunner. Recently we needed to
add @RequiresStableInput to taht component because of streaming use-case.
But now batch case crash on SparkRunner with

Caused by: java.lang.UnsupportedOperationException: Spark runner
currently doesn't support @RequiresStableInput annotation.
at 
org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58)
at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
at 
com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42)
at 
com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35)
at scala.util.Try$.apply(Try.scala:192)
at 
com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35)


We are using Beam 2.19.0. Is the @RequiresStableInput problematic to
support for both streaming and batch use-case? What are the options here?
https://issues.apache.org/jira/browse/BEAM-5358


Re: TextIO. Writing late files

2020-05-15 Thread Jozef Vilcek
-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
>> window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far
>> behind inputWatermark:2020-05-12T14:05:19.799Z;
>> outputWatermark:2020-05-12T14:05:19.799Z`
>>
>> What happen here? I think that messages are generated per second and a
>> window of 5 seconds group them. Then a delay is added and finally data are
>> written in a file.
>> The pipeline reads more data, increasing the watermark.
>> Then, file names are emitted without pane information (see "Emitted File"
>> in logs). Window in second distinct compares file names' timestamp and the
>> pipeline watermark and then it discards file names as late.
>>
>>
>> Bonus
>> -
>> You can add a lateness to the pipeline. See
>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness
>>
>> If a minute is added a lateness for window the file names are processed
>> as late. As result the traces of LateDataFilter disappear.
>>
>> Moreover, in order to illustrate better that file names are emitted as
>> late for the second discarded I added a second TextIO to write file names
>> in other files.
>> Same FileNamePolicy than before was used (window + timing + shards).
>> Then, you can find files that contains the original filenames in
>> windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the
>> interesting part, because you will find several files with LATE in their
>> names.
>>
>> Please, let me know if you need more information or if the example is not
>> enough to check the expected scenarios.
>>
>> Kby.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> El dom., 10 may. 2020 a las 17:04, Reuven Lax ()
>> escribió:
>>
>>> Pane info is supposed to be preserved across transforms. If the Fink
>>> runner does not, than I believe that is a bug.
>>>
>>> On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek 
>>> wrote:
>>>
>>>> I am using FileIO and I do observe the drop of pane info information on
>>>> Flink runner too. It was mentioned in this thread:
>>>> https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
>>>>
>>>> It is a result of different reshuffle expansion for optimisation
>>>> reasons. However, I did not observe a data loss in my case. Windowing and
>>>> watermark info should be preserved. Pane info is not, which brings a
>>>> question how reliable pane info should be in terms of SDK and runner.
>>>>
>>>> If you do observe a data loss, it would be great to share a test case
>>>> which replicates the problem.
>>>>
>>>> On Sun, May 10, 2020 at 8:03 AM Reuven Lax  wrote:
>>>>
>>>>> Ah, I think I see the problem.
>>>>>
>>>>> It appears that for some reason, the Flink runner loses windowing
>>>>> information when a Reshuffle is applied. I'm not entirely sure why, 
>>>>> because
>>>>> windowing information should be maintained across a Reshuffle.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel 
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have added some logs to the pipeline as following (you can find the
>>>>>> log function in the Appendix):
>>>>>>
>>>>>> //STREAM + processing time.
>>>>>> pipeline.apply(KafkaIO.read())
>>>>>>.apply(...) //mappers, a window and a combine
>>>>>>.apply(logBeforeWrite())
>>>>>>
>>>>>>.apply("WriteFiles",
>>>>>> TextIO.writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>>>.getPerDestinationOutputFilenames()
>>>>>>
>>>>>>.apply(logAfterWrite())
>>>>>>.apply("CombineFileNames", Combine.perKey(...))
>>>>>>
>>>>>> I have run the pipeline using DirectRunner (local), SparkRunner and
>>>>>> FlinkRunner, both of them using a cluster.
>>>>>> Below you can see the timing and pane information before/after (you
>>>>>> can see traces in detail with window and timestamp information in the
>>>>>> Appendix).

Re: TextIO. Writing late files

2020-05-10 Thread Jozef Vilcek
I am using FileIO and I do observe the drop of pane info information on
Flink runner too. It was mentioned in this thread:
https://www.mail-archive.com/dev@beam.apache.org/msg20186.html

It is a result of different reshuffle expansion for optimisation reasons.
However, I did not observe a data loss in my case. Windowing and watermark
info should be preserved. Pane info is not, which brings a question how
reliable pane info should be in terms of SDK and runner.

If you do observe a data loss, it would be great to share a test case
which replicates the problem.

On Sun, May 10, 2020 at 8:03 AM Reuven Lax  wrote:

> Ah, I think I see the problem.
>
> It appears that for some reason, the Flink runner loses windowing
> information when a Reshuffle is applied. I'm not entirely sure why, because
> windowing information should be maintained across a Reshuffle.
>
> Reuven
>
> On Sat, May 9, 2020 at 9:50 AM Jose Manuel  wrote:
>
>>
>> Hi,
>>
>> I have added some logs to the pipeline as following (you can find the log
>> function in the Appendix):
>>
>> //STREAM + processing time.
>> pipeline.apply(KafkaIO.read())
>>.apply(...) //mappers, a window and a combine
>>.apply(logBeforeWrite())
>>
>>.apply("WriteFiles",
>> TextIO.writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>.getPerDestinationOutputFilenames()
>>
>>.apply(logAfterWrite())
>>.apply("CombineFileNames", Combine.perKey(...))
>>
>> I have run the pipeline using DirectRunner (local), SparkRunner and
>> FlinkRunner, both of them using a cluster.
>> Below you can see the timing and pane information before/after (you can
>> see traces in detail with window and timestamp information in the Appendix).
>>
>> DirectRunner:
>> [Before Write] timing=ON_TIME,  pane=PaneInfo{isFirst=true, isLast=true,
>> timing=ON_TIME, index=0, onTimeIndex=0}
>> [AfterWrite] timing=EARLY,   pane=PaneInfo{isFirst=true,
>> timing=EARLY, index=0}
>>
>> FlinkRunner:
>> [Before Write] timing=ON_TIME,pane=PaneInfo{isFirst=true,
>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>> [AfterWrite] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>
>> SparkRunner:
>> [Before Write] timing=ON_TIME,pane=PaneInfo{isFirst=true,
>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>> [AfterWrite] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>
>> It seems DirectRunner propagates the windowing information as expected.
>> I am not sure if TextIO really propagates or it just emits a window pane,
>> because the timing before TextIO is ON_TIME and after TextIO is EARLY.
>> In any case using FlinkRunner and SparkRunner the timing and the pane are
>> not set.
>>
>> I thought the problem was in GatherBundlesPerWindowFn, but now, after
>> seeing that the DirectRunner filled windowing data... I am not sure.
>>
>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
>>
>>
>> Appendix
>> ---
>> Here you can see the log function and traces for different runners in
>> detail.
>>
>> private SingleOutput logBefore() {
>> return ParDo.of(new DoFn() {
>> @ProcessElement
>> public void processElement(ProcessContext context, BoundedWindow
>> boundedWindow) {
>> String value = context.element();
>> log.info("[Before Write] Element=data window={},
>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}",
>> boundedWindow,
>> context.timestamp(),
>> context.pane().getTiming(),
>> context.pane().getIndex(),
>> context.pane().isFirst(),
>> context.pane().isLast(),
>> context.pane()
>> );
>> context.output(context.element());
>> }
>> });
>> }
>>
>> logAfter function shows the same information.
>>
>> Traces in details.
>>
>> DirectRunner (local):
>> [Before Write] Element=data
>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst
>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>> timing=ON_TIME, index=0, onTimeIndex=0}
>> [After  Write] Element=file
>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY,   index =0, isFirst
>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
>>
>>
>> FlinkRunner (cluster):
>> [Before Write] Element=data
>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst
>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>> timing=ON_TIME, index=0, onTimeIndex=0}
>> [After  Write] Element=file
>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, 

Re: Flink: Lost pane timing at some steps of pipeline

2020-05-05 Thread Jozef Vilcek
Then maybe the better question is, what is the behaviour / guarantee of
propagating PaneInfo between steps of the pipeline.

If I do write files like this:

PCollection> destFileNames =
windowedData.apply(FileIO.write() ...).getPerDestinationOutputFilenames

Then even if data written to files are windowed and materialised files are
associated to certain triggered panes, the `destFileNames` pcollection does
not necessarily carry such information. It is runner depended behaviour. In
older versions of Beam pane info was propagated. The reason is that
internally, WriteFiles does use Reshuffle (and many other parts of Beam
does too).
Now is this expected with respect to model and API? How does actually
paneInfo "get lost" in case of doing flink rebalance?


On Tue, May 5, 2020 at 7:39 AM David Morávek 
wrote:

> Hi Jozef, I think this is expected beahior as Flink does not use default
> expansion for Reshuffle (uses round-robin rebalance ship strategy instead).
> There is no aggregation that needs buffering (and triggering). All of the
> elements are immediately emmited to downstream operations after the
> Reshuffle.
>
> In case of direct runner, this is just a side-effect of Reshuffle
> expansion. See
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L69
> for more details.
>
> I don't think we should expect Reshuffle to have the same semantics as
> GBK, because it's only an performance optimization steps, that should not
> have any effect to pipeline's overall result. Some runners may also
> completely ignore this step as part of execution plan optimization process
> (eg. two reshuffles in a row are idempotent). (
> https://issues.apache.org/jira/browse/BEAM-9824)
>
> D.
>
> On Mon, May 4, 2020 at 2:48 PM Jozef Vilcek  wrote:
>
>> I have a pipeline which
>>
>> 1. Read from KafkaIO
>> 2. Does stuff with events and writes windowed file via FileIO
>> 3. Apply statefull DoFn on written files info
>>
>> The statefull DoFn does some logic which depends on PaneInfo.Timing, if
>> it is EARLY or something else. When testing in DirectRunner, all is good.
>> But with FlinkRunner, panes are always NO_FIRING.
>>
>> To demonstrate this, here is a dummy test pipeline:
>>
>> val testStream = sc.testStream(testStreamOf[String]
>>   .advanceWatermarkTo(new Instant(1))
>>   .addElements(goodMessage, goodMessage)
>>   .advanceWatermarkTo(new Instant(2))
>>   .addElements(goodMessage, goodMessage)
>>   .advanceWatermarkTo(new Instant(200))
>>   .addElements(goodMessage, goodMessage)
>>   .advanceWatermarkToInfinity())
>>
>> testStream
>>   .withFixedWindows(
>> duration = Duration.standardSeconds(1),
>> options = WindowOptions(
>>   trigger = AfterWatermark.pastEndOfWindow()
>> .withEarlyFirings(AfterPane.elementCountAtLeast(1))
>> .withLateFirings(AfterPane.elementCountAtLeast(1)),
>>   accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
>>   allowedLateness = Duration.standardDays(1)
>> ))
>>   .keyBy(_ => "static_key")
>>   .withPaneInfo
>>   .map { case (element, paneInfo) =>
>> println(s"#1 - $paneInfo")
>> element
>>   }
>>   //.groupByKey // <- need to uncomment this for Flink to work
>>   .applyTransform(Reshuffle.viaRandomKey())
>>   .withPaneInfo
>>   .map { case (element, paneInfo) =>
>> println(s"#2 - $paneInfo")
>> element
>>   }
>>
>> When executed with DirectRunner, #1 prints pane with UNKNOWN timing and
>> #2 with EARLY, which is what I expect. When run with Flink runner, both #1
>> and #2 writes UNKNOWN timing from PaneInfo.NO_FIRING. Only if I add extra
>> GBK, then #2 writes panes with EARLY timing.
>>
>> This is run on Beam 2.19. I was trying to analyze where could be a
>> problem but got lost. I will be happy for any suggestions or pointers. Does
>> it sounds like bug or am I doing something wrong?
>>
>


Flink: Lost pane timing at some steps of pipeline

2020-05-04 Thread Jozef Vilcek
I have a pipeline which

1. Read from KafkaIO
2. Does stuff with events and writes windowed file via FileIO
3. Apply statefull DoFn on written files info

The statefull DoFn does some logic which depends on PaneInfo.Timing, if it
is EARLY or something else. When testing in DirectRunner, all is good. But
with FlinkRunner, panes are always NO_FIRING.

To demonstrate this, here is a dummy test pipeline:

val testStream = sc.testStream(testStreamOf[String]
  .advanceWatermarkTo(new Instant(1))
  .addElements(goodMessage, goodMessage)
  .advanceWatermarkTo(new Instant(2))
  .addElements(goodMessage, goodMessage)
  .advanceWatermarkTo(new Instant(200))
  .addElements(goodMessage, goodMessage)
  .advanceWatermarkToInfinity())

testStream
  .withFixedWindows(
duration = Duration.standardSeconds(1),
options = WindowOptions(
  trigger = AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(1))
.withLateFirings(AfterPane.elementCountAtLeast(1)),
  accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
  allowedLateness = Duration.standardDays(1)
))
  .keyBy(_ => "static_key")
  .withPaneInfo
  .map { case (element, paneInfo) =>
println(s"#1 - $paneInfo")
element
  }
  //.groupByKey // <- need to uncomment this for Flink to work
  .applyTransform(Reshuffle.viaRandomKey())
  .withPaneInfo
  .map { case (element, paneInfo) =>
println(s"#2 - $paneInfo")
element
  }

When executed with DirectRunner, #1 prints pane with UNKNOWN timing and #2
with EARLY, which is what I expect. When run with Flink runner, both #1 and
#2 writes UNKNOWN timing from PaneInfo.NO_FIRING. Only if I add extra GBK,
then #2 writes panes with EARLY timing.

This is run on Beam 2.19. I was trying to analyze where could be a problem
but got lost. I will be happy for any suggestions or pointers. Does it
sounds like bug or am I doing something wrong?


Re: KafkaIO: Configurable timeout for setupInitialOffset()

2020-03-02 Thread Jozef Vilcek
Thanks Ismael!

On Mon, Mar 2, 2020 at 2:15 PM Ismaël Mejía  wrote:

> Done, also assigned the issue you mentioned in the previous email to you.
>
> On Mon, Mar 2, 2020 at 12:56 PM Jozef Vilcek 
> wrote:
>
>> Recently I had a problem with Beam pipeline unable to start due to
>> unhealthy broker in the list of configured bootstrap servers. I have
>> created a JIRA for it and plan to work on the fix.
>>
>> https://issues.apache.org/jira/browse/BEAM-9420
>>
>> Please let me know in case it does not make sense of should be addressed
>> somehow else.
>>
>> Thanks,
>> Jozef
>>
>


Permission to self-assign JIRAs

2020-03-02 Thread Jozef Vilcek
Can I please get a permission in JIRA for `jvilcek` user to self assign
JIRAs?


KafkaIO: Configurable timeout for setupInitialOffset()

2020-03-02 Thread Jozef Vilcek
Recently I had a problem with Beam pipeline unable to start due to
unhealthy broker in the list of configured bootstrap servers. I have
created a JIRA for it and plan to work on the fix.

https://issues.apache.org/jira/browse/BEAM-9420

Please let me know in case it does not make sense of should be addressed
somehow else.

Thanks,
Jozef


Re: Schema Convert transform fails on type metadata

2020-02-22 Thread Jozef Vilcek
 Will do! Thanks Alex.

On Sat, Feb 22, 2020 at 9:23 AM Alex Van Boxel  wrote:

> I've assigned it too you. If you create a PR you can add me as reviewer.
>
>  _/
> _/ Alex Van Boxel
>
>
> On Sat, Feb 22, 2020 at 9:14 AM Jozef Vilcek 
> wrote:
>
>> I have filed https://issues.apache.org/jira/browse/BEAM-9360 for this.
>>
>> Seems like I am not able to self assign this, can I get rights to do that
>> or get someone assign JIRA to me please?
>>
>> On Sat, Feb 22, 2020 at 2:41 AM Reuven Lax  wrote:
>>
>>> I think that it's incorrect - we shouldn't enforce that the metadata
>>> match.
>>>
>>> On Fri, Feb 21, 2020 at 10:38 AM Jozef Vilcek 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am playing with Schemas in Beam and conversion between types. I am
>>>> experimenting with Convert transform to convert PCollection of POJOs to
>>>> Protobufs, but get a failure about schemas not being compatible.
>>>>
>>>> The root cause is that FieldType are not passing `equivalent()` check
>>>> because of difference in metadata as protobuf add there a proto_number
>>>>
>>>> https://github.com/apache/beam/blob/v2.19.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L818
>>>>
>>>> Is it correct that metadata should be strictly equal to pass field type
>>>> equivalence check, or not?
>>>>
>>>> Thanks,
>>>> Jozef
>>>>
>>>


Re: Schema Convert transform fails on type metadata

2020-02-22 Thread Jozef Vilcek
I have filed https://issues.apache.org/jira/browse/BEAM-9360 for this.

Seems like I am not able to self assign this, can I get rights to do that
or get someone assign JIRA to me please?

On Sat, Feb 22, 2020 at 2:41 AM Reuven Lax  wrote:

> I think that it's incorrect - we shouldn't enforce that the metadata match.
>
> On Fri, Feb 21, 2020 at 10:38 AM Jozef Vilcek 
> wrote:
>
>> Hi,
>>
>> I am playing with Schemas in Beam and conversion between types. I am
>> experimenting with Convert transform to convert PCollection of POJOs to
>> Protobufs, but get a failure about schemas not being compatible.
>>
>> The root cause is that FieldType are not passing `equivalent()` check
>> because of difference in metadata as protobuf add there a proto_number
>>
>> https://github.com/apache/beam/blob/v2.19.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L818
>>
>> Is it correct that metadata should be strictly equal to pass field type
>> equivalence check, or not?
>>
>> Thanks,
>> Jozef
>>
>


Schema Convert transform fails on type metadata

2020-02-21 Thread Jozef Vilcek
Hi,

I am playing with Schemas in Beam and conversion between types. I am
experimenting with Convert transform to convert PCollection of POJOs to
Protobufs, but get a failure about schemas not being compatible.

The root cause is that FieldType are not passing `equivalent()` check
because of difference in metadata as protobuf add there a proto_number
https://github.com/apache/beam/blob/v2.19.0/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java#L818

Is it correct that metadata should be strictly equal to pass field type
equivalence check, or not?

Thanks,
Jozef


Re: [VOTE] Beam's Mascot will be the Firefly (Lampyridae)

2019-12-12 Thread Jozef Vilcek
+1

On Fri, Dec 13, 2019 at 5:58 AM Kenneth Knowles  wrote:

> Please vote on the proposal for Beam's mascot to be the Firefly. This
> encompasses the Lampyridae family of insects, without specifying a genus or
> species.
>
> [ ] +1, Approve Firefly being the mascot
> [ ] -1, Disapprove Firefly being the mascot
>
> The vote will be open for at least 72 hours excluding weekends. It is
> adopted by at least 3 PMC +1 approval votes, with no PMC -1 disapproval
> votes*. Non-PMC votes are still encouraged.
>
> PMC voters, please help by indicating your vote as "(binding)"
>
> Kenn
>
> *I have chosen this format for this vote, even though Beam uses simple
> majority as a rule, because I want any PMC member to be able to veto based
> on concerns about overlap or trademark.
>


Re: [VOTE] Beam Mascot animal choice: vote for as many as you want

2019-11-20 Thread Jozef Vilcek
On Wed, Nov 20, 2019 at 3:43 AM Kenneth Knowles  wrote:

> Please cast your votes of approval [1] for animals you would support as
> Beam mascot. The animal with the most approval will be identified as the
> favorite.
>
> *** Vote for as many as you like, using this checklist as a template 
>
> [ ] Beaver
> [ ] Hedgehog
> [X] Lemur
> [X] Owl
> [ ] Salmon
> [ ] Trout
> [ ] Robot dinosaur
> [X] Firefly
> [ ] Cuttlefish
> [ ] Dumbo Octopus
> [ ] Angler fish
>
> This vote will remain open for at least 72 hours.
>
> Kenn
>
> [1] See https://en.wikipedia.org/wiki/Approval_voting#Description and
> https://www.electionscience.org/library/approval-voting/
>


Re: [Discuss] Beam mascot

2019-11-13 Thread Jozef Vilcek
Interesting topic :) I kind of liked also Alex's firefly. The impression it
made on me. To drive it further, hands on hips make strong / serious pose,
hovering in the air above all.
I would put logo on the him, to become is torso / body or a dress. Logo
with a big B on it almost looks like superhero dress / cap ( which is
popular tune these "days" :) ).

To summarize, I would love mascot not only looks nice, but be deadly
serious about doing cool stuff and pulling superhero features out of it's
sleeve :D

On Wed, Nov 13, 2019 at 3:17 PM Maximilian Michels  wrote:

> > Same. What about 37 with the eyes from 52?
>
> +1 That would combine two ideas: (1) "Beam" eyes and (2) sea animal.
>
> We could set this as the working idea and build a logo based off that.
>
> On 12.11.19 22:41, Robert Bradshaw wrote:
> > On Tue, Nov 12, 2019 at 1:29 PM Aizhamal Nurmamat kyzy
> >  wrote:
> >> 52 and 37 for me. I don't know what 53 is, but I like it too.
> >
> > Same. What about 37 with the eyes from 52?
> >
> >> On Tue, Nov 12, 2019 at 9:19 AM Maximilian Michels 
> wrote:
> >>>
> >>> More logos :D
> >>>
> >>> (35) - (37), (51), (48), (53) go into the direction of cuttlefish.
> >>>
> >>>   From the new ones I like (52) because of the eyes. (53) If we want to
> >>> move into the direction of a water animal, the small ones are quite
> >>> recognizable. Also, (23) and (36) are kinda cute.
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On 12.11.19 02:09, Robert Bradshaw wrote:
>  Cuttlefish are cool, but I don't know how recognizable they are, and
>  they don't scream "fast" or "stream-y" or "parallel processing" to me
>  (not that that's a requirement...) I like that firefly, nice working
>  the logo into the trailing beam of light.
> 
>  On Mon, Nov 11, 2019 at 5:03 PM Udi Meiri  wrote:
> >
> > Dumbo octopus anyone? https://youtu.be/DmqikqvLLLw?t=263
> >
> >
> > On Mon, Nov 11, 2019 at 2:06 PM Luke Cwik  wrote:
> >>
> >> The real answer, what cool schwag can we get based upon the mascot.
> >>
> >> On Mon, Nov 11, 2019 at 2:04 PM Kenneth Knowles 
> wrote:
> >>>
> >>> I'm with Luke on cuttlefish. We can have color changing schwag...
> >>>
> >>> On Mon, Nov 11, 2019 at 9:57 AM David Cavazos 
> wrote:
> 
>  I like 9 as well. Not related to anything, but chinchillas are
> also cute.
> 
>  On Mon, Nov 11, 2019 at 8:25 AM Luke Cwik 
> wrote:
> >
> > 9 and 7 for me (in that order)
> >
> > On Mon, Nov 11, 2019 at 7:18 AM Maximilian Michels <
> m...@apache.org> wrote:
> >>
> >> Here are some sketches from the designer. I've put them all in
> one image
> >> and added labels to make it easier to refer to them. My
> favorites are
> >> (2) and (9).
> >>
> >> Cheers,
> >> Max
> >>
> >> On 09.11.19 19:43, Maximilian Michels wrote:
> >>> I like that sketch! The designer has also sent me some rough
> sketches,
> >>> I'll share these here when I get consent from the designer.
> >>>
> >>> -Max
> >>>
> >>> On 09.11.19 19:22, Alex Van Boxel wrote:
>  +1 for a FireFly. Ok, I can't draw, but it's to make a point
> ;-)
> 
>  Fire2.jpg
> 
> 
> 
>  _/
>  _/ Alex Van Boxel
> 
> 
>  On Sat, Nov 9, 2019 at 12:26 AM Kyle Weaver <
> kcwea...@google.com
>  > wrote:
> 
>    Re fish: The authors of the Streaming Systems went with
> trout, but
>    the book mentioned a missed opportunity to make their
> cover a "robot
>    dinosaur with a Scottish accent." Perhaps that idea is
> worth
>  revisiting?
> 
>    On Fri, Nov 8, 2019 at 3:20 PM Luke Cwik <
> lc...@google.com
>    > wrote:
> 
>    My top suggestion is a cuttlefish.
> 
>    On Thu, Nov 7, 2019 at 10:28 PM Reza Rokni <
> r...@google.com
>    > wrote:
> 
>    Salmon... they love streams? :-)
> 
>    On Fri, 8 Nov 2019 at 12:00, Kenneth Knowles
>    mailto:k...@apache.org>>
> wrote:
> 
>    Agree with Aizhamal that it doesn't matter
> if they are
>    taken if they are not too close in space to
> Beam: Apache
>    projects, big data, log processing, stream
> processing.
>    Not a legal opinion, but an aesthetic
> opinion. So I
>    would keep Lemur as a possibility.
> Definitely nginx is
> 

Re: JdbcIO read needs to fit in memory

2019-10-29 Thread Jozef Vilcek
On Tue, Oct 29, 2019 at 10:04 AM Ryan Skraba  wrote:

> I didn't get a chance to try this out -- it sounds like a bug with the
> SparkRunner, if you've tested it with FlinkRunner and it succeeded.
>
> From your description, it should be reproducible by reading any large
> database table with the SparkRunner where the entire dataset is
> greater than the memory available to a single executor?  Do you have
> any other tips to reproduce?
>

Yes, that is what I do.


> Expecially worrisome is "as past JDBC load job runs fine with 4GB
> heap" -- did this happen with the same volumes of data and a different
> version of Beam?  Or the same version and a pipeline with different
> characteristics? This does sound like a regression, so details would
> help to confirm and track it down!
>

Eh, my english, sorry :) What I meant to say is, that if I provide this
data e.g. via file dump, then whole job runs OK with 4GB executor heap.
Run is about 400 cores for 1 hour, so triple the heap size for all just for
one initial load on one executor is inefficient.
I am not aware about any regression.


>
> All my best, Ryan
>
>
>
>
> On Tue, Oct 29, 2019 at 9:48 AM Jozef Vilcek 
> wrote:
> >
> > I can not find anything in docs about expected behavior of DoFn emitting
> arbitrary large number elements on one processElement().
> >
> > I wonder if Spark Runner behavior is a bug or just a difference (and
> disadvantage in this case) in execution more towards runner capability
> matrix differences.
> >
> > Also, in such cases, what is an opinion about BoundedSource vs DoFn as a
> source. What is a recommendation to IO developer if one want's to achieve
> equivalent execution scalability across runners?
> >
> >
> > On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek 
> wrote:
> >>
> >> typo in my previous message. I meant to say => JDBC is `not` the main
> data set, just metadata
> >>
> >> On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek 
> wrote:
> >>>
> >>> Result of my query can fit the memory if I use 12GB heap per spark
> executor. This makes the job quite inefficient as past JDBC load job runs
> fine with 4GB heap to do the main heavy lifting - JDBC is the main data
> set, just metadata.
> >>>
> >>> I just did run the same JdbcIO read code on Spark and Flink runner.
> Flink did not blow up on memory. So it seems like this is a limitation of
> SparkRunner.
> >>>
> >>> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba  wrote:
> >>>>
> >>>> One more thing to try -- depending on your pipeline, you can disable
> >>>> the "auto-reshuffle" of JdbcIO.Read by setting
> >>>> withOutputParallelization(false)
> >>>>
> >>>> This is particularly useful if (1) you do aggressive and cheap
> >>>> filtering immediately after the read or (2) you do your own
> >>>> repartitioning action like GroupByKey after the read.
> >>>>
> >>>> Given your investigation into the heap, I doubt this will help!  I'll
> >>>> take a closer look at the DoFnOutputManager.  In the meantime, is
> >>>> there anything particularly about your job that might help
> >>>> investigate?
> >>>>
> >>>> All my best, Ryan
> >>>>
> >>>> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek 
> wrote:
> >>>> >
> >>>> > I agree I might be too quick to call DoFn output need to fit in
> memory. Actually I am not sure what Beam model say on this matter and what
> output managers of particular runners do about it.
> >>>> >
> >>>> > But SparkRunner definitely has an issue here. I did try set small
> `fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK.
> All fails on OOM.
> >>>> > When looking at the heap, most of it is used by linked list
> multi-map of DoFnOutputManager here:
> >>>> >
> https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
> >>>> >
> >>>> >
>


Re: JdbcIO read needs to fit in memory

2019-10-29 Thread Jozef Vilcek
I can not find anything in docs about expected behavior of DoFn emitting
arbitrary large number elements on one processElement().

I wonder if Spark Runner behavior is a bug or just a difference (and
disadvantage in this case) in execution more towards runner capability
matrix differences.

Also, in such cases, what is an opinion about BoundedSource vs DoFn as a
source. What is a recommendation to IO developer if one want's to achieve
equivalent execution scalability across runners?


On Sun, Oct 27, 2019 at 6:02 PM Jozef Vilcek  wrote:

> typo in my previous message. I meant to say => JDBC is `not` the main data
> set, just metadata
>
> On Sun, Oct 27, 2019 at 6:00 PM Jozef Vilcek 
> wrote:
>
>> Result of my query can fit the memory if I use 12GB heap per spark
>> executor. This makes the job quite inefficient as past JDBC load job runs
>> fine with 4GB heap to do the main heavy lifting - JDBC is the main data
>> set, just metadata.
>>
>> I just did run the same JdbcIO read code on Spark and Flink runner. Flink
>> did not blow up on memory. So it seems like this is a limitation of
>> SparkRunner.
>>
>> On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba  wrote:
>>
>>> One more thing to try -- depending on your pipeline, you can disable
>>> the "auto-reshuffle" of JdbcIO.Read by setting
>>> withOutputParallelization(false)
>>>
>>> This is particularly useful if (1) you do aggressive and cheap
>>> filtering immediately after the read or (2) you do your own
>>> repartitioning action like GroupByKey after the read.
>>>
>>> Given your investigation into the heap, I doubt this will help!  I'll
>>> take a closer look at the DoFnOutputManager.  In the meantime, is
>>> there anything particularly about your job that might help
>>> investigate?
>>>
>>> All my best, Ryan
>>>
>>> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek 
>>> wrote:
>>> >
>>> > I agree I might be too quick to call DoFn output need to fit in
>>> memory. Actually I am not sure what Beam model say on this matter and what
>>> output managers of particular runners do about it.
>>> >
>>> > But SparkRunner definitely has an issue here. I did try set small
>>> `fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK.
>>> All fails on OOM.
>>> > When looking at the heap, most of it is used by linked list multi-map
>>> of DoFnOutputManager here:
>>> >
>>> https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
>>> >
>>> >
>>>
>>


Re: JdbcIO read needs to fit in memory

2019-10-27 Thread Jozef Vilcek
Result of my query can fit the memory if I use 12GB heap per spark
executor. This makes the job quite inefficient as past JDBC load job runs
fine with 4GB heap to do the main heavy lifting - JDBC is the main data
set, just metadata.

I just did run the same JdbcIO read code on Spark and Flink runner. Flink
did not blow up on memory. So it seems like this is a limitation of
SparkRunner.

On Fri, Oct 25, 2019 at 5:28 PM Ryan Skraba  wrote:

> One more thing to try -- depending on your pipeline, you can disable
> the "auto-reshuffle" of JdbcIO.Read by setting
> withOutputParallelization(false)
>
> This is particularly useful if (1) you do aggressive and cheap
> filtering immediately after the read or (2) you do your own
> repartitioning action like GroupByKey after the read.
>
> Given your investigation into the heap, I doubt this will help!  I'll
> take a closer look at the DoFnOutputManager.  In the meantime, is
> there anything particularly about your job that might help
> investigate?
>
> All my best, Ryan
>
> On Fri, Oct 25, 2019 at 2:47 PM Jozef Vilcek 
> wrote:
> >
> > I agree I might be too quick to call DoFn output need to fit in memory.
> Actually I am not sure what Beam model say on this matter and what output
> managers of particular runners do about it.
> >
> > But SparkRunner definitely has an issue here. I did try set small
> `fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK.
> All fails on OOM.
> > When looking at the heap, most of it is used by linked list multi-map of
> DoFnOutputManager here:
> >
> https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234
> >
> >
>


Re: JdbcIO read needs to fit in memory

2019-10-25 Thread Jozef Vilcek
I agree I might be too quick to call DoFn output need to fit in memory.
Actually I am not sure what Beam model say on this matter and what output
managers of particular runners do about it.

But SparkRunner definitely has an issue here. I did try set small
`fetchSize` for JdbcIO as well as change `storageLevel` to MEMORY_AND_DISK.
All fails on OOM.
When looking at the heap, most of it is used by linked list multi-map of
DoFnOutputManager here:
https://github.com/apache/beam/blob/v2.15.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L234


JdbcIO read needs to fit in memory

2019-10-24 Thread Jozef Vilcek
Hi,

I am in a need to read a big-ish data set via JdbcIO. This forced me to
bump up memory for my executor (right now using SparkRunner). It seems that
JdbcIO has a requirement to fit all data in memory as it is using DoFn to
unfold query to list of elements.

BoundedSource would not face the need to fit result in memory, but JdbcIO
is using DoFn. Also, in recent discussion [1] it was suggested that
BoudnedSource should not be used as it is obsolete.

Does anyone faced this issue? What would be the best way to solve it? If
DoFn should be kept, then I can only think of splitting the query to ranges
and try to find most fitting number of rows to read at once.

I appreciate any thoughts.

[1]
https://lists.apache.org/list.html?dev@beam.apache.org:lte=1M:Reading%20from%20RDB%2C%20ParDo%20or%20BoundedSource


Re: Beam 2.15.0 SparkRunner issues

2019-10-03 Thread Jozef Vilcek
We do have 2.15.0 Beam batch jobs running on Spark runner. I did have a bit
of tricky time with spark.default.parallelism, but at the end it works fine
for us (custom parallelism on source stages and spark.default.parallelism
on all other stages after shuffles)

Tricky part in my case was interaction between `spark.default.parallelism`
and `beam.bundleSize`. I had a problem that default parallelism was
enforced on inputs too, therefore splitting them too much or too little.
Configuring bundleSize and custom config on inputs (e.g. hadoop input
format max/min split size) did the trick. TransformTranslator does make a
decision on parishioner based on bundleSize, however I am not sure how it
is later on used:
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java#L571

On Thu, Oct 3, 2019 at 9:25 AM Tim Robertson 
wrote:

> Hi all,
>
> We haven't dug enough into this to know where to log issues, but I'll
> start by sharing here.
>
> After upgrading from Beam 2.10.0 to 2.15.0 we see issues on SparkRunner -
> we suspect all of this related.
>
> 1. spark.default.parallelism is not respected
>
> 2. File writing (Avro) with dynamic destinations (grouped into folders by
> a field name) consistently fail with
> org.apache.beam.sdk.util.UserCodeException:
> java.nio.file.FileAlreadyExistsException: Unable to rename resource
> hdfs://ha-nn/pipelines/export-20190930-0854/.temp-beam-d4fd89ed-fc7a-4b1e-aceb-68f9d72d50f0/6e086f60-8bda-4d0e-b29d-1b47fdfc88c0
> to
> hdfs://ha-nn/pipelines/export-20190930-0854/7c9d2aec-f762-11e1-a439-00145eb45e9a/verbatimHBaseExport-0-of-1.avro
> as destination already exists and couldn't be deleted.
>
> 3. GBK operations that run over 500M small records consistently fail with
> OOM. We tried different configs with 48GB, 60GB, 80GB executor memory
>
> Our pipelines run are batch, simple transformations with either an
> HBaseSnapshot to Avro files or a merge of records in Avro (the GBK issue)
> pushed to ElasticSearch (it fails upstream of the ElasticsearchIO in the
> GBK stage).
>
> We notice operations that were mapToPair  in 2.10.0 become repartition
> operations ( (mapToPair at GroupCombineFunctions.java:68 becomes
> repartition at GroupCombineFunctions.java:202)) which might be related to
> this and looks surprising.
>
> I'll report more as we learn. If anyone has any immediate ideas based on
> their commits or reviews or if you wish an tests run on other Beam versions
> please say.
>
> Thanks,
> Tim
>
>
>
>


Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Jozef Vilcek
Interesting discussion. I think it is very important information, that when
user will use a stateful ParDo, he can run into the situation where it will
not behave correctly in "batch operating mode".
But some transforms known to Beam, like fixed-window, will work fine? Is
there a sorting applied to keyed elements before evaluating window key
group? If answer is yes, then why not also do the same in case of stateful
ParDo? It would feel consistent to me.

Part of SDK or not, I see DataFlow runner is doing this optimisation,
probably precisely for making stateful ParDo operations stable in batch mode
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L64


On Thu, May 16, 2019 at 5:09 PM Jan Lukavský  wrote:

> Hi Max,
> answers inline.
> -- Původní e-mail --
> Od: Maximilian Michels 
> Komu: dev@beam.apache.org
> Datum: 16. 5. 2019 15:59:59
> Předmět: Re: Definition of Unified model (WAS: Semantics of
> PCollection.isBounded)
>
> Hi Jan,
>
> Thanks for the discussion. Aljoscha already gave great answers. Just a
> couple of remarks:
>
> > a) streaming semantics (i.e. what I can express using Transforms) are
> subset of batch semantics
>
> I think you mean streaming is a superset of batch, or batch is a subset
> of streaming. This is the ideal. In practice, the two execution modes
> are sometimes accomplished by two different execution engines. Even in
> Flink, we have independent APIs for batch and streaming and the
> execution semantics are slightly different. For example, there are no
> watermarks in the batch API. Thus, batch rarely is simply an execution
> mode of streaming. However, I still think the unified Beam model works
> in both cases.
>
> > batch semantics and streaming semantics differs only in that I can have
> GlobalWindow with default trigger on batch and cannot on stream
>
> Actually I really thought, that regarding semantics, streaming should be
> subset of batch. That is because in batch, you can be sure that the
> watermark will eventually approach infinity. That gives you one additional
> feature, that streaming generally doesn't have (if you don't manually
> forward watermark to infinity as you suggest).
>
>
>
> You can have a GlobalWindow in streaming with a default trigger. You
> could define additional triggers that do early firings. And you could
> even trigger the global window by advancing the watermark to +inf.
>
> Yes, but then you actually changed streaming to batch, you just execute
> batch pipeline in streaming way.
>
>
>
> > On batch engines, this is generally not an issue, because the buffering
> is eliminated by sorting - when a Group by operation occurs, batch runners
> sort elements with the same key to be together and therefore eliminate the
> need for potentially infinite cache.
>
> The batch engines you normally use might do that. However, I do not see
> how sorting is an inherent property of the streaming model. We do not
> guarantee a deterministic order of events in streaming with respect to
> event time. In that regard, batch is a true subset of streaming because
> we make no guarantees on the order. Actually, because we only advance
> the watermark from -inf to +inf once we have read all data, this nicely
> aligns with the streaming model.
>
>
> Sure, streaming, doesn't  have the time ordering guarantees. Having so
> would be impractical. But - there is no issues in having these quarantees
> in batch mode, moreover, it gives the pipelines, that need to have "bounded
> out of orderness" the chance to ever finish.
>
>
> I think that there is some issues in how we think about the properties of
> batch vs. stream. If we define streaming as the superset, then we cannot
> define some properties for batch, that streaming doesn't have. But - if we
> just split it on the part of semantics and on the part of runtime
> properties and guarantees, than it is possible to define properties of
> batch, that streaming doesn't have.
>
>
> Jan
>
>
>
>
> -Max
>
> On 16.05.19 15:20, Aljoscha Krettek wrote:
> > Hi,
> >
> > I think it’s helpful to consider that events never truly arrive in order
> in the real world (you mentioned as much yourself). For streaming use
> cases, there might be some out-of-orderness (or a lot of it, depending on
> the use case) so your implementation has to be able to deal with that. On
> the other end of the spectrum we have batch use cases, where
> out-of-orderness is potentially even bigger because it allows for more
> efficient parallel execution. If your implementation can deal with
> out-of-orderness that also shouldn’t be a problem.
> >
> > Another angle is completeness vs. latency: you usually cannot have both
> in a streaming world. If you want 100 % completeness, i.e. you want to
> ensure that you process all events and never drop anything, you can never
> advance the watermark from its initial -Inf if you want to also never 

Re: Custom shardingFn for FileIO

2019-05-09 Thread Jozef Vilcek
Yes, I was able to use it in Flink and I do see performance gain. I also
see, which is important for me, more predictable and uniform memory usage
among workers

On Wed, May 8, 2019 at 7:19 AM Reuven Lax  wrote:

> So you were able to use this in Flink? Did you see performance gains?
>
> On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek  wrote:
>
>> Sorry, it took a while. I wanted to actually use this extension for
>> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
>> PR is at https://github.com/apache/beam/pull/8499
>>
>> On Thu, May 2, 2019 at 3:22 PM Reuven Lax  wrote:
>>
>>> Great, let me know when to take another look at the PR!
>>>
>>> Reuven
>>>
>>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek 
>>> wrote:
>>>
>>>> That coder is added extra as a re-map stage from "original" key to new
>>>> ShardAwareKey ... But pipeline might get broken I guess.
>>>> Very fair point. I am having a second thought pass over this and will
>>>> try to simplify it much more
>>>>
>>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax  wrote:
>>>>
>>>>> I haven't looked at the PR in depth yet, but it appears that someone
>>>>> running a pipeline today who then tries to update post this PR will have
>>>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>>>> custom function. Is that correct, or am I misreading things?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek 
>>>>> wrote:
>>>>>
>>>>>> Hm, what would be the scenario? Have version A running with original
>>>>>> random sharding and then start version B where I change sharding to some
>>>>>> custom function?
>>>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>>>> state and also work with new keys produced to GBK going forward?
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>>>> these types of transforms to better support runners that allow in-place
>>>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>>>
>>>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>>>> function.
>>>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>>>
>>>>>>>> If this sort of change looks good, then next step would be to use
>>>>>>>> in in Flink runner transform override. Let me know what do you think
>>>>>>>>
>>>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>>>> PTransformOverride by runner.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yes, a hook would have to be added to allow specifying a
>>>>>>>>>> different function for choosing the shard number (I assume the 
>>>>>>>>>> problem is
>>>>>>>>>> that there are cases where the current random assignment is not 
>>>>>>>>>> good?).
>>>>>>>>>> However this can be set using PTransformOverride, we ideally 
>>>>>>>>>> shouldn't
>>>>>>>>>> force the user to know details of the runner when writing their code.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <
>>>>>>>>>> m...@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>>

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

2019-05-05 Thread Jozef Vilcek
Well, I did not do a proper perf test. What I am saying is that my
observation is:

* Flink native job does use copy of inputs but looking at stack trace perf
snapshots, CPU is most time engaged by inflating bytes read from Kafka
* Running Beam pipeline on Flink, Coder copy trace pops up in top CPU usages

I am just speculating here. Flink's "coders" does have
serialise/deserialize option and copy option. Plus it has an
isImmutableType() hint, so it has more potential to be more effective.


On Fri, May 3, 2019 at 2:01 PM Maximilian Michels  wrote:

> Misread your post. You're saying that Kryo is more efficient that a
> roundtrip obj->bytes->obj_copy. Still, most types use Flink's
> serializers which also do the above roundtrip. So I'm not sure this
> performance advantage holds true for other Flink jobs.
>
> On 02.05.19 20:01, Maximilian Michels wrote:
> >> I am not sure what are you referring to here. What do you mean Kryo is
> >> simply slower ... Beam Kryo or Flink Kryo or?
> >
> > Flink uses Kryo as a fallback serializer when its own type serialization
> > system can't analyze the type. I'm just guessing here that this could be
> > slower.
> >
> > On 02.05.19 16:51, Jozef Vilcek wrote:
> >>
> >>
> >> On Thu, May 2, 2019 at 3:41 PM Maximilian Michels  >> <mailto:m...@apache.org>> wrote:
> >>
> >> Thanks for the JIRA issues Jozef!
> >>
> >>  > So the feature in Flink is operator chaining and Flink per
> >> default initiate copy of input elements. In case of Beam coders copy
> >> seems to be more noticable than native Flink.
> >>
> >> Copying between chained operators can be turned off in the
> >> FlinkPipelineOptions (if you know what you're doing).
> >>
> >>
> >> Yes, I know that it can be instracted to reuse objects (if you are
> >> referring to this). I am just not sure I want to open this door in
> >> general :)
> >> But it is interesting to learn, that with portability, this will be
> >> turned On per default. Quite important finding imho.
> >>
> >> Beam coders should
> >> not be slower than Flink's. They are simple wrapped. It seems Kryo
> is
> >> simply slower which we could fix by providing more type hints to
> >> Flink.
> >>
> >>
> >> I am not sure what are you referring to here. What do you mean Kryo is
> >> simply slower ... Beam Kryo or Flink Kryo or?
> >>
> >> -Max
> >>
> >> On 02.05.19 13:15, Robert Bradshaw wrote:
> >>  > Thanks for filing those.
> >>  >
> >>  > As for how not doing a copy is "safe," it's not really. Beam
> >> simply
> >>  > asserts that you MUST NOT mutate your inputs (and direct runners,
> >>  > which are used during testing, do perform extra copies and
> >> checks to
> >>  > catch violations of this requirement).
> >>  >
> >>  > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
> >> mailto:jozo.vil...@gmail.com>> wrote:
> >>  >>
> >>  >> I have created
> >>  >> https://issues.apache.org/jira/browse/BEAM-7204
> >>  >> https://issues.apache.org/jira/browse/BEAM-7206
> >>  >>
> >>  >> to track these topics further
> >>  >>
> >>  >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
> >> mailto:jozo.vil...@gmail.com>> wrote:
> >>  >>>
> >>  >>>
> >>  >>>
> >>  >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
> >> mailto:k...@apache.org>> wrote:
> >>  >>>>
> >>  >>>>
> >>  >>>>
> >>  >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax  >> <mailto:re...@google.com>> wrote:
> >>  >>>>>
> >>  >>>>> In that case, Robert's point is quite valid. The old Flink
> >> runner I believe had no knowledge of fusion, which was known to make
> >> it extremely slow. A lot of work went into making the portable
> >> runner fusion aware, so we don't need to round trip through coders
> >> on every ParDo.
> >>      >>>>
> >>  >>>>
> >>  >>>> The old Flink runner got fusion for free, since Flink does it.
> >> The new fusion 

Re: Custom shardingFn for FileIO

2019-05-05 Thread Jozef Vilcek
Sorry, it took a while. I wanted to actually use this extension for
WriteFiles in Flink and see it works and that proved too be a bit bumpy.
PR is at https://github.com/apache/beam/pull/8499

On Thu, May 2, 2019 at 3:22 PM Reuven Lax  wrote:

> Great, let me know when to take another look at the PR!
>
> Reuven
>
> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek  wrote:
>
>> That coder is added extra as a re-map stage from "original" key to new
>> ShardAwareKey ... But pipeline might get broken I guess.
>> Very fair point. I am having a second thought pass over this and will try
>> to simplify it much more
>>
>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax  wrote:
>>
>>> I haven't looked at the PR in depth yet, but it appears that someone
>>> running a pipeline today who then tries to update post this PR will have
>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>> custom function. Is that correct, or am I misreading things?
>>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek 
>>> wrote:
>>>
>>>> Hm, what would be the scenario? Have version A running with original
>>>> random sharding and then start version B where I change sharding to some
>>>> custom function?
>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>> state and also work with new keys produced to GBK going forward?
>>>>
>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:
>>>>
>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>> these types of transforms to better support runners that allow in-place
>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
>>>>> wrote:
>>>>>
>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>> function.
>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>
>>>>>> If this sort of change looks good, then next step would be to use in
>>>>>> in Flink runner transform override. Let me know what do you think
>>>>>>
>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
>>>>>> wrote:
>>>>>>
>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>> PTransformOverride by runner.
>>>>>>>
>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:
>>>>>>>
>>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>>> function for choosing the shard number (I assume the problem is that 
>>>>>>>> there
>>>>>>>> are cases where the current random assignment is not good?). However 
>>>>>>>> this
>>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the 
>>>>>>>> user to
>>>>>>>> know details of the runner when writing their code.
>>>>>>>>
>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>> number of
>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>
>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>> number
>>>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>>>> that
>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>
>>>>>>>>> -Max
>>>>>>>>>
>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>> sharding key

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

2019-05-02 Thread Jozef Vilcek
On Thu, May 2, 2019 at 3:41 PM Maximilian Michels  wrote:

> Thanks for the JIRA issues Jozef!
>
> > So the feature in Flink is operator chaining and Flink per default
> initiate copy of input elements. In case of Beam coders copy seems to be
> more noticable than native Flink.
>
> Copying between chained operators can be turned off in the
> FlinkPipelineOptions (if you know what you're doing).


Yes, I know that it can be instracted to reuse objects (if you are
referring to this). I am just not sure I want to open this door in general
:)
But it is interesting to learn, that with portability, this will be turned
On per default. Quite important finding imho.


> Beam coders should
> not be slower than Flink's. They are simple wrapped. It seems Kryo is
> simply slower which we could fix by providing more type hints to Flink.
>

I am not sure what are you referring to here. What do you mean Kryo is
simply slower ... Beam Kryo or Flink Kryo or?


> -Max
>
> On 02.05.19 13:15, Robert Bradshaw wrote:
> > Thanks for filing those.
> >
> > As for how not doing a copy is "safe," it's not really. Beam simply
> > asserts that you MUST NOT mutate your inputs (and direct runners,
> > which are used during testing, do perform extra copies and checks to
> > catch violations of this requirement).
> >
> > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek 
> wrote:
> >>
> >> I have created
> >> https://issues.apache.org/jira/browse/BEAM-7204
> >> https://issues.apache.org/jira/browse/BEAM-7206
> >>
> >> to track these topics further
> >>
> >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek 
> wrote:
> >>>
> >>>
> >>>
> >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles 
> wrote:
> >>>>
> >>>>
> >>>>
> >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax  wrote:
> >>>>>
> >>>>> In that case, Robert's point is quite valid. The old Flink runner I
> believe had no knowledge of fusion, which was known to make it extremely
> slow. A lot of work went into making the portable runner fusion aware, so
> we don't need to round trip through coders on every ParDo.
> >>>>
> >>>>
> >>>> The old Flink runner got fusion for free, since Flink does it. The
> new fusion in portability is because fusing the runner side of portability
> steps does not achieve real fusion
> >>>
> >>>
> >>> Aha, I see. So the feature in Flink is operator chaining and Flink per
> default initiate copy of input elements. In case of Beam coders copy seems
> to be more noticable than native Flink.
> >>> So do I get it right that in portable runner scenario, you do similar
> chaining via this "fusion of stages"? Curious here... how is it different
> from chaining so runner can be sure that not doing copy is "safe" with
> respect to user defined functions and their behaviour over inputs?
> >>>
> >>>>>
> >>>>>
> >>>>> Reuven
> >>>>>
> >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek 
> wrote:
> >>>>>>
> >>>>>> It was not a portable Flink runner.
> >>>>>>
> >>>>>> Thanks all for the thoughts, I will create JIRAs, as suggested,
> with my findings and send them out
> >>>>>>
> >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax 
> wrote:
> >>>>>>>
> >>>>>>> Jozef did you use the portable Flink runner or the old one?
> >>>>>>>
> >>>>>>> Reuven
> >>>>>>>
> >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >>>>>>>>
> >>>>>>>> Thanks for starting this investigation. As mentioned, most of the
> work
> >>>>>>>> to date has been on feature parity, not performance parity, but
> we're
> >>>>>>>> at the point that the latter should be tackled as well. Even if
> there
> >>>>>>>> is a slight overhead (and there's talk about integrating more
> deeply
> >>>>>>>> with the Flume DAG that could elide even that) I'd expect it
> should be
> >>>>>>>> nowhere near the 3x that you're seeing. Aside from the timer
> issue,
> >>>>>>>> sounds like the cloning via coders is is a huge drag that needs
&g

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

2019-05-02 Thread Jozef Vilcek
I have created
https://issues.apache.org/jira/browse/BEAM-7204
https://issues.apache.org/jira/browse/BEAM-7206

to track these topics further

On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek  wrote:

>
>
> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles  wrote:
>
>>
>>
>> On Tue, Apr 30, 2019, 07:05 Reuven Lax  wrote:
>>
>>> In that case, Robert's point is quite valid. The old Flink runner I
>>> believe had no knowledge of fusion, which was known to make it extremely
>>> slow. A lot of work went into making the portable runner fusion aware, so
>>> we don't need to round trip through coders on every ParDo.
>>>
>>
>> The old Flink runner got fusion for free, since Flink does it. The new
>> fusion in portability is because fusing the runner side of portability
>> steps does not achieve real fusion
>>
>
> Aha, I see. So the feature in Flink is operator chaining and Flink per
> default initiate copy of input elements. In case of Beam coders copy seems
> to be more noticable than native Flink.
> So do I get it right that in portable runner scenario, you do similar
> chaining via this "fusion of stages"? Curious here... how is it different
> from chaining so runner can be sure that not doing copy is "safe" with
> respect to user defined functions and their behaviour over inputs?
>
>
>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek 
>>> wrote:
>>>
>>>> It was not a portable Flink runner.
>>>>
>>>> Thanks all for the thoughts, I will create JIRAs, as suggested, with my
>>>> findings and send them out
>>>>
>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax  wrote:
>>>>
>>>>> Jozef did you use the portable Flink runner or the old one?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> Thanks for starting this investigation. As mentioned, most of the work
>>>>>> to date has been on feature parity, not performance parity, but we're
>>>>>> at the point that the latter should be tackled as well. Even if there
>>>>>> is a slight overhead (and there's talk about integrating more deeply
>>>>>> with the Flume DAG that could elide even that) I'd expect it should be
>>>>>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>>>>>> sounds like the cloning via coders is is a huge drag that needs to be
>>>>>> addressed. I wonder if this is one of those cases where using the
>>>>>> portability framework could be a performance win (specifically, no
>>>>>> cloning would happen between operators of fused stages, and the
>>>>>> cloning between operators could be on the raw bytes[] (if needed at
>>>>>> all, because we know they wouldn't be mutated).
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles 
>>>>>> wrote:
>>>>>> >
>>>>>> > Specifically, a lot of shared code assumes that repeatedly setting
>>>>>> a timer is nearly free / the same cost as determining whether or not to 
>>>>>> set
>>>>>> the timer. ReduceFnRunner has been refactored in a way so it would be 
>>>>>> very
>>>>>> easy to set the GC timer once per window that occurs in a bundle, but
>>>>>> there's probably some underlying inefficiency around why this isn't cheap
>>>>>> that would be a bigger win.
>>>>>> >
>>>>>> > Kenn
>>>>>> >
>>>>>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax 
>>>>>> wrote:
>>>>>> >>
>>>>>> >> I think the short answer is that folks working on the BeamFlink
>>>>>> runner have mostly been focused on getting everything working, and so 
>>>>>> have
>>>>>> not dug into this performance too deeply. I suspect that there is
>>>>>> low-hanging fruit to optimize as a result.
>>>>>> >>
>>>>>> >> You're right that ReduceFnRunner schedules a timer for each
>>>>>> element. I think this code dates back to before Beam; on Dataflow timers
>>>>>> are identified by tag, so this simply overwrites the existing timer which
>>>>>&

Re: Custom shardingFn for FileIO

2019-05-01 Thread Jozef Vilcek
That coder is added extra as a re-map stage from "original" key to new
ShardAwareKey ... But pipeline might get broken I guess.
Very fair point. I am having a second thought pass over this and will try
to simplify it much more

On Wed, May 1, 2019 at 2:12 PM Reuven Lax  wrote:

> I haven't looked at the PR in depth yet, but it appears that someone
> running a pipeline today who then tries to update post this PR will have
> the coder change to DefaultShardKeyCoder, even if they haven't picked any
> custom function. Is that correct, or am I misreading things?
>
> Reuven
>
> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek 
> wrote:
>
>> Hm, what would be the scenario? Have version A running with original
>> random sharding and then start version B where I change sharding to some
>> custom function?
>> So I have to enable the pipeline to digest old keys from GBK restored
>> state and also work with new keys produced to GBK going forward?
>>
>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:
>>
>>> Initial thought on PR: we usually try to limit changing coders in these
>>> types of transforms to better support runners that allow in-place updates
>>> of pipelines. Can this be done without changing the coder?
>>>
>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
>>> wrote:
>>>
>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>> function.
>>>> https://github.com/apache/beam/pull/8438
>>>>
>>>> If this sort of change looks good, then next step would be to use in in
>>>> Flink runner transform override. Let me know what do you think
>>>>
>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
>>>> wrote:
>>>>
>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>> PTransformOverride by runner.
>>>>>
>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:
>>>>>
>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>> function for choosing the shard number (I assume the problem is that 
>>>>>> there
>>>>>> are cases where the current random assignment is not good?). However this
>>>>>> can be set using PTransformOverride, we ideally shouldn't force the user 
>>>>>> to
>>>>>> know details of the runner when writing their code.
>>>>>>
>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
>>>>>> wrote:
>>>>>>
>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>> FlinkTransformOverrides. We already use this to determine the number
>>>>>>> of
>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>
>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>> number
>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>> that
>>>>>>> logic is buried inside WriteFiles.
>>>>>>>
>>>>>>> -Max
>>>>>>>
>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>>>>> keys.
>>>>>>> >
>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>> jozo.vil...@gmail.com
>>>>>>> > <mailto:jozo.vil...@gmail.com>> wrote:
>>>>>>> >
>>>>>>> > Right now, sharding can be specified only via target
>>>>>>> `shardCount`,
>>>>>>> > be it user or runner. Next to configurable shardCount, I am
>>>>>>> > proposing to be able to pass also a function which will allow
>>>>>>> to the
>>>>>>> > user (or runner) control how is shard determined and what key
>>>>>>> will
>>>>>>> > be used to represent it
>>>>>>> >
>>>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>> extends
>>&g

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

2019-05-01 Thread Jozef Vilcek
On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles  wrote:

>
>
> On Tue, Apr 30, 2019, 07:05 Reuven Lax  wrote:
>
>> In that case, Robert's point is quite valid. The old Flink runner I
>> believe had no knowledge of fusion, which was known to make it extremely
>> slow. A lot of work went into making the portable runner fusion aware, so
>> we don't need to round trip through coders on every ParDo.
>>
>
> The old Flink runner got fusion for free, since Flink does it. The new
> fusion in portability is because fusing the runner side of portability
> steps does not achieve real fusion
>

Aha, I see. So the feature in Flink is operator chaining and Flink per
default initiate copy of input elements. In case of Beam coders copy seems
to be more noticable than native Flink.
So do I get it right that in portable runner scenario, you do similar
chaining via this "fusion of stages"? Curious here... how is it different
from chaining so runner can be sure that not doing copy is "safe" with
respect to user defined functions and their behaviour over inputs?


>
>> Reuven
>>
>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek 
>> wrote:
>>
>>> It was not a portable Flink runner.
>>>
>>> Thanks all for the thoughts, I will create JIRAs, as suggested, with my
>>> findings and send them out
>>>
>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax  wrote:
>>>
>>>> Jozef did you use the portable Flink runner or the old one?
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> Thanks for starting this investigation. As mentioned, most of the work
>>>>> to date has been on feature parity, not performance parity, but we're
>>>>> at the point that the latter should be tackled as well. Even if there
>>>>> is a slight overhead (and there's talk about integrating more deeply
>>>>> with the Flume DAG that could elide even that) I'd expect it should be
>>>>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>>>>> sounds like the cloning via coders is is a huge drag that needs to be
>>>>> addressed. I wonder if this is one of those cases where using the
>>>>> portability framework could be a performance win (specifically, no
>>>>> cloning would happen between operators of fused stages, and the
>>>>> cloning between operators could be on the raw bytes[] (if needed at
>>>>> all, because we know they wouldn't be mutated).
>>>>>
>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles 
>>>>> wrote:
>>>>> >
>>>>> > Specifically, a lot of shared code assumes that repeatedly setting a
>>>>> timer is nearly free / the same cost as determining whether or not to set
>>>>> the timer. ReduceFnRunner has been refactored in a way so it would be very
>>>>> easy to set the GC timer once per window that occurs in a bundle, but
>>>>> there's probably some underlying inefficiency around why this isn't cheap
>>>>> that would be a bigger win.
>>>>> >
>>>>> > Kenn
>>>>> >
>>>>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax 
>>>>> wrote:
>>>>> >>
>>>>> >> I think the short answer is that folks working on the BeamFlink
>>>>> runner have mostly been focused on getting everything working, and so have
>>>>> not dug into this performance too deeply. I suspect that there is
>>>>> low-hanging fruit to optimize as a result.
>>>>> >>
>>>>> >> You're right that ReduceFnRunner schedules a timer for each
>>>>> element. I think this code dates back to before Beam; on Dataflow timers
>>>>> are identified by tag, so this simply overwrites the existing timer which
>>>>> is very cheap in Dataflow. If it is not cheap on Flink, this might be
>>>>> something to optimize.
>>>>> >>
>>>>> >> Reuven
>>>>> >>
>>>>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek 
>>>>> wrote:
>>>>> >>>
>>>>> >>> Hello,
>>>>> >>>
>>>>> >>> I am interested in any knowledge or thoughts on what should be /
>>>>> is an overhead of running Beam pipelines instead of pipelines written on
>>

Re: Custom shardingFn for FileIO

2019-04-30 Thread Jozef Vilcek
Hm, what would be the scenario? Have version A running with original random
sharding and then start version B where I change sharding to some custom
function?
So I have to enable the pipeline to digest old keys from GBK restored state
and also work with new keys produced to GBK going forward?

On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:

> Initial thought on PR: we usually try to limit changing coders in these
> types of transforms to better support runners that allow in-place updates
> of pipelines. Can this be done without changing the coder?
>
> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
> wrote:
>
>> I have created a PR for enhancing WriteFiles for custom sharding function.
>> https://github.com/apache/beam/pull/8438
>>
>> If this sort of change looks good, then next step would be to use in in
>> Flink runner transform override. Let me know what do you think
>>
>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
>> wrote:
>>
>>> I guess it is fine to enable shardingFn control only on WriteFiles level
>>> rather than FileIO. On WriteFiles it can be manipulated in
>>> PTransformOverride by runner.
>>>
>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:
>>>
>>>> Yes, a hook would have to be added to allow specifying a different
>>>> function for choosing the shard number (I assume the problem is that there
>>>> are cases where the current random assignment is not good?). However this
>>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>>> know details of the runner when writing their code.
>>>>
>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
>>>> wrote:
>>>>
>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>> FlinkTransformOverrides. We already use this to determine the number
>>>>> of
>>>>> shards in case of Runner-determined sharding.
>>>>>
>>>>> Not sure if that would work for Jozef's case because setting the
>>>>> number
>>>>> of shards is not enough. We want to set the shard key directly and
>>>>> that
>>>>> logic is buried inside WriteFiles.
>>>>>
>>>>> -Max
>>>>>
>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>>> keys.
>>>>> >
>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek >>>> > <mailto:jozo.vil...@gmail.com>> wrote:
>>>>> >
>>>>> > Right now, sharding can be specified only via target
>>>>> `shardCount`,
>>>>> > be it user or runner. Next to configurable shardCount, I am
>>>>> > proposing to be able to pass also a function which will allow to
>>>>> the
>>>>> > user (or runner) control how is shard determined and what key
>>>>> will
>>>>> > be used to represent it
>>>>> >
>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>> extends
>>>>> > Serializable {
>>>>> > ShardKeyT assign(DestinationT destination, UserT element,
>>>>> > shardCount: Integer);
>>>>> > }
>>>>> >
>>>>> > Default implementation can be what is right now =>  random shard
>>>>> > encapsulated as ShardedKey.
>>>>> >
>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax >>>> > <mailto:re...@google.com>> wrote:
>>>>> >
>>>>> > If sharding is not specified, then the semantics are
>>>>> > "runner-determined sharding." The DataflowRunner already
>>>>> takes
>>>>> > advantage of this to impose its own sharding if the user
>>>>> hasn't
>>>>> > specified an explicit one. Could the Flink runner do the same
>>>>> > instead of pushing this to the users?
>>>>> >
>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>> > mailto:m...@apache.org>> wrote:
>>>>> >
>>>>> > Hi Jozef,
>>>>> >
>>>

Re: Custom shardingFn for FileIO

2019-04-30 Thread Jozef Vilcek
I have created a PR for enhancing WriteFiles for custom sharding function.
https://github.com/apache/beam/pull/8438

If this sort of change looks good, then next step would be to use in in
Flink runner transform override. Let me know what do you think

On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek  wrote:

> I guess it is fine to enable shardingFn control only on WriteFiles level
> rather than FileIO. On WriteFiles it can be manipulated in
> PTransformOverride by runner.
>
> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:
>
>> Yes, a hook would have to be added to allow specifying a different
>> function for choosing the shard number (I assume the problem is that there
>> are cases where the current random assignment is not good?). However this
>> can be set using PTransformOverride, we ideally shouldn't force the user to
>> know details of the runner when writing their code.
>>
>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
>> wrote:
>>
>>> Reuven is talking about PTransformOverride, e.g.
>>> FlinkTransformOverrides. We already use this to determine the number of
>>> shards in case of Runner-determined sharding.
>>>
>>> Not sure if that would work for Jozef's case because setting the number
>>> of shards is not enough. We want to set the shard key directly and that
>>> logic is buried inside WriteFiles.
>>>
>>> -Max
>>>
>>> On 25.04.19 16:30, Reuven Lax wrote:
>>> > Actually the runner is free to perform surgery on the graph. The
>>> > FlinkRunner can insert a custom function to determine the sharding
>>> keys.
>>> >
>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek >> > <mailto:jozo.vil...@gmail.com>> wrote:
>>> >
>>> > Right now, sharding can be specified only via target `shardCount`,
>>> > be it user or runner. Next to configurable shardCount, I am
>>> > proposing to be able to pass also a function which will allow to
>>> the
>>> > user (or runner) control how is shard determined and what key will
>>> > be used to represent it
>>> >
>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
>>> > Serializable {
>>> > ShardKeyT assign(DestinationT destination, UserT element,
>>> > shardCount: Integer);
>>> > }
>>> >
>>> > Default implementation can be what is right now =>  random shard
>>> > encapsulated as ShardedKey.
>>> >
>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax >> > <mailto:re...@google.com>> wrote:
>>> >
>>> > If sharding is not specified, then the semantics are
>>> > "runner-determined sharding." The DataflowRunner already takes
>>> > advantage of this to impose its own sharding if the user hasn't
>>> > specified an explicit one. Could the Flink runner do the same
>>> > instead of pushing this to the users?
>>> >
>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>> > mailto:m...@apache.org>> wrote:
>>> >
>>> > Hi Jozef,
>>> >
>>> > For sharding in FileIO there are basically two options:
>>> >
>>> > (1) num_shards ~= num_workers => bad spread of the load
>>> > across workers
>>> > (2) num_shards >> num_workers => good spread of the load
>>> > across workers,
>>> > but huge number of files
>>> >
>>> > Your approach would give users control over the sharding
>>> > keys such that
>>> > they could be adjusted to spread load more evenly.
>>> >
>>> > I'd like to hear from Beam IO experts if that would make
>>> sense.
>>> >
>>> > Thanks,
>>> > Max
>>> >
>>> > On 25.04.19 08:52, Jozef Vilcek wrote:
>>> >  > Hello,
>>> >  >
>>> >  > Right now, if someone needs sharded files via FileIO,
>>> > there is only one
>>> >  > option which is random (round robin) shard assignment
>>> per
>>> > element and it
>>> >  > always use ShardedKey as a key for the GBK
>>> which
>>> > follows.
>>> >  >
>>> >  > I would like to generalize this and have a possibility
>>> to
>>> > provide some
>>> >  > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>> >  > What I am mainly after is, to have a possibility to
>>> > provide optimisation
>>> >  > for Flink runtime and pass in a special function which
>>> > generates shard
>>> >  > keys in a way that they are evenly spread among workers
>>> > (BEAM-5865).
>>> >  >
>>> >  > Would such extension for FileIO make sense? If yes, I
>>> > would create a
>>> >  > ticket for it and try to draft a PR.
>>> >  >
>>> >  > Best,
>>> >  > Jozef
>>> >
>>>
>>


Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

2019-04-30 Thread Jozef Vilcek
All right, I can test it out if I can. How to deploy pipeline on Flink
portable runner? Should I follow this to be able to do it?
https://beam.apache.org/documentation/runners/flink/

On Tue, Apr 30, 2019 at 4:05 PM Reuven Lax  wrote:

> In that case, Robert's point is quite valid. The old Flink runner I
> believe had no knowledge of fusion, which was known to make it extremely
> slow. A lot of work went into making the portable runner fusion aware, so
> we don't need to round trip through coders on every ParDo.
>
> Reuven
>
> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek 
> wrote:
>
>> It was not a portable Flink runner.
>>
>> Thanks all for the thoughts, I will create JIRAs, as suggested, with my
>> findings and send them out
>>
>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax  wrote:
>>
>>> Jozef did you use the portable Flink runner or the old one?
>>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw 
>>> wrote:
>>>
>>>> Thanks for starting this investigation. As mentioned, most of the work
>>>> to date has been on feature parity, not performance parity, but we're
>>>> at the point that the latter should be tackled as well. Even if there
>>>> is a slight overhead (and there's talk about integrating more deeply
>>>> with the Flume DAG that could elide even that) I'd expect it should be
>>>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>>>> sounds like the cloning via coders is is a huge drag that needs to be
>>>> addressed. I wonder if this is one of those cases where using the
>>>> portability framework could be a performance win (specifically, no
>>>> cloning would happen between operators of fused stages, and the
>>>> cloning between operators could be on the raw bytes[] (if needed at
>>>> all, because we know they wouldn't be mutated).
>>>>
>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles 
>>>> wrote:
>>>> >
>>>> > Specifically, a lot of shared code assumes that repeatedly setting a
>>>> timer is nearly free / the same cost as determining whether or not to set
>>>> the timer. ReduceFnRunner has been refactored in a way so it would be very
>>>> easy to set the GC timer once per window that occurs in a bundle, but
>>>> there's probably some underlying inefficiency around why this isn't cheap
>>>> that would be a bigger win.
>>>> >
>>>> > Kenn
>>>> >
>>>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax  wrote:
>>>> >>
>>>> >> I think the short answer is that folks working on the BeamFlink
>>>> runner have mostly been focused on getting everything working, and so have
>>>> not dug into this performance too deeply. I suspect that there is
>>>> low-hanging fruit to optimize as a result.
>>>> >>
>>>> >> You're right that ReduceFnRunner schedules a timer for each element.
>>>> I think this code dates back to before Beam; on Dataflow timers are
>>>> identified by tag, so this simply overwrites the existing timer which is
>>>> very cheap in Dataflow. If it is not cheap on Flink, this might be
>>>> something to optimize.
>>>> >>
>>>> >> Reuven
>>>> >>
>>>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek 
>>>> wrote:
>>>> >>>
>>>> >>> Hello,
>>>> >>>
>>>> >>> I am interested in any knowledge or thoughts on what should be / is
>>>> an overhead of running Beam pipelines instead of pipelines written on "bare
>>>> runner". Is this something which is being tested or investigated by
>>>> community? Is there a consensus in what bounds should the overhead
>>>> typically be? I realise this is very runner specific, but certain things
>>>> are imposed also by SDK model itself.
>>>> >>>
>>>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and found
>>>> very noticeable differences. I want to stress out, it was not a performance
>>>> test. Job does following:
>>>> >>>
>>>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors
>>>> -> Reshuffle -> Report counter.inc() to metrics for throughput
>>>> >>>
>>>> >>> Both jobs had same config

Re: [DISCUSS] Performance of Beam compare to "Bare Runner"

2019-04-30 Thread Jozef Vilcek
It was not a portable Flink runner.

Thanks all for the thoughts, I will create JIRAs, as suggested, with my
findings and send them out

On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax  wrote:

> Jozef did you use the portable Flink runner or the old one?
>
> Reuven
>
> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw 
> wrote:
>
>> Thanks for starting this investigation. As mentioned, most of the work
>> to date has been on feature parity, not performance parity, but we're
>> at the point that the latter should be tackled as well. Even if there
>> is a slight overhead (and there's talk about integrating more deeply
>> with the Flume DAG that could elide even that) I'd expect it should be
>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>> sounds like the cloning via coders is is a huge drag that needs to be
>> addressed. I wonder if this is one of those cases where using the
>> portability framework could be a performance win (specifically, no
>> cloning would happen between operators of fused stages, and the
>> cloning between operators could be on the raw bytes[] (if needed at
>> all, because we know they wouldn't be mutated).
>>
>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles  wrote:
>> >
>> > Specifically, a lot of shared code assumes that repeatedly setting a
>> timer is nearly free / the same cost as determining whether or not to set
>> the timer. ReduceFnRunner has been refactored in a way so it would be very
>> easy to set the GC timer once per window that occurs in a bundle, but
>> there's probably some underlying inefficiency around why this isn't cheap
>> that would be a bigger win.
>> >
>> > Kenn
>> >
>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax  wrote:
>> >>
>> >> I think the short answer is that folks working on the BeamFlink runner
>> have mostly been focused on getting everything working, and so have not dug
>> into this performance too deeply. I suspect that there is low-hanging fruit
>> to optimize as a result.
>> >>
>> >> You're right that ReduceFnRunner schedules a timer for each element. I
>> think this code dates back to before Beam; on Dataflow timers are
>> identified by tag, so this simply overwrites the existing timer which is
>> very cheap in Dataflow. If it is not cheap on Flink, this might be
>> something to optimize.
>> >>
>> >> Reuven
>> >>
>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek 
>> wrote:
>> >>>
>> >>> Hello,
>> >>>
>> >>> I am interested in any knowledge or thoughts on what should be / is
>> an overhead of running Beam pipelines instead of pipelines written on "bare
>> runner". Is this something which is being tested or investigated by
>> community? Is there a consensus in what bounds should the overhead
>> typically be? I realise this is very runner specific, but certain things
>> are imposed also by SDK model itself.
>> >>>
>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and found
>> very noticeable differences. I want to stress out, it was not a performance
>> test. Job does following:
>> >>>
>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors
>> -> Reshuffle -> Report counter.inc() to metrics for throughput
>> >>>
>> >>> Both jobs had same configuration and same state backed with same
>> checkpointing strategy. What I noticed from few simple test runs:
>> >>>
>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have
>> found out that ~50% time was spend either on removing timers from
>> HeapInternalTimerService or in java.io.ByteArrayOutputStream from
>> CoderUtils.clone()
>> >>>
>> >>> * problem with timer delete was addressed by FLINK-9423. I have
>> retested on Flink 1.7.2 and there was not much time is spend in timer
>> delete now, but root cause was not removed. It still remains that timers
>> are frequently registered and removed ( I believe from
>> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called
>> per processed element? )  which is noticeable in GC activity, Heap and
>> State ...
>> >>>
>> >>> * in Flink I use FileSystem state backed which keeps state in memory
>> CopyOnWriteStateTable which after some time is full of PaneInfo objects.
>> Maybe they come from PaneInfoTracker activity
>> >>>
>> >>> * Coder clone is painfull. Pure Flink job does copy between operators
>> too, in my case it is via Kryo.copy() but this is not noticeable in CPU
>> profile. Kryo.copy() does copy on object level not boject -> bytes ->
>> object which is cheaper
>> >>>
>> >>> Overall, my observation is that pure Flink can be roughly 3x faster.
>> >>>
>> >>> I do not know what I am trying to achieve here :) Probably just start
>> a discussion and collect thoughts and other experiences on the cost of
>> running some data processing on Beam and particular runner.
>> >>>
>>
>


[DISCUSS] Performance of Beam compare to "Bare Runner"

2019-04-29 Thread Jozef Vilcek
Hello,

I am interested in any knowledge or thoughts on what should be / is an
overhead of running Beam pipelines instead of pipelines written on "bare
runner". Is this something which is being tested or investigated by
community? Is there a consensus in what bounds should the overhead
typically be? I realise this is very runner specific, but certain things
are imposed also by SDK model itself.

I tested simple streaming pipeline on Flink vs Beam-Flink and found very
noticeable differences. I want to stress out, it was not a performance
test. Job does following:

Read Kafka -> Deserialize to Proto -> Filter deserialisation errors ->
Reshuffle -> Report counter.inc() to metrics for throughput

Both jobs had same configuration and same state backed with same
checkpointing strategy. What I noticed from few simple test runs:

* first run on Flink 1.5.0 from CPU profiles on one worker I have found out
that ~50% time was spend either on removing timers
from HeapInternalTimerService or in java.io.ByteArrayOutputStream from
CoderUtils.clone()

* problem with timer delete was addressed by FLINK-9423. I have retested on
Flink 1.7.2 and there was not much time is spend in timer delete now, but
root cause was not removed. It still remains that timers are frequently
registered and removed ( I believe
from ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is
called per processed element? )  which is noticeable in GC activity, Heap
and State ...

* in Flink I use FileSystem state backed which keeps state in memory
CopyOnWriteStateTable which after some time is full of PaneInfo objects.
Maybe they come from PaneInfoTracker activity

* Coder clone is painfull. Pure Flink job does copy between operators too,
in my case it is via Kryo.copy() but this is not noticeable in CPU profile.
Kryo.copy() does copy on object level not boject -> bytes -> object which
is cheaper

Overall, my observation is that pure Flink can be roughly 3x faster.

I do not know what I am trying to achieve here :) Probably just start a
discussion and collect thoughts and other experiences on the cost of
running some data processing on Beam and particular runner.


Re: Custom shardingFn for FileIO

2019-04-25 Thread Jozef Vilcek
Right now, sharding can be specified only via target `shardCount`, be it
user or runner. Next to configurable shardCount, I am proposing to be able
to pass also a function which will allow to the user (or runner) control
how is shard determined and what key will be used to represent it

interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
Serializable {
   ShardKeyT assign(DestinationT destination, UserT element, shardCount:
Integer);
}

Default implementation can be what is right now =>  random shard
encapsulated as ShardedKey.


On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax  wrote:

> If sharding is not specified, then the semantics are "runner-determined
> sharding." The DataflowRunner already takes advantage of this to impose its
> own sharding if the user hasn't specified an explicit one. Could the Flink
> runner do the same instead of pushing this to the users?
>
> On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels  wrote:
>
>> Hi Jozef,
>>
>> For sharding in FileIO there are basically two options:
>>
>> (1) num_shards ~= num_workers => bad spread of the load across workers
>> (2) num_shards >> num_workers => good spread of the load across workers,
>> but huge number of files
>>
>> Your approach would give users control over the sharding keys such that
>> they could be adjusted to spread load more evenly.
>>
>> I'd like to hear from Beam IO experts if that would make sense.
>>
>> Thanks,
>> Max
>>
>> On 25.04.19 08:52, Jozef Vilcek wrote:
>> > Hello,
>> >
>> > Right now, if someone needs sharded files via FileIO, there is only one
>> > option which is random (round robin) shard assignment per element and
>> it
>> > always use ShardedKey as a key for the GBK which follows.
>> >
>> > I would like to generalize this and have a possibility to provide some
>> > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>> > What I am mainly after is, to have a possibility to provide
>> optimisation
>> > for Flink runtime and pass in a special function which generates shard
>> > keys in a way that they are evenly spread among workers (BEAM-5865).
>> >
>> > Would such extension for FileIO make sense? If yes, I would create a
>> > ticket for it and try to draft a PR.
>> >
>> > Best,
>> > Jozef
>>
>


Custom shardingFn for FileIO

2019-04-25 Thread Jozef Vilcek
Hello,

Right now, if someone needs sharded files via FileIO, there is only one
option which is random (round robin) shard assignment per element and it
always use ShardedKey as a key for the GBK which follows.

I would like to generalize this and have a possibility to provide some
ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
What I am mainly after is, to have a possibility to provide optimisation
for Flink runtime and pass in a special function which generates shard keys
in a way that they are evenly spread among workers (BEAM-5865).

Would such extension for FileIO make sense? If yes, I would create a ticket
for it and try to draft a PR.

Best,
Jozef


[BEAM-6077] FlinkRunner: Make UnboundedSource state re-scale friendly

2018-11-20 Thread Jozef Vilcek
I want to reach out for opinions on what would be the best way to proceed
with https://issues.apache.org/jira/browse/BEAM-6077

The problem is, that when FlinkRunner job is being restored from
checkpoint, it needs to resurrect source and it's readers given the
checkpoint state. State element is represented by
`UnboundedSource.CheckpointMark` which does not tell much information.
Within CheckpointMark there might be already stored state per key, e.g. in
case of Kafka it is list of PartitionMarks having each partition_id and
offset.

UnboundedSource can create a reader per single CheckpointMark and reader
can produce single CheckpointMark from it's state. Now at rescale, number
of CheckpointMarks retrieved from state does not correspond to actual
parallelism. Merge or flatten needs to be invoked over list of marks read
from state. The question is, where such logic and knowledge should be.

It feels similar to UnboundedSource.split(parallelism, pipelineOptions) and
also maybe related somehow to SplittableDoFn logic. Not sure.

My question is:
1. Is there a way to achieve such splitting / merging of checkpoint mark
with current SDK?
2. If not and it make sense to add it where it would best go? Source?
3. Some other approach Beam rookie as me do not see?

Best,
Jozef


Re: Flink operator max parallelism and rescalable jobs

2018-11-16 Thread Jozef Vilcek
Hey Max, thanks for the pointer to UnboundedSourceWrapper.
I have created BEAM-6077 and will try to come up with the patch

On Fri, Nov 16, 2018 at 12:41 PM Maximilian Michels  wrote:

> Hi Jozef,
>
> The main blocker for rescaling Beam pipelines on Flink was the use of
> Key Group state. This splits each operator state additionally into N
> partitions, such that N * P = MAX_PARALLELISM, where P is the
> parallelism of the operator.
>
> This has largely been done. However, it is not complete. If you look at
> the way the UnboundedSourceWrapper snapshots its state, you will see
> that it does not support Key Groups. Thus, if you increase the
> parallelism, one of the new parallel instances of the operator will
> _not_ receive state and thus behave differently.
>
> I think we could migrate UnboundedSourceWrapper to KeyGroups and then
> also leverage spread of the Kafka partitions.
>
> Thanks,
> Max
>
> On 16.11.18 10:57, Jozef Vilcek wrote:
> > Hi,
> >
> > I want to collect some feedback on rescaling streaming Beam pipeline on
> > Flink runner. Flink seems to be able to re-scale jobs, which in Beam
> > terms means changing the parallelism in Beam. However, one have to make
> > sure that state can rescale as well to the predefined MAX parallelism.
> > Max parallelism must be set for job on FlinkRunner.
> >
> > Flink supports fiddling with max parallelism on global, environment and
> > operator level. Changes in operator level are not possible with beam. I
> > found this JIRA which seems to be inconclusive if changes in operator
> > parallelism make sense to adopt somehow in Beam
> > https://issues.apache.org/jira/browse/BEAM-68
> >
> > I did try to set max parallelism to environment via my local patch. My
> > job did launch and not crash like before when I bumped parallelism += 1.
> > But there was one drawback as far as I know. My test job reads from
> > kafka and after launching job from savepoint point, one partition does
> > not continue from offset in savepoint but according to what is defined
> > by auto.offset.reset (my case 'latest') which is not great.
> >
> > My questions:
> >
> > 1. Should re-scale work for beam if runner does support it or there can
> > be some incompatibilities in general depending on how particular runner
> > works
> >
> > 2. Did anyone have a success with Flink and rescale? Honestly, not sure
> > how well it behaves in native Flink. Never tried it
> >
> > 3. Why does kafka not redistribute stored partition offsets after
> > chenging parallelism?
> >
> > 4. Is BEAM-68 still relevant?
> >
> > Many thanks,
> > Jozef
>


Flink operator max parallelism and rescalable jobs

2018-11-16 Thread Jozef Vilcek
Hi,

I want to collect some feedback on rescaling streaming Beam pipeline on
Flink runner. Flink seems to be able to re-scale jobs, which in Beam terms
means changing the parallelism in Beam. However, one have to make sure that
state can rescale as well to the predefined MAX parallelism. Max
parallelism must be set for job on FlinkRunner.

Flink supports fiddling with max parallelism on global, environment and
operator level. Changes in operator level are not possible with beam. I
found this JIRA which seems to be inconclusive if changes in operator
parallelism make sense to adopt somehow in Beam
https://issues.apache.org/jira/browse/BEAM-68

I did try to set max parallelism to environment via my local patch. My job
did launch and not crash like before when I bumped parallelism += 1. But
there was one drawback as far as I know. My test job reads from kafka and
after launching job from savepoint point, one partition does not continue
from offset in savepoint but according to what is defined by
auto.offset.reset (my case 'latest') which is not great.

My questions:

1. Should re-scale work for beam if runner does support it or there can be
some incompatibilities in general depending on how particular runner works

2. Did anyone have a success with Flink and rescale? Honestly, not sure how
well it behaves in native Flink. Never tried it

3. Why does kafka not redistribute stored partition offsets after chenging
parallelism?

4. Is BEAM-68 still relevant?

Many thanks,
Jozef


Re: Unbalanced FileIO writes on Flink

2018-10-26 Thread Jozef Vilcek
Thanks for the JIRA. If I understand it correctly ... so runner determined
sharding will avoid extra shuffle? Will it just write worker local
available data to it's shard? Something similar to coalesce in Spark?

On Fri, Oct 26, 2018 at 11:26 AM Maximilian Michels  wrote:

> Oh ok, thanks for the pointer. Coming from Flink, the default is that
> the sharding is determined by the runtime distribution. Indeed, we will
> have to add an overwrite to the Flink Runner, similar to this one:
>
>
> https://github.com/apache/beam/commit/cbb922c8a72680c5b8b4299197b515abf650bfdf#diff-a79d5c3c33f6ef1c4894b97ca907d541R347
>
> Jira issue: https://issues.apache.org/jira/browse/BEAM-5865
>
> Thanks,
> Max
>
> On 25.10.18 22:37, Reuven Lax wrote:
> > FYI the Dataflow runner automatically sets the default number of shards
> > (I believe to be 2 * num_workers). Probably we should do something
> > similar for the Flink runner.
> >
> > This needs to be done by the runner, as # of workers is a runner
> > concept; the SDK itself has no concept of workers.
> >
> > On Thu, Oct 25, 2018 at 3:28 AM Jozef Vilcek  > <mailto:jozo.vil...@gmail.com>> wrote:
> >
> > If I do not specify shards for unbounded collection, I get
> >
> > Caused by: java.lang.IllegalArgumentException: When applying
> > WriteFiles to an unbounded PCollection, must specify number of
> > output shards explicitly
> >  at
> > org.apache.beam.repackaged.beam_sdks_java_core.com
> .google.common.base.Preconditions.checkArgument(Preconditions.java:191)
> >  at
> > org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
> >
> > Around same lines in WriteFiles is also a check for windowed writes.
> > I believe FileIO enables it explicitly when windowing is present. In
> > filesystem written files are per window and shard.
> >
> > On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels  > <mailto:m...@apache.org>> wrote:
> >
> > I agree it would be nice to keep the current distribution of
> > elements
> > instead of doing a shuffle based on an artificial shard key.
> >
> > Have you tried `withWindowedWrites()`? Also, why do you say you
> > need to
> > specify the number of shards in streaming mode?
> >
> > -Max
> >
> > On 25.10.18 10:12, Jozef Vilcek wrote:
> >  > Hm, yes, this makes sense now, but what can be done for my
> > case? I do
> >  > not want to end up with too many files on disk.
> >  >
> >  > I think what I am looking for is to instruct IO that do not
> > do again
> >  > random shard and reshuffle but just assume number of shards
> > equal to
> >  > number of workers and shard ID is a worker ID.
> >  > Is this doable in beam model?
> >  >
> >  > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels
> > mailto:m...@apache.org>
> >  > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >  >
> >  > The FlinkRunner uses a hash function (MurmurHash) on each
> > key which
> >  > places keys somewhere in the hash space. The hash space
> > (2^32) is split
> >  > among the partitions (5 in your case). Given enough keys,
> > the chance
> >  > increases they are equally spread.
> >  >
> >  > This should be similar to what the other Runners do.
> >  >
> >  > On 24.10.18 10:58, Jozef Vilcek wrote:
> >  >  >
> >  >  > So if I run 5 workers with 50 shards, I end up with:
> >  >  >
> >  >  > DurationBytes receivedRecords received
> >  >  >   2m 39s900 MB465,525
> >  >  >   2m 39s   1.76 GB930,720
> >  >  >   2m 39s789 MB407,315
> >  >  >   2m 39s   1.32 GB698,262
> >  >  >   2m 39s788 MB407,310
> >  >  >
> >  >  > Still not good but better than with 5 shards where
> > some workers
> >  > did not
> >  >  > participate at all.
> >  >  > So, problem is in some layer which 

Re: Unbalanced FileIO writes on Flink

2018-10-25 Thread Jozef Vilcek
If I do not specify shards for unbounded collection, I get

Caused by: java.lang.IllegalArgumentException: When applying WriteFiles to
an unbounded PCollection, must specify number of output shards explicitly
at
org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)

Around same lines in WriteFiles is also a check for windowed writes. I
believe FileIO enables it explicitly when windowing is present. In
filesystem written files are per window and shard.

On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels  wrote:

> I agree it would be nice to keep the current distribution of elements
> instead of doing a shuffle based on an artificial shard key.
>
> Have you tried `withWindowedWrites()`? Also, why do you say you need to
> specify the number of shards in streaming mode?
>
> -Max
>
> On 25.10.18 10:12, Jozef Vilcek wrote:
> > Hm, yes, this makes sense now, but what can be done for my case? I do
> > not want to end up with too many files on disk.
> >
> > I think what I am looking for is to instruct IO that do not do again
> > random shard and reshuffle but just assume number of shards equal to
> > number of workers and shard ID is a worker ID.
> > Is this doable in beam model?
> >
> > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels  > <mailto:m...@apache.org>> wrote:
> >
> > The FlinkRunner uses a hash function (MurmurHash) on each key which
> > places keys somewhere in the hash space. The hash space (2^32) is
> split
> > among the partitions (5 in your case). Given enough keys, the chance
> > increases they are equally spread.
> >
> > This should be similar to what the other Runners do.
> >
> > On 24.10.18 10:58, Jozef Vilcek wrote:
> >  >
> >  > So if I run 5 workers with 50 shards, I end up with:
> >  >
> >  > DurationBytes receivedRecords received
> >  >   2m 39s900 MB465,525
> >  >   2m 39s   1.76 GB930,720
> >  >   2m 39s789 MB407,315
> >  >   2m 39s   1.32 GB698,262
> >  >   2m 39s788 MB407,310
> >  >
> >  > Still not good but better than with 5 shards where some workers
> > did not
> >  > participate at all.
> >  > So, problem is in some layer which distributes keys / shards
> > among workers?
> >  >
> >  > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax  > <mailto:re...@google.com>
> >  > <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
> >  >
> >  > withNumShards(5) generates 5 random shards. It turns out that
> >  > statistically when you generate 5 random shards and you have 5
> >  > works, the probability is reasonably high that some workers
> > will get
> >  > more than one shard (and as a result not all workers will
> >  > participate). Are you able to set the number of shards larger
> > than 5?
> >  >
> >  > On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
> > mailto:jozo.vil...@gmail.com>
> >  > <mailto:jozo.vil...@gmail.com
> > <mailto:jozo.vil...@gmail.com>>> wrote:
> >  >
> >  > cc (dev)
> >  >
> >  > I tried to run the example with FlinkRunner in batch mode
> and
> >  > received again bad data spread among the workers.
> >  >
> >  > When I tried to remove number of shards for batch mode in
> > above
> >  > example, pipeline crashed before launch
> >  >
> >  > Caused by: java.lang.IllegalStateException: Inputs to
> Flatten
> >  > had incompatible triggers:
> >  >
> >
>  
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> >      > entCountAtLeast(1)),
> >  >
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> >  > hour,
> >  >
> >
>  
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> >  > rever(AfterPane.elementCountAtLeast(1)),
> >  >
> >
&g

Re: Unbalanced FileIO writes on Flink

2018-10-25 Thread Jozef Vilcek
Hm, yes, this makes sense now, but what can be done for my case? I do not
want to end up with too many files on disk.

I think what I am looking for is to instruct IO that do not do again random
shard and reshuffle but just assume number of shards equal to number of
workers and shard ID is a worker ID.
Is this doable in beam model?

On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels  wrote:

> The FlinkRunner uses a hash function (MurmurHash) on each key which
> places keys somewhere in the hash space. The hash space (2^32) is split
> among the partitions (5 in your case). Given enough keys, the chance
> increases they are equally spread.
>
> This should be similar to what the other Runners do.
>
> On 24.10.18 10:58, Jozef Vilcek wrote:
> >
> > So if I run 5 workers with 50 shards, I end up with:
> >
> > DurationBytes receivedRecords received
> >   2m 39s900 MB465,525
> >   2m 39s   1.76 GB930,720
> >   2m 39s789 MB407,315
> >   2m 39s   1.32 GB698,262
> >   2m 39s788 MB407,310
> >
> > Still not good but better than with 5 shards where some workers did not
> > participate at all.
> > So, problem is in some layer which distributes keys / shards among
> workers?
> >
> > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax  > <mailto:re...@google.com>> wrote:
> >
> > withNumShards(5) generates 5 random shards. It turns out that
> > statistically when you generate 5 random shards and you have 5
> > works, the probability is reasonably high that some workers will get
> > more than one shard (and as a result not all workers will
> > participate). Are you able to set the number of shards larger than 5?
> >
> > On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek  > <mailto:jozo.vil...@gmail.com>> wrote:
> >
> > cc (dev)
> >
> > I tried to run the example with FlinkRunner in batch mode and
> > received again bad data spread among the workers.
> >
> > When I tried to remove number of shards for batch mode in above
> > example, pipeline crashed before launch
> >
> > Caused by: java.lang.IllegalStateException: Inputs to Flatten
> > had incompatible triggers:
> >
>  
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> > entCountAtLeast(1)),
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> > hour,
> >
>  
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> > rever(AfterPane.elementCountAtLeast(1)),
> >
>  
> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(
> >
> >
> >
> >
> >
> > On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> > mailto:jozo.vil...@gmail.com>> wrote:
> >
> > Hi Max,
> >
> > I forgot to mention that example is run in streaming mode,
> > therefore I can not do writes without specifying shards.
> > FileIO explicitly asks for them.
> >
> > I am not sure where the problem is. FlinkRunner is only one
> > I used.
> >
> > On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
> > mailto:m...@apache.org>> wrote:
> >
> > Hi Jozef,
> >
> > This does not look like a FlinkRunner related problem,
> > but is caused by
> > the `WriteFiles` sharding logic. It assigns keys and
> > does a Reshuffle
> > which apparently does not lead to good data spread in
> > your case.
> >
> > Do you see the same behavior without `withNumShards(5)`?
> >
> > Thanks,
> > Max
> >
> > On 22.10.18 11:57, Jozef Vilcek wrote:
> >  > Hello,
> >  >
> >  > I am having some trouble to get a balanced write via
> > FileIO. Workers at
> >  > the shuffle side where data per window fire are
> > written to the
> >  > filesystem receive unbalanced number of events.
> >  >
> >  > Here

Re: KafkaIO - Deadletter output

2018-10-25 Thread Jozef Vilcek
what I ended up doing, when I could not for any reasono rely on kafka
timestamps, but need to parse them form message is:

* have a cusom kafka deserializer which never throws but returns message
which is either a success with parsed data structure plus timestamp or
failure with original kafka bytes payload
* timestamp policy than can extract timestamp in case of a success
deserialize result and in case of failure result, I am returning timestamp
of a last success message  ( in my case messages are not terribly out of
order and failures are rather rare )
* following ParDo then side output failures to dead letters


On Thu, Oct 25, 2018 at 8:54 AM Reuven Lax  wrote:

>
>
> On Wed, Oct 24, 2018, 10:26 PM Raghu Angadi  wrote:
>
>> Well, if every input record's timestamp is X, watermark staying at X is
>> the right answer, no? But I am not sure where the disagreement is,
>> actually. I might be mistaken.
>>
>> KafkaIO has a few in-built policies for watermark and timestamp that
>> cover most use cases (including server time, which has a benefit of
>> providing perfect watermark). It also gives fairly complete control on
>> these to users if they chose to. I think it looks like reasonable for a
>> policy to base its watermark only only on parsable records, and ignore
>> unparsable records w.r.t watermark calculation.
>>
>
> But then doesn't that force the user to set max allowed lateness to
> infinity, otherwise these records will be dropped?
>
> It could even assign a timestamp that makes more logical sense in a
>> particular application.
>>
>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles  wrote:
>>
>>> Forgive me if this is naive or missing something, but here are my
>>> thoughts on these alternatives:
>>>
>>> (0) Timestamp has to be pulled out in the source to control the
>>> watermark. Luke's point is imortant.
>>>
>>> (1) If bad records get min_timestamp, and they occur infrequently
>>> enough, then watermark will advance and they will all be dropped. That will
>>> not allow output to a dead-letter queue.
>>>
>>> (2) If you have always min_timestamp records, or if bad records are
>>> frequent, the watermark will never advance. So windows/aggregations would
>>> never be considered complete. Triggers could be used to get output anyhow,
>>> but it would never be a final answer. I think it is not in the spirit of
>>> Beam to work this way. Pragmatically, no state could ever be freed by a
>>> runner.
>>>
>>> In SQL there is an actual "dead letter" option when creating a table
>>> that parses from a bytes source. If, for example, a JSON record cannot be
>>> parsed to the expected schema - like maybe an avro record got in the
>>> stream, or the JSON doesn't match the expected schema - it is output as-is
>>> to a user-specified dead letter queue. I think this same level of support
>>> is also required for records that cannot have timestamps extracted in an
>>> unbounded source.
>>>
>>> In an SDF I think the function has enough control to do it all in
>>> "userland", so Cham is right on here.
>>>
>>> Kenn
>>>
>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik  wrote:
>>>
 That depends on the users pipeline and how watermark advancement of the
 source may impact elements becoming droppably late if they are emitted with
 the minimum timestamp.

 On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi 
 wrote:

> I see.
>
> What I meant was to return min_timestamp for bad records in the
> timestamp handler passed to KafkaIO itself, and correct timestamp for
> parsable records. That should work too, right?
>
> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik  wrote:
>
>> Yes, that would be fine.
>>
>> The user could then use a ParDo which outputs to a DLQ for things it
>> can't parse the timestamp for and use outputWithTimestamp[1] for 
>> everything
>> else.
>>
>> 1:
>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant-
>>
>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi 
>> wrote:
>>
>>> Thanks. So returning  min timestamp is OK, right (assuming
>>> application fine is with what it means)?
>>>
>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik 
>>> wrote:
>>>
 All records in Apache Beam have a timestamp. The default timestamp
 is the min timestamp defined here:
 https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48


 On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi 
 wrote:

>
>
> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik 
> wrote:
>
>> You would have to return min timestamp for all records otherwise
>> the watermark may have advanced and 

Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Jozef Vilcek
So if I run 5 workers with 50 shards, I end up with:

Duration Bytes received Records received
 2m 39s 900 MB 465,525
 2m 39s1.76 GB 930,720
 2m 39s 789 MB 407,315
 2m 39s1.32 GB 698,262
 2m 39s 788 MB 407,310

Still not good but better than with 5 shards where some workers did not
participate at all.
So, problem is in some layer which distributes keys / shards among workers?

On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax  wrote:

> withNumShards(5) generates 5 random shards. It turns out that
> statistically when you generate 5 random shards and you have 5 works, the
> probability is reasonably high that some workers will get more than one
> shard (and as a result not all workers will participate). Are you able to
> set the number of shards larger than 5?
>
> On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek 
> wrote:
>
>> cc (dev)
>>
>> I tried to run the example with FlinkRunner in batch mode and received
>> again bad data spread among the workers.
>>
>> When I tried to remove number of shards for batch mode in above example,
>> pipeline crashed before launch
>>
>> Caused by: java.lang.IllegalStateException: Inputs to Flatten had
>> incompatible triggers:
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>> entCountAtLeast(1)),
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>> hour,
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>> rever(AfterPane.elementCountAtLeast(1)),
>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(
>>
>>
>>
>>
>>
>> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek 
>> wrote:
>>
>>> Hi Max,
>>>
>>> I forgot to mention that example is run in streaming mode, therefore I
>>> can not do writes without specifying shards. FileIO explicitly asks for
>>> them.
>>>
>>> I am not sure where the problem is. FlinkRunner is only one I used.
>>>
>>> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels 
>>> wrote:
>>>
>>>> Hi Jozef,
>>>>
>>>> This does not look like a FlinkRunner related problem, but is caused by
>>>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>>>> which apparently does not lead to good data spread in your case.
>>>>
>>>> Do you see the same behavior without `withNumShards(5)`?
>>>>
>>>> Thanks,
>>>> Max
>>>>
>>>> On 22.10.18 11:57, Jozef Vilcek wrote:
>>>> > Hello,
>>>> >
>>>> > I am having some trouble to get a balanced write via FileIO. Workers
>>>> at
>>>> > the shuffle side where data per window fire are written to the
>>>> > filesystem receive unbalanced number of events.
>>>> >
>>>> > Here is a naive code example:
>>>> >
>>>> >  val read = KafkaIO.read()
>>>> >  .withTopic("topic")
>>>> >  .withBootstrapServers("kafka1:9092")
>>>> >  .withKeyDeserializer(classOf[ByteArrayDeserializer])
>>>> >  .withValueDeserializer(classOf[ByteArrayDeserializer])
>>>> >  .withProcessingTime()
>>>> >
>>>> >  pipeline
>>>> >  .apply(read)
>>>> >  .apply(MapElements.via(new
>>>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>>>> >override def apply(input: KafkaRecord[Array[Byte],
>>>> > Array[Byte]]): String = {
>>>> >  new String(input.getKV.getValue, "UTF-8")
>>>> >}
>>>> >  }))
>>>> >
>>>> >
>>>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>>>> >  .triggering(AfterWatermark.pastEndOfWindow()
>>>> >
>>>> .withEarlyFirings(AfterPane.elementCountAtLeast(4))
>>>> >
>>>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>>>> >
>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)),
>>>> >
>>>> >
>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))
>>>> >  .discardingFiredPanes()
>>>> >  .withAllowedLateness(Duration.standardDays(7)))
>>>> >
>>>> >  .apply(FileIO.write()
>>>> >  .via(TextIO.sink())
>>>> >  .withNaming(new SafeFileNaming(outputPath, ".txt"))
>>>> >  .withTempDirectory(tempLocation)
>>>> >  .withNumShards(5))
>>>> >
>>>> >
>>>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>>>> > number of shards), I would expect that each worker will participate
>>>> on
>>>> > persisting shards and equally, since code uses fixed number of shards
>>>> > (and random shard assign?). But reality is different (see 2
>>>> attachements
>>>> > - statistiscs from flink task reading from kafka and task writing to
>>>> files)
>>>> >
>>>> > What am I missing? How to achieve balanced writes?
>>>> >
>>>> > Thanks,
>>>> > Jozef
>>>> >
>>>> >
>>>>
>>>


Re: Unbalanced FileIO writes on Flink

2018-10-24 Thread Jozef Vilcek
cc (dev)

I tried to run the example with FlinkRunner in batch mode and received
again bad data spread among the workers.

When I tried to remove number of shards for batch mode in above example,
pipeline crashed before launch

Caused by: java.lang.IllegalStateException: Inputs to Flatten had
incompatible triggers:
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(4)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
entCountAtLeast(1)),
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
hour,
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
rever(AfterPane.elementCountAtLeast(1)),
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane(





On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek  wrote:

> Hi Max,
>
> I forgot to mention that example is run in streaming mode, therefore I can
> not do writes without specifying shards. FileIO explicitly asks for them.
>
> I am not sure where the problem is. FlinkRunner is only one I used.
>
> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels 
> wrote:
>
>> Hi Jozef,
>>
>> This does not look like a FlinkRunner related problem, but is caused by
>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>> which apparently does not lead to good data spread in your case.
>>
>> Do you see the same behavior without `withNumShards(5)`?
>>
>> Thanks,
>> Max
>>
>> On 22.10.18 11:57, Jozef Vilcek wrote:
>> > Hello,
>> >
>> > I am having some trouble to get a balanced write via FileIO. Workers at
>> > the shuffle side where data per window fire are written to the
>> > filesystem receive unbalanced number of events.
>> >
>> > Here is a naive code example:
>> >
>> >  val read = KafkaIO.read()
>> >  .withTopic("topic")
>> >  .withBootstrapServers("kafka1:9092")
>> >  .withKeyDeserializer(classOf[ByteArrayDeserializer])
>> >  .withValueDeserializer(classOf[ByteArrayDeserializer])
>> >  .withProcessingTime()
>> >
>> >  pipeline
>> >  .apply(read)
>> >  .apply(MapElements.via(new
>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>> >override def apply(input: KafkaRecord[Array[Byte],
>> > Array[Byte]]): String = {
>> >  new String(input.getKV.getValue, "UTF-8")
>> >}
>> >  }))
>> >
>> >
>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>> >  .triggering(AfterWatermark.pastEndOfWindow()
>> >  .withEarlyFirings(AfterPane.elementCountAtLeast(4))
>> >
>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>> >
>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)),
>> >
>> >
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))
>> >  .discardingFiredPanes()
>> >  .withAllowedLateness(Duration.standardDays(7)))
>> >
>> >  .apply(FileIO.write()
>> >  .via(TextIO.sink())
>> >  .withNaming(new SafeFileNaming(outputPath, ".txt"))
>> >  .withTempDirectory(tempLocation)
>> >  .withNumShards(5))
>> >
>> >
>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>> > number of shards), I would expect that each worker will participate on
>> > persisting shards and equally, since code uses fixed number of shards
>> > (and random shard assign?). But reality is different (see 2
>> attachements
>> > - statistiscs from flink task reading from kafka and task writing to
>> files)
>> >
>> > What am I missing? How to achieve balanced writes?
>> >
>> > Thanks,
>> > Jozef
>> >
>> >
>>
>


Re: Metrics Pusher support on Dataflow

2018-10-04 Thread Jozef Vilcek
Just curious here. What happens when this local JVM context dies for any
reason?  How does it work with DataFlow?

On Thu, Oct 4, 2018 at 1:50 AM Scott Wegner  wrote:

> Another point that we discussed at ApacheCon is that a difference between
> Dataflow and other runners is Dataflow is service-based and doesn't need a
> locally executing "driver" program. A local driver context is a good place
> to implement MetricsPusher because it is a singleton process.
>
> In fact, DataflowRunner supports PipelineResult.waitUntilFinish() [1],
> where we do maintain the local JVM context. Currently in this mode the
> runner polls the Dataflow service API for log messages [2]. It would be
> very easy to also poll for metric updates and push them out via
> MetricsPusher.
>
> [1]
> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java#L169
> [2]
> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java#L291
>
> On Wed, Oct 3, 2018 at 4:44 AM Etienne Chauchot 
> wrote:
>
>> Hi Scott,
>> Thanks for the update.
>> Both solutions look good to me. Though, they both have plus and minus. I
>> let the googlers chose which is more appropriate:
>>
>> - DAG modifcation: less intrusive in Dataflow but the DAG executed and
>> shown in the DAG UI in dataflow will contain an extra step that the user
>> might wonder about.
>> - polling thread: it is exactly what I did for the other runners, it is
>> more transparent to the user but requires more infra work (adds a container
>> that needs to be resilient)
>>
>> Best
>> Etienne
>>
>> Le vendredi 21 septembre 2018 à 12:46 -0700, Scott Wegner a écrit :
>>
>> Hi Etienne, sorry for the delay on this. I just got back from leave and
>> found this discussion.
>>
>> We haven't started implementing MetricsPusher in the Dataflow runner,
>> mostly because the Dataflow service has it's own rich Metrics REST API and
>> we haven't heard a need from Dataflow customers to push metrics to an
>> external backend. However, it would be nice to have this implemented across
>> all runners for feature parity.
>>
>> I read through the discussion in JIRA [1], and the simplest
>> implementation for Dataflow may be to have a single thread periodically
>> poll the Dataflow REST API [2] for latest metric values, and push them to a
>> configured sink. This polling thread could be hosted in a separate docker
>> container, within the worker process, or perhaps a ParDo with timers that
>> gets injected into the pipeline during graph translation.
>>
>> At any rate, I'm not aware of anybody currently working on this. But with
>> the Dataflow worker code being donated to Beam [3], soon it will be
>> possible for anybody to contribute.
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-3926
>> [2]
>> https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/getMetrics
>> [3]
>> https://lists.apache.org/thread.html/2bdc645659e2fbd7e29f3a2758941faefedb01148a2a11558dfe60f8@%3Cdev.beam.apache.org%3E
>>
>> On Fri, Aug 17, 2018 at 4:26 PM Lukasz Cwik  wrote:
>>
>> I forwarded your request to a few people who work on the internal parts
>> of Dataflow to see if they could help in some way.
>>
>> On Thu, Aug 16, 2018 at 6:22 AM Etienne Chauchot 
>> wrote:
>>
>> Hi all
>>
>> As we already discussed, it would be good to support Metrics Pusher [1]
>> in Dataflow (in other runners also, of course). Today, only Spark and Flink
>> support it. It requires a modification in C++ Dataflow code, so only Google
>> friends can do it.
>>
>> Is someone interested in doing it ?
>>
>> Here is the ticket https://issues.apache.org/jira/browse/BEAM-3926
>>
>> Besides, I wonder if this feature should be added to the capability
>> matrix.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/BEAM/Metrics+architecture+inside+the+runners
>>
>> Thanks
>> Etienne
>>
>>
>>
>>
>
> --
>
>
>
>
> Got feedback? tinyurl.com/swegner-feedback
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-23 Thread Jozef Vilcek
Just for reference, there is a JIRA open for
FileBasedSink.moveToOutputFiles()  and filesystem move behavior

https://issues.apache.org/jira/browse/BEAM-5036


On Wed, Aug 22, 2018 at 9:15 PM Tim Robertson 
wrote:

> Reuven, I think you might be on to something
>
> The Beam HadoopFileSystem copy() does indeed stream through the driver
> [1], and the FileBasedSink.moveToOutputFiles() seemingly uses that method
> [2].
> I'll cobble together a patched version to test using a rename() rather
> than a copy() and report back findings before we consider the implications.
>
> Thanks
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
> [2]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288
>
> On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson 
> wrote:
>
>> > Does HDFS support a fast rename operation?
>>
>> Yes. From the shell it is “mv” and in the Java API it is “rename(Path
>> src, Path dst)”.
>> I am not aware of a fast copy though. I think an HDFS copy streams the
>> bytes through the driver (unless a distcp is issued which is a MR job).
>>
>> (Thanks for engaging in this discussion folks)
>>
>>
>> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax  wrote:
>>
>>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>>> temporary files to the final destination and then delete the temp files.
>>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>>> instead of paying the cost of copying the files.
>>>
>>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax  wrote:
>>>
 Ismael, that should already be true. If not using dynamic destinations
 there might be some edges in the graph that are never used (i.e. no records
 are ever published on them), but that should not affect performance. If
 this is not the case we should fix it.

 Reuven

 On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía  wrote:

> Spark runner uses the Spark broadcast mechanism to materialize the
> side input PCollections in the workers, not sure exactly if this is
> efficient assigned in an optimal way but seems logical at least.
>
> Just wondering if we shouldn't better first tackle the fact that if
> the pipeline does not have dynamic destinations (this case) WriteFiles
> should not be doing so much extra magic?
>
> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
> >
> > Often only the metadata (i.e. temp file names) are shuffled, except
> in the "spilling" case (which should only happen when using dynamic
> destinations).
> >
> > WriteFiles depends heavily on side inputs. How are side inputs
> implemented in the Spark runner?
> >
> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw 
> wrote:
> >>
> >> Yes, I stand corrected, dynamic writes is now much more than the
> >> primitive window-based naming we used to have.
> >>
> >> It would be interesting to visualize how much of this codepath is
> >> metatada vs. the actual data.
> >>
> >> In the case of file writing, it seems one could (maybe?) avoid
> >> requiring a stable input, as shards are accepted as a whole (unlike,
> >> say, sinks where a deterministic uid is needed for deduplication on
> >> retry).
> >>
> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax 
> wrote:
> >> >
> >> > Robert - much of the complexity isn't due to streaming, but
> rather because WriteFiles supports "dynamic" output (where the user can
> choose a destination file based on the input record). In practice if a
> pipeline is not using dynamic destinations the full graph is still
> generated, but much of that graph is never used (empty PCollections).
> >> >
> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>
> >> >> I agree that this is concerning. Some of the complexity may have
> also
> >> >> been introduced to accommodate writing files in Streaming mode,
> but it
> >> >> seems we should be able to execute this as a single Map
> operation.
> >> >>
> >> >> Have you profiled to see which stages and/or operations are
> taking up the time?
> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >> >>  wrote:
> >> >> >
> >> >> > Hi folks,
> >> >> >
> >> >> > I've recently been involved in projects rewriting Avro files
> and have discovered a concerning performance trait in Beam.
> >> >> >
> >> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >> >
> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >> >  - Rewriting 1.5TB Avro file (small 

Re: Beam application upgrade on Flink crashes

2018-08-22 Thread Jozef Vilcek
Hm, I am sorry to hear this. I must of missed it in docs, that beam version
upgrades can break flink state. It is important information for ones
wanting to use Beam on Flink in production.

So, I guess there is no guarantee for another bump of Flink version to not
break things until it reach 1.7.
Event then, thinks can break maybe? Is there a plan making Flink runner
more robust and catch compatibility issues early by tests?

Just trying to figure out my options with upgrades. Does other runners
suffer the same weak guarantees?


On Tue, Aug 21, 2018 at 9:25 PM Stephan Ewen  wrote:

> Flink 1.7 will change the way the "restore serializer" is handled, which
> should make it much easier to handle such cases.
> Especially breaking java class version format will not be an issue anymore.
>
> That should help to make it easier to give the Beam-on-Flink runner cross
> version compatibility.
>
>
> On Mon, Aug 20, 2018 at 6:46 PM, Maximilian Michels 
> wrote:
>
>> AFAIK the serializer used here is the CoderTypeSerializer which may not
>> be recoverable because of changes to the contained Coder
>> (TaggedKvCoder). It doesn't currently have a serialVersionUID, so even
>> small changes could break serialization backwards-compatibility.
>>
>> As of now Beam doesn't offer the same upgrade guarantees as Flink [1].
>> This should be improved for the next release.
>>
>> Thanks,
>> Max
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>>
>> On 20.08.18 17:46, Stephan Ewen wrote:
>> > Hi Jozef!
>> >
>> > When restoring state, the serializer that created the state must still
>> > be available, so the state can be read.
>> > It looks like some serializer classes were removed between Beam versions
>> > (or changed in an incompatible manner).
>> >
>> > Backwards compatibility of an operator implementation needs cooperation
>> > from the operator. Withing Flink itself, when we change the way an
>> > operator uses state, we keep the old codepath and classes in a
>> > "backwards compatibility restore" that takes the old state and brings it
>> > into the shape of the new state.
>> >
>> > I am not deeply into the of how Beam and the Flink runner implement
>> > their use of state, but it looks this part is not present, which could
>> > mean that savepoints taken from Beam applications are not backwards
>> > compatible.
>> >
>> >
>> > On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek > > <mailto:jozo.vil...@gmail.com>> wrote:
>> >
>> > Hello,
>> >
>> > I am attempting to upgrade  Beam app from 2.5.0 running on Flink
>> > 1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any
>> > state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just
>> > starting a new App with updated libs from Flink save-point captured
>> > by previous version of the app.
>> >
>> > There is not change in topology. Job is accepted without error to
>> > the new cluster which suggests that all operators are matched with
>> > state based on IDs. However, app runs only few seccons and then
>> > crash with:
>> >
>> > java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> >   at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>> >   at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>> >   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>> >   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>> >   at java.lang.Thread.run(Thread.java:745)
>> > Caused by: org.apache.flink.util.FlinkException: Could not restore
>> operator state backend for
>> DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1
>> provided restore options.
>> >   at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>> >   at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
>> >   at
>> org.

Beam application upgrade on Flink crashes

2018-08-20 Thread Jozef Vilcek
Hello,

I am attempting to upgrade  Beam app from 2.5.0 running on Flink 1.4.0 to
Beam 2.6.0 running on Flink 1.5.0. I am not aware of any state migration
changes needed for Flink 1.4.0 -> 1.5.0 so I am just starting a new App
with updated libs from Flink save-point captured by previous version of the
app.

There is not change in topology. Job is accepted without error to the new
cluster which suggests that all operators are matched with state based on
IDs. However, app runs only few seccons and then crash with:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore
operator state backend for
DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the
1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
... 5 more
Caused by: java.io.IOException: Unable to restore operator state
[bundle-buffer-tag]. The previous serializer of the operator state
must be present; the serializer could have been removed from the
classpath, or its implementation have changed and could not be loaded.
This is a temporary restriction that will be fixed in future versions.
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more


Does this mean anything to anyone? Am I doing anything wrong or did
FlinkRunner change in some way? The mentioned "bundle-buffer-tag" seems to
be too deep internal in runner for my reach.

Any help is much appreciated.

Best,
Jozo


Git export-ignore for gradle

2018-08-20 Thread Jozef Vilcek
Hello,

this commit added export-ignore for some of the gradle stuff
https://github.com/apache/beam/commit/2a0f68b0c743d37c46486b81500043b4b420c825

This means that downloaded zip archive of git repository is not build-able
via 'gradlew` command. I am curious about the rationale behind this feature.

Best,
Jozef


Re: [PROPOSAL] Prepare Beam 2.6.0 release

2018-07-29 Thread Jozef Vilcek
Hello, is there a change this bug make it into the release?

https://issues.apache.org/jira/browse/BEAM-5028

On Sat, Jul 28, 2018 at 1:38 AM Pablo Estrada  wrote:

> Hello all,
> I will start daily updates of progress on the 2.6.0 release.
> As of today, the main release blockers are issues in Dataflow that are
> preventing us from cutting the Dataflow workers.
>
>- One issue in Java SDK related to FnAPI. Specifically PR 5709
>requires Dataflow worker changes[1].
>- One issue in the Python SDK related to context management. PR 5356
>also requires Dataflow worker changes [2].
>
> Please reach out to me if you have any questions.
> Best
> -P.
>
> [1] https://github.com/apache/beam/pull/5709
> [2] https://github.com/apache/beam/pull/5356
>
>
> On Thu, Jul 26, 2018 at 2:16 PM Pablo Estrada  wrote:
>
>> Hello everyone,
>> I wanted to do an update on the state of the release, as there haven't
>> been news on this for a while.
>> We have found a few issues that broke postcommits a few weeks back, but
>> we hadn't noticed. Some people are tacking these to try to stabilize the
>> release branch[1].
>>
>> In the meantime, the release has been blocked, but Boyuan Zhang has taken
>> advantage of this to code up a few scripts to try and automate release
>> steps. (Thanks Boyuan!). We will try these as soon as the release is
>> unblocked.
>>
>> Best
>> -P.
>>
>> [1] https://github.com/apache/beam/pull/6072
>>
>> On Wed, Jul 18, 2018 at 11:03 AM Pablo Estrada 
>> wrote:
>>
>>> Hello all!
>>> I've cut the release branch (release-2.6.0), with some help from Ahmet
>>> and Boyuan. From now on, please cherry-pick 2.6.0 blockers into the branch.
>>> Now we start stabilizing it.
>>>
>>> Thanks!
>>>
>>> -P.
>>>
>>> On Tue, Jul 17, 2018 at 9:34 PM Jean-Baptiste Onofré 
>>> wrote:
>>>
 Hi Pablo,

 I'm investigating this issue, but it's a little long process.

 So, I propose you start with the release process,  cutting the branch,
 and then, I will create a cherry-pick PR for this one.

 Regards
 JB

 On 17/07/2018 20:19, Pablo Estrada wrote:
 > Checking once more:
 > What does the communitythink we should do
 > about https://issues.apache.org/jira/browse/BEAM-4750 ? Should I
 bump it
 > to 2.7.0?
 > Best
 > -P.
 >
 > On Fri, Jul 13, 2018 at 5:15 PM Ahmet Altay >>> > > wrote:
 >
 > Update:  https://issues.apache.org/jira/browse/BEAM-4784 is not a
 > release blocker, details in the JIRA issue.
 >
 > On Fri, Jul 13, 2018 at 11:12 AM, Thomas Weise >>> > > wrote:
 >
 > Can one of our Python experts please take a look
 > at https://issues.apache.org/jira/browse/BEAM-4784 and
 advise if
 > this should be addressed for the release?
 >
 > Thanks,
 > Thomas
 >
 >
 > On Fri, Jul 13, 2018 at 11:02 AM Ahmet Altay <
 al...@google.com
 > > wrote:
 >
 >
 >
 > On Fri, Jul 13, 2018 at 10:48 AM, Pablo Estrada
 > mailto:pabl...@google.com>> wrote:
 >
 > Hi all,
 > I've triaged most issues marked for 2.6.0 release.
 I've
 > localized two that need a decision / attention:
 >
 > - https://issues.apache.org/jira/browse/BEAM-4417 -
 > Bigquery IO Numeric Datatype Support. Cham is not
 > available to fix this at the moment, but this is a
 > critical issue. Is anyone able to tackle this / should
 > we bump this to next release?
 >
 >
 > I bumped this to the next release. I think Cham will be
 the
 > best person to address it when he is back. And with the
 > regular release cadence, it would not be delayed by much.
 >
 >
 >
 > - https://issues.apache.org/jira/browse/BEAM-4750 -
 > Performance degradation due to some safeguards in
 > beam-sdks-java-core. JB, are you looking to fix this?
 > Should we bump? I had the impression that it was an
 easy
 > fix, but I'm not sure.
 >
 > If you're aware of any other issue that needs to be
 > included as a release blocker, please report it to me.
 > Best
 > -P.
 >
 > On Thu, Jul 12, 2018 at 2:15 AM Etienne Chauchot
 > mailto:echauc...@apache.org>>
 wrote:
 >
 > +1,
 >
 > Thanks for volunteering Pablo, thanks also to have
 > caught tickets that I forgot to close :)
 >
 > Etienne
 >
 

Re: FileBasedSink.WriteOperation copy instead of move?

2018-07-26 Thread Jozef Vilcek
Yes, rename can be tricky with cross-directory. This is related
https://issues.apache.org/jira/browse/BEAM-4861
I guess I can file a JIRA for this, right?

On Thu, Jul 26, 2018 at 7:31 PM Chamikara Jayalath 
wrote:

> Also, we'll have to use StandardMoveOptions.IGNORE_MISSING_FILES for
> supporting failures of the rename step. I think this is a good change to do
> if the change significantly improves the performance of some of the
> FileSystems (note that some FileSystems, for example GCS, implement rename
> in the form of a copy+delete, so there will be no significant performance
> improvements for such FileSystems).
>
> -Cham
>
> On Thu, Jul 26, 2018 at 10:14 AM Reuven Lax  wrote:
>
>> We might be able to replace this with Filesystem.rename(). One thing to
>> keep in mind - the destination files might be in a different directory, so
>> we would need to make sure that all Filesystems support cross-directory
>> rename.
>>
>> On Thu, Jul 26, 2018 at 9:58 AM Lukasz Cwik  wrote:
>>
>>> +dev
>>>
>>> On Thu, Jul 26, 2018 at 2:40 AM Jozef Vilcek 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> just came across FileBasedSink.WriteOperation class which does have
>>>> moveToOutput() method. Implementation does a Filesystem.copy() instead of
>>>> "move". With large files I find it quote no efficient if underlying FS
>>>> supports more efficient ways, so I wonder what is the story behind it? Must
>>>> it be a copy?
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L761
>>>>
>>>