Hi,

this is indeed the correct behaviour right now. Which doesn't mean that it's the behaviour that we would like to have.

The reason why we can't move the "pending" files to "final" is that we don't have a point where we can do this in an idempotent and retryable fashion. When we do regular STREAMING execution we do checkpoints and when the checkpoints complete we can move pending files. If this fails, it will be retried because we still have all the information we need for that in the checkpoint. (It's basically a two-phase commit protocol). When a bounded STREAMING programs simply finishes, we don't have a point where we can do that. A colleague of mine (Yun in cc) is actually working on a proposal to do one "final checkpoint" for exactly this.

We're also working on better support for bounded programs on the DataStream API, I'll try and summarise this below.

A couple of colleagues and I are currently thinking about how we can bring support for good BATCH execution to the DataStream API. The starting point is https://s.apache.org/FLIP-131 which discusses eventual deprecation of the DataSet API, followed by https://s.apache.org/FLIP-134 which outlines the semantics of BATCH execution on the DataStraem API, and https://s.apache.org/FLIP-140 which discusses improved runtime behaviour for BATCH programs on the DataStream API.

The last piece of the puzzle will be sinks, which also need to work well for both BATCH and STREAMING programs on the DataStream API. We're expecting to publish a FLIP for this shortly.

Best,
Aljoscha

On 07.09.20 19:29, Ken Krugler wrote:
Hi Fred,

I think this is the current behavior (though it would be helpful to know which 
version of Flink you’re using).

 From an email conversation with Kostas in January of this year:

Hi Ken, Jingsong and Li,

Sorry for the late reply.

As Jingsong pointed out, upon calling close() the StreamingFileSink
does not commit the in-progress/pending files.
The reason for this is that the close() method of any UDF including
sink functions is called on both normal termination and termination
due to failure.
Given this, we cannot commit the files, because in case of failure
they should be reverted.

Actually we are currently updating the StreamingFileSink docs to
includes this among other things.
Also the differentiation between normal termination and termination
due to failure will hopefully be part of Flink 1.11 and
this is the FLIP to check
https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs
 
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-46%3A+Graceful+Shutdown+Handling+by+UDFs>.

Cheers,
Kostas

Though it looks like FLIP-46 is still under discussion, and thus 1.11 doesn’t 
have a fix for this?

— Ken

On Sep 7, 2020, at 8:39 AM, Teunissen, F.G.J. (Fred) <fred.teunis...@ing.com 
<mailto:fred.teunis...@ing.com>> wrote:

Hi All,
My flink-job is using bounded input sources and writes the results to a StreamingFileSink.
When it has processed all the input the job is finished and closes. But the 
output files are still
named “<prefix>-0-0.<ext>.inprogress.<guid>”. I expected them to be named 
““<prefix>-0-0.<ext>”.

Did I forget some setting or something else?
Regards,
Fred
-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended 
recipient. If you are not the intended recipient, don't use or disclose it in 
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------

--------------------------
Ken Krugler
http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Reply via email to