RE: Batching in kinesis sink

2022-05-12 Thread Teoh, Hong
Hi Zain,

For Flink 1.13, we use the KinesisProducerLibrary. If you are using 
aggregation, you can control the maximum size of aggregated records by 
configuring the AggregationMaxSize in the producer config when constructing the 
FlinkKinesisProducer. (See [1] for more docs)


producerConfig.put("AggregationMaxSize", "1048576”);

However, since the default value is actually <1MB here, I doubt this is the 
issue. A possibility I can think of is that a single record is larger than 1MB, 
so the aggregation limit doesn’t apply. If this is the case, the way forward 
would be to change the record size to be lower than 1MB.

In general, I would recommend upgrading to Flink 1.15 and using the newer 
KinesisStreamsSink. That sink is more configurable (see below and see [2] for 
more docs), and will surface the problem explicitly if the issue is really that 
a single record is larger than 1MB.

(Note that we use the PutRecords API, so individual records still need to be 
smaller than 1MB, but batches can be up to 5MB) See [3] for more info.


.setMaxBatchSizeInBytes(5 * 1024 * 1024)
.setMaxRecordSizeInBytes(1 * 1024 * 1024)



Thanks,
Hong


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/#kinesis-producer
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/#kinesis-streams-sink
[3] https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html




On 2022/05/12 10:30:47 Zain Haider Nemati wrote:
> Hi,
> I am using a kinesis sink with flink 1.13.
> The amount of data is in millions and it choke the 1MB cap for kinesis data
> streams.
> Is there any way to send data to kinesis sink in batches of less than 1MB?
> or any other workaround
>


Re: Flink Job Manager unable to recognize Task Manager Available slots

2022-05-24 Thread Teoh, Hong
Hi Sunitha,

Without more information about your setup, I would assume you are trying to 
return JobManager (and HA setup) into a stable state. A couple of questions:

  *   Since your job is cancelled, I would assume that the current job’s HA 
state is not important, so we can delete the checkpoint pointer and data.
  *   Are there other jobs running on the same cluster whose HA state you want 
to salvage?

I can think of the following options:

  1.  If there are no other jobs running on the same cluster, and the HA state 
is not important, the easiest way is to totally replace your Zookeeper 
instances. (this will start the JobManager afresh, but will cause the HA state 
for all other jobs running on the same cluster to be lost)
  2.  Manually clear the Zookeeper HA state for the problematic job. This will 
keep the HA state of other jobs running on the same cluster.

To perform step 2, see below:
The zookeeper stores “Active” jobs in a znode hierarchy as shown below (You can 
imagine this like a pseudo file system). I am assuming the jobid you have 
pasted in logs.


  *   /flink/default/running_job_registry/3a97d1d50f663027ae81efe0f0aa
This has the status of the job (e.g. RUNNING)

  *   /flink/default/leader/resource_manager_lock
This has the information about which JM has the ResourceManager (which is the 
component responsible for registering the task slots in the cluster

There are other znodes as well, which are all interesting (e.g. 
/flink/default/checkpoints, /flink/default/checkpoint-counter), but I’ve 
highlighted the relevant ones.

To clear this, you can simply log unto your zookeeper nodes, and delete the 
znodes. The JobManager will repopulate them when the job starts up.

  1.  Log unto your zookeeper nodes (e.g. execute into your zookeeper container)
  2.  Execute the zookeeper CLI. This usually comes prepackaged with zookeeper, 
and you can simply run the pre-packaged script bin/zkCli.sh.

Explore the pseudo-file system by doing ls or get (e.g. ls /flink/default )

  3.  You can delete the znodes associated to your job

rmr /flink/default/running_job_registry/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/jobgraphs/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/checkpoints/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/checkpoint-counter/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/leaderlatch/3a97d1d50f663027ae81efe0f0aa
rmr /flink/default/leader/3a97d1d50f663027ae81efe0f0aa

This should result in your JobManager recovering from the faulty job.

Regards,
Hong






From: "s_penakalap...@yahoo.com" 
Date: Tuesday, 24 May 2022 at 18:40
To: User 
Subject: RE: [EXTERNAL]Flink Job Manager unable to recognize Task Manager 
Available slots


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi Team,

Any inputs please badly stuck.

Regards,
Sunitha

On Sunday, May 22, 2022, 12:34:22 AM GMT+5:30, s_penakalap...@yahoo.com 
 wrote:


Hi All,

Help please!

We have standalone Flink service installed in individual VM and clubed to form 
a cluster with HA and checkpoint in place. When cancelling Job, Flink cluster 
went down and its unable to start up normally as Job manager is continuously 
going down with the below error:

2022-05-21 14:33:09,314 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 
3a97d1d50f663027ae81efe0f0aa.

Each attempt to restart cluster failed with the same error so the whole cluster 
became unrecoverable and not operating, please help on the below points:
1> In which Fink/zookeeper folder job recovery details are stored and how can 
we clear all old job instance so that Flink cluster will not try to recover and 
start fresh to manually submit all job.

2> Since cluster is HA, we have 2 Job manager's even though one JM is going 
down Flink is started but available slots are showing up as 0 (task manager's 
are up but not displayed in web UI).

Regards
Sunitha.



Re: Using RocksDBStateBackend and SSD to store states, application runs slower..

2022-07-21 Thread Teoh, Hong
Hi,

I’d say it seems you are trying to identify bottlenecks in your job, and are 
currently looking at RocksDB Disk I/O as one of the bottlenecks. However, there 
are also other bottlenecks (e.g. CPU/memory/network/sink throttling), and from 
what you described, it’s possible that the HDFS sink is the bottleneck. Are you 
using Flink >= 1.13? If so you can use Flamegraphs on the Flink dashboard to 
debug what the busy operator is doing.

Regards,
Hong



From: Jing Ge 
Date: Thursday, 21 July 2022 at 21:14
To: Yaroslav Tkachenko 
Cc: vtygoss , "user@flink.apache.org" 
Subject: RE: [EXTERNAL]Using RocksDBStateBackend and SSD to store states, 
application runs slower..


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi,

using FLASH_SSD_OPTIMIZED already sets the number of threads to 4. This 
optimization can improve the source throughput and reduce the delayed wrate 
rate.

If this optimization didn't fix the back pressure, could you share more 
information about your job? Could you check the metric of the back pressured 
operator, e.g. check if it is caused by write-heavy or read-heavy tasks? You 
could try tuning rocksdb.writebuffer for write-heavy tasks.

On Thu, Jul 21, 2022 at 5:59 PM Yaroslav Tkachenko 
mailto:yaros...@goldsky.io>> wrote:
Hi!

I'd try re-running the SSD test with the following config options:

state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.predefined-options: FLASH_SSD_OPTIMIZED


On Thu, Jul 21, 2022 at 4:11 AM vtygoss 
mailto:vtyg...@126.com>> wrote:

Hi, community!



I am doing some performance tests based on my scene.



1. Environment

- Flink: 1.13.5

- StateBackend: RocksDB, incremental

- user case: complex sql contains 7 joins and 2 aggregation, input data 
30,000,000 records and output 60,000,000 records about 80GB.

- resource: flink on yarn. JM 2G, one TM 24G(8G on-heap, 16G off-heap). 3 slots 
per TM

- only difference: different config 'state.backend.rocksdb.localdir', one SATA 
disk or one SSD disk.



2. rand write performance difference between SATA and SSD

   4.8M/s is archived using SATA, while 48.2M/s using SSD.

   ```

   fio -direct=1 -iodepth 64 -thread -rw=randwrite -ioengine=sync  -fsync=1 
-runtime=300 -group_reporting -name=xxx -size=100G --allow_mounted_write=1 
-bs=8k  -numjobs=64 -filename=/mnt/disk11/xx

   ```



3. In my use case, Flink SQL application finished in 41minutes using SATA, 
while 45minutes using SSD.



Does this comparision suggest that the way to improve RocksDB performance by 
using SSD is not effective?

The direct downstream of the BackPressure operator is HdfsSink, does that mean 
the best target to improve application performance is HDFS?



Thanks for your any replies or suggestions.



Best Regards!














Re: Is Flink SQL a good fit for alerting?

2022-07-27 Thread Teoh, Hong
Re-pasting from Slack

[cid:image001.png@01D8A1E9.DA582010]
Hong Teoh  7 hours 
ago
I can give some examples, but they are all using DataStream API
https://aws.amazon.com/blogs/big-data/building-a-real-time-notification-system-with-[…]-dynamodb-and-amazon-kinesis-data-analytics-for-apache-flink/
https://aws.amazon.com/blogs/big-data/real-time-bushfire-alerting-with-complex-event[…]cessing-in-apache-flink-on-amazon-emr-and-iot-sensor-network/
FlinkSQL is quite powerful though, are there any operations that you would like 
that is not currently supported in SQL?

[cid:image002.jpg@01D8A1E9.DA582010]
salvalcantara  8 hours 
ago
Thanks a lot @Hong Teoh! For 
my use case, Flink SQL should be capable enough...what worries me is how to 
manage/deploy those alerts, if implemented as SQL scripts. In particular, 
having one sql job per user alert looks impractical...even if deployed on the 
same cluster (session mode?). (edited)

[cid:image001.png@01D8A1E9.DA582010]
Hong Teoh  7 hours 
ago
I see…Probably I’d try to design the job to not have to change per user, but 
use the user as a key [:thinking_face:] Or at least split it into typical job 
families, with filters for the “types” of users that should be following each 
code pathIf you have to have a custom job graph per user, sounds like you want 
to design some form of Platform to run Flink jobs in general…

[cid:image002.jpg@01D8A1E9.DA582010]
salvalcantara  7 hours 
ago
yeah...the thing is that I need alerts to run as separate jobs so that I can 
enable/disable specific alerts without affecting the others... (edited)

[cid:image002.jpg@01D8A1E9.DA582010]
salvalcantara  7 hours 
ago
Or...maybe a user changes the definition for a given alert, I just want to 
redeploy this specific alert definition, without affecting the others which 
should continue running without interruption

[cid:image001.png@01D8A1E9.DA582010]
Hong Teoh  7 hours 
ago
Maybe consider having a control stream (with user-key and enable/disable 
field), that can update an in-memory table?OR.. use a lookup join? 
https://github.com/ververica/flink-sql-cookbook/blob/main/joins/04_lookup_joins/04_lookup_joins.md

[cid:image002.jpg@01D8A1E9.DA582010]
salvalcantara  7 hours 
ago
From what I'm seeing...Flink SQL is very good for doing adhoc / low-in-code 
analytics here and there but I don't think it could tackle my use case...having 
said that, I might be wrong since I'm just getting started with Flink SQL...

[cid:image001.png@01D8A1E9.DA582010]
Hong Teoh  7 hours 
ago
That way you can adjust the “user-specific” configuration in the external 
database without redeploying the job

[cid:image002.jpg@01D8A1E9.DA582010]
salvalcantara  7 hours 
ago
there are two features that I like from Flink SQL that I thought could 
be very useful for alerting purposes: JSON Functions & MATCH_RECOGNIZE (CEP)

[cid:image002.jpg@01D8A1E9.DA582010]
salvalcantara  7 hours 
ago
coming back to your comment, I guess that I should not try to implement each 
alert as a separate (self-contained) job (SQL script) but inste

Re: How to write custom serializer for dynamodb connector

2022-11-08 Thread Teoh, Hong
Hi Matt,

First of all, awesome that you are using the DynamoDB sink!

To resolve your issue with serialization in the DDB sink, you are right, the 
issue only happens when you create the AttributeValue object in a previous 
operator and send it to the sink.
The issue here is with serializing of ImmutableMap. Kryo tries to call the 
put(), which is unsupported since its immutable, so you can register a specific 
serializer for it. Like below:

env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class, 
ImmutableMapSerializer.class);

You can get ImmutableMapSerializer.class from a pre-package library like this: 
https://github.com/magro/kryo-serializers
Just add the following to your pom.xml


de.javakaffee
kryo-serializers
0.45


Regarding resources, I find the following helpful:

  *   Article on 
serialization<-%09https:/alibaba-cloud.medium.com/data-types-and-serialization-flink-advanced-tutorials-b363241c8836>
  *   The FlinkForward youtube channel has a couple of useful deep dives on 
Flink in general : 
https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/playlists

Hope the above helps.


A more general question on your use case, what is the reason you want to 
generate the AttributeValue in a previous operator rather than in the sink 
directly? Is it for some dynamic generation of objects to write into DDB?

Regards,
Hong


From: Matt Fysh 
Date: Tuesday, 8 November 2022 at 14:04
To: User 
Subject: [EXTERNAL] How to write custom serializer for dynamodb connector


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


I'm attempting to use the dynamodb sink located at 
https://github.com/apache/flink-connector-aws

The 
example
 in the repo is working as expected, however when I try to create a nested data 
structure, I receive a Kryo serialization error message:

Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException
Serialization trace:
m (software.amazon.awssdk.services.dynamodb.model.AttributeValue)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)

The value that cannot be serialized is produced by this code:
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

AttributeValue.builder().m(
  ImmutableMap.of(
"innerkey", AttributeValue.builder().s("innervalue").build()
  )
).build();

There are tests in the connector 
repo
 for nested map structures, but they do not test that the structure can be 
ser/de by Flink, which I believe occurs when the operator that produces the 
value is separate to the sink operator.

Given that this is a fairly simple data type, I should be able to register a 
custom serializer with Flink, but since I'm new to java I'm having trouble 
making sense of the 
docs
 and was hoping to find someone more knowledgeable in this area for some 
pointers on what else I could start reading

Thanks
Matt


Re: OOM taskmanager

2023-01-26 Thread Teoh, Hong
Hi Marco,

When you say OOM, I assume you mean TM pod being OOMKilled, is that correct? If 
so, this usually means that the TM is using more than the actual memory 
allocated to the pod. First I would check your memory configuration to figure 
out where this extra memory use is coming from. This is a non trivial task, and 
I’ll list down some common situations I’ve seen tin the past to get you started.


  *   Misconfigured process memory. Flink configuration of 
`taskmanager.memory.process.size` will set the memory of the entire TM, which 
Flink will use and break down into smaller buckets. IF this is higher than 
memory resource of container, this will cause OOMKilled situations
  *   User code has memory leak (e.g. spins up too many threads). Would be 
useful to test the Flink job you have on a local cluster and monitor the memory 
use.
  *   State backend (if you use rocksdb) using too much memory.

You can also look at [1] and [2] for more information.

Regards,
Hong

[1] Talk on Flink memory utilisation https://www.youtube.com/watch?v=F5yKSznkls8
[2] Flink description of TM memory breakdown 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/


From: marco andreas 
Date: Wednesday, 25 January 2023 at 19:57
To: user 
Subject: [EXTERNAL] OOM taskmanager


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.




Hello,

We are deploying a flink application cluster in kubernetes, 2 pods one for the 
JM and the other for the TM.

The problem is when we launch load tests we see that task manager memory usage 
increases,  after the tests  are finished and flink stop processing data the 
memory usage never comes down where it was before, eventually when we launch 
tests again and again the memory of TM continues to grow until it reaches the 
memory resource limit specified in the container templates and it get killed 
because of OOM.


Has anyone faced the same issue and what is the best way to investigate this 
error in order to know the root cause of why the memory usage of the TM never 
comes down when flink finishes processing.

FLink version is 1.16.0.
Thanks,


Re: AsyncFunction vs Async Sink

2023-06-14 Thread Teoh, Hong
Hi Lu,

Thanks for your question. See below for my understanding.

I would recommend using the Async Sink if you are writing to the external 
service as the final output of your job graph, and if you don’t have the 
ordered requirement that updates to the external system must be done before 
updates to some other external system within the same job graph. (More 
explained later).

The abstraction of the Async Sink is a sink, meaning it is a terminal operator 
in the job graph. The abstraction is intended to simplify the writing of a sink 
- meaning the base implementation will handle batching, state management and 
rate limiting. You only need to provide the client and request structure to be 
used to interact with the external service. This makes writing and maintaining 
the sink easier (if you simply want to write to a destination with at least 
once processing).

The AsyncFunction, as I understand it is more used for data enrichment, and is 
not a terminal operator in the job graph. This means the return value from the 
external service will continue to be passed on down the Flink Job graph. This 
is useful for data enrichment using the external service, or if we want to 
ensure the system being called in the AsyncFunction is updated BEFORE any data 
is written to the sinks further down the job graph.

For example:

Kinesis Source -> Map -> AsyncFunction (Updates DynamoDB) -> Kinesis Sink

We can be sure that the updates to DynamoDB for a particular record happens 
before the record is written to the Kinesis Sink.


Hope the above clarifies your question!

Regards,
Hong


On 14 Jun 2023, at 19:27, Lu Niu  wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi, Flink dev and users

If I want to async write to an external service, which API shall I use, 
AsyncFunction or Async Sink?

My understanding after checking the code are:

  1.  Both APIs guarantee at least once write to external service. As both API 
internally stores in-flight requests in the checkpoint.
  2.  Async Sink provides a batching request feature. This can be implemented 
with Map + AsyncFunction. Map function groups requests in batches and pass it 
to AsyncFunction.The batching implementation can refer to 
AbstractMapBundleOperator if don’t want to use state.
  3.  Async Sink supports retry on failed requests. AsyncFunction also supports 
retry in latest flink version.
  4.  Async Sink supports rate limiting, AsyncFunction doesn’t.
  5.  AsyncFunction can be used to implement read-update-write. Async Sink 
cannot.

Best

Lu



Re: Async IO operator to write to DB

2023-06-14 Thread Teoh, Hong
Hi Karthik,

As an additional update, since Flink 1.15 we have introduced the asynchronous 
sink base, which allows easy writing of an asynchronous sink (you simply 
provide the client, request and response handling) and the sink base will 
handle the state management, retries and batching.

See the blogpost [1] for more details!

Hope the above helps.

Regards,
Hong

[1] https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/


On 13 Jun 2023, at 05:29, Karthik Deivasigamani  wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Thanks Martijn, the documentation for Async IO was also indicating the same and 
that's what prompted me to post this question here.
~
Karthik

On Mon, Jun 12, 2023 at 7:45 PM Martijn Visser 
mailto:martijnvis...@apache.org>> wrote:
Hi Karthik,

In my opinion, it makes more sense to use a sink to leverage Scylla over using 
Async IO. The primary use case for Async IO is enrichment, not for writing to a 
sync.

Best regards,

Martijn

On Mon, Jun 12, 2023 at 4:10 PM Karthik Deivasigamani 
mailto:karthi...@gmail.com>> wrote:
Thanks Martijn for your response.
One thing I did not mention was that we are in the process of moving away from 
Cassandra to Scylla and would like to use the Scylla Java Driver for the 
following reason :

The Scylla Java driver is shard aware and contains extensions for a 
tokenAwareHostPolicy. Using this policy, the driver can select a connection to 
a particular shard based on the shard’s token. As a result, latency is 
significantly reduced because there is no need to pass data between the shards.

We were considering writing our own Sink to leverage Scylla Java Driver once 
the migration is done.
~
Karthik


On Mon, Jun 12, 2023 at 4:56 PM Martijn Visser 
mailto:martijnvis...@apache.org>> wrote:
Hi,

Why wouldn't you just use the Flink Kafka connector and the Flink Cassandra 
connector for your use case?

Best regards,

Martijn

On Mon, Jun 12, 2023 at 12:03 PM Karthik Deivasigamani 
mailto:karthi...@gmail.com>> wrote:
Hi,
   I have a use case where I need to read messages from a Kafka topic, parse it 
and write it to a database (Cassandra). Since Cassandra supports async APIs I 
was considering using Async IO operator for my writes. I do not need 
exactly-once semantics for my use-case.
Is it okay to leverage the Async IO operator as a Sink (writing data into a DB)?
~
Karthik



Re: Interaction between idling sources and watermark alignment

2023-06-15 Thread Teoh, Hong
Hi Alexis, below is my understanding:


> I see that, in Flink 1.17.1, watermark alignment will be supported (as beta) 
> within a single source's splits and across different sources. I don't see 
> this explicitly mentioned in the documentation, but I assume that the concept 
> of "maximal drift" used for alignment also takes idleness into account, 
> resuming any readers that were paused due to an idle split or source. Is my 
> understanding correct?

As far as I understand, the evaluation to “unpause” a given split that might 
have been paused due to watermark alignment is evaluated at fixed intervals 
here. [1]

We see that the SourceCoordinator calls announceCombinedWatermark() that 
calculates the global watermark and that subsequently sends a 
WatermarkAlignmentEvent to each subtask. On each subtask, there is an 
evaluation of whether to “wake up” the operator. [2] [3]

This means that there is a periodic evaluation of whether to “wake up”, 
controlled by the update interval, which defaults to 1s [4]

> Also, something that isn't 100% clear to me when comparing to the previous 
> watermark alignment documentation, even if I only wanted alignment within a 
> single source's splits, I still need to call withWatermarkAlignment in the 
> watermark strategy, right? Otherwise alignment will not take place, 
> regardless of pipeline.watermark-alignment.allow-unaligned-source-splits.

Yes, this is correct. Watermark groups are used to check whether multiple 
sources need to coordinate watermarks. If two sources A and B both belong to 
the same watermark group, then their watermarks will be aligned.

Hope the above helps.


Cheers,
Hong


[1] 
https://github.com/apache/flink/blob/45ba7ee87caee63a0babfd421b7c5eabaa779baa/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L160
[2] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L556-L559
[3] 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java#L659
[4] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarksWithWatermarkAlignment.java#L29



On 13 Jun 2023, at 21:08, Alexis Sarda-Espinosa  
wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hi again, I'm not a fan of bumping questions, but I think this might be 
relevant, maybe enough to include it in the official documentation?

Regards,
Alexis.

On Tue, 30 May 2023, 16:07 Alexis Sarda-Espinosa, 
mailto:sarda.espin...@gmail.com>> wrote:
Hello,

I see that, in Flink 1.17.1, watermark alignment will be supported (as beta) 
within a single source's splits and across different sources. I don't see this 
explicitly mentioned in the documentation, but I assume that the concept of 
"maximal drift" used for alignment also takes idleness into account, resuming 
any readers that were paused due to an idle split or source. Is my 
understanding correct?

Also, something that isn't 100% clear to me when comparing to the previous 
watermark alignment documentation, even if I only wanted alignment within a 
single source's splits, I still need to call withWatermarkAlignment in the 
watermark strategy, right? Otherwise alignment will not take place, regardless 
of pipeline.watermark-alignment.allow-unaligned-source-splits.

Regards,
Alexis.




Re: AsyncFunction vs Async Sink

2023-06-15 Thread Teoh, Hong
Hi Lu,

> 1. Is there any problem if we use Async Function for such a user case? We can 
> simply drop the output and use Unordered mode.


As far as I can tell, it is similar, other than the retry strategy available 
for AsyncFunctions and batching for Async Sink. Both should work on Flink.


> 2. For AsyncFunction and  Async Sink. does it make sense that both could 
> share the same underlying implementation and the features like batching and 
> rate limiting can benefit both?

Good question - I think there are quite a lot of similarities, that’s why the 
interface is similar. However, I think the end use-case is different. For 
example, AsyncSink might want to implement support for some form of 
2phase-commit on Sink (at least once guarantee). This is slightly more 
complicated on AsyncFunction.



Regards,
Hong



On 15 Jun 2023, at 00:26, Lu Niu  wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Thanks, Hong!

I understand that if the user case is to simply write sth to an external 
service, Async Sink is a good option that provides features like batching, 
state management and rate limiting. I have some follow up questions:

1. Is there any problem if we use Async Function for such a user case? We can 
simply drop the output and use Unordered mode.
2. For AsyncFunction and  Async Sink. does it make sense that both could share 
the same underlying implementation and the features like batching and rate 
limiting can benefit both?

Best
Lu


On Wed, Jun 14, 2023 at 2:20 PM Teoh, Hong 
mailto:lian...@amazon.co.uk>> wrote:
Hi Lu,

Thanks for your question. See below for my understanding.

I would recommend using the Async Sink if you are writing to the external 
service as the final output of your job graph, and if you don’t have the 
ordered requirement that updates to the external system must be done before 
updates to some other external system within the same job graph. (More 
explained later).

The abstraction of the Async Sink is a sink, meaning it is a terminal operator 
in the job graph. The abstraction is intended to simplify the writing of a sink 
- meaning the base implementation will handle batching, state management and 
rate limiting. You only need to provide the client and request structure to be 
used to interact with the external service. This makes writing and maintaining 
the sink easier (if you simply want to write to a destination with at least 
once processing).

The AsyncFunction, as I understand it is more used for data enrichment, and is 
not a terminal operator in the job graph. This means the return value from the 
external service will continue to be passed on down the Flink Job graph. This 
is useful for data enrichment using the external service, or if we want to 
ensure the system being called in the AsyncFunction is updated BEFORE any data 
is written to the sinks further down the job graph.

For example:

Kinesis Source -> Map -> AsyncFunction (Updates DynamoDB) -> Kinesis Sink

We can be sure that the updates to DynamoDB for a particular record happens 
before the record is written to the Kinesis Sink.


Hope the above clarifies your question!

Regards,
Hong


On 14 Jun 2023, at 19:27, Lu Niu mailto:qqib...@gmail.com>> 
wrote:


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi, Flink dev and users

If I want to async write to an external service, which API shall I use, 
AsyncFunction or Async Sink?

My understanding after checking the code are:

  1.  Both APIs guarantee at least once write to external service. As both API 
internally stores in-flight requests in the checkpoint.
  2.  Async Sink provides a batching request feature. This can be implemented 
with Map + AsyncFunction. Map function groups requests in batches and pass it 
to AsyncFunction.The batching implementation can refer to 
AbstractMapBundleOperator if don’t want to use state.
  3.  Async Sink supports retry on failed requests. AsyncFunction also supports 
retry in latest flink version.
  4.  Async Sink supports rate limiting, AsyncFunction doesn’t.
  5.  AsyncFunction can be used to implement read-update-write. Async Sink 
cannot.

Best

Lu