Re: End to end unit tests for stateful pipeline

2021-06-14 Thread gaurav mishra
Hi Luke,
I tried going down the path which you suggested but hitting some
roadblocks. Maybe I am doing something wrong. As you said I created a unit
test specific class for PipelineOptions, created a TestRedisFactory which
is setup to return a mock instance of RedisClient. In my test code I have
 ```
options = PipelineOptionsFactory.as(TestPipelineOptions.class);
// get instance of TestRedisClient which is serializable
RedisClient client = options.getRedisClient();
// some code to setup mocked interactions
pipeline.run(options);
```

In my DoFn I have
```
ProductionPipelineOptions pipelineOptions =
context.getPipelineOptions().as(ProductionPipelineOptions.class);

// get instance of RealRedisClient
RedisClient redisClient = pipelineOptions.getRedisClient();
redisClient.get(key)
```
In unit test my options is getting serialized along with the
TestRedisClient inside it. But when my DoFn is being called the framework
tries to deserialize the string representation of `TestRedisClient` to
`something that implements RedisClient` and this is where I am getting
stuck. Not able to wrap my head around how to tell the framework to
deserialize the string to TestRedisClient and return that in my DoFn.

On Mon, Jun 14, 2021 at 2:07 PM Luke Cwik  wrote:

> You can create a PipelineOption which represents your Redis client object.
> For tests you would set the PipelineOption to a serializable fake/mock that
> can replay the results you want. The default for the PipelineOption object
> would instantiate your production client. You can see an example usage of
> the DefaultValueFactory here[1].
>
> 1:
> https://github.com/apache/beam/blob/5cebe0fd82ade3f957fe70e25aa3e399d2e91b32/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java#L71
>
> On Mon, Jun 14, 2021 at 10:54 AM gaurav mishra <
> gauravmishra.it...@gmail.com> wrote:
>
>> Hi,
>> I have a streaming pipeline which reads from pubsub, enriches data using
>> redis and finally writes to pubsub. The code has some stateful DoFns with
>> timers. I wanted to write unit tests for the whole pipeline, that reads
>> from TestStream<> , enriches data using a mocked redis client, and writes
>> data to a PCollection on which I can do PAsserts. The trouble I am having
>> here is how to set up the mocked redis client. Are there any examples that
>> I can take a look at? I am using java with junit4 as a testing framework.
>> More details about my code are here -
>> https://stackoverflow.com/questions/67963189/unit-tests-apache-beam-stateful-pipeline-with-external-dependencies
>>
>


Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Robert Bradshaw
Awesome, thanks!

On Mon, Jun 14, 2021 at 5:36 PM Evan Galpin  wrote:
>
> I’ll try to create something as small as possible from the pipeline I 
> mentioned  I should have time this week to do so.
>
> Thanks,
> Evan
>
> On Mon, Jun 14, 2021 at 18:09 Robert Bradshaw  wrote:
>>
>> Is it possible to post the code? (Or the code of a similar, but
>> minimal, pipeline that exhibits the same issues?)
>>
>> On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin  wrote:
>> >
>> > @robert I have a pipeline which consistently shows a major slowdown (10 
>> > seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be 
>> > boiled down to:
>> >
>> > - Read GCS file patterns from PubSub
>> > - Window into Fixed windows (repeating every 15 seconds)
>> > - Deduplicate/distinct (have tried both)
>> > - Read GCS blobs via patterns from the first step
>> > - Write file contents to sink
>> >
>> > It doesn't seem to matter if there are 0 messages in a subscription or 50k 
>> > messages at startup. The rate of new messages however is very low. Not 
>> > sure if those are helpful details, let me know if there's anything else 
>> > specific which would help.
>> >
>> > On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw  
>> > wrote:
>> >>
>> >> +1, we'd really like to get to the bottom of this, so clear
>> >> instructions on a pipeline/conditions that can reproduce it would be
>> >> great.
>> >>
>> >> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský  wrote:
>> >> >
>> >> > Hi Eddy,
>> >> >
>> >> > you are probably hitting a not-yet discovered bug in SDF implementation 
>> >> > in FlinkRunner that (under some currently unknown conditions) seems to 
>> >> > stop advancing the watermark. This has been observed in one other 
>> >> > instance (that I'm aware of). I think we don't yet have a tracking JIRA 
>> >> > for that, would you mind filling it? It would be awesome if you could 
>> >> > include estimations of messages per sec throughput that causes the 
>> >> > issue in your case.
>> >> >
>> >> > +Tobias Kaymak
>> >> >
>> >> > Tobias, could you please confirm that the case you had with Flink 
>> >> > stopping progressing watermark resembled this one?
>> >> >
>> >> > Thanks.
>> >> >
>> >> >  Jan
>> >> >
>> >> > On 6/14/21 4:11 PM, Eddy G wrote:
>> >> >
>> >> > Hi Jan,
>> >> >
>> >> > I've added --experiments=use_deprecated_read and it seems to work 
>> >> > flawlessly (with my current Window and the one proposed by Evan).
>> >> >
>> >> > Why is this? Do Splittable DoFn now break current implementations? Are 
>> >> > there any posts of possible breaking changes?
>> >> >
>> >> > On 2021/06/14 13:19:39, Jan Lukavský  wrote:
>> >> >
>> >> > Hi Eddy,
>> >> >
>> >> > answers inline.
>> >> >
>> >> > On 6/14/21 3:05 PM, Eddy G wrote:
>> >> >
>> >> > Hi Jan,
>> >> >
>> >> > Thanks for replying so fast!
>> >> >
>> >> > Regarding your questions,
>> >> >
>> >> > - "Does your data get buffered in a state?"
>> >> > Yes, I do have a state within a stage prior ParquetIO writing together 
>> >> > with a Timer with PROCESSING_TIME.
>> >> >
>> >> > The stage which contains the state does send bytes to the next one 
>> >> > which is the ParquetIO writing. Seems the @OnTimer doesn't get 
>> >> > triggered and it's not clearing the state. This however does work under 
>> >> > normal circumstances without having too much data queued waiting to be 
>> >> > processed.
>> >> >
>> >> > OK, this suggests, that the watermark is for some reason "stuck". If you
>> >> > checkpoints enabled, you should see the size of the checkpoint to grow
>> >> > over time.
>> >> >
>> >> > - "Do you see watermark being updated in your Flink WebUI?"
>> >> > The stages that do have a watermark don't get updated. The same 
>> >> > watermark value has been constant since the pipeline started.
>> >> >
>> >> > If no lateness is set, any late data should be admitted right?
>> >> >
>> >> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
>> >> > means that data that arrive after end-of-window will be dropped.
>> >> >
>> >> > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, 
>> >> > neither in Flink UI or Prometheus. I've seen it in Dataflow but seems 
>> >> > to be a Dataflow specific metric right?
>> >> >
>> >> > Should not be Dataflow specific. But if you don't see it, it means it
>> >> > could be zero. So, we can rule this out.
>> >> >
>> >> > We're using KinesisIO for reading messages.
>> >> >
>> >> > Kinesis uses UnboundedSource, which is expended to SDF starting from
>> >> > Beam 2.25.0. The flag should change that as well. Can you try the
>> >> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
>> >> > (should not contain Impulse transform at the beginning) and if it solves
>> >> > your issues?
>> >> >
>> >> > On 2021/06/14 12:48:58, Jan Lukavský  wrote:
>> >> >
>> >> > Hi Eddy,
>> >> >
>> >> > does your data get buffered in a state - e.g. does the size of the state
>> >> > grow over time? Do you see watermark 

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
I’ll try to create something as small as possible from the pipeline I
mentioned  I should have time this week to do so.

Thanks,
Evan

On Mon, Jun 14, 2021 at 18:09 Robert Bradshaw  wrote:

> Is it possible to post the code? (Or the code of a similar, but
> minimal, pipeline that exhibits the same issues?)
>
> On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin  wrote:
> >
> > @robert I have a pipeline which consistently shows a major slowdown (10
> seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be
> boiled down to:
> >
> > - Read GCS file patterns from PubSub
> > - Window into Fixed windows (repeating every 15 seconds)
> > - Deduplicate/distinct (have tried both)
> > - Read GCS blobs via patterns from the first step
> > - Write file contents to sink
> >
> > It doesn't seem to matter if there are 0 messages in a subscription or
> 50k messages at startup. The rate of new messages however is very low. Not
> sure if those are helpful details, let me know if there's anything else
> specific which would help.
> >
> > On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw 
> wrote:
> >>
> >> +1, we'd really like to get to the bottom of this, so clear
> >> instructions on a pipeline/conditions that can reproduce it would be
> >> great.
> >>
> >> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský  wrote:
> >> >
> >> > Hi Eddy,
> >> >
> >> > you are probably hitting a not-yet discovered bug in SDF
> implementation in FlinkRunner that (under some currently unknown
> conditions) seems to stop advancing the watermark. This has been observed
> in one other instance (that I'm aware of). I think we don't yet have a
> tracking JIRA for that, would you mind filling it? It would be awesome if
> you could include estimations of messages per sec throughput that causes
> the issue in your case.
> >> >
> >> > +Tobias Kaymak
> >> >
> >> > Tobias, could you please confirm that the case you had with Flink
> stopping progressing watermark resembled this one?
> >> >
> >> > Thanks.
> >> >
> >> >  Jan
> >> >
> >> > On 6/14/21 4:11 PM, Eddy G wrote:
> >> >
> >> > Hi Jan,
> >> >
> >> > I've added --experiments=use_deprecated_read and it seems to work
> flawlessly (with my current Window and the one proposed by Evan).
> >> >
> >> > Why is this? Do Splittable DoFn now break current implementations?
> Are there any posts of possible breaking changes?
> >> >
> >> > On 2021/06/14 13:19:39, Jan Lukavský  wrote:
> >> >
> >> > Hi Eddy,
> >> >
> >> > answers inline.
> >> >
> >> > On 6/14/21 3:05 PM, Eddy G wrote:
> >> >
> >> > Hi Jan,
> >> >
> >> > Thanks for replying so fast!
> >> >
> >> > Regarding your questions,
> >> >
> >> > - "Does your data get buffered in a state?"
> >> > Yes, I do have a state within a stage prior ParquetIO writing
> together with a Timer with PROCESSING_TIME.
> >> >
> >> > The stage which contains the state does send bytes to the next one
> which is the ParquetIO writing. Seems the @OnTimer doesn't get triggered
> and it's not clearing the state. This however does work under normal
> circumstances without having too much data queued waiting to be processed.
> >> >
> >> > OK, this suggests, that the watermark is for some reason "stuck". If
> you
> >> > checkpoints enabled, you should see the size of the checkpoint to grow
> >> > over time.
> >> >
> >> > - "Do you see watermark being updated in your Flink WebUI?"
> >> > The stages that do have a watermark don't get updated. The same
> watermark value has been constant since the pipeline started.
> >> >
> >> > If no lateness is set, any late data should be admitted right?
> >> >
> >> > If no lateness is set, it means allowed lateness of Duration.ZERO,
> which
> >> > means that data that arrive after end-of-window will be dropped.
> >> >
> >> > Regarding 'droppedDueToLateness' metric, can't see it exposed
> anywhere, neither in Flink UI or Prometheus. I've seen it in Dataflow but
> seems to be a Dataflow specific metric right?
> >> >
> >> > Should not be Dataflow specific. But if you don't see it, it means it
> >> > could be zero. So, we can rule this out.
> >> >
> >> > We're using KinesisIO for reading messages.
> >> >
> >> > Kinesis uses UnboundedSource, which is expended to SDF starting from
> >> > Beam 2.25.0. The flag should change that as well. Can you try the
> >> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
> >> > (should not contain Impulse transform at the beginning) and if it
> solves
> >> > your issues?
> >> >
> >> > On 2021/06/14 12:48:58, Jan Lukavský  wrote:
> >> >
> >> > Hi Eddy,
> >> >
> >> > does your data get buffered in a state - e.g. does the size of the
> state
> >> > grow over time? Do you see watermark being updated in your Flink
> WebUI?
> >> > When a stateful operation (and GroupByKey is a stateful operation)
> does
> >> > not output any data, the first place to look at is if watermark
> >> > correctly progresses. If it does not progress, then the input data
> must
> >> > be buffered in state and the size of the 

CANCELLED: call already cancelled

2021-06-14 Thread Trevor Kramer
Hello,

I am trying to run a Beam pipeline on Flink using EMR. I am consistently
getting these errors. I found a reference to a bug report that said this
issue was fixed in 1.11. I am using 1.12.1.

Caused by: org.apache.beam.vendor.grpc.v1p36p0.io.grpc.
StatusRuntimeException: CANCELLED: call already cancelled. Use
ServerCallStreamObserver.setOnCancelHandler() to disable this exception
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Status
.asRuntimeException(Status.java:526)
at org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.
ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:351)
at org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(
DirectStreamObserver.java:98)
at org.apache.beam.sdk.fn.data.
BeamFnDataSizeBasedBufferingOutboundObserver.flush(
BeamFnDataSizeBasedBufferingOutboundObserver.java:103)
at org.apache.beam.sdk.fn.data.
BeamFnDataSizeBasedBufferingOutboundObserver.accept(
BeamFnDataSizeBasedBufferingOutboundObserver.java:115)
at org.apache.beam.runners.fnexecution.control.
SdkHarnessClient$CountingFnDataReceiver.accept(SdkHarnessClient.java:718)
at org.apache.beam.runners.flink.translation.functions.
FlinkExecutableStageFunction.processElements(FlinkExecutableStageFunction
.java:362)
at org.apache.beam.runners.flink.translation.functions.
FlinkExecutableStageFunction.mapPartition(FlinkExecutableStageFunction.java:
267)
at org.apache.flink.runtime.operators.MapPartitionDriver.run(
MapPartitionDriver.java:113)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:
357)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)

Is there a more solid runner for running Beam jobs in an AWS environment?

Thanks,

Trevor


Re: How avoid blocking when decompressing large GZIP files.

2021-06-14 Thread Luke Cwik
Try adding a Reshuffle transform to the pipeline after the ParDo that gives
the sequence number. This will cause the data to be materialized and then
the subsequent steps happen in parallel.

Depending on which IO transform you are using and if splitting support is
ever added for compressed files and splitting support is added for CSV
files then your pipeline might be broken since "multiple" file segments
will start from 1 and count up.

There is some advanced support for having line numbers along with the data
within ContextualTextIO which might be of interest to you as a replacement
for your implementation.


On Fri, Apr 23, 2021 at 5:10 AM Evan Galpin  wrote:

> Hmm in my somewhat limited experience, I was not able to combine state and
> Splittable DoFn. Definitely could be user error on my part though.
>
> RE sequence numbers, could it work to embed those numbers in the CSV
> itself?
>
> Thanks,
> Evan
>
> On Fri, Apr 23, 2021 at 07:55 Simon Gauld  wrote:
>
>> Thank you and I will have a look however some concerns I have
>>
>> - the gzip itself is not splittable as such
>> - I need to apply a sequence number 1..n so I believe the read *must* be
>> sequential
>>
>> However what I am looking to achieve is handing off the newly decorated
>> row as soon as the sequence is applied to it.   The issue is that the
>> entire step of applying the sequence number appear to be blocking. Also of
>> note, I am using a @DoFn.StateId.
>>
>> I'll look at SplittableDoFns, thanks.
>>
>>
>> On Fri, Apr 23, 2021 at 12:50 PM Evan Galpin 
>> wrote:
>>
>>> I could be wrong but I believe that if your large file is being read by
>>> a DoFn, it’s likely that the file is being processed atomically inside that
>>> DoFn, which cannot be parallelized further by the runner.
>>>
>>> One purpose-built way around that constraint is by using Splittable
>>> DoFn[1][2] which could be used to allow each split to read a portion of the
>>> file. I don’t know, however, how this might (or might not) work with
>>> compression.
>>>
>>> [1]
>>> https://beam.apache.org/blog/splittable-do-fn-is-available/
>>> [2]
>>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Fri, Apr 23, 2021 at 07:34 Simon Gauld  wrote:
>>>
 Hello,

 I am trying to apply a transformation to each row in a reasonably large
 (1b row) gzip compressed CSV.

 The first operation is to assign a sequence number, in this case 1,2,3..

 The second operation is the actual transformation.

 I would like to apply the sequence number *as* each row is read from
 the compressed source and then hand off the 'real' transformation work in
 parallel, using DataFlow to autoscale the workers for the transformation.

 I don't seem to be able to scale *until* all rows have been read; this
 appears to be blocking the pipeline until decompression of the entire file
 is completed.   At this point DataFlow autoscaling works as expected, it
 scales upwards and throughput is then high. The issue is the decompression
 appears to block.

 My question: in beam, is it possible to stream records from a
 compressed source? without blocking the pipeline?

 thank you

 .s




Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Robert Bradshaw
Is it possible to post the code? (Or the code of a similar, but
minimal, pipeline that exhibits the same issues?)

On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin  wrote:
>
> @robert I have a pipeline which consistently shows a major slowdown (10 
> seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be 
> boiled down to:
>
> - Read GCS file patterns from PubSub
> - Window into Fixed windows (repeating every 15 seconds)
> - Deduplicate/distinct (have tried both)
> - Read GCS blobs via patterns from the first step
> - Write file contents to sink
>
> It doesn't seem to matter if there are 0 messages in a subscription or 50k 
> messages at startup. The rate of new messages however is very low. Not sure 
> if those are helpful details, let me know if there's anything else specific 
> which would help.
>
> On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw  wrote:
>>
>> +1, we'd really like to get to the bottom of this, so clear
>> instructions on a pipeline/conditions that can reproduce it would be
>> great.
>>
>> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský  wrote:
>> >
>> > Hi Eddy,
>> >
>> > you are probably hitting a not-yet discovered bug in SDF implementation in 
>> > FlinkRunner that (under some currently unknown conditions) seems to stop 
>> > advancing the watermark. This has been observed in one other instance 
>> > (that I'm aware of). I think we don't yet have a tracking JIRA for that, 
>> > would you mind filling it? It would be awesome if you could include 
>> > estimations of messages per sec throughput that causes the issue in your 
>> > case.
>> >
>> > +Tobias Kaymak
>> >
>> > Tobias, could you please confirm that the case you had with Flink stopping 
>> > progressing watermark resembled this one?
>> >
>> > Thanks.
>> >
>> >  Jan
>> >
>> > On 6/14/21 4:11 PM, Eddy G wrote:
>> >
>> > Hi Jan,
>> >
>> > I've added --experiments=use_deprecated_read and it seems to work 
>> > flawlessly (with my current Window and the one proposed by Evan).
>> >
>> > Why is this? Do Splittable DoFn now break current implementations? Are 
>> > there any posts of possible breaking changes?
>> >
>> > On 2021/06/14 13:19:39, Jan Lukavský  wrote:
>> >
>> > Hi Eddy,
>> >
>> > answers inline.
>> >
>> > On 6/14/21 3:05 PM, Eddy G wrote:
>> >
>> > Hi Jan,
>> >
>> > Thanks for replying so fast!
>> >
>> > Regarding your questions,
>> >
>> > - "Does your data get buffered in a state?"
>> > Yes, I do have a state within a stage prior ParquetIO writing together 
>> > with a Timer with PROCESSING_TIME.
>> >
>> > The stage which contains the state does send bytes to the next one which 
>> > is the ParquetIO writing. Seems the @OnTimer doesn't get triggered and 
>> > it's not clearing the state. This however does work under normal 
>> > circumstances without having too much data queued waiting to be processed.
>> >
>> > OK, this suggests, that the watermark is for some reason "stuck". If you
>> > checkpoints enabled, you should see the size of the checkpoint to grow
>> > over time.
>> >
>> > - "Do you see watermark being updated in your Flink WebUI?"
>> > The stages that do have a watermark don't get updated. The same watermark 
>> > value has been constant since the pipeline started.
>> >
>> > If no lateness is set, any late data should be admitted right?
>> >
>> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
>> > means that data that arrive after end-of-window will be dropped.
>> >
>> > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, 
>> > neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to 
>> > be a Dataflow specific metric right?
>> >
>> > Should not be Dataflow specific. But if you don't see it, it means it
>> > could be zero. So, we can rule this out.
>> >
>> > We're using KinesisIO for reading messages.
>> >
>> > Kinesis uses UnboundedSource, which is expended to SDF starting from
>> > Beam 2.25.0. The flag should change that as well. Can you try the
>> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
>> > (should not contain Impulse transform at the beginning) and if it solves
>> > your issues?
>> >
>> > On 2021/06/14 12:48:58, Jan Lukavský  wrote:
>> >
>> > Hi Eddy,
>> >
>> > does your data get buffered in a state - e.g. does the size of the state
>> > grow over time? Do you see watermark being updated in your Flink WebUI?
>> > When a stateful operation (and GroupByKey is a stateful operation) does
>> > not output any data, the first place to look at is if watermark
>> > correctly progresses. If it does not progress, then the input data must
>> > be buffered in state and the size of the state should grow over time. If
>> > it progresses, then it might be the case, that the data is too late
>> > after the watermark (the watermark estimator might need tuning) and the
>> > data gets dropped (note you don't set any allowed lateness, which
>> > _might_ cause issues). You could see if your pipeline drops data in
>> > 

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
@robert I have a pipeline which consistently shows a major slowdown (10
seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be
boiled down to:

- Read GCS file patterns from PubSub
- Window into Fixed windows (repeating every 15 seconds)
- Deduplicate/distinct (have tried both)
- Read GCS blobs via patterns from the first step
- Write file contents to sink

It doesn't seem to matter if there are 0 messages in a subscription or 50k
messages at startup. The rate of new messages however is very low. Not sure
if those are helpful details, let me know if there's anything else specific
which would help.

On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw 
wrote:

> +1, we'd really like to get to the bottom of this, so clear
> instructions on a pipeline/conditions that can reproduce it would be
> great.
>
> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský  wrote:
> >
> > Hi Eddy,
> >
> > you are probably hitting a not-yet discovered bug in SDF implementation
> in FlinkRunner that (under some currently unknown conditions) seems to stop
> advancing the watermark. This has been observed in one other instance (that
> I'm aware of). I think we don't yet have a tracking JIRA for that, would
> you mind filling it? It would be awesome if you could include estimations
> of messages per sec throughput that causes the issue in your case.
> >
> > +Tobias Kaymak
> >
> > Tobias, could you please confirm that the case you had with Flink
> stopping progressing watermark resembled this one?
> >
> > Thanks.
> >
> >  Jan
> >
> > On 6/14/21 4:11 PM, Eddy G wrote:
> >
> > Hi Jan,
> >
> > I've added --experiments=use_deprecated_read and it seems to work
> flawlessly (with my current Window and the one proposed by Evan).
> >
> > Why is this? Do Splittable DoFn now break current implementations? Are
> there any posts of possible breaking changes?
> >
> > On 2021/06/14 13:19:39, Jan Lukavský  wrote:
> >
> > Hi Eddy,
> >
> > answers inline.
> >
> > On 6/14/21 3:05 PM, Eddy G wrote:
> >
> > Hi Jan,
> >
> > Thanks for replying so fast!
> >
> > Regarding your questions,
> >
> > - "Does your data get buffered in a state?"
> > Yes, I do have a state within a stage prior ParquetIO writing together
> with a Timer with PROCESSING_TIME.
> >
> > The stage which contains the state does send bytes to the next one which
> is the ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's
> not clearing the state. This however does work under normal circumstances
> without having too much data queued waiting to be processed.
> >
> > OK, this suggests, that the watermark is for some reason "stuck". If you
> > checkpoints enabled, you should see the size of the checkpoint to grow
> > over time.
> >
> > - "Do you see watermark being updated in your Flink WebUI?"
> > The stages that do have a watermark don't get updated. The same
> watermark value has been constant since the pipeline started.
> >
> > If no lateness is set, any late data should be admitted right?
> >
> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
> > means that data that arrive after end-of-window will be dropped.
> >
> > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere,
> neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to be
> a Dataflow specific metric right?
> >
> > Should not be Dataflow specific. But if you don't see it, it means it
> > could be zero. So, we can rule this out.
> >
> > We're using KinesisIO for reading messages.
> >
> > Kinesis uses UnboundedSource, which is expended to SDF starting from
> > Beam 2.25.0. The flag should change that as well. Can you try the
> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
> > (should not contain Impulse transform at the beginning) and if it solves
> > your issues?
> >
> > On 2021/06/14 12:48:58, Jan Lukavský  wrote:
> >
> > Hi Eddy,
> >
> > does your data get buffered in a state - e.g. does the size of the state
> > grow over time? Do you see watermark being updated in your Flink WebUI?
> > When a stateful operation (and GroupByKey is a stateful operation) does
> > not output any data, the first place to look at is if watermark
> > correctly progresses. If it does not progress, then the input data must
> > be buffered in state and the size of the state should grow over time. If
> > it progresses, then it might be the case, that the data is too late
> > after the watermark (the watermark estimator might need tuning) and the
> > data gets dropped (note you don't set any allowed lateness, which
> > _might_ cause issues). You could see if your pipeline drops data in
> > "droppedDueToLateness" metric. The size of you state would not grow much
> > in that situation.
> >
> > Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> > using "--experiments=use_deprecated_read" on command line (which you
> > then must pass to PipelineOptionsFactory). There is some suspicion that
> > SDF wrapper for Kafka might not 

Re: End to end unit tests for stateful pipeline

2021-06-14 Thread Luke Cwik
You can create a PipelineOption which represents your Redis client object.
For tests you would set the PipelineOption to a serializable fake/mock that
can replay the results you want. The default for the PipelineOption object
would instantiate your production client. You can see an example usage of
the DefaultValueFactory here[1].

1:
https://github.com/apache/beam/blob/5cebe0fd82ade3f957fe70e25aa3e399d2e91b32/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java#L71

On Mon, Jun 14, 2021 at 10:54 AM gaurav mishra 
wrote:

> Hi,
> I have a streaming pipeline which reads from pubsub, enriches data using
> redis and finally writes to pubsub. The code has some stateful DoFns with
> timers. I wanted to write unit tests for the whole pipeline, that reads
> from TestStream<> , enriches data using a mocked redis client, and writes
> data to a PCollection on which I can do PAsserts. The trouble I am having
> here is how to set up the mocked redis client. Are there any examples that
> I can take a look at? I am using java with junit4 as a testing framework.
> More details about my code are here -
> https://stackoverflow.com/questions/67963189/unit-tests-apache-beam-stateful-pipeline-with-external-dependencies
>


Re: Allyship workshops for open source contributors

2021-06-14 Thread deepak kumar
+1

On Tue, Jun 15, 2021 at 12:31 AM Aizhamal Nurmamat kyzy 
wrote:

> Thank you all! Based on the feedback, I will set up a session for a couple
> open source groups. Will share more details soon. Stay tuned.
>
> On Mon, Jun 7, 2021 at 4:42 PM Kenneth Knowles  wrote:
>
>> Yes please!
>>
>> On Thu, Jun 3, 2021, 18:32 Ratnakar Malla  wrote:
>>
>>> +1
>>>
>>>
>>> --
>>> *From:* Austin Bennett 
>>> *Sent:* Thursday, June 3, 2021 6:20:25 PM
>>> *To:* user@beam.apache.org 
>>> *Cc:* dev 
>>> *Subject:* Re: Allyship workshops for open source contributors
>>>
>>> +1, assuming timing can work.
>>>
>>> On Wed, Jun 2, 2021 at 2:07 PM Aizhamal Nurmamat kyzy <
>>> aizha...@apache.org> wrote:
>>>
>>> If we have a good number of people who express interest in this thread,
>>> I will set up training for the Airflow community.
>>>
>>>
>>> I meant Beam ^^' I am organizing it for the Airflow community as well.
>>>
>>>


Re: Allyship workshops for open source contributors

2021-06-14 Thread Aizhamal Nurmamat kyzy
Thank you all! Based on the feedback, I will set up a session for a couple
open source groups. Will share more details soon. Stay tuned.

On Mon, Jun 7, 2021 at 4:42 PM Kenneth Knowles  wrote:

> Yes please!
>
> On Thu, Jun 3, 2021, 18:32 Ratnakar Malla  wrote:
>
>> +1
>>
>>
>> --
>> *From:* Austin Bennett 
>> *Sent:* Thursday, June 3, 2021 6:20:25 PM
>> *To:* user@beam.apache.org 
>> *Cc:* dev 
>> *Subject:* Re: Allyship workshops for open source contributors
>>
>> +1, assuming timing can work.
>>
>> On Wed, Jun 2, 2021 at 2:07 PM Aizhamal Nurmamat kyzy <
>> aizha...@apache.org> wrote:
>>
>> If we have a good number of people who express interest in this thread, I
>> will set up training for the Airflow community.
>>
>>
>> I meant Beam ^^' I am organizing it for the Airflow community as well.
>>
>>


End to end unit tests for stateful pipeline

2021-06-14 Thread gaurav mishra
Hi,
I have a streaming pipeline which reads from pubsub, enriches data using
redis and finally writes to pubsub. The code has some stateful DoFns with
timers. I wanted to write unit tests for the whole pipeline, that reads
from TestStream<> , enriches data using a mocked redis client, and writes
data to a PCollection on which I can do PAsserts. The trouble I am having
here is how to set up the mocked redis client. Are there any examples that
I can take a look at? I am using java with junit4 as a testing framework.
More details about my code are here -
https://stackoverflow.com/questions/67963189/unit-tests-apache-beam-stateful-pipeline-with-external-dependencies


Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Robert Bradshaw
+1, we'd really like to get to the bottom of this, so clear
instructions on a pipeline/conditions that can reproduce it would be
great.

On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský  wrote:
>
> Hi Eddy,
>
> you are probably hitting a not-yet discovered bug in SDF implementation in 
> FlinkRunner that (under some currently unknown conditions) seems to stop 
> advancing the watermark. This has been observed in one other instance (that 
> I'm aware of). I think we don't yet have a tracking JIRA for that, would you 
> mind filling it? It would be awesome if you could include estimations of 
> messages per sec throughput that causes the issue in your case.
>
> +Tobias Kaymak
>
> Tobias, could you please confirm that the case you had with Flink stopping 
> progressing watermark resembled this one?
>
> Thanks.
>
>  Jan
>
> On 6/14/21 4:11 PM, Eddy G wrote:
>
> Hi Jan,
>
> I've added --experiments=use_deprecated_read and it seems to work flawlessly 
> (with my current Window and the one proposed by Evan).
>
> Why is this? Do Splittable DoFn now break current implementations? Are there 
> any posts of possible breaking changes?
>
> On 2021/06/14 13:19:39, Jan Lukavský  wrote:
>
> Hi Eddy,
>
> answers inline.
>
> On 6/14/21 3:05 PM, Eddy G wrote:
>
> Hi Jan,
>
> Thanks for replying so fast!
>
> Regarding your questions,
>
> - "Does your data get buffered in a state?"
> Yes, I do have a state within a stage prior ParquetIO writing together with a 
> Timer with PROCESSING_TIME.
>
> The stage which contains the state does send bytes to the next one which is 
> the ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's not 
> clearing the state. This however does work under normal circumstances without 
> having too much data queued waiting to be processed.
>
> OK, this suggests, that the watermark is for some reason "stuck". If you
> checkpoints enabled, you should see the size of the checkpoint to grow
> over time.
>
> - "Do you see watermark being updated in your Flink WebUI?"
> The stages that do have a watermark don't get updated. The same watermark 
> value has been constant since the pipeline started.
>
> If no lateness is set, any late data should be admitted right?
>
> If no lateness is set, it means allowed lateness of Duration.ZERO, which
> means that data that arrive after end-of-window will be dropped.
>
> Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, 
> neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to be a 
> Dataflow specific metric right?
>
> Should not be Dataflow specific. But if you don't see it, it means it
> could be zero. So, we can rule this out.
>
> We're using KinesisIO for reading messages.
>
> Kinesis uses UnboundedSource, which is expended to SDF starting from
> Beam 2.25.0. The flag should change that as well. Can you try the
> --experiments=use_deprecated_read and see if you Pipeline DAG changes
> (should not contain Impulse transform at the beginning) and if it solves
> your issues?
>
> On 2021/06/14 12:48:58, Jan Lukavský  wrote:
>
> Hi Eddy,
>
> does your data get buffered in a state - e.g. does the size of the state
> grow over time? Do you see watermark being updated in your Flink WebUI?
> When a stateful operation (and GroupByKey is a stateful operation) does
> not output any data, the first place to look at is if watermark
> correctly progresses. If it does not progress, then the input data must
> be buffered in state and the size of the state should grow over time. If
> it progresses, then it might be the case, that the data is too late
> after the watermark (the watermark estimator might need tuning) and the
> data gets dropped (note you don't set any allowed lateness, which
> _might_ cause issues). You could see if your pipeline drops data in
> "droppedDueToLateness" metric. The size of you state would not grow much
> in that situation.
>
> Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> using "--experiments=use_deprecated_read" on command line (which you
> then must pass to PipelineOptionsFactory). There is some suspicion that
> SDF wrapper for Kafka might not work as expected in certain situations
> with Flink.
>
> Please feel free to share any results,
>
> Jan
>
> On 6/14/21 1:39 PM, Eddy G wrote:
>
> As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with 
> late data (intentionally stopped my consumer so data has been accumulating 
> for several days now). Now, with the following Window... I'm using Beam 2.27 
> and Flink 1.12.
>
>   
> Window.into(FixedWindows.of(Duration.standardMinutes(10)))
>
> And several parsing stages after, once it's time to write within the 
> ParquetIO stage...
>
>   FileIO
>   .writeDynamic()
>   .by(...)
>   .via(...)
>   .to(...)
>   

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský

Hi Eddy,

you are probably hitting a not-yet discovered bug in SDF implementation 
in FlinkRunner that (under some currently unknown conditions) seems to 
stop advancing the watermark. This has been observed in one other 
instance (that I'm aware of). I think we don't yet have a tracking JIRA 
for that, would you mind filling it? It would be awesome if you could 
include estimations of messages per sec throughput that causes the issue 
in your case.


+Tobias Kaymak 

Tobias, could you please confirm that the case you had with Flink 
stopping progressing watermark resembled this one?


Thanks.

 Jan

On 6/14/21 4:11 PM, Eddy G wrote:

Hi Jan,

I've added --experiments=use_deprecated_read and it seems to work flawlessly 
(with my current Window and the one proposed by Evan).

Why is this? Do Splittable DoFn now break current implementations? Are there 
any posts of possible breaking changes?

On 2021/06/14 13:19:39, Jan Lukavský  wrote:

Hi Eddy,

answers inline.

On 6/14/21 3:05 PM, Eddy G wrote:

Hi Jan,

Thanks for replying so fast!

Regarding your questions,

- "Does your data get buffered in a state?"
Yes, I do have a state within a stage prior ParquetIO writing together with a 
Timer with PROCESSING_TIME.

The stage which contains the state does send bytes to the next one which is the 
ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's not 
clearing the state. This however does work under normal circumstances without 
having too much data queued waiting to be processed.

OK, this suggests, that the watermark is for some reason "stuck". If you
checkpoints enabled, you should see the size of the checkpoint to grow
over time.

- "Do you see watermark being updated in your Flink WebUI?"
The stages that do have a watermark don't get updated. The same watermark value 
has been constant since the pipeline started.

If no lateness is set, any late data should be admitted right?

If no lateness is set, it means allowed lateness of Duration.ZERO, which
means that data that arrive after end-of-window will be dropped.

Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, neither 
in Flink UI or Prometheus. I've seen it in Dataflow but seems to be a Dataflow 
specific metric right?

Should not be Dataflow specific. But if you don't see it, it means it
could be zero. So, we can rule this out.

We're using KinesisIO for reading messages.

Kinesis uses UnboundedSource, which is expended to SDF starting from
Beam 2.25.0. The flag should change that as well. Can you try the
--experiments=use_deprecated_read and see if you Pipeline DAG changes
(should not contain Impulse transform at the beginning) and if it solves
your issues?

On 2021/06/14 12:48:58, Jan Lukavský  wrote:

Hi Eddy,

does your data get buffered in a state - e.g. does the size of the state
grow over time? Do you see watermark being updated in your Flink WebUI?
When a stateful operation (and GroupByKey is a stateful operation) does
not output any data, the first place to look at is if watermark
correctly progresses. If it does not progress, then the input data must
be buffered in state and the size of the state should grow over time. If
it progresses, then it might be the case, that the data is too late
after the watermark (the watermark estimator might need tuning) and the
data gets dropped (note you don't set any allowed lateness, which
_might_ cause issues). You could see if your pipeline drops data in
"droppedDueToLateness" metric. The size of you state would not grow much
in that situation.

Another hint - If you use KafkaIO, try to disable SDF wrapper for it
using "--experiments=use_deprecated_read" on command line (which you
then must pass to PipelineOptionsFactory). There is some suspicion that
SDF wrapper for Kafka might not work as expected in certain situations
with Flink.

Please feel free to share any results,

     Jan

On 6/14/21 1:39 PM, Eddy G wrote:

As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with late 
data (intentionally stopped my consumer so data has been accumulating for 
several days now). Now, with the following Window... I'm using Beam 2.27 and 
Flink 1.12.

   
Window.into(FixedWindows.of(Duration.standardMinutes(10)))

And several parsing stages after, once it's time to write within the ParquetIO 
stage...

   FileIO
   .writeDynamic()
   .by(...)
   .via(...)
   .to(...)
   .withNaming(...)
   .withDestinationCoder(StringUtf8Coder.of())
   .withNumShards(options.getNumShards())

it won't send bytes across all stages so no data is being written, still it 
accumulates in the first stage seen in the image and won't go further than that.

Any reason why this 

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
There have been varied reports of slowness loosely attributed to SDF
default wrapper change from 2.25.0.  Ex

https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E

https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-10670/comment/17316858

http://mail-archives.apache.org/mod_mbox/beam-dev/202105.mbox/%3ccae7uba_v0vfl9ck7n06n2zf6e+xcebdircez7yftlfwuvch...@mail.gmail.com%3e



On Mon, Jun 14, 2021 at 10:11 Eddy G  wrote:

> Hi Jan,
>
> I've added --experiments=use_deprecated_read and it seems to work
> flawlessly (with my current Window and the one proposed by Evan).
>
> Why is this? Do Splittable DoFn now break current implementations? Are
> there any posts of possible breaking changes?
>
> On 2021/06/14 13:19:39, Jan Lukavský  wrote:
> > Hi Eddy,
> >
> > answers inline.
> >
> > On 6/14/21 3:05 PM, Eddy G wrote:
> > > Hi Jan,
> > >
> > > Thanks for replying so fast!
> > >
> > > Regarding your questions,
> > >
> > > - "Does your data get buffered in a state?"
> > > Yes, I do have a state within a stage prior ParquetIO writing together
> with a Timer with PROCESSING_TIME.
> > >
> > > The stage which contains the state does send bytes to the next one
> which is the ParquetIO writing. Seems the @OnTimer doesn't get triggered
> and it's not clearing the state. This however does work under normal
> circumstances without having too much data queued waiting to be processed.
> > OK, this suggests, that the watermark is for some reason "stuck". If you
> > checkpoints enabled, you should see the size of the checkpoint to grow
> > over time.
> > >
> > > - "Do you see watermark being updated in your Flink WebUI?"
> > > The stages that do have a watermark don't get updated. The same
> watermark value has been constant since the pipeline started.
> > >
> > > If no lateness is set, any late data should be admitted right?
> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
> > means that data that arrive after end-of-window will be dropped.
> > >
> > > Regarding 'droppedDueToLateness' metric, can't see it exposed
> anywhere, neither in Flink UI or Prometheus. I've seen it in Dataflow but
> seems to be a Dataflow specific metric right?
> > Should not be Dataflow specific. But if you don't see it, it means it
> > could be zero. So, we can rule this out.
> > >
> > > We're using KinesisIO for reading messages.
> > Kinesis uses UnboundedSource, which is expended to SDF starting from
> > Beam 2.25.0. The flag should change that as well. Can you try the
> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
> > (should not contain Impulse transform at the beginning) and if it solves
> > your issues?
> > >
> > > On 2021/06/14 12:48:58, Jan Lukavský  wrote:
> > >> Hi Eddy,
> > >>
> > >> does your data get buffered in a state - e.g. does the size of the
> state
> > >> grow over time? Do you see watermark being updated in your Flink
> WebUI?
> > >> When a stateful operation (and GroupByKey is a stateful operation)
> does
> > >> not output any data, the first place to look at is if watermark
> > >> correctly progresses. If it does not progress, then the input data
> must
> > >> be buffered in state and the size of the state should grow over time.
> If
> > >> it progresses, then it might be the case, that the data is too late
> > >> after the watermark (the watermark estimator might need tuning) and
> the
> > >> data gets dropped (note you don't set any allowed lateness, which
> > >> _might_ cause issues). You could see if your pipeline drops data in
> > >> "droppedDueToLateness" metric. The size of you state would not grow
> much
> > >> in that situation.
> > >>
> > >> Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> > >> using "--experiments=use_deprecated_read" on command line (which you
> > >> then must pass to PipelineOptionsFactory). There is some suspicion
> that
> > >> SDF wrapper for Kafka might not work as expected in certain situations
> > >> with Flink.
> > >>
> > >> Please feel free to share any results,
> > >>
> > >> Jan
> > >>
> > >> On 6/14/21 1:39 PM, Eddy G wrote:
> > >>> As seen in this image https://imgur.com/a/wrZET97, I'm trying to
> deal with late data (intentionally stopped my consumer so data has been
> accumulating for several days now). Now, with the following Window... I'm
> using Beam 2.27 and Flink 1.12.
> > >>>
> > >>>
>  Window.into(FixedWindows.of(Duration.standardMinutes(10)))
> > >>>
> > >>> And several parsing stages after, once it's time to write within the
> ParquetIO stage...
> > >>>
> > >>>   FileIO
> > >>>   .writeDynamic()
> > >>>   .by(...)
> > >>>   .via(...)
> > >>>   .to(...)
> > >>>   .withNaming(...)
> > >>>
>  .withDestinationCoder(StringUtf8Coder.of())

Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Eddy G
Hi Jan,

I've added --experiments=use_deprecated_read and it seems to work flawlessly 
(with my current Window and the one proposed by Evan).

Why is this? Do Splittable DoFn now break current implementations? Are there 
any posts of possible breaking changes?

On 2021/06/14 13:19:39, Jan Lukavský  wrote: 
> Hi Eddy,
> 
> answers inline.
> 
> On 6/14/21 3:05 PM, Eddy G wrote:
> > Hi Jan,
> >
> > Thanks for replying so fast!
> >
> > Regarding your questions,
> >
> > - "Does your data get buffered in a state?"
> > Yes, I do have a state within a stage prior ParquetIO writing together with 
> > a Timer with PROCESSING_TIME.
> >
> > The stage which contains the state does send bytes to the next one which is 
> > the ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's 
> > not clearing the state. This however does work under normal circumstances 
> > without having too much data queued waiting to be processed.
> OK, this suggests, that the watermark is for some reason "stuck". If you 
> checkpoints enabled, you should see the size of the checkpoint to grow 
> over time.
> >
> > - "Do you see watermark being updated in your Flink WebUI?"
> > The stages that do have a watermark don't get updated. The same watermark 
> > value has been constant since the pipeline started.
> >
> > If no lateness is set, any late data should be admitted right?
> If no lateness is set, it means allowed lateness of Duration.ZERO, which 
> means that data that arrive after end-of-window will be dropped.
> >
> > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, 
> > neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to be 
> > a Dataflow specific metric right?
> Should not be Dataflow specific. But if you don't see it, it means it 
> could be zero. So, we can rule this out.
> >
> > We're using KinesisIO for reading messages.
> Kinesis uses UnboundedSource, which is expended to SDF starting from 
> Beam 2.25.0. The flag should change that as well. Can you try the 
> --experiments=use_deprecated_read and see if you Pipeline DAG changes 
> (should not contain Impulse transform at the beginning) and if it solves 
> your issues?
> >
> > On 2021/06/14 12:48:58, Jan Lukavský  wrote:
> >> Hi Eddy,
> >>
> >> does your data get buffered in a state - e.g. does the size of the state
> >> grow over time? Do you see watermark being updated in your Flink WebUI?
> >> When a stateful operation (and GroupByKey is a stateful operation) does
> >> not output any data, the first place to look at is if watermark
> >> correctly progresses. If it does not progress, then the input data must
> >> be buffered in state and the size of the state should grow over time. If
> >> it progresses, then it might be the case, that the data is too late
> >> after the watermark (the watermark estimator might need tuning) and the
> >> data gets dropped (note you don't set any allowed lateness, which
> >> _might_ cause issues). You could see if your pipeline drops data in
> >> "droppedDueToLateness" metric. The size of you state would not grow much
> >> in that situation.
> >>
> >> Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> >> using "--experiments=use_deprecated_read" on command line (which you
> >> then must pass to PipelineOptionsFactory). There is some suspicion that
> >> SDF wrapper for Kafka might not work as expected in certain situations
> >> with Flink.
> >>
> >> Please feel free to share any results,
> >>
> >>     Jan
> >>
> >> On 6/14/21 1:39 PM, Eddy G wrote:
> >>> As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal 
> >>> with late data (intentionally stopped my consumer so data has been 
> >>> accumulating for several days now). Now, with the following Window... I'm 
> >>> using Beam 2.27 and Flink 1.12.
> >>>
> >>>   
> >>> Window.into(FixedWindows.of(Duration.standardMinutes(10)))
> >>>
> >>> And several parsing stages after, once it's time to write within the 
> >>> ParquetIO stage...
> >>>
> >>>   FileIO
> >>>   .writeDynamic()
> >>>   .by(...)
> >>>   .via(...)
> >>>   .to(...)
> >>>   .withNaming(...)
> >>>   
> >>> .withDestinationCoder(StringUtf8Coder.of())
> >>>   .withNumShards(options.getNumShards())
> >>>
> >>> it won't send bytes across all stages so no data is being written, still 
> >>> it accumulates in the first stage seen in the image and won't go further 
> >>> than that.
> >>>
> >>> Any reason why this may be happening? Wrong windowing strategy?
> 


Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský

Hi Eddy,

answers inline.

On 6/14/21 3:05 PM, Eddy G wrote:

Hi Jan,

Thanks for replying so fast!

Regarding your questions,

- "Does your data get buffered in a state?"
Yes, I do have a state within a stage prior ParquetIO writing together with a 
Timer with PROCESSING_TIME.

The stage which contains the state does send bytes to the next one which is the 
ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's not 
clearing the state. This however does work under normal circumstances without 
having too much data queued waiting to be processed.
OK, this suggests, that the watermark is for some reason "stuck". If you 
checkpoints enabled, you should see the size of the checkpoint to grow 
over time.


- "Do you see watermark being updated in your Flink WebUI?"
The stages that do have a watermark don't get updated. The same watermark value 
has been constant since the pipeline started.

If no lateness is set, any late data should be admitted right?
If no lateness is set, it means allowed lateness of Duration.ZERO, which 
means that data that arrive after end-of-window will be dropped.


Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, neither 
in Flink UI or Prometheus. I've seen it in Dataflow but seems to be a Dataflow 
specific metric right?
Should not be Dataflow specific. But if you don't see it, it means it 
could be zero. So, we can rule this out.


We're using KinesisIO for reading messages.
Kinesis uses UnboundedSource, which is expended to SDF starting from 
Beam 2.25.0. The flag should change that as well. Can you try the 
--experiments=use_deprecated_read and see if you Pipeline DAG changes 
(should not contain Impulse transform at the beginning) and if it solves 
your issues?


On 2021/06/14 12:48:58, Jan Lukavský  wrote:

Hi Eddy,

does your data get buffered in a state - e.g. does the size of the state
grow over time? Do you see watermark being updated in your Flink WebUI?
When a stateful operation (and GroupByKey is a stateful operation) does
not output any data, the first place to look at is if watermark
correctly progresses. If it does not progress, then the input data must
be buffered in state and the size of the state should grow over time. If
it progresses, then it might be the case, that the data is too late
after the watermark (the watermark estimator might need tuning) and the
data gets dropped (note you don't set any allowed lateness, which
_might_ cause issues). You could see if your pipeline drops data in
"droppedDueToLateness" metric. The size of you state would not grow much
in that situation.

Another hint - If you use KafkaIO, try to disable SDF wrapper for it
using "--experiments=use_deprecated_read" on command line (which you
then must pass to PipelineOptionsFactory). There is some suspicion that
SDF wrapper for Kafka might not work as expected in certain situations
with Flink.

Please feel free to share any results,

    Jan

On 6/14/21 1:39 PM, Eddy G wrote:

As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with late 
data (intentionally stopped my consumer so data has been accumulating for 
several days now). Now, with the following Window... I'm using Beam 2.27 and 
Flink 1.12.

  
Window.into(FixedWindows.of(Duration.standardMinutes(10)))

And several parsing stages after, once it's time to write within the ParquetIO 
stage...

  FileIO
  .writeDynamic()
  .by(...)
  .via(...)
  .to(...)
  .withNaming(...)
  .withDestinationCoder(StringUtf8Coder.of())
  .withNumShards(options.getNumShards())

it won't send bytes across all stages so no data is being written, still it 
accumulates in the first stage seen in the image and won't go further than that.

Any reason why this may be happening? Wrong windowing strategy?


Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Eddy G
Hi Jan,

Thanks for replying so fast!

Regarding your questions,

- "Does your data get buffered in a state?"
Yes, I do have a state within a stage prior ParquetIO writing together with a 
Timer with PROCESSING_TIME.

The stage which contains the state does send bytes to the next one which is the 
ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's not 
clearing the state. This however does work under normal circumstances without 
having too much data queued waiting to be processed.

- "Do you see watermark being updated in your Flink WebUI?"
The stages that do have a watermark don't get updated. The same watermark value 
has been constant since the pipeline started.

If no lateness is set, any late data should be admitted right?

Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, neither 
in Flink UI or Prometheus. I've seen it in Dataflow but seems to be a Dataflow 
specific metric right?

We're using KinesisIO for reading messages.

On 2021/06/14 12:48:58, Jan Lukavský  wrote: 
> Hi Eddy,
> 
> does your data get buffered in a state - e.g. does the size of the state 
> grow over time? Do you see watermark being updated in your Flink WebUI? 
> When a stateful operation (and GroupByKey is a stateful operation) does 
> not output any data, the first place to look at is if watermark 
> correctly progresses. If it does not progress, then the input data must 
> be buffered in state and the size of the state should grow over time. If 
> it progresses, then it might be the case, that the data is too late 
> after the watermark (the watermark estimator might need tuning) and the 
> data gets dropped (note you don't set any allowed lateness, which 
> _might_ cause issues). You could see if your pipeline drops data in 
> "droppedDueToLateness" metric. The size of you state would not grow much 
> in that situation.
> 
> Another hint - If you use KafkaIO, try to disable SDF wrapper for it 
> using "--experiments=use_deprecated_read" on command line (which you 
> then must pass to PipelineOptionsFactory). There is some suspicion that 
> SDF wrapper for Kafka might not work as expected in certain situations 
> with Flink.
> 
> Please feel free to share any results,
> 
>    Jan
> 
> On 6/14/21 1:39 PM, Eddy G wrote:
> > As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with 
> > late data (intentionally stopped my consumer so data has been accumulating 
> > for several days now). Now, with the following Window... I'm using Beam 
> > 2.27 and Flink 1.12.
> >
> >  
> > Window.into(FixedWindows.of(Duration.standardMinutes(10)))
> >
> > And several parsing stages after, once it's time to write within the 
> > ParquetIO stage...
> >
> >  FileIO
> >  .writeDynamic()
> >  .by(...)
> >  .via(...)
> >  .to(...)
> >  .withNaming(...)
> >  .withDestinationCoder(StringUtf8Coder.of())
> >  .withNumShards(options.getNumShards())
> >
> > it won't send bytes across all stages so no data is being written, still it 
> > accumulates in the first stage seen in the image and won't go further than 
> > that.
> >
> > Any reason why this may be happening? Wrong windowing strategy?
> 


Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Evan Galpin
I believe that by default windows will only trigger one time [1]. This has
definitely caught me by surprise before.

I think that default strategy might fine for a batch pipeline,
but  typically does not for streaming (which I assume you’re using because
you mentioned Flink).

I believe you’ll want to add a non-default triggering mechanism to the
window strategy that you mentioned. I would recommend reading through the
triggering docs[2] for background. The Repeatedly.forever[3] function may
work for your use case. Something like:

Window.into(FixedWindows.of(Duration.standardMinutes(10)))

.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes();


[1]
https://beam.apache.org/documentation/programming-guide/#default-trigger
[2]
https://beam.apache.org/documentation/programming-guide/#triggers
[3]
https://beam.apache.org/releases/javadoc/2.29.0/org/apache/beam/sdk/transforms/windowing/Repeatedly.html

On Mon, Jun 14, 2021 at 07:39 Eddy G  wrote:

> As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal
> with late data (intentionally stopped my consumer so data has been
> accumulating for several days now). Now, with the following Window... I'm
> using Beam 2.27 and Flink 1.12.
>
>
> Window.into(FixedWindows.of(Duration.standardMinutes(10)))
>
> And several parsing stages after, once it's time to write within the
> ParquetIO stage...
>
> FileIO
> .writeDynamic()
> .by(...)
> .via(...)
> .to(...)
> .withNaming(...)
> .withDestinationCoder(StringUtf8Coder.of())
> .withNumShards(options.getNumShards())
>
> it won't send bytes across all stages so no data is being written, still
> it accumulates in the first stage seen in the image and won't go further
> than that.
>
> Any reason why this may be happening? Wrong windowing strategy?
>


Re: GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Jan Lukavský

Hi Eddy,

does your data get buffered in a state - e.g. does the size of the state 
grow over time? Do you see watermark being updated in your Flink WebUI? 
When a stateful operation (and GroupByKey is a stateful operation) does 
not output any data, the first place to look at is if watermark 
correctly progresses. If it does not progress, then the input data must 
be buffered in state and the size of the state should grow over time. If 
it progresses, then it might be the case, that the data is too late 
after the watermark (the watermark estimator might need tuning) and the 
data gets dropped (note you don't set any allowed lateness, which 
_might_ cause issues). You could see if your pipeline drops data in 
"droppedDueToLateness" metric. The size of you state would not grow much 
in that situation.


Another hint - If you use KafkaIO, try to disable SDF wrapper for it 
using "--experiments=use_deprecated_read" on command line (which you 
then must pass to PipelineOptionsFactory). There is some suspicion that 
SDF wrapper for Kafka might not work as expected in certain situations 
with Flink.


Please feel free to share any results,

  Jan

On 6/14/21 1:39 PM, Eddy G wrote:

As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with late 
data (intentionally stopped my consumer so data has been accumulating for 
several days now). Now, with the following Window... I'm using Beam 2.27 and 
Flink 1.12.

 
Window.into(FixedWindows.of(Duration.standardMinutes(10)))

And several parsing stages after, once it's time to write within the ParquetIO 
stage...

 FileIO
 .writeDynamic()
 .by(...)
 .via(...)
 .to(...)
 .withNaming(...)
 .withDestinationCoder(StringUtf8Coder.of())
 .withNumShards(options.getNumShards())

it won't send bytes across all stages so no data is being written, still it 
accumulates in the first stage seen in the image and won't go further than that.

Any reason why this may be happening? Wrong windowing strategy?


GroupIntoShards not sending bytes further when dealing with huge amount of data

2021-06-14 Thread Eddy G
As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with late 
data (intentionally stopped my consumer so data has been accumulating for 
several days now). Now, with the following Window... I'm using Beam 2.27 and 
Flink 1.12.


Window.into(FixedWindows.of(Duration.standardMinutes(10)))

And several parsing stages after, once it's time to write within the ParquetIO 
stage...

FileIO
.writeDynamic()
.by(...)
.via(...)
.to(...)
.withNaming(...)
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(options.getNumShards())

it won't send bytes across all stages so no data is being written, still it 
accumulates in the first stage seen in the image and won't go further than that.

Any reason why this may be happening? Wrong windowing strategy?