You must have checkpointing enabled to use the StreamingFileSink.  The
feature relies on CP for achieving exactly once semantics.

>>  This is integrated with the checkpointing mechanism to provide exactly
once semantics.

https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.html


Tim

On Mon, May 27, 2019 at 9:27 PM Vishal Sharma <vishal.sha...@grab.com>
wrote:

> Hello everyone,
>
> I want to use aws s3 as sink for a data stream in flink. I am using
> StreamingFileSink class to create a sink.
>
> I don't need checkpointing for my job, but when I disable checkpointing,
> data is no longer written to S3.
>
> case 1 : checkpointing enabled
> When checkpointing is enabled, the data is successfully ingested to the
> mentioned s3 path.
>
> case 2 : checkpointing disabled
> When checkpointing is disabled, the data is not written to s3.
>
> I tried executing the job multiple times, but every time I got the same
> result. I am facing this on local machine as well as on kubernetes cluster.
>
>
> Following is a code I tried having bounded stream -
>
> object FlinkTestJob {
>
>   def main(args: Array[String]): Unit = {
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>     // with checkpointing enabled
>     env.enableCheckpointing(100)
>
>     // Sinks
>     val streamStrings: Seq[String] =
>       Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", 
> "test8", "test9", "test10")
>
>     val testStream = env.fromCollection(streamStrings)
>
>     val rollingPolicy = new RollingPolicy[String, String] {
>
>       override def shouldRollOnCheckpoint(partFileState: 
> PartFileInfo[String]): Boolean =
>         partFileState.getSize > 1
>
>       override def shouldRollOnEvent(
>           partFileState: PartFileInfo[String],
>           element: String): Boolean = true
>
>       override def shouldRollOnProcessingTime(
>           partFileState: PartFileInfo[String],
>           currentTime: Long): Boolean = true
>     }
>
>     val sink: StreamingFileSink[String] = StreamingFileSink
>       .forRowFormat(new Path("s3a://testbucket/sink"), new 
> SimpleStringEncoder[String]("UTF-8"))
>       .withRollingPolicy(rollingPolicy)
>       .build()
>
>     testStream.addSink(sink)
>     env.execute("test-job")
>   }
> }
>
>
> When I write to s3 using "writeAsText("s3a://testbucket/sink")" instead of
> StreamingFileSink, it works perfectly fine regardless of whether or not
> checkpointing is enabled.
>
> Flink version : 1.8.0
> I want to understand the relation between checkpointing and
> StreamingFileSink.
>
> - Thanks
>
> *Grab is hiring. Learn more at https://grab.careers
> <https://grab.careers/>*
>
> By communicating with Grab Inc and/or its subsidiaries, associate
> companies and jointly controlled entities (“Grab Group”), you are deemed to
> have consented to processing of your personal data as set out in the
> Privacy Notice which can be viewed at https://grab.com/privacy/
>
> This email contains confidential information and is only for the intended
> recipient(s). If you are not the intended recipient(s), please do not
> disseminate, distribute or copy this email and notify Grab Group
> immediately if you have received this by mistake and delete this email from
> your system. Email transmission cannot be guaranteed to be secure or
> error-free as any information therein could be intercepted, corrupted,
> lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do
> not accept liability for any errors or omissions in the contents of this
> email arises as a result of email transmission. All intellectual property
> rights in this email and attachments therein shall remain vested in Grab
> Group, unless otherwise provided by law.
>

Reply via email to