Hi Robert, Due to the performance issue of using state processor, I probably would like to give up state processor and am trying StreamingFileSink in a streaming manner. However, I need to store the files on GCS. However, I encountered the error below. It looks like Flink hasn't support GCS for StreamingFileSink (https://issues.apache.org/jira/browse/FLINK-11838). If you know any solution to this issue, please let me know. Thanks. java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:60)
Best regards, Chen-Che On 2021/04/16 06:53:37, Robert Metzger <rmetz...@apache.org> wrote: > Hi, > I assumed you are using the DataStream API, because you mentioned the > streaming sink. But you also mentioned the state processor API (which I > ignored a bit). > > I wonder why you are using the state processor API. Can't you use the > streaming job that created the state also for writing it to files using the > StreamingFileSink? > > If you want to stick to the DataSet API, then I guess you have to implement > a custom (File)OutputFormat. > > > On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang <acmic...@gmail.com> wrote: > > > Hi Robert, > > > > Thanks for your code. It's really helpful! > > > > However, with the readKeyedState api of state processor, we get dataset > > for our data instead of datastream and it seems the dataset doesn't support > > streamfilesink (not addSink method like datastream). If not, I need to > > transform the dataset to a datastream. I'm not sure it's doable based on > > https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628. > > If it's doable, then I'll be able to solve our problem with applying > > streamfilesink to the transformed dataset. > > > > Best wishes, > > Chen-Che Huang > > > > On 2021/04/15 19:23:43, Robert Metzger <rmetz...@apache.org> wrote: > > > Hey Chen-Che Huang, > > > > > > I guess the StreamingFileSink is what you are looking for. It is > > documented > > > here: > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > > I drafted a short example (that is not production ready), which does > > > roughly what you are asking for: > > > https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d > > > > > > Hope this helps! > > > > > > Best, > > > Robert > > > > > > > > > On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang <acmic...@gmail.com> > > wrote: > > > > > > > Hi all, > > > > > > > > We're going to use state processor to make our keyedstate data to be > > > > written to different files based on the keys. More specifically, we > > want > > > > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt > > where > > > > the value with the same key is stored in the same file. In each file, > > the > > > > data may be stored as follows. As far as I know, I need to implement > > my own > > > > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction) > > to > > > > meet the requirement. However, I wonder is there a native way to > > achieve > > > > this without implementing my own Sink because using official solution > > is > > > > usually more efficient and reliable than doing it by myself. Many > > thanks > > > > for any comment. > > > > > > > > key1.txt > > > > key1 value11 > > > > key1 value21 > > > > key1 value31 > > > > > > > > key2.txt > > > > key2 value21 > > > > key2 value22 > > > > key2 value23 > > > > > > > > Best wishes, > > > > Chen-Che Huang > > > > > > > > > >