[ANNOUNCE] Beam 2.24.0 Released

2020-09-18 Thread Daniel Oliveira
The Apache Beam team is pleased to announce the release of version 2.24.0.

Apache Beam is an open source unified programming model to define and
execute data processing pipelines, including ETL, batch and stream
(continuous) processing. See: https://beam.apache.org

You can download the release here:

https://beam.apache.org/get-started/downloads/

This release includes bug fixes, features, and improvements detailed on
the Beam blog: https://beam.apache.org/blog/beam-2.24.0/

Thanks to everyone who contributed to this release, and we hope you enjoy
using Beam 2.24.0.

-- Daniel Oliveira, on behalf of The Apache Beam team


Re: flink runner 1.10 checkpoint timeout issue

2020-09-18 Thread Deshpande, Omkar
I only have one operator because of the operator chaining.  And it does not 
have any backpressure. There is no option for disabling the chaining.

And this same setup used to work with 1.9. So I am wondering what has changed 
in 1.10 runner.


From: Maximilian Michels 
Sent: Friday, September 18, 2020 7:08 AM
To: user@beam.apache.org ; Deshpande, Omkar 

Subject: Re: flink runner 1.10 checkpoint timeout issue

This email is from an external sender.


This type of stack trace occurs when the downstream operator is blocked
for some reason. Flink maintains a finite number of network buffers for
each network channel. If the receiving downstream operator does not
process incoming network buffers, the upstream operator blocks. This is
also called backpressure and a useful feature to avoid data congestion.

I would check the stack traces downstream to find the cause of the
backpressure.

-Max

On 17.09.20 19:50, Deshpande, Omkar wrote:
> Flink 1.10
> 
> *From:* Kyle Weaver 
> *Sent:* Thursday, September 17, 2020 9:34 AM
> *To:* user@beam.apache.org 
> *Subject:* Re: flink runner 1.10 checkpoint timeout issue
> This email is from an external sender.
>
> What is the version of your Flink cluster?
>
> On Wed, Sep 16, 2020 at 9:10 PM Deshpande, Omkar
> mailto:omkar_deshpa...@intuit.com>> wrote:
>
> Hello,
>
> I recently upgraded to beam-flink-runner-1.10:2.23.0 from
> beam-flink-runner-1.9:2.23.0. My application was working as expected
> with 1.9 runner. but after upgrading the checkpoints are timing out.
> Even after increasing the timeout significantly, the checkpoints
> keep failing. I was trying to look at the stack dump to determine
> any deadlocks. There are no deadlocks. But this thread seems to be
> in awaiting confirmation stage for long time -
>
> Legacy Source Thread - Source:
> read/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
> read/Remove Kafka Metadata/ParMultiDo(Anonymous) -> Random key
> assignment SPP/ParMultiDo(RandomPartitioner) -> Window for
> repartitioning SPP/Window.Assign.out -> ToKeyedWorkItem (1/4)
> 
> awaiting notification on [ 0x0007b83b7958
>  ] , holding [
>
>   * 0x0007bc786fd8
> 
>
> ]
> at sun.misc.Unsafe.park(Native Method)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> 
> java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
> at
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> at
> 
> java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
>
>
> My application is IO bound, i.e every record makes a rest call and
> takes a few seconds to complete.
> Did not face this issue with 1.9 runner. What has changed in 1.10
> runner ? Any pointers for debugging?
>
> Omkar
>


Re: Output from Window not getting materialized

2020-09-18 Thread Luke Cwik
To answer your specific question, you should create and return the WallTime
estimator. You shouldn't need to interact with it from within
your @ProcessElement call since your elements are using the current time
for their timestamp.

On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik  wrote:

> Kafka is a complex example because it is adapting code from before there
> was an SDF implementation (namely the TimestampPolicy and the
> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>
> There are three types of watermark estimators that are in the Beam Java
> SDK today:
> Manual: Can be invoked from within your @ProcessElement method within your
> SDF allowing you precise control over what the watermark is.
> WallTime: Doesn't need to be interacted with, will report the current time
> as the watermark time. Once it is instantiated and returned via the
> @NewWatermarkEstimator method you don't need to do anything with it. This
> is functionally equivalent to calling setWatermark(Instant.now()) right
> before returning from the @ProcessElement method in the SplittableDoFn on a
> Manual watermark.
> TimestampObserving: Is invoked using the output timestamp for every
> element that is output. This is functionally equivalent to calling
> setWatermark after each output within your @ProcessElement method in the
> SplittableDoFn. The MonotonicallyIncreasing implementation for
> the TimestampObserving estimator ensures that the largest timestamp seen so
> far will be reported for the watermark.
>
> The default is to not set any watermark estimate.
>
> For all watermark estimators you're allowed to set the watermark estimate
> to anything as the runner will recompute the output watermark as:
> new output watermark = max(previous output watermark, min(upstream
> watermark, watermark estimates))
> This effectively means that the watermark will never go backwards from the
> runners point of view but that does mean that setting the watermark
> estimate below the previous output watermark (which isn't observable) will
> not do anything beyond holding the watermark at the previous output
> watermark.
>
> Depending on the windowing strategy and allowed lateness, any records that
> are output with a timestamp that is too early can be considered droppably
> late, otherwise they will be late/ontime/early.
>
> So as an author for an SDF transform, you need to figure out:
> 1) What timestamp your going to output your records at
> * use upstream input elements timestamp: guidance use the default
> implementation and to get the upstream watermark by default
> * use data from within the record being output or external system state
> via an API call: use a watermark estimator
> 2) How you want to compute the watermark estimate (if at all)
> * the choice here depends on how the elements timestamps progress, are
> they in exactly sorted order, almost sorted order, completely unsorted, ...?
>
> For both of these it is upto you to choose how much flexibility in these
> decisions you want to give to your users and that should guide what you
> expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
> many other sources don't expose anything.
>
>
> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
> harish.prav...@gmail.com> wrote:
>
>> Hi Luke,
>>
>> I am also looking at the `WatermarkEstimators.manual` option, in
>> parallel. Now we are getting data past our Fixed Window but the aggregation
>> is not as expected.  The doc says setWatermark will "set timestamp
>> before or at the timestamps of all future elements produced by the
>> associated DoFn". If I output with a timestamp as below then could you
>> please clarify on how we should set the watermark for this manual
>> watermark estimator?
>>
>> receiver.outputWithTimestamp(ossRecord, Instant.now());
>>
>> Thanks,
>> Praveen
>>
>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik  wrote:
>>
>>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>>> close allowing for the Count transform to produce output?
>>>
>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>>
>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum 
>>> wrote:
>>>
 Hi everyone!

 We are developing a new IO connector using the SDF API, and testing it
 with the following simple counting pipeline:



 p.apply(MyIO.read()

 .withStream(inputStream)

 .withStreamPartitions(Arrays.asList(0))

 .withConsumerConfig(config)

 ) // gets a PCollection>





 .apply(Values.*create*()) // PCollection



 .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))

 .withAllowedLateness(Duration.standardDays(1))

 .accumulatingFiredPanes())



 .apply(Count.perElement())





 // write PCollection> to stream

 

Re: Output from Window not getting materialized

2020-09-18 Thread Luke Cwik
Kafka is a complex example because it is adapting code from before there
was an SDF implementation (namely the TimestampPolicy and the
TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).

There are three types of watermark estimators that are in the Beam Java SDK
today:
Manual: Can be invoked from within your @ProcessElement method within your
SDF allowing you precise control over what the watermark is.
WallTime: Doesn't need to be interacted with, will report the current time
as the watermark time. Once it is instantiated and returned via the
@NewWatermarkEstimator method you don't need to do anything with it. This
is functionally equivalent to calling setWatermark(Instant.now()) right
before returning from the @ProcessElement method in the SplittableDoFn on a
Manual watermark.
TimestampObserving: Is invoked using the output timestamp for every element
that is output. This is functionally equivalent to calling setWatermark
after each output within your @ProcessElement method in the SplittableDoFn.
The MonotonicallyIncreasing implementation for the TimestampObserving
estimator ensures that the largest timestamp seen so far will be reported
for the watermark.

The default is to not set any watermark estimate.

For all watermark estimators you're allowed to set the watermark estimate
to anything as the runner will recompute the output watermark as:
new output watermark = max(previous output watermark, min(upstream
watermark, watermark estimates))
This effectively means that the watermark will never go backwards from the
runners point of view but that does mean that setting the watermark
estimate below the previous output watermark (which isn't observable) will
not do anything beyond holding the watermark at the previous output
watermark.

Depending on the windowing strategy and allowed lateness, any records that
are output with a timestamp that is too early can be considered droppably
late, otherwise they will be late/ontime/early.

So as an author for an SDF transform, you need to figure out:
1) What timestamp your going to output your records at
* use upstream input elements timestamp: guidance use the default
implementation and to get the upstream watermark by default
* use data from within the record being output or external system state via
an API call: use a watermark estimator
2) How you want to compute the watermark estimate (if at all)
* the choice here depends on how the elements timestamps progress, are they
in exactly sorted order, almost sorted order, completely unsorted, ...?

For both of these it is upto you to choose how much flexibility in these
decisions you want to give to your users and that should guide what you
expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
many other sources don't expose anything.


On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
harish.prav...@gmail.com> wrote:

> Hi Luke,
>
> I am also looking at the `WatermarkEstimators.manual` option, in parallel.
> Now we are getting data past our Fixed Window but the aggregation is not as
> expected.  The doc says setWatermark will "set timestamp before or at the
> timestamps of all future elements produced by the associated DoFn". If I
> output with a timestamp as below then could you please clarify on how we
> should set the watermark for this manual watermark estimator?
>
> receiver.outputWithTimestamp(ossRecord, Instant.now());
>
> Thanks,
> Praveen
>
> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik  wrote:
>
>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>> close allowing for the Count transform to produce output?
>>
>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>
>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum 
>> wrote:
>>
>>> Hi everyone!
>>>
>>> We are developing a new IO connector using the SDF API, and testing it
>>> with the following simple counting pipeline:
>>>
>>>
>>>
>>> p.apply(MyIO.read()
>>>
>>> .withStream(inputStream)
>>>
>>> .withStreamPartitions(Arrays.asList(0))
>>>
>>> .withConsumerConfig(config)
>>>
>>> ) // gets a PCollection>
>>>
>>>
>>>
>>>
>>>
>>> .apply(Values.*create*()) // PCollection
>>>
>>>
>>>
>>> .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))
>>>
>>> .withAllowedLateness(Duration.standardDays(1))
>>>
>>> .accumulatingFiredPanes())
>>>
>>>
>>>
>>> .apply(Count.perElement())
>>>
>>>
>>>
>>>
>>>
>>> // write PCollection> to stream
>>>
>>> .apply(MyIO.write()
>>>
>>> .withStream(outputStream)
>>>
>>> .withConsumerConfig(config));
>>>
>>>
>>>
>>>
>>>
>>> Without the window transform, we can read from the stream and write to
>>> it, however, I don’t see output after the Window transform. Could you
>>> please help pin down the issue?
>>>
>>> Thank you,
>>>
>>> Gaurav
>>>
>>
>
> --
> Thanks,
> Praveen K Viswanathan
>


Re: [DISCUSS] Deprecation of AWS SDK v2 IO connectors

2020-09-18 Thread Alexey Romanenko
Hi David,

Thank you for your feedback, sounds totally reasonable to me. I agree that 
before deprecation any of AWS V1 connectors we have to make sure that V2 
version may completely substitute the previous one.

> On 17 Sep 2020, at 19:19, David Hollands  wrote:
> 
> Hi Alexey –
>  
> As relatively new users of Beam, we recently selected v1 over v2 because we 
> didn’t think v2 currently (as of 2.24.0-snapshot) had feature parity 
> especially the lack of a v2 based S3FileSystem and KinesisIO.Write.
>  
> Ideally we would have selected v2. 
>  
> On a related note, and not really Beam’s problem, but if I remember rightly, 
> we also had a bit of trouble creating some LocalStack testcontainers based 
> integration tests with v2…
>  
> Cheers, David
>  
> David Hollands
> Audience Platform – Audience Data Engineering
> david.holla...@bbc.co.uk 
> BC5 C5, BBC Broadcast Centre, London, W12 7TQ
>  
> From: Alexey Romanenko  >
> Reply to: "user@beam.apache.org " 
> mailto:user@beam.apache.org>>
> Date: Tuesday, 15 September 2020 at 17:06
> To: "user@beam.apache.org" 
> Subject: Re: [DISCUSS] Deprecation of AWS SDK v2 IO connectors
>  
> I just want to cross-post it on users@ to find out which version of AWS SDK 
> connectors is mostly used in user applications and if there are any strong 
> objections to switch mostly to AWS SDK v2?
>  
> Thank you for any feedback in advance.
> 
> 
> On 11 Sep 2020, at 19:13, Alexey Romanenko  > wrote:
>  
> Hello,
>  
> In Beam, there are two versions of AWS IO connectors for Java SDK - based on 
> AWS SDK v1 [1] and v2 [2]. For now, they are pretty equal in terms of 
> functionality, but since AWS SDK v2 is more modern (it's a major rewrite of 
> the version 1.x code base, it’s built on top of Java 8+ and adds more 
> features [3]), then it would be more logical to use only V2. Also, it’s not 
> reasonable to support two versions of similar connectors, since it’s a big 
> pain for us, and it will be more clear for users which package of AWS 
> connectors to use . 
>  
> According to this, I’d propose to deprecate all Java AWS IO connectors V1 (+ 
> KinesisIO which is in a different package for now) starting from Beam 2.25 
> and then add new features only to V2 connectors. Bug fixes should be applied 
> to V2 connectors in the first order, and to V1 connectors if it’s only 
> necessary. 
>  
> What are the community thoughts on this? Any pros and cons that I'm missing?
>  
>  
> [1] 
> https://github.com/apache/beam/tree/master/sdks/java/io/amazon-web-services 
> 
> [2] 
> https://github.com/apache/beam/tree/master/sdks/java/io/amazon-web-services2 
> 
> [3] https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/welcome.html 
> 


Re: flink runner 1.10 checkpoint timeout issue

2020-09-18 Thread Maximilian Michels
This type of stack trace occurs when the downstream operator is blocked 
for some reason. Flink maintains a finite number of network buffers for 
each network channel. If the receiving downstream operator does not 
process incoming network buffers, the upstream operator blocks. This is 
also called backpressure and a useful feature to avoid data congestion.


I would check the stack traces downstream to find the cause of the 
backpressure.


-Max

On 17.09.20 19:50, Deshpande, Omkar wrote:

Flink 1.10

*From:* Kyle Weaver 
*Sent:* Thursday, September 17, 2020 9:34 AM
*To:* user@beam.apache.org 
*Subject:* Re: flink runner 1.10 checkpoint timeout issue
This email is from an external sender.

What is the version of your Flink cluster?

On Wed, Sep 16, 2020 at 9:10 PM Deshpande, Omkar 
mailto:omkar_deshpa...@intuit.com>> wrote:


Hello,

I recently upgraded to beam-flink-runner-1.10:2.23.0 from
beam-flink-runner-1.9:2.23.0. My application was working as expected
with 1.9 runner. but after upgrading the checkpoints are timing out.
Even after increasing the timeout significantly, the checkpoints
keep failing. I was trying to look at the stack dump to determine
any deadlocks. There are no deadlocks. But this thread seems to be
in awaiting confirmation stage for long time -

Legacy Source Thread - Source:
read/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
read/Remove Kafka Metadata/ParMultiDo(Anonymous) -> Random key
assignment SPP/ParMultiDo(RandomPartitioner) -> Window for
repartitioning SPP/Window.Assign.out -> ToKeyedWorkItem (1/4)

awaiting notification on [ 0x0007b83b7958
 ] , holding [

  * 0x0007bc786fd8


]
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at

java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at

java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at

org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)


My application is IO bound, i.e every record makes a rest call and
takes a few seconds to complete.
Did not face this issue with 1.9 runner. What has changed in 1.10
runner ? Any pointers for debugging?

Omkar