Hi Jacob,

For your first question, I think it is fine to use Java completableFuture
for your case. If we create  lots of threads, of course it would consume
more CPU and influent the processing of records. But in your case, the
close op may not be very heavy. One thing comes to mind is that when the
exception happens during the close, if necessary, you should find a way to
propagated the exception to the flink's main thread. Maybe you can take a
look at kafka connector's codes
<https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java>
for more insights of using flink's mailboxExecutor.

For your second question, it seems to me that your deletion logic is your
customized action when writing to the sink. You can use Java's
ScheduledExecutorService for that. Flink has its timer
<https://nightlies.apache.org/flink/flink-docs-release-1.19/zh/docs/dev/datastream/operators/process_function/#timers>
and
async
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/>
mechanism.  But it is mainly designed for processing records. Maybe they
are not what you are looking for.

Best,
Biao Geng



Jacob Rollings <jacobrolling...@gmail.com> 于2024年4月18日周四 11:06写道:

> Hello,
>
> I have a use case where I need to do a cache file deletion after a
> successful sunk operation(writing to db). My Flink pipeline is built using
> Java. I am contemplating using Java completableFuture.runasync() to perform
> the file deletion activity. I am wondering what issues this might cause in
> terms of thread management and next event processing.
>
> Also related to the same usecase, in another Flink pipeline. I might need
> to do this cache file deletion in a timed fashion. For example, every five
> minutes  I have to check for cache files that are older than currently
> opened cache file that is serving some data into the Sink function. All old
> cache files that are in closed status need to be deleted in a timely manner.
>
> All this deletion has to happen asynchronously without blocking the flow
> of events from Flink source to sink.
> Also, based on requirement, I cannot make the whole Sink operation async.
> I have to make the file based cache  deletion alone async inside the Sink
> function.
>
> Does Flink support timers or async blocks?
>
> Any inputs  will be highly helpful.
>
> Thanks,
> JB
>

Reply via email to