You must have checkpointing enabled to use the StreamingFileSink. The feature relies on CP for achieving exactly once semantics.
>> This is integrated with the checkpointing mechanism to provide exactly once semantics. https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.html Tim On Mon, May 27, 2019 at 9:27 PM Vishal Sharma <vishal.sha...@grab.com> wrote: > Hello everyone, > > I want to use aws s3 as sink for a data stream in flink. I am using > StreamingFileSink class to create a sink. > > I don't need checkpointing for my job, but when I disable checkpointing, > data is no longer written to S3. > > case 1 : checkpointing enabled > When checkpointing is enabled, the data is successfully ingested to the > mentioned s3 path. > > case 2 : checkpointing disabled > When checkpointing is disabled, the data is not written to s3. > > I tried executing the job multiple times, but every time I got the same > result. I am facing this on local machine as well as on kubernetes cluster. > > > Following is a code I tried having bounded stream - > > object FlinkTestJob { > > def main(args: Array[String]): Unit = { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > // with checkpointing enabled > env.enableCheckpointing(100) > > // Sinks > val streamStrings: Seq[String] = > Seq("test1", "test2", "test3", "test4", "test5", "test6", "test7", > "test8", "test9", "test10") > > val testStream = env.fromCollection(streamStrings) > > val rollingPolicy = new RollingPolicy[String, String] { > > override def shouldRollOnCheckpoint(partFileState: > PartFileInfo[String]): Boolean = > partFileState.getSize > 1 > > override def shouldRollOnEvent( > partFileState: PartFileInfo[String], > element: String): Boolean = true > > override def shouldRollOnProcessingTime( > partFileState: PartFileInfo[String], > currentTime: Long): Boolean = true > } > > val sink: StreamingFileSink[String] = StreamingFileSink > .forRowFormat(new Path("s3a://testbucket/sink"), new > SimpleStringEncoder[String]("UTF-8")) > .withRollingPolicy(rollingPolicy) > .build() > > testStream.addSink(sink) > env.execute("test-job") > } > } > > > When I write to s3 using "writeAsText("s3a://testbucket/sink")" instead of > StreamingFileSink, it works perfectly fine regardless of whether or not > checkpointing is enabled. > > Flink version : 1.8.0 > I want to understand the relation between checkpointing and > StreamingFileSink. > > - Thanks > > *Grab is hiring. Learn more at https://grab.careers > <https://grab.careers/>* > > By communicating with Grab Inc and/or its subsidiaries, associate > companies and jointly controlled entities (“Grab Group”), you are deemed to > have consented to processing of your personal data as set out in the > Privacy Notice which can be viewed at https://grab.com/privacy/ > > This email contains confidential information and is only for the intended > recipient(s). If you are not the intended recipient(s), please do not > disseminate, distribute or copy this email and notify Grab Group > immediately if you have received this by mistake and delete this email from > your system. Email transmission cannot be guaranteed to be secure or > error-free as any information therein could be intercepted, corrupted, > lost, destroyed, delayed or incomplete, or contain viruses. Grab Group do > not accept liability for any errors or omissions in the contents of this > email arises as a result of email transmission. All intellectual property > rights in this email and attachments therein shall remain vested in Grab > Group, unless otherwise provided by law. >