Re: Define custom trigger

2021-10-18 Thread Eleanore Jin
Thanks a lot Luke, I will try it out!

Eleanore

On Mon, Oct 18, 2021 at 9:41 AM Luke Cwik  wrote:

> You could use a stateful DoFn and buffer the first message and everything
> that you see for the first 5 seconds and then afterwards pass everything
> through. Something like:
>
> processElement(...) {
>   if (value state is false) {
> if (bag state is empty) {
>   schedule processing timer for 5 seconds from now
> }
> buffer element in bag state
>   }
>   output element
> }
>
> onTimer(...) {
>   output everything in bag state buffer
>   set value state to true
> }
>
>
>
> On Sun, Oct 17, 2021 at 2:31 PM Eleanore Jin 
> wrote:
>
>> Hi community,
>>
>> Here is my usecase:
>> My pipeline uses another kafka topic as a SideInputs, that contains the
>> filter criteria. Then when processing the mainstream, the pipeline is
>> trying to see if each message from mainstream matches *any *existing
>> filter criteria.
>>
>> The sideInputs logic is: whenever seeing at least 1 element from
>> sideInputs topic -> fire -> and accumulate all the elements seen
>>
>> Trigger trigger = Repeatedly.forever(
>>   AfterFirst.of(
>> AfterPane.elementCountAtLeast(1),
>> AfterProcessingTime.pastFirstElementInPane()
>>   ));
>>
>> return kafkaValues.apply("sideInputFromTopic-" + topicName,
>>
>> Window.>into(new GlobalWindows())
>>   .triggering(trigger)
>>   .accumulatingFiredPanes()
>>   .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
>> );
>>
>>
>> Here is my problem:
>> During application restarts, the 1st message from mainstream matches the 3rd 
>> element in the sideInputs.
>> but since the pane will fire when the 1st element from sideInputs is 
>> consumed, so it marked the 1st message
>> from mainstream as not pass filter.
>>
>> Then I switched to the below trigger, which fires after 5 seconds after 
>> reading 1 element from sideInputs.
>> This could workaround the application restarts problem, but for any 
>> subsequent published elements in the sideInputs,
>> it also requires waiting for 5 seconds to fire, which could lead to some 
>> messages from mainstream incorrectly being marked as not pass filters.
>>
>> Trigger trigger = Repeatedly.forever(
>>   AfterFirst.of(
>> AfterPane.elementCountAtLeast(3000),
>> 
>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))
>>   ));
>>
>> Question:
>>
>> I want to define a custom trigger that 1st time fires, it will wait for say 
>> 5 - 10 seconds before processing the mainstream.
>>
>> But afterwards, it fires as soon as it sees a new element from sideInputs.
>>
>> Is this possible? Or should I be able to leverage existing API to do it?
>>
>>
>> Beam version: 2.23, Flink runner: 1.10.2
>>
>> Thanks a lot!
>>
>> Eleanore
>>
>>
>>


Define custom trigger

2021-10-17 Thread Eleanore Jin
Hi community,

Here is my usecase:
My pipeline uses another kafka topic as a SideInputs, that contains the
filter criteria. Then when processing the mainstream, the pipeline is
trying to see if each message from mainstream matches *any *existing filter
criteria.

The sideInputs logic is: whenever seeing at least 1 element from sideInputs
topic -> fire -> and accumulate all the elements seen

Trigger trigger = Repeatedly.forever(
  AfterFirst.of(
AfterPane.elementCountAtLeast(1),
AfterProcessingTime.pastFirstElementInPane()
  ));

return kafkaValues.apply("sideInputFromTopic-" + topicName,

Window.>into(new GlobalWindows())
  .triggering(trigger)
  .accumulatingFiredPanes()
  .withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
);


Here is my problem:
During application restarts, the 1st message from mainstream matches
the 3rd element in the sideInputs.
but since the pane will fire when the 1st element from sideInputs is
consumed, so it marked the 1st message
from mainstream as not pass filter.

Then I switched to the below trigger, which fires after 5 seconds
after reading 1 element from sideInputs.
This could workaround the application restarts problem, but for any
subsequent published elements in the sideInputs,
it also requires waiting for 5 seconds to fire, which could lead to
some messages from mainstream incorrectly being marked as not pass
filters.

Trigger trigger = Repeatedly.forever(
  AfterFirst.of(
AfterPane.elementCountAtLeast(3000),

AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))
  ));

Question:

I want to define a custom trigger that 1st time fires, it will wait
for say 5 - 10 seconds before processing the mainstream.

But afterwards, it fires as soon as it sees a new element from sideInputs.

Is this possible? Or should I be able to leverage existing API to do it?


Beam version: 2.23, Flink runner: 1.10.2

Thanks a lot!

Eleanore


Re: Beam 2.28.0 objectReuse and fasterCopy for FlinkPipelineOption

2021-04-12 Thread Eleanore Jin
Thanks a lot Jan,

Im clear now.

Best,
Eleanore

On Mon, Apr 12, 2021 at 11:22 AM Jan Lukavský  wrote:

> Hi Eleanore,
>
> that's a good question. :)
>
> There has been discussion about this in the PR of the mentioned Jira [1].
> Generally, objectReuse enables Flink to hypothetically reuse instances of
> deserialized objects instead of creating new ones. But due to how Beam
> defines Coders it creates new instances nevertheless. See the discussion
> for details.
>
>  Jan
>
> [1] https://github.com/apache/beam/pull/13240#issuecomment-721635620
> On 4/12/21 7:29 PM, Eleanore Jin wrote:
>
> Hi Jan,
>
> Thanks a lot for the reply! This helps, I wonder if you have any idea
> whats the difference between fasterCopy vs objectReuse option?
>
> Eleanore
>
> On Fri, Apr 9, 2021 at 11:53 AM Jan Lukavský  wrote:
>
>> Hi Eleanore,
>>
>> the --fasterCopy option disables clone between operators (see [1]). It
>> should be safe to use it, unless your pipeline outputs an object and
>> later modifies the same instance. This is generally not supported by the
>> Beam model and is considered to be an user error. FlinkRunner
>> historically chose a way of "better-safe-than-sorry" approach and
>> explicitly cloned every received object between (non-shuffle) operators.
>> Enabling this option should increase performance, you can verify your
>> Pipeline is not doing any disallowed mutations using DirectRunner, which
>> checks this by default (without --enforceImmutability=false).
>>
>>   Jan
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-11146
>>
>> On 4/9/21 7:57 AM, Eleanore Jin wrote:
>> > Hi community,
>> >
>> > I am upgrading from Beam 2.23.0 -> 2.28.0, and a new
>> > FlinkPipelineOption is introduced: fasterCopy.
>> >
>> > Can you please help me understand what is the difference between the
>> > option objectReuse vs fasterCopy?
>> >
>> > Thanks a lot!
>> > Eleanore
>>
>


Re: Beam 2.28.0 objectReuse and fasterCopy for FlinkPipelineOption

2021-04-12 Thread Eleanore Jin
Hi Jan,

Thanks a lot for the reply! This helps, I wonder if you have any idea whats
the difference between fasterCopy vs objectReuse option?

Eleanore

On Fri, Apr 9, 2021 at 11:53 AM Jan Lukavský  wrote:

> Hi Eleanore,
>
> the --fasterCopy option disables clone between operators (see [1]). It
> should be safe to use it, unless your pipeline outputs an object and
> later modifies the same instance. This is generally not supported by the
> Beam model and is considered to be an user error. FlinkRunner
> historically chose a way of "better-safe-than-sorry" approach and
> explicitly cloned every received object between (non-shuffle) operators.
> Enabling this option should increase performance, you can verify your
> Pipeline is not doing any disallowed mutations using DirectRunner, which
> checks this by default (without --enforceImmutability=false).
>
>   Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-11146
>
> On 4/9/21 7:57 AM, Eleanore Jin wrote:
> > Hi community,
> >
> > I am upgrading from Beam 2.23.0 -> 2.28.0, and a new
> > FlinkPipelineOption is introduced: fasterCopy.
> >
> > Can you please help me understand what is the difference between the
> > option objectReuse vs fasterCopy?
> >
> > Thanks a lot!
> > Eleanore
>


Beam 2.28.0 objectReuse and fasterCopy for FlinkPipelineOption

2021-04-08 Thread Eleanore Jin
Hi community,

I am upgrading from Beam 2.23.0 -> 2.28.0, and a new FlinkPipelineOption is
introduced: fasterCopy.

Can you please help me understand what is the difference between the option
objectReuse vs fasterCopy?

Thanks a lot!
Eleanore


Re: Beam support Flink Async I/O operator

2021-01-27 Thread Eleanore Jin
ha, yes, sorry I forgot about that

On Wed, Jan 27, 2021 at 4:31 AM Alexey Romanenko 
wrote:

> I guess it was already discussed a while ago [1] and the conclusion was
> that Flink Async I/O is a specific Flink operator and it can’t be used
> directly from Beam since Beam should provide a unified model and its
> implementation for all supported runners.
>
> Did a proposed workaround, based on GroupIntoBatches [2], work for you?
>
> [1]
> https://lists.apache.org/thread.html/rc1e087a15036c18564d3147c3570c7d80c4963ff7d48cf8eaf180758%40%3Cuser.beam.apache.org%3E
> [2]
> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>
> On 26 Jan 2021, at 22:09, Boyuan Zhang  wrote:
>
> +dev 
>
> On Tue, Jan 26, 2021 at 1:07 PM Eleanore Jin 
> wrote:
>
>> Hi community,
>>
>> Does Beam support Flink Async I/O operator? if so, can you please share
>> the doc, and if not, is there any workaround to achieve the same in Beam
>> semantics?
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673
>>
>> Thanks a lot!
>> Eleanore
>>
>
>


Beam support Flink Async I/O operator

2021-01-26 Thread Eleanore Jin
Hi community,

Does Beam support Flink Async I/O operator? if so, can you please share the
doc, and if not, is there any workaround to achieve the same in Beam
semantics?

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673

Thanks a lot!
Eleanore


Re: KafkaIO memory issue

2020-10-28 Thread Eleanore Jin
Hi Alex,

Thanks for sharing this! I think my problem is I did not preserve enough
memory for JVM non-heap usage, and by default Flink set the xms and xmx to
be the same and I allocate almost all the memory for heap.

After add more memory, the memory usage seems stabilized.

We do use global window for other beam pipelines along with sideInput, I’m
not sure sliding window would work for us.

Is there a ticket that Beam community is working on to fix it?

Thanks a lot
Eleanore

On Wed, Oct 28, 2020 at 10:20 Alexey Romanenko 
wrote:

> I don’t think it’s a KafkaIO issue since checkpoints are handled by
> runner.
>
>
>
> Could it be similar to this issue?
>
>
> https://lists.apache.org/thread.html/r4a454a40197f2a59280ffeccfe44837ec072237aea56d50599f12184%40%3Cuser.beam.apache.org%3E
>
>
>
> Could you try a workaround with sliding windows proposed there?
>
>
>
>
>
> > On 22 Oct 2020, at 05:18, Eleanore Jin  wrote:
>
> >
>
> > Hi all,
>
> >
>
> > I am using beam 2.23 (java), and flink 1.10.2, my pipeline is quite
> simple read from a kafka topic and write to another kafka topic.
>
> >
>
> > When I enabled checkpoint, I see the memory usage of the flink job
> manager keeps on growing
>
> > 
>
> >
>
> > The Flink cluster is running on kubernetes, with 1 job manager, and 12
> task managers each with 4 slots, kafka input topic has 96 partitions. The
> checkpoint is stored in azure blob storage.
>
> >
>
> > Checkpoint happens every 3 seconds, with timeout 10 seconds, with
> minimum pause of 1 second.
>
> >
>
> > Any ideas why this happens?
>
> > Thanks a lot!
>
> > Eleanore
>
>
>
>


KafkaIO memory issue

2020-10-21 Thread Eleanore Jin
Hi all,

I am using beam 2.23 (java), and flink 1.10.2, my pipeline is quite simple
read from a kafka topic and write to another kafka topic.

When I enabled checkpoint, I see the memory usage of the flink job manager
keeps on growing
[image: image.png]

The Flink cluster is running on kubernetes, with 1 job manager, and 12 task
managers each with 4 slots, kafka input topic has 96 partitions. The
checkpoint is stored in azure blob storage.

Checkpoint happens every 3 seconds, with timeout 10 seconds, with minimum
pause of 1 second.

Any ideas why this happens?
Thanks a lot!
Eleanore


Re: Is KafkaIO KafkaWriter stateless

2020-08-31 Thread Eleanore Jin
Thanks Alex!

On Mon, Aug 31, 2020 at 9:32 AM Alexey Romanenko 
wrote:

> Yes, afaik, checkpoints are supported and used only for UnboundedRead
> transforms.
>
>
> On 27 Aug 2020, at 19:18, Eleanore Jin  wrote:
>
> Hi all,
>
> Just would like to confirm, KafkaWriter
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java>
>  has
> no state, so that means, when enabled checkpoint, no state will be
> checkpointed from KafkaWriter?
>
> Thanks a lot!
> Eleanore
>
>
>


Is KafkaIO KafkaWriter stateless

2020-08-27 Thread Eleanore Jin
Hi all,

Just would like to confirm, KafkaWriter

has
no state, so that means, when enabled checkpoint, no state will be
checkpointed from KafkaWriter?

Thanks a lot!
Eleanore


Re: Beam flink runner job not keeping up with input rate after downscaling

2020-08-15 Thread Eleanore Jin
Hi Sandeep,

As I am also exploring the Beam KafkaIO, just to share some of my thoughts.
1. My understanding is, in order to guarantee no message loss, you will
need to use KafkaExactlyOnceSink [1]. And it is not possible to relax to
at-least-once with current KafkaIO.
2. when KafkaExactlyOnceSink is enabled, the checkpoint will include:
offset from source, all the messages in between the last checkpoint and
current checkpoint.
3. Only after the checkpoint is completed, then KakfaExactlyOnceSink will
start publishing messages that have been checkpointed.
4. In case of failure during publishing these messages, the messages will
be retried, there will be sequenceId assigned to each message, to determine
which messages are published successfully, which one need to be tried. e.g.
say messages 5 - 10 are in checkpoint, only 5 and 6 are published
successfully, then when restart from checkpoint, only 7 to 10 will be
published again.

My question for your setup:
if you just enable checkpoint and still use KafkaWriter [2], and your
application is stateless, then the only state is source offset.
consider below scenario:
checkpoint offset 10, and checkpoint is succeeded, then message with offset
10 fails to be published, job restarted, it will resume from checkpoint,
and start from offset 11, then message 10 gets lost.

Please correct me if I am missing anything.

Thanks a lot!
Eleanore

[1]
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java
[2]
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java

On Tue, Aug 11, 2020 at 11:19 PM Eleanore Jin 
wrote:

>
> Hi Sandeep,
>
> Thanks a lot for the information! I am on a similar track which requires
> to scale up/down the stateless pipeline from a savepoint.
>
> It’s good to learn from your experience.
>
> Thanks!
> Eleanore
>
> On Tue, Aug 11, 2020 at 10:21 AM Kathula, Sandeep <
> sandeep_kath...@intuit.com> wrote:
>
>> Hi Eleanore,
>>
>> We are using atleast once semantics when writing to Kafka. We are Ok with
>> duplicate messages.
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>> *From: *Eleanore Jin 
>> *Date: *Monday, August 10, 2020 at 11:32 AM
>> *To: *"Kathula, Sandeep" 
>> *Cc: *"user@beam.apache.org" , "Vora, Jainik" <
>> jainik_v...@intuit.com>, "Benenson, Mikhail" ,
>> "Deshpande, Omkar" , "LeVeck, Matt" <
>> matt_lev...@intuit.com>
>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>> after downscaling
>>
>>
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Sandeep,
>>
>>
>>
>> Thanks a lot for sharing! On a separate note, I see you are using the
>> KafkaIO.write, but not with EOS (exactly once semantics). From my
>> understanding, just enabling a checkpoint will not be enough to guarantee
>> no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am
>> also read and write to Kafka with KafkaIO.
>>
>>
>>
>> Thanks a lot!
>>
>> Eleanore
>>
>>
>>
>> On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <
>> sandeep_kath...@intuit.com> wrote:
>>
>> Hi Eleanore,
>>
>> We are also observing that few task managers are able to
>> keep up with incoming load but few task managers are lagging behind after
>> starting from savepoint with less parallelism. Not all task managers are
>> affected by this problem. We repeated this test multiple times to confirm.
>>
>>
>>
>> Thanks
>>
>> Sandeep Kathula
>>
>>
>>
>> *From: *"Kathula, Sandeep" 
>> *Reply-To: *"user@beam.apache.org" 
>> *Date: *Monday, August 10, 2020 at 11:04 AM
>> *To: *"user@beam.apache.org" , "
>> eleanore@gmail.com" 
>> *Cc: *"Vora, Jainik" , "Benenson, Mikhail" <
>> mikhail_benen...@intuit.com>, "Deshpande, Omkar" <
>> omkar_deshpa...@intuit.com>, "LeVeck, Matt" 
>> *Subject: *Re: Beam flink runner job not keeping up with input rate
>> after downscaling
>>
>>
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Eleanore,
>>
>> Our DAG:
>>
>> Source: Strip Metadata/EventBusIO.Read/Read Bytes From
>> Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip
>> Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip
&g

Re: Beam flink runner job not keeping up with input rate after downscaling

2020-08-10 Thread Eleanore Jin
Hi Sandeep,

Can you please share your DAG? Is your job read and write to some sink?

Thanks a lot!

On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep 
wrote:

> Hi,
>
>We started a Beam application with Flink runner with parallelism as 50.
> It is a *stateless application.*  With initial parallelism of 50, our
> application is able to process up to *50,000 records* per second. After a
> week, we took a savepoint and restarted from savepoint with the parallelism
> of *18.* We are seeing that our application is only able to process *7000* 
> records
> per second but we expect it to process almost 18,000 records per second.
> Records processed per task manager was almost *half* of what is used to
> process previously with 50 task managers.
>
>
>
>  When we started a new application with 18 pods without any savepoint, it
> is able to process ~18500 records per second. This problem *occurs only
> when we downscale after taking a savepoint*. We ported same application
> to simple *Flink application without Apache Beam*, and there *it scales
> well without any issues* after restarting from savepoint with less
> parallelism.  So the problem should be with Apache Beam or some config we
> are passing to Beam/Flink. We are using the following config:
>
>
>
> numberOfExecutionRetries=2
>
> externalizedCheckpointsEnabled=true
>
> retainExternalizedCheckpointsOnCancellation=true
>
>
>
>
>
> We didn’t give any maxParallelism in our Beam application but just
> specifying parallelism.
>
>
>
> Beam version - 2.19
>
> Flink version- 1.9
>
>
>
> Any suggestions/help would be appreciated.
>
>
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
>
>


Re: KafkaIO Exactly once vs At least Once

2020-07-31 Thread Eleanore Jin
Hi Alex,

You previously suggested for KafkaIO exactly once, I do not need to
configure KafkaIO.write().withEOS(), only need to add producer property
enable.idempotence=true, however from the Code:
https://github.com/apache/beam/blob/release-2.23.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1311,
only withEOS(), then it will use KafkaExactlyOnceSink, otherwise it will
use KafkaWriter.

[image: image.png]

Did I misunderstand something? Or does it require EOS() for end-to-end
exactly once with KafkaIO as source and sink?

Thanks a lot!
Eleanore

On Wed, Jun 24, 2020 at 12:48 PM Eleanore Jin 
wrote:

>
> Hi Alex,
>
> Thanks a lot for the info.
>
> Eleanore
>
> On Wed, Jun 24, 2020 at 9:26 AM Alexey Romanenko 
> wrote:
>
>> Well, I think, in general, it will be a question of trade-off between
>> latency and performance in case of EOS sink (since EOS can’t be "for
>> free").
>>
>> I can’t recommend specific numbers for Flink (maybe Maximilian Michels or
>> others with more Flink knowledge can do), but I’d just try different
>> numbers to see how it it will affect the results.
>>
>> On 23 Jun 2020, at 23:27, Eleanore Jin  wrote:
>>
>> Hi Alexey,
>>
>> Thanks a lot for the information! I will give it a try.
>>
>> Regarding the checkpoint intervals, I think the Flink community suggested
>> something between 3-5 minutes, I am not sure yet if the checkpoint interval
>> can be in milliseconds? Currently, our beam pipeline is stateless, there is
>> no other operator state or user defined state.
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>>
>>> On 23 Jun 2020, at 07:49, Eleanore Jin  wrote:
>>>
>>> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the
>>> way I understand to use is 
>>> KafkaIO.write().withEOS(numPartitionsForSinkTopic,
>>> "mySlinkGroupId"), reading from your response, do I need additionally
>>> configure KafkaProducer property enable.idempotence=true, or I only need to
>>> configure this property?
>>>
>>>
>>> No, you don’t need to do that. New KafkaProducer will be created with
>>> this option set in KafkaExactlyOnceSink [1].
>>>
>>> So can you please correct me if the above settings is not the optimal
>>> and if there is anyway to reduce the latency by introducing checkpointing
>>> for EOS?
>>>
>>>
>>> Your settings look fine for me. You probably could play with checkpoint
>>> intervals (why it’s 10 secs?) to reduce a latency.
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>>>
>>>
>>>
>>>
>>> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>
>>>> I think you don’t need to enable EOS in this case since KafkaIO has a
>>>> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
>>>> all runners) and it relies on setting “enable.idempotence=true” for
>>>> KafkaProducer.
>>>> I’m not sure that you can achieve “at least once” semantics with
>>>> current KafkaIO implementation.
>>>>
>>>> On 16 Jun 2020, at 17:16, Eleanore Jin  wrote:
>>>>
>>>> Hi All,
>>>>
>>>> I previously asked a few questions regarding enable EOS (exactly once
>>>> semantics) please see below.
>>>>
>>>> Our Beam pipeline uses KafkaIO to read from source topic, and then use
>>>> KafkaIO to publish to sink topic.
>>>>
>>>> According to Max's answer to my previous questions, enable EOS with
>>>> KafkaIO will introduce latency,
>>>> as only after checkpoints of all message within the checkpoint
>>>> interval, then the KakfaIO.ExactlyOnceWriter
>>>> processElement method will be called. So the latency depends on the
>>>> checkpoint interval.
>>>>
>>>> I just wonder if I relax to At Least Once, do I still need to enable
>>>> EOS on KafkaIO? Or it is not required?
>>>> If not, can you please provide some instruction how should it be done?
>>>>
>>>> Thanks a lot!
>>>> Eleanore
>>>>
>>>> > Thanks for the response! the reason

Re: [ANNOUNCE] Beam 2.23.0 Released

2020-07-30 Thread Eleanore Jin
Hi Kyle,

Thanks a lot for the information.

Eleanore

On Thu, Jul 30, 2020 at 3:06 PM Kyle Weaver  wrote:

> Hi Eleanore, there have been no changes to Beam's supported Flink versions
> since Beam 2.21.0. Beam supports Flink 1.8, 1.9, and 1.10.
>
> If you are looking for Flink 1.11 support, I didn't find an existing
> issue, so I filed https://issues.apache.org/jira/browse/BEAM-10612.
>
> On Thu, Jul 30, 2020 at 2:56 PM Eleanore Jin 
> wrote:
>
>> Hi Valentyn,
>>
>> Thanks for the information! I am trying to see what Flink version does
>> Beam 2.23.0 support, but the information is not available here
>> https://beam.apache.org/documentation/runners/flink/.
>>
>> Can you please clarify?
>>
>> Thanks a lot!
>> Eleanore
>>
>> On Wed, Jul 29, 2020 at 5:47 PM Valentyn Tymofieiev 
>> wrote:
>>
>>> The Apache Beam team is pleased to announce the release of version
>>> 2.23.0.
>>>
>>> Apache Beam is an open source unified programming model to define and
>>> execute data processing pipelines, including ETL, batch and stream
>>> (continuous) processing. See: https://beam.apache.org
>>>
>>> You can download the release here:
>>>
>>> https://beam.apache.org/get-started/downloads/
>>>
>>> This release includes bug fixes, features, and improvements detailed on
>>> the Beam blog: https://beam.apache.org/blog/beam-2.23.0/
>>>
>>> Thanks to everyone who contributed to this release, and we hope you enjoy
>>> using Beam 2.23.0.
>>>
>>> -- Valentyn Tymofieiev, on behalf of The Apache Beam team
>>>
>>


Re: [ANNOUNCE] Beam 2.23.0 Released

2020-07-30 Thread Eleanore Jin
Hi Valentyn,

Thanks for the information! I am trying to see what Flink version does Beam
2.23.0 support, but the information is not available here
https://beam.apache.org/documentation/runners/flink/.

Can you please clarify?

Thanks a lot!
Eleanore

On Wed, Jul 29, 2020 at 5:47 PM Valentyn Tymofieiev 
wrote:

> The Apache Beam team is pleased to announce the release of version 2.23.0.
>
> Apache Beam is an open source unified programming model to define and
> execute data processing pipelines, including ETL, batch and stream
> (continuous) processing. See: https://beam.apache.org
>
> You can download the release here:
>
> https://beam.apache.org/get-started/downloads/
>
> This release includes bug fixes, features, and improvements detailed on
> the Beam blog: https://beam.apache.org/blog/beam-2.23.0/
>
> Thanks to everyone who contributed to this release, and we hope you enjoy
> using Beam 2.23.0.
>
> -- Valentyn Tymofieiev, on behalf of The Apache Beam team
>


Re: Beam supports Flink Async IO operator

2020-07-13 Thread Eleanore Jin
Hi Kaymak,

Sorry for the late reply and thanks for sharing the blog, I went through
it.

here is my understanding:

timely processing could `buffer` data and send them to the external
system in a batch fashion, but in order for it to work `similar` flink
async IO operator it also requires the external system to be able to accept
input data in bulk and return back the response synchronously. Otherwise it
would still like making multiple sync calls to the external system and get
back responses one by one.

Thanks a lot for sharing!

Best,
Eleanore

On Thu, Jul 9, 2020 at 1:56 AM Kaymak, Tobias 
wrote:

> Hi Eleanore,
>
> Maybe batched RPC is what you are looking for?
> https://beam.apache.org/blog/timely-processing/
>
> On Wed, Jul 8, 2020 at 6:20 PM Eleanore Jin 
> wrote:
>
>> Thanks Luke and Max for the information.
>>
>> We have the use case that inside a DoFn, we will need to call external
>> services to trigger some other flows. The calls to other services are REST
>> based sync calls, and it will take 150 milliseconds plus to return. We are
>> using Flink as the runner and I came across this Async I/O operator from
>> flink, trying to figure out if this is the right approach and if Beam
>> provides any similar concept for it.
>>
>> Thanks!
>> Eleanore
>>
>> On Wed, Jul 8, 2020 at 2:55 AM Maximilian Michels  wrote:
>>
>>> Just to clarify: We could make the AsnycIO operator also available in
>>> Beam but the operator has to be represented by a concept in Beam.
>>> Otherwise, there is no way to know when to produce it as part of the
>>> translation.
>>>
>>> On 08.07.20 11:53, Maximilian Michels wrote:
>>> > Flink's AsycIO operator is useful for processing io-bound operations,
>>> > e.g. sending network requests. Like Luke mentioned, it is not
>>> available
>>> > in Beam.
>>> >
>>> > -Max
>>> >
>>> > On 07.07.20 22:11, Luke Cwik wrote:
>>> >> Beam is a layer that sits on top of execution engines like Flink and
>>> >> provides its own programming model thus native operators like Flink's
>>> >> async IO operator are not exposed.
>>> >>
>>> >> Most people use a DoFn to do all their IO and sometimes will compose
>>> >> it with another transform such as GroupIntoBatches[1] to simplify
>>> >> their implementation.
>>> >>
>>> >> Why do you need async?
>>> >>
>>> >> 1:
>>> >>
>>> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>>> >>
>>> >>
>>> >>
>>> >> On Tue, Jul 7, 2020 at 11:03 AM Eleanore Jin >> >> <mailto:eleanore@gmail.com>> wrote:
>>> >>
>>> >> Hi community,
>>> >>
>>> >> I cannot find any documentation for Beam supporting Flink async IO
>>> >> operator
>>> >>
>>> >> (
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html),
>>>
>>> >>
>>> >> just wonder is this not supported right now?
>>> >>
>>> >> Thanks a lot!
>>> >> Eleanore
>>> >>
>>>
>>


Re: Beam supports Flink Async IO operator

2020-07-08 Thread Eleanore Jin
Thanks Luke and Max for the information.

We have the use case that inside a DoFn, we will need to call external
services to trigger some other flows. The calls to other services are REST
based sync calls, and it will take 150 milliseconds plus to return. We are
using Flink as the runner and I came across this Async I/O operator from
flink, trying to figure out if this is the right approach and if Beam
provides any similar concept for it.

Thanks!
Eleanore

On Wed, Jul 8, 2020 at 2:55 AM Maximilian Michels  wrote:

> Just to clarify: We could make the AsnycIO operator also available in
> Beam but the operator has to be represented by a concept in Beam.
> Otherwise, there is no way to know when to produce it as part of the
> translation.
>
> On 08.07.20 11:53, Maximilian Michels wrote:
> > Flink's AsycIO operator is useful for processing io-bound operations,
> > e.g. sending network requests. Like Luke mentioned, it is not available
> > in Beam.
> >
> > -Max
> >
> > On 07.07.20 22:11, Luke Cwik wrote:
> >> Beam is a layer that sits on top of execution engines like Flink and
> >> provides its own programming model thus native operators like Flink's
> >> async IO operator are not exposed.
> >>
> >> Most people use a DoFn to do all their IO and sometimes will compose
> >> it with another transform such as GroupIntoBatches[1] to simplify
> >> their implementation.
> >>
> >> Why do you need async?
> >>
> >> 1:
> >>
> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
> >>
> >>
> >>
> >> On Tue, Jul 7, 2020 at 11:03 AM Eleanore Jin  >> <mailto:eleanore@gmail.com>> wrote:
> >>
> >> Hi community,
> >>
> >> I cannot find any documentation for Beam supporting Flink async IO
> >> operator
> >>
> >> (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html),
>
> >>
> >> just wonder is this not supported right now?
> >>
> >> Thanks a lot!
> >> Eleanore
> >>
>


Beam supports Flink Async IO operator

2020-07-07 Thread Eleanore Jin
Hi community,

I cannot find any documentation for Beam supporting Flink async IO operator
(
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html),
just wonder is this not supported right now?

Thanks a lot!
Eleanore


Re: KafkaIO Exactly once vs At least Once

2020-06-24 Thread Eleanore Jin
Hi Alex,

Thanks a lot for the info.

Eleanore

On Wed, Jun 24, 2020 at 9:26 AM Alexey Romanenko 
wrote:

> Well, I think, in general, it will be a question of trade-off between
> latency and performance in case of EOS sink (since EOS can’t be "for
> free").
>
> I can’t recommend specific numbers for Flink (maybe Maximilian Michels or
> others with more Flink knowledge can do), but I’d just try different
> numbers to see how it it will affect the results.
>
> On 23 Jun 2020, at 23:27, Eleanore Jin  wrote:
>
> Hi Alexey,
>
> Thanks a lot for the information! I will give it a try.
>
> Regarding the checkpoint intervals, I think the Flink community suggested
> something between 3-5 minutes, I am not sure yet if the checkpoint interval
> can be in milliseconds? Currently, our beam pipeline is stateless, there is
> no other operator state or user defined state.
>
> Thanks a lot!
> Eleanore
>
> On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko 
> wrote:
>
>>
>> On 23 Jun 2020, at 07:49, Eleanore Jin  wrote:
>>
>> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the
>> way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
>> "mySlinkGroupId"), reading from your response, do I need additionally
>> configure KafkaProducer property enable.idempotence=true, or I only need to
>> configure this property?
>>
>>
>> No, you don’t need to do that. New KafkaProducer will be created with
>> this option set in KafkaExactlyOnceSink [1].
>>
>> So can you please correct me if the above settings is not the optimal and
>> if there is anyway to reduce the latency by introducing checkpointing for
>> EOS?
>>
>>
>> Your settings look fine for me. You probably could play with checkpoint
>> intervals (why it’s 10 secs?) to reduce a latency.
>>
>>
>> [1]
>> https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>>
>>
>>
>>
>> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>>> I think you don’t need to enable EOS in this case since KafkaIO has a
>>> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
>>> all runners) and it relies on setting “enable.idempotence=true” for
>>> KafkaProducer.
>>> I’m not sure that you can achieve “at least once” semantics with current
>>> KafkaIO implementation.
>>>
>>> On 16 Jun 2020, at 17:16, Eleanore Jin  wrote:
>>>
>>> Hi All,
>>>
>>> I previously asked a few questions regarding enable EOS (exactly once
>>> semantics) please see below.
>>>
>>> Our Beam pipeline uses KafkaIO to read from source topic, and then use
>>> KafkaIO to publish to sink topic.
>>>
>>> According to Max's answer to my previous questions, enable EOS with
>>> KafkaIO will introduce latency,
>>> as only after checkpoints of all message within the checkpoint interval,
>>> then the KakfaIO.ExactlyOnceWriter
>>> processElement method will be called. So the latency depends on the
>>> checkpoint interval.
>>>
>>> I just wonder if I relax to At Least Once, do I still need to enable EOS
>>> on KafkaIO? Or it is not required?
>>> If not, can you please provide some instruction how should it be done?
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>> > Thanks for the response! the reason to setup the state backend is to
>>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>>> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can
>>> > you please help me clarify my understanding?
>>> >
>>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>>> > EOS, ExactlyOnceWriter processElement method is annotated
>>> > with @RequiresStableInput, so all the messages will be cached
>>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
>>> > messages will be processed by ExactlyOnceWriter?
>>>
>>> That's correct.
>>>
>>> >
>>> > 2. Upon checkpoint, will those messages cached by
>>> > KeyedBufferingEleementsHandler also checkpointed?
>>>
>>> Yes, the buffered elements will be checkpointed.
>>>
>>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in
>>&

Re: KafkaIO Exactly once vs At least Once

2020-06-23 Thread Eleanore Jin
Hi Alexey,

Thanks a lot for the information! I will give it a try.

Regarding the checkpoint intervals, I think the Flink community suggested
something between 3-5 minutes, I am not sure yet if the checkpoint interval
can be in milliseconds? Currently, our beam pipeline is stateless, there is
no other operator state or user defined state.

Thanks a lot!
Eleanore

On Tue, Jun 23, 2020 at 9:58 AM Alexey Romanenko 
wrote:

>
> On 23 Jun 2020, at 07:49, Eleanore Jin  wrote:
>
> the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the
> way I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
> "mySlinkGroupId"), reading from your response, do I need additionally
> configure KafkaProducer property enable.idempotence=true, or I only need to
> configure this property?
>
>
> No, you don’t need to do that. New KafkaProducer will be created with this
> option set in KafkaExactlyOnceSink [1].
>
> So can you please correct me if the above settings is not the optimal and
> if there is anyway to reduce the latency by introducing checkpointing for
> EOS?
>
>
> Your settings look fine for me. You probably could play with checkpoint
> intervals (why it’s 10 secs?) to reduce a latency.
>
>
> [1]
> https://github.com/apache/beam/blob/05f676ea255587ccf52ad56d84191838bb981ebb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L711
>
>
>
>
> On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
>
>> I think you don’t need to enable EOS in this case since KafkaIO has a
>> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
>> all runners) and it relies on setting “enable.idempotence=true” for
>> KafkaProducer.
>> I’m not sure that you can achieve “at least once” semantics with current
>> KafkaIO implementation.
>>
>> On 16 Jun 2020, at 17:16, Eleanore Jin  wrote:
>>
>> Hi All,
>>
>> I previously asked a few questions regarding enable EOS (exactly once
>> semantics) please see below.
>>
>> Our Beam pipeline uses KafkaIO to read from source topic, and then use
>> KafkaIO to publish to sink topic.
>>
>> According to Max's answer to my previous questions, enable EOS with
>> KafkaIO will introduce latency,
>> as only after checkpoints of all message within the checkpoint interval,
>> then the KakfaIO.ExactlyOnceWriter
>> processElement method will be called. So the latency depends on the
>> checkpoint interval.
>>
>> I just wonder if I relax to At Least Once, do I still need to enable EOS
>> on KafkaIO? Or it is not required?
>> If not, can you please provide some instruction how should it be done?
>>
>> Thanks a lot!
>> Eleanore
>>
>> > Thanks for the response! the reason to setup the state backend is to
>> > experiment Kafka EOS with Beam running on Flink.  Reading through the
>> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can
>> > you please help me clarify my understanding?
>> >
>> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
>> > EOS, ExactlyOnceWriter processElement method is annotated
>> > with @RequiresStableInput, so all the messages will be cached
>> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
>> > messages will be processed by ExactlyOnceWriter?
>>
>> That's correct.
>>
>> >
>> > 2. Upon checkpoint, will those messages cached by
>> > KeyedBufferingEleementsHandler also checkpointed?
>>
>> Yes, the buffered elements will be checkpointed.
>>
>> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
>> > stream processing, the delay is based on the checkpoint interval? How to
>> > reduce the latency while still have EOS guarantee?
>>
>> Indeed, the checkpoint interval and the checkpoint duration limits the
>> latency. Given the current design and the guarantees, there is no other
>> way to influence the latency.
>>
>> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
>> > checkpoint successfully, the checkpointed offset will be committed back
>> > to kafka, but if this operation does not finish successfully, and then
>> > the job gets cancelled/stopped, and re-submit the job again (with the
>> > same consumer group for source topics, but different jobID), then it is
>> > possible duplicated processing still exists? because the consumed offset
>> > is not committed back to kafka?
>>
>> This option is for the Kafka consumer. AFAIK this is just a convenience
>> method to commit the latest checkpointed offset to Kafka. This offset is
>> not used when restoring from a checkpoint. However, if you don't restore
>> from a checkpoint, you can resume from that offset which might be
>> convenient or not, depending on your use case.
>>
>>
>>
>


Re: KafkaIO Exactly once vs At least Once

2020-06-22 Thread Eleanore Jin
Hi Alexey,

Thanks for the response, below are some of my follow up question:

the KafkaIO EOS-sink you referred is: KafkaExactlyOnceSink? If so, the way
I understand to use is KafkaIO.write().withEOS(numPartitionsForSinkTopic,
"mySlinkGroupId"), reading from your response, do I need additionally
configure KafkaProducer property enable.idempotence=true, or I only need to
configure this property?

The configuration I currently tried:
- from FlinkRunnerOptions:

   1. enable checkpoints: options.setCheckpointingInterval(10_000L);
   2. set state backend to Filesystem:
   options.setStateBackendFactory(FsBackendFactory. class);
   3. optionally set number of retries when pipeline fails before exit the
   application: options.setNumberOfExecutionRetries(2);


- from KafkaIO.read():

   1. set enable.auto.commit to false for Kafka ConsumerConfig
   2. set Kafka ConsumerConfig isolation.level to read_committed via Beam
   KafkaIO.Read: withReadCommitted()
   3. set to commit finalized offset to Kafka, called when finalize
   checkpoint: commitOffsetsInFinalize()

- from KafkaIO.write(), I only configured:
withEOS(numPartitionsForSinkTopic, "mySlinkGroupId")

With the above settings,  the current observation is: when I inject some
artificial Exception, the beam job is triggered to restart and there is no
message loss, but the message will not show up in the output topic until
the checkpoint finishes, so the latency depends on the checkpoint interval.

So can you please correct me if the above settings is not the optimal and
if there is anyway to reduce the latency by introducing checkpointing for
EOS?

Thanks a lot!
Eleanore


On Mon, Jun 22, 2020 at 10:17 AM Alexey Romanenko 
wrote:

> I think you don’t need to enable EOS in this case since KafkaIO has a
> dedicated EOS-sink implementation for Beam part (btw, it’s not supported by
> all runners) and it relies on setting “enable.idempotence=true” for
> KafkaProducer.
> I’m not sure that you can achieve “at least once” semantics with current
> KafkaIO implementation.
>
> On 16 Jun 2020, at 17:16, Eleanore Jin  wrote:
>
> Hi All,
>
> I previously asked a few questions regarding enable EOS (exactly once
> semantics) please see below.
>
> Our Beam pipeline uses KafkaIO to read from source topic, and then use
> KafkaIO to publish to sink topic.
>
> According to Max's answer to my previous questions, enable EOS with
> KafkaIO will introduce latency,
> as only after checkpoints of all message within the checkpoint interval,
> then the KakfaIO.ExactlyOnceWriter
> processElement method will be called. So the latency depends on the
> checkpoint interval.
>
> I just wonder if I relax to At Least Once, do I still need to enable EOS
> on KafkaIO? Or it is not required?
> If not, can you please provide some instruction how should it be done?
>
> Thanks a lot!
> Eleanore
>
> > Thanks for the response! the reason to setup the state backend is to
> > experiment Kafka EOS with Beam running on Flink.  Reading through the
> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can
> > you please help me clarify my understanding?
> >
> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> > EOS, ExactlyOnceWriter processElement method is annotated
> > with @RequiresStableInput, so all the messages will be cached
> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> > messages will be processed by ExactlyOnceWriter?
>
> That's correct.
>
> >
> > 2. Upon checkpoint, will those messages cached by
> > KeyedBufferingEleementsHandler also checkpointed?
>
> Yes, the buffered elements will be checkpointed.
>
> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> > stream processing, the delay is based on the checkpoint interval? How to
> > reduce the latency while still have EOS guarantee?
>
> Indeed, the checkpoint interval and the checkpoint duration limits the
> latency. Given the current design and the guarantees, there is no other
> way to influence the latency.
>
> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> > checkpoint successfully, the checkpointed offset will be committed back
> > to kafka, but if this operation does not finish successfully, and then
> > the job gets cancelled/stopped, and re-submit the job again (with the
> > same consumer group for source topics, but different jobID), then it is
> > possible duplicated processing still exists? because the consumed offset
> > is not committed back to kafka?
>
> This option is for the Kafka consumer. AFAIK this is just a convenience
> method to commit the latest checkpointed offset to Kafka. This offset is
> not used when restoring from a checkpoint. However, if you don't restore
> from a checkpoint, you can resume from that offset which might be
> convenient or not, depending on your use case.
>
>
>


KafkaIO Exactly once vs At least Once

2020-06-16 Thread Eleanore Jin
Hi All,

I previously asked a few questions regarding enable EOS (exactly once
semantics) please see below.

Our Beam pipeline uses KafkaIO to read from source topic, and then use
KafkaIO to publish to sink topic.

According to Max's answer to my previous questions, enable EOS with KafkaIO
will introduce latency,
as only after checkpoints of all message within the checkpoint interval,
then the KakfaIO.ExactlyOnceWriter
processElement method will be called. So the latency depends on the
checkpoint interval.

I just wonder if I relax to At Least Once, do I still need to enable EOS on
KafkaIO? Or it is not required?
If not, can you please provide some instruction how should it be done?

Thanks a lot!
Eleanore

> Thanks for the response! the reason to setup the state backend is to
> experiment Kafka EOS with Beam running on Flink.  Reading through the
> code and this PR , can
> you please help me clarify my understanding?
>
> 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> EOS, ExactlyOnceWriter processElement method is annotated
> with @RequiresStableInput, so all the messages will be cached
> by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> messages will be processed by ExactlyOnceWriter?

That's correct.

>
> 2. Upon checkpoint, will those messages cached by
> KeyedBufferingEleementsHandler also checkpointed?

Yes, the buffered elements will be checkpointed.

> 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> stream processing, the delay is based on the checkpoint interval? How to
> reduce the latency while still have EOS guarantee?

Indeed, the checkpoint interval and the checkpoint duration limits the
latency. Given the current design and the guarantees, there is no other
way to influence the latency.

> 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> checkpoint successfully, the checkpointed offset will be committed back
> to kafka, but if this operation does not finish successfully, and then
> the job gets cancelled/stopped, and re-submit the job again (with the
> same consumer group for source topics, but different jobID), then it is
> possible duplicated processing still exists? because the consumed offset
> is not committed back to kafka?

This option is for the Kafka consumer. AFAIK this is just a convenience
method to commit the latest checkpointed offset to Kafka. This offset is
not used when restoring from a checkpoint. However, if you don't restore
from a checkpoint, you can resume from that offset which might be
convenient or not, depending on your use case.


Re: Beam supports Flink RichAsyncFunction

2020-06-16 Thread Eleanore Jin
Thanks Luke for the info. I will take a look.

Eleanore

On Mon, Jun 15, 2020 at 12:48 PM Luke Cwik  wrote:

> The intent is that users shouldn't have to use async I/O since the idea is
> that the runner should increase the number of workers/threads being
> processed automatically so that you never need to special case this.
> Unfortunately Dataflow is the only one who does this today so you are
> forced to use something like GroupIntoBatches[1] to gather input elements
> that you convert into requests you want to send and manage your own threads
> / completion.
>
> 1:
> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
>
> On Sun, Jun 14, 2020 at 7:21 PM Eleanore Jin 
> wrote:
>
>> Hi Community,
>>
>> I am trying to convert an existing Flink job into Beam pipeline. In the
>> current Flink job, we have async I/O operator (
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html)
>> which extends RichAsyncFunction
>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/async/RichAsyncFunction.html>
>> .
>>
>> I did not find any document online for Beam to support this, if it is
>> documented somewhere, can you please point to me?
>>
>> In case Beam does not support it, is there any suggested 'work around'
>> for it?
>>
>> Thanks a lot!
>> Eleanore
>>
>


Beam supports Flink RichAsyncFunction

2020-06-14 Thread Eleanore Jin
Hi Community,

I am trying to convert an existing Flink job into Beam pipeline. In the
current Flink job, we have async I/O operator (
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html)
which extends RichAsyncFunction

.

I did not find any document online for Beam to support this, if it is
documented somewhere, can you please point to me?

In case Beam does not support it, is there any suggested 'work around' for
it?

Thanks a lot!
Eleanore


Re: KafkaIO.write dynamically fanout to different kafka topics

2020-05-14 Thread Eleanore Jin
Hi Alex,

Thanks a lot for the information!

Best
Eleanore

On Thu, May 14, 2020 at 1:53 AM Alexey Romanenko 
wrote:

> Hi Eleanore,
>
> Yes, to define output topic dynamically for every record, you may want to
> use KafkaIO.writeRecords() that takes PCollection as
> an input and for every processed ProducerRecord it takes its topic name (if
> it was specified there), and use it as an output topic. So, in this way,
> you can set for every record a topic name where it will be published.
>
> If I got your question right, you need to have an intermediate PTransfrom
> before KafkaIO.writeRecords(), that will use an information from a message
> field to define a topic where your record should be published, and then
> create a new ProducerRecord with a proper topic name.
>
> > On 14 May 2020, at 07:09, Eleanore Jin  wrote:
> >
> > Hi all,
> >
> > I have a beam pipeline, which will read from kafka topic via KafkaIO,
> and based on the message field, add additional field in the message for the
> destination topic.
> >
> > I see KakfaIO.write can be used to publish to kafka topics.
> >
> > In KafkaIO.java, it construct the ProducerRecord, and getTopic()
> determines which topic to publish, and this information is passed when
> create PTransforms via KafkaIO.write.
> >
> > Any suggestions to dynamically set kafka topic from message field?
> >
> > Thanks a lot!
> > Eleanore
>
>


KafkaIO.write dynamically fanout to different kafka topics

2020-05-13 Thread Eleanore Jin
Hi all,

I have a beam pipeline, which will read from kafka topic via KafkaIO, and
based on the message field, add additional field in the message for the
destination topic.

I see KakfaIO.write can be used to publish to kafka topics.

In KafkaIO.java, it construct the ProducerRecord, and getTopic()
determines which topic to publish, and this information is passed when
create PTransforms via KafkaIO.write.

Any suggestions to dynamically set kafka topic from message field?

Thanks a lot!
Eleanore


Re: GC overhead limit exceeded

2020-05-13 Thread Eleanore Jin
Hi Reuven,

This is the code snippet for side input, we dont have window & trigger in
the main business logic PTransforms.

Thanks!
Eleanore

private  PCollection> createSideCollection(String topicName,
  Class> deserializerClass) {

  Map consumerProperties = ImmutableMap.builder()
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString())
.build();

  PCollection> kafkaValues =
pipeline.apply("collectionFromTopic-" + topicName,
KafkaIO.read()
  .withBootstrapServers(kafkaSettings.getBootstrapServers())
  .withTopics(Collections.singletonList(topicName))
  .withKeyDeserializer(KeyDeserializer.class)
  .withValueDeserializer(deserializerClass)
  .withConsumerConfigUpdates(consumerProperties)
  .withoutMetadata());

  Trigger trigger = Repeatedly.forever(
AfterFirst.of(
  AfterPane.elementCountAtLeast(1),
  AfterProcessingTime.pastFirstElementInPane()
));

  return kafkaValues.apply(Window.>into(new GlobalWindows())
.triggering(trigger)
.accumulatingFiredPanes()
.withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
  );
}


On Wed, May 13, 2020 at 1:39 PM Reuven Lax  wrote:

> Do you have a window or trigger set up in your pipeline?
>
> On Mon, May 11, 2020 at 2:41 PM Eleanore Jin 
> wrote:
>
>>
>> Hi Max,
>>
>> No I did not introduce RocksDB at this point since the pipeline is
>> stateless apart from Kafka offset.
>>
>> So what we do is to ensure there is a dummy message in the side input to
>> avoid this situation.
>>
>> Thanks!
>> Eleanore
>>
>> On Mon, May 11, 2020 at 2:57 AM Maximilian Michels 
>> wrote:
>>
>>> Generally, it is to be expected that the main input is buffered until
>>> the side input is available. We really have no other option to correctly
>>> process the data.
>>>
>>> Have you tried using RocksDB as the state backend to prevent too much GC
>>> churn?
>>>
>>> -Max
>>>
>>> On 07.05.20 06:27, Eleanore Jin wrote:
>>> > Please see: https://issues.apache.org/jira/browse/BEAM-9914
>>> >
>>> > Thanks a lot!
>>> > Eleanore
>>> >
>>> > On Wed, May 6, 2020 at 9:17 PM Ankur Goenka >> > <mailto:goe...@google.com>> wrote:
>>> >
>>> > Thanks for sharing the response. It makes sense to me.
>>> > Please file a jira in Beam so that we can prioritize it.
>>> >
>>> > Thanks,
>>> > Ankur
>>> >
>>> > On Wed, May 6, 2020 at 9:08 PM Eleanore Jin <
>>> eleanore@gmail.com
>>> > <mailto:eleanore@gmail.com>> wrote:
>>> >
>>> > Hi Ankur,
>>> >
>>> > Thanks for your response.
>>> >
>>> > I also checked with Flink Community, here is there response, in
>>> > short, flink does not cache the main input data if there is no
>>> > data available in side input  (flink broadcast stream)
>>> >
>>> > - quote from flink community:
>>> >
>>> > Coming back to your question, Flink's Broadcast stream does
>>> > *not* block or collect events from the non-broadcasted side if
>>> > the broadcast side doesn't serve events.
>>> > However, the user-implemented operators (Beam or your code in
>>> > this case) often puts non-broadcasted events into state to wait
>>> > for input from the other side.
>>> > Since the error is not about lack of memory, the buffering in
>>> > Flink state might not be the problem here.
>>> >
>>> > Thanks a lot for the help!
>>> > Eleanore
>>> >
>>> > On Wed, May 6, 2020 at 8:59 PM Ankur Goenka >> > <mailto:goe...@google.com>> wrote:
>>> >
>>> > The relevant code should bere
>>> > here
>>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595
>>>
>>> >
>>> > Given that the problem goes away after publishing Side
>>> input
>>> > suggests that this might be a problem with synchronizing 2
>>> > streams of data on Flink using Beam.
>>> >
>>> > I am not sure if flink

Re: Behavior of KafkaIO

2020-05-12 Thread Eleanore Jin
Hi Alex,

Thanks a lot for the suggestion, it seems that with my previous experiment,
I did not pre-ingest enough amount of messages. So it looks like each
partition gets a slice of time to be consumed by the same consumer. And
maybe during partition1's time slice, it already drill down to zero, and
hence the observation.

I tried to ingest more data, and I see all of the partitions are making
progress. I will update if I have more findings.

Thanks a lot!
Eleanore

On Tue, May 12, 2020 at 10:44 AM Alexey Romanenko 
wrote:

> Hi Eleanore,
>
> Interesting topic, thank you for more information. I don’t see that this
> is unexpected behavior for KafkaIO since, as Heejong said before,  it
> relies on implementation of KafkaConsumer that is used in your case.
>
> According to KafkaConsumer Javadoc [1], in most cases it should read
> fairly from different partitions in case if one consumer handles several of
> them:
>
> “Consumption Flow Control
>
> If a consumer is assigned multiple partitions to fetch data from, it will
> try to consume from all of them at the same time, effectively giving these
> partitions the same priority for consumption. However in some cases
> consumers may want to first focus on fetching from some subset of the
> assigned partitions at full speed, and only start fetching other partitions
> when these partitions have few or no data to consume.”
>
> Perhaps, you may want to try to change
> fetch.max.bytes or max.partition.fetch.bytes options and see if it will
> help.
>
>
> [1]
> http://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
>
>
>
>
> On 12 May 2020, at 07:52, Eleanore Jin  wrote:
>
> Hi Chamikara and Lee,
>
> Thanks for the information, I did more experiment on my local laptop.
> (Flink Runner local mode, Job Manager and Task Manager runs in the same JVM)
> setup: input topic 4 partitions
> 1. with 1 parallelism: KafkaIO read will drill 1 partition completed to 0
> lags, then move to the another partition
> 2. with 2 parallelism: KafkaIO read will read 2 partitions together, and
> move to the rest of the partitions
> 3. with 4 parallelism: KafkaIO read will read 4 partitions together.
>
> In production, we run multiple Flink Task managers, from the consumer lag
> reported, we also see some partitions goes to 0, while other
> partitions remain high lag.
>
> Thanks!
> Eleanore
>
> On Mon, May 11, 2020 at 8:19 PM Heejong Lee  wrote:
>
>> If we assume that there's only one reader, all partitions are assigned to
>> a single KafkaConsumer. I think the order of reading each partition depends
>> on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns
>> messages.
>>
>> Reference:
>> assigning partitions:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
>> polling records:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
>> creating a record batch:
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614
>>
>> On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath 
>> wrote:
>>
>>> The number of partitions assigned to a given split depends on the
>>> desiredNumSplits value provided by the runner.
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>>
>>> (This is assuming that you are using Beam Kafka source not a native
>>> Flink override).
>>>
>>> Do you see the same behavior when you increase the number of workers of
>>> your Flink cluster ?
>>>
>>> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin 
>>> wrote:
>>>
>>>> Hi community,
>>>>
>>>> In my pipeline, I am using KafkaIO to read and write. The source topic
>>>> has 4 partitions and pipeline parallelism is 1.
>>>>
>>>> I noticed from consumer lag metrics, it will consume from 1 partition
>>>> until all the messages from that partition is processed then it will
>>>> consume from another partition.
>>>>
>>>> Is this the expected behavior?
>>>>
>>>> Runner is Flink.
>>>>
>>>> Thanks a lot!
>>>> Eleanore
>>>>
>>>
>


Re: Behavior of KafkaIO

2020-05-11 Thread Eleanore Jin
Hi Chamikara and Lee,

Thanks for the information, I did more experiment on my local laptop.
(Flink Runner local mode, Job Manager and Task Manager runs in the same JVM)
setup: input topic 4 partitions
1. with 1 parallelism: KafkaIO read will drill 1 partition completed to 0
lags, then move to the another partition
2. with 2 parallelism: KafkaIO read will read 2 partitions together, and
move to the rest of the partitions
3. with 4 parallelism: KafkaIO read will read 4 partitions together.

In production, we run multiple Flink Task managers, from the consumer lag
reported, we also see some partitions goes to 0, while other
partitions remain high lag.

Thanks!
Eleanore

On Mon, May 11, 2020 at 8:19 PM Heejong Lee  wrote:

> If we assume that there's only one reader, all partitions are assigned to
> a single KafkaConsumer. I think the order of reading each partition depends
> on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns
> messages.
>
> Reference:
> assigning partitions:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
> polling records:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
> creating a record batch:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614
>
> On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath 
> wrote:
>
>> The number of partitions assigned to a given split depends on the
>> desiredNumSplits value provided by the runner.
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>
>> (This is assuming that you are using Beam Kafka source not a native Flink
>> override).
>>
>> Do you see the same behavior when you increase the number of workers of
>> your Flink cluster ?
>>
>> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin 
>> wrote:
>>
>>> Hi community,
>>>
>>> In my pipeline, I am using KafkaIO to read and write. The source topic
>>> has 4 partitions and pipeline parallelism is 1.
>>>
>>> I noticed from consumer lag metrics, it will consume from 1 partition
>>> until all the messages from that partition is processed then it will
>>> consume from another partition.
>>>
>>> Is this the expected behavior?
>>>
>>> Runner is Flink.
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>


Behavior of KafkaIO

2020-05-11 Thread Eleanore Jin
Hi community,

In my pipeline, I am using KafkaIO to read and write. The source topic has
4 partitions and pipeline parallelism is 1.

I noticed from consumer lag metrics, it will consume from 1 partition until
all the messages from that partition is processed then it will consume from
another partition.

Is this the expected behavior?

Runner is Flink.

Thanks a lot!
Eleanore


Re: GC overhead limit exceeded

2020-05-11 Thread Eleanore Jin
Hi Max,

No I did not introduce RocksDB at this point since the pipeline is
stateless apart from Kafka offset.

So what we do is to ensure there is a dummy message in the side input to
avoid this situation.

Thanks!
Eleanore

On Mon, May 11, 2020 at 2:57 AM Maximilian Michels  wrote:

> Generally, it is to be expected that the main input is buffered until
> the side input is available. We really have no other option to correctly
> process the data.
>
> Have you tried using RocksDB as the state backend to prevent too much GC
> churn?
>
> -Max
>
> On 07.05.20 06:27, Eleanore Jin wrote:
> > Please see: https://issues.apache.org/jira/browse/BEAM-9914
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Wed, May 6, 2020 at 9:17 PM Ankur Goenka  > <mailto:goe...@google.com>> wrote:
> >
> > Thanks for sharing the response. It makes sense to me.
> > Please file a jira in Beam so that we can prioritize it.
> >
> > Thanks,
> > Ankur
> >
> > On Wed, May 6, 2020 at 9:08 PM Eleanore Jin  > <mailto:eleanore@gmail.com>> wrote:
> >
> > Hi Ankur,
> >
> > Thanks for your response.
> >
> > I also checked with Flink Community, here is there response, in
> > short, flink does not cache the main input data if there is no
> > data available in side input  (flink broadcast stream)
> >
> > - quote from flink community:
> >
> > Coming back to your question, Flink's Broadcast stream does
> > *not* block or collect events from the non-broadcasted side if
> > the broadcast side doesn't serve events.
> > However, the user-implemented operators (Beam or your code in
> > this case) often puts non-broadcasted events into state to wait
> > for input from the other side.
> > Since the error is not about lack of memory, the buffering in
> > Flink state might not be the problem here.
> >
> > Thanks a lot for the help!
> > Eleanore
> >
> > On Wed, May 6, 2020 at 8:59 PM Ankur Goenka  > <mailto:goe...@google.com>> wrote:
> >
> > The relevant code should bere
> > here
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L595
>
> >
> > Given that the problem goes away after publishing Side input
> > suggests that this might be a problem with synchronizing 2
> > streams of data on Flink using Beam.
> >
> > I am not sure if flink optimizer waits for site input to be
> > available before processing the main input. We might
> > potentially handle this on the Beam side as well or use a
> > different set of flink apis to let us do better optimization
> > if possible. In any case this would require a new sdk
> > release if we decide to fix.
> >
> > On Wed, May 6, 2020 at 7:54 PM Eleanore Jin
> > mailto:eleanore@gmail.com>>
> wrote:
> >
> > Hi Ankur,
> >
> > Thanks for the answer! Can you please point to me the
> > source code where the buffering is? I would like to
> > learn how beam works, thanks!
> >
> > To your question, in my case, side input does not have
> > any data, meaning no one publishing to the side input
> > topic.
> >
> > After publishing some data into the side input topic,
> > the OOM goes away.
> >
> > Thanks!
> > Eleanore
> >
> > On Wed, May 6, 2020 at 6:37 PM Ankur Goenka
> > mailto:goe...@google.com>> wrote:
> >
> > Hi Eleanore,
> >
> > The operation requires buffering the data till the
> > data from side input is not available. Which might
> > be causing the OOM issue.
> > You mention that OOM happens when there is no data
> > in side input. Does it mean that the side input is
> > not yet ready or does side input have no data at all?
> >
> > Thanks,
> > Ankur
> >
> > On Tue, May 5, 2020 at 5

Re: Set parallelism for each operator

2020-05-07 Thread Eleanore Jin
Hi John,

Thanks for the information.

Eleanore

On Wed, May 6, 2020 at 11:37 PM John Youngseok Yang 
wrote:

> Hi,
>
> The Nemo runner supports setting parallelism for each PTransform.
> You can configure a Nemo optimization pass that traverses the operators of
> your Beam program, and annotates the ParallelismProperty of operators of
> your choice.
>
> Here is an example pass for setting parallelism:
>
> https://github.com/apache/incubator-nemo/blob/master/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultParallelismPass.java
>
> Thanks,
> John
>
> On 2020/05/06 16:27:19, Alexey Romanenko  wrote:
> > One of the option when reading from topics with a small number of
> partitions could be to do a Reshuffle right after read transform to
> parallelize better other pipeline steps.>
> >
> > We had a discussion in this Jira about that a while ago:>
> > https://issues.apache.org/jira/browse/BEAM-8121 <
> https://issues.apache.org/jira/browse/BEAM-8121>>
> >
> > > On 30 Apr 2020, at 03:56, Eleanore Jin  wrote:>
> > > >
> > > Thanks all for the information! >
> > > >
> > > Eleanore >
> > > >
> > > On Wed, Apr 29, 2020 at 6:36 PM Ankur Goenka  > wrote:>
> > > Beam does support parallelism for the job which applies to all the
> transforms in the job when executing on Flink using the "--parallelism"
> flag.>
> > > >
> > > From the usecase you mentioned, Kafka read operations will be over
> parallelised but it should be ok as they will only have a small amount of
> memory impact in loading some state for kafka client etc.>
> > > Also flink can run multiple operations for the same Job in a single
> task slot so having higher parallelism for lightweight operations should
> not be a problem.>
> > > >
> > > On Wed, Apr 29, 2020 at 6:28 PM Luke Cwik  google.com>> wrote:>
> > > Beam doesn't expose such a thing directly but the FlinkRunner may be
> able to take some pipeline options to configure this.>
> > > >
> > > On Wed, Apr 29, 2020 at 5:51 PM Eleanore Jin  > wrote:>
> > > Hi Kyle, >
> > > >
> > > I am using Flink Runner (v1.8.2)>
> > > >
> > > Thanks!>
> > > Eleanore>
> > > >
> > > On Wed, Apr 29, 2020 at 10:33 AM Kyle Weaver  > wrote:>
> > > Which runner are you using?>
> > > >
> > > On Wed, Apr 29, 2020 at 1:32 PM Eleanore Jin  > wrote:>
> > > Hi all, >
> > > >
> > > I just wonder can Beam allow to set parallelism for each operator
> (PTransform) separately? Flink provides such feature. >
> > > >
> > > The usecase I have is the source is kafka topics, which has less
> partitions, while we have heavy PTransform and would like to scale it with
> more parallelism. >
> > > >
> > > Thanks a lot!>
> > > Eleanore>
> >
> >
>


Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

2020-05-05 Thread Eleanore Jin
Hi Max,

Thanks for the info!

Eleanore

On Tue, May 5, 2020 at 4:01 AM Maximilian Michels  wrote:

> Hey Eleanore,
>
> The change will be part of the 2.21.0 release.
>
> -Max
>
> On 04.05.20 19:14, Eleanore Jin wrote:
> > Hi Max,
> >
> > Thanks for the information and I saw this PR is already merged, just
> > wonder is it backported to the affected versions already
> > (i.e. 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have
> > to wait for the 2.20.1 release?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels  > <mailto:m...@apache.org>> wrote:
> >
> > Hi Eleanore,
> >
> > Exactly-once is not affected but the pipeline can fail to checkpoint
> > after the maximum number of state cells have been reached. We are
> > working on a fix [1].
> >
> > Cheers,
> > Max
> >
> > [1] https://github.com/apache/beam/pull/11478
> >
> > On 22.04.20 07:19, Eleanore Jin wrote:
> > > Hi Maxi,
> > >
> > > I assume this will impact the Exactly Once Semantics that beam
> > provided
> > > as in the KafkaExactlyOnceSink, the processElement method is also
> > > annotated with @RequiresStableInput?
> > >
> > > Thanks a lot!
> > > Eleanore
> > >
> > > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels
> > mailto:m...@apache.org>
> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> > >
> > > Hi Stephen,
> > >
> > > Thanks for reporting the issue! David, good catch!
> > >
> > > I think we have to resort to only using a single state cell for
> > > buffering on checkpoints, instead of using a new one for every
> > > checkpoint. I was under the assumption that, if the state cell
> was
> > > cleared, it would not be checkpointed but that does not seem
> to be
> > > the case.
> > >
> > > Thanks,
> > > Max
> > >
> > > On 21.04.20 09:29, David Morávek wrote:
> > > > Hi Stephen,
> > > >
> > > > nice catch and awesome report! ;) This definitely needs a
> > proper fix.
> > > > I've created a new JIRA to track the issue and will try to
> > resolve it
> > > > soon as this seems critical to me.
> > > >
> > > > https://issues.apache.org/jira/browse/BEAM-9794
> > > >
> > > > Thanks,
> > > > D.
> > > >
> > > > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> > > mailto:stephenpate...@gmail.com>
> > <mailto:stephenpate...@gmail.com <mailto:stephenpate...@gmail.com>>
> > > > <mailto:stephenpate...@gmail.com
> > <mailto:stephenpate...@gmail.com>
> > > <mailto:stephenpate...@gmail.com
> > <mailto:stephenpate...@gmail.com>>>> wrote:
> > > >
> > > > I was able to reproduce this in a unit test:
> > > >
> > > > @Test
> > > >
> > > >   *public* *void* test() *throws*
> InterruptedException,
> > > > ExecutionException {
> > > >
> > > > FlinkPipelineOptions options =
> > > >
> >  PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> > > >
> > > > options.setCheckpointingInterval(10L);
> > > >
> > > > options.setParallelism(1);
> > > >
> > > > options.setStreaming(*true*);
> > > >
> > > > options.setRunner(FlinkRunner.*class*);
> > > >
> > > > options.setFlinkMaster("[local]");
> > > >
> > > > options.setStateBackend(*new*
> > > > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> > > >
> > > > Pipeline pipeline = Pipeline./create/(options);
> > > >
> > > > pipeline
> > > >
&

Re: Set parallelism for each operator

2020-04-29 Thread Eleanore Jin
Thanks all for the information!

Eleanore

On Wed, Apr 29, 2020 at 6:36 PM Ankur Goenka  wrote:

> Beam does support parallelism for the job which applies to all the
> transforms in the job when executing on Flink using the "--parallelism"
> flag.
>
> From the usecase you mentioned, Kafka read operations will be over
> parallelised but it should be ok as they will only have a small amount of
> memory impact in loading some state for kafka client etc.
> Also flink can run multiple operations for the same Job in a single task
> slot so having higher parallelism for lightweight operations should not be
> a problem.
>
> On Wed, Apr 29, 2020 at 6:28 PM Luke Cwik  wrote:
>
>> Beam doesn't expose such a thing directly but the FlinkRunner may be able
>> to take some pipeline options to configure this.
>>
>> On Wed, Apr 29, 2020 at 5:51 PM Eleanore Jin 
>> wrote:
>>
>>> Hi Kyle,
>>>
>>> I am using Flink Runner (v1.8.2)
>>>
>>> Thanks!
>>> Eleanore
>>>
>>> On Wed, Apr 29, 2020 at 10:33 AM Kyle Weaver 
>>> wrote:
>>>
>>>> Which runner are you using?
>>>>
>>>> On Wed, Apr 29, 2020 at 1:32 PM Eleanore Jin 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I just wonder can Beam allow to set parallelism for each operator
>>>>> (PTransform) separately? Flink provides such feature.
>>>>>
>>>>> The usecase I have is the source is kafka topics, which has less
>>>>> partitions, while we have heavy PTransform and would like to scale it with
>>>>> more parallelism.
>>>>>
>>>>> Thanks a lot!
>>>>> Eleanore
>>>>>
>>>>


Re: Set parallelism for each operator

2020-04-29 Thread Eleanore Jin
Hi Kyle,

I am using Flink Runner (v1.8.2)

Thanks!
Eleanore

On Wed, Apr 29, 2020 at 10:33 AM Kyle Weaver  wrote:

> Which runner are you using?
>
> On Wed, Apr 29, 2020 at 1:32 PM Eleanore Jin 
> wrote:
>
>> Hi all,
>>
>> I just wonder can Beam allow to set parallelism for each operator
>> (PTransform) separately? Flink provides such feature.
>>
>> The usecase I have is the source is kafka topics, which has less
>> partitions, while we have heavy PTransform and would like to scale it with
>> more parallelism.
>>
>> Thanks a lot!
>> Eleanore
>>
>


Set parallelism for each operator

2020-04-29 Thread Eleanore Jin
Hi all,

I just wonder can Beam allow to set parallelism for each operator
(PTransform) separately? Flink provides such feature.

The usecase I have is the source is kafka topics, which has less
partitions, while we have heavy PTransform and would like to scale it with
more parallelism.

Thanks a lot!
Eleanore


Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

2020-04-21 Thread Eleanore Jin
Hi Maxi,

I assume this will impact the Exactly Once Semantics that beam provided as
in the KafkaExactlyOnceSink, the processElement method is also annotated
with @RequiresStableInput?

Thanks a lot!
Eleanore

On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels  wrote:

> Hi Stephen,
>
> Thanks for reporting the issue! David, good catch!
>
> I think we have to resort to only using a single state cell for
> buffering on checkpoints, instead of using a new one for every
> checkpoint. I was under the assumption that, if the state cell was
> cleared, it would not be checkpointed but that does not seem to be the
> case.
>
> Thanks,
> Max
>
> On 21.04.20 09:29, David Morávek wrote:
> > Hi Stephen,
> >
> > nice catch and awesome report! ;) This definitely needs a proper fix.
> > I've created a new JIRA to track the issue and will try to resolve it
> > soon as this seems critical to me.
> >
> > https://issues.apache.org/jira/browse/BEAM-9794
> >
> > Thanks,
> > D.
> >
> > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel  > > wrote:
> >
> > I was able to reproduce this in a unit test:
> >
> > @Test
> >
> >   *public* *void* test() *throws* InterruptedException,
> > ExecutionException {
> >
> > FlinkPipelineOptions options =
> > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >
> > options.setCheckpointingInterval(10L);
> >
> > options.setParallelism(1);
> >
> > options.setStreaming(*true*);
> >
> > options.setRunner(FlinkRunner.*class*);
> >
> > options.setFlinkMaster("[local]");
> >
> > options.setStateBackend(*new*
> > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >
> > Pipeline pipeline = Pipeline./create/(options);
> >
> > pipeline
> >
> > .apply(Create./of/((Void) *null*))
> >
> > .apply(
> >
> > ParDo./of/(
> >
> > *new* DoFn() {
> >
> >
> >   *private* *static* *final* *long*
> > */serialVersionUID/* = 1L;
> >
> >
> >   @RequiresStableInput
> >
> >   @ProcessElement
> >
> >   *public* *void* processElement() {}
> >
> > }));
> >
> > pipeline.run();
> >
> >   }
> >
> >
> > It took a while to get to checkpoint 32,767, but eventually it did,
> > and it failed with the same error I listed above.
> >
> > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > mailto:stephenpate...@gmail.com>> wrote:
> >
> > I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> > emr-5.26.0) that uses the RequiresStableInput feature.
> >
> > Currently it's configured to checkpoint once a minute, and after
> > around 32000-33000 checkpoints, it fails with:
> >
> > 2020-04-15 13:15:02,920 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >   - Triggering checkpoint 32701 @ 1586956502911 for job
> > 9953424f21e240112dd23ab4f8320b60.
> > 2020-04-15 13:15:05,762 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >   - Completed checkpoint 32701 for job
> > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667
> ms).
> > 2020-04-15 13:16:02,919 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >   - Triggering checkpoint 32702 @ 1586956562911 for job
> > 9953424f21e240112dd23ab4f8320b60.
> > 2020-04-15 13:16:03,147 INFO
> >  org.apache.flink.runtime.executiongraph.ExecutionGraph
> >-  (1/2)
> > (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to
> > FAILED.
> > AsynchronousException{java.lang.Exception: Could not
> > materialize checkpoint 32702 for operator 
> > (1/2).}
> > at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > at
> >
>  
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > at
> >
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> >
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused 

Re: FlinkStateBackendFactory

2020-03-11 Thread Eleanore Jin
Hi Max,

Thanks a lot for the clarification!

Best
Eleanore

On Wed, Mar 11, 2020 at 11:32 AM Maximilian Michels  wrote:

> Please see my answers inline.
>
> -Max
>
> On 10.03.20 05:02, Eleanore Jin wrote:
> > Hi Max,
> >
> > Thanks for the response! the reason to setup the state backend is to
> > experiment Kafka EOS with Beam running on Flink.  Reading through the
> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can
> > you please help me clarify my understanding?
> >
> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> > EOS, ExactlyOnceWriter processElement method is annotated
> > with @RequiresStableInput, so all the messages will be cached
> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> > messages will be processed by ExactlyOnceWriter?
>
> That's correct.
>
> >
> > 2. Upon checkpoint, will those messages cached by
> > KeyedBufferingEleementsHandler also checkpointed?
>
> Yes, the buffered elements will be checkpointed.
>
> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> > stream processing, the delay is based on the checkpoint interval? How to
> > reduce the latency while still have EOS guarantee?
>
> Indeed, the checkpoint interval and the checkpoint duration limits the
> latency. Given the current design and the guarantees, there is no other
> way to influence the latency.
>
> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> > checkpoint successfully, the checkpointed offset will be committed back
> > to kafka, but if this operation does not finish successfully, and then
> > the job gets cancelled/stopped, and re-submit the job again (with the
> > same consumer group for source topics, but different jobID), then it is
> > possible duplicated processing still exists? because the consumed offset
> > is not committed back to kafka?
>
> This option is for the Kafka consumer. AFAIK this is just a convenience
> method to commit the latest checkpointed offset to Kafka. This offset is
> not used when restoring from a checkpoint. However, if you don't restore
> from a checkpoint, you can resume from that offset which might be
> convenient or not, depending on your use case.
>
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Thu, Mar 5, 2020 at 12:46 AM Maximilian Michels  > <mailto:m...@apache.org>> wrote:
> >
> > Hi Eleanore,
> >
> > Good question. I think the easiest way is to configure this in the
> > Flink
> > configuration file, i.e. flink-conf.yaml. Then you don't need to set
> > anything in Beam.
> >
> > If you want to go with your approach, then just use
> > getClass().getClassLoader() unless you have some custom classloader
> for
> > loading your state backend.
> >
> > Cheers,
> > Max
> >
> > On 04.03.20 01:39, Jin Yi wrote:
> > > Hi Experts,
> > >
> > > I am running Beam application with Flink Runner. I would like to
> set
> > > State Backend to be FsStateBackend instead of MemoryStateBackend.
> > >
> > > in FlinkPipelineOptions.java
> > >
> > > I should be able to call setStateBackendFactory(), but I did not
> find
> > > any provided implementations for FlinkStateBackendFactory
> > interface, so
> > > that means I have to implement my own?
> > >
> > > Thanks a lot!
> > > Eleanore
> > >
> > > /**
> > > * State backend to store Beam's state during computation. Note:
> Only
> > > applicable when executing in
> > > * streaming mode.
> > > */
> > > @Description(
> > >  "Sets the state backend factory to use in streaming mode. "
> > > +"Defaults to the flink cluster's state.backend configuration.")
> > > Class getStateBackendFactory();
> > >
> > > void setStateBackendFactory(Class > FlinkStateBackendFactory> stateBackendFactory);
> > >
> >
>


Re: FlinkStateBackendFactory

2020-03-09 Thread Eleanore Jin
Hi Max,

Thanks for the response! the reason to setup the state backend is to
experiment Kafka EOS with Beam running on Flink.  Reading through the code
and this PR , can you
please help me clarify my understanding?

1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
EOS, ExactlyOnceWriter processElement method is annotated
with @RequiresStableInput, so all the messages will be cached by
KeyedBufferingElementsHandler,
only after checkpoint succeeds, those messages will be processed by
ExactlyOnceWriter?

2. Upon checkpoint, will those messages cached by
KeyedBufferingEleementsHandler also checkpointed?

3. It seems the way Beam provides Kafka EOS will introduce delays in the
stream processing, the delay is based on the checkpoint interval? How to
reduce the latency while still have EOS guarantee?

4. commitOffsetsInFinalize is also enabled, does this mean, upon checkpoint
successfully, the checkpointed offset will be committed back to kafka, but
if this operation does not finish successfully, and then the job gets
cancelled/stopped, and re-submit the job again (with the same consumer
group for source topics, but different jobID), then it is possible
duplicated processing still exists? because the consumed offset is not
committed back to kafka?

Thanks a lot!
Eleanore

On Thu, Mar 5, 2020 at 12:46 AM Maximilian Michels  wrote:

> Hi Eleanore,
>
> Good question. I think the easiest way is to configure this in the Flink
> configuration file, i.e. flink-conf.yaml. Then you don't need to set
> anything in Beam.
>
> If you want to go with your approach, then just use
> getClass().getClassLoader() unless you have some custom classloader for
> loading your state backend.
>
> Cheers,
> Max
>
> On 04.03.20 01:39, Jin Yi wrote:
> > Hi Experts,
> >
> > I am running Beam application with Flink Runner. I would like to set
> > State Backend to be FsStateBackend instead of MemoryStateBackend.
> >
> > in FlinkPipelineOptions.java
> >
> > I should be able to call setStateBackendFactory(), but I did not find
> > any provided implementations for FlinkStateBackendFactory interface, so
> > that means I have to implement my own?
> >
> > Thanks a lot!
> > Eleanore
> >
> > /**
> > * State backend to store Beam's state during computation. Note: Only
> > applicable when executing in
> > * streaming mode.
> > */
> > @Description(
> >  "Sets the state backend factory to use in streaming mode. "
> > +"Defaults to the flink cluster's state.backend configuration.")
> > Class getStateBackendFactory();
> >
> > void setStateBackendFactory(Class
> stateBackendFactory);
> >
>