Re: Question about state processor data outputs

2021-05-05 Thread Chen-Che Huang
Hi Robert,

Due to the performance issue of using state processor, I probably would like to 
give up state processor and am trying StreamingFileSink in a streaming manner. 
However, I need to store the files on GCS. However, I encountered the error 
below. It looks like Flink hasn't support GCS for StreamingFileSink 
(https://issues.apache.org/jira/browse/FLINK-11838). If you know any solution 
to this issue, please let me know. Thanks.
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only 
supported for HDFS
at 
org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:60)

Best regards,
Chen-Che

On 2021/04/16 06:53:37, Robert Metzger  wrote: 
> Hi,
> I assumed you are using the DataStream API, because you mentioned the
> streaming sink. But you also mentioned the state processor API (which I
> ignored a bit).
> 
> I wonder why you are using the state processor API. Can't you use the
> streaming job that created the state also for writing it to files using the
> StreamingFileSink?
> 
> If you want to stick to the DataSet API, then I guess you have to implement
> a custom (File)OutputFormat.
> 
> 
> On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang  wrote:
> 
> > Hi Robert,
> >
> > Thanks for your code. It's really helpful!
> >
> > However, with the readKeyedState api of state processor, we get dataset
> > for our data instead of datastream and it seems the dataset doesn't support
> > streamfilesink (not addSink method like datastream). If not, I need to
> > transform the dataset to a datastream. I'm not sure it's doable based on
> > https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628.
> > If it's doable, then I'll be able to solve our problem with applying
> > streamfilesink to the transformed dataset.
> >
> > Best wishes,
> > Chen-Che Huang
> >
> > On 2021/04/15 19:23:43, Robert Metzger  wrote:
> > > Hey Chen-Che Huang,
> > >
> > > I guess the StreamingFileSink is what you are looking for. It is
> > documented
> > > here:
> > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > I drafted a short example (that is not production ready), which does
> > > roughly what you are asking for:
> > > https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
> > >
> > > Hope this helps!
> > >
> > > Best,
> > > Robert
> > >
> > >
> > > On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > We're going to use state processor to make our keyedstate data to be
> > > > written to different files based on the keys. More specifically, we
> > want
> > > > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt
> > where
> > > > the value with the same key is stored in the same file. In each file,
> > the
> > > > data may be stored as follows. As far as I know, I need to implement
> > my own
> > > > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction)
> > to
> > > > meet the requirement. However, I wonder is there a native way to
> > achieve
> > > > this without implementing my own Sink because using official solution
> > is
> > > > usually more efficient and reliable than doing it by myself.  Many
> > thanks
> > > > for any comment.
> > > >
> > > > key1.txt
> > > > key1 value11
> > > > key1 value21
> > > > key1 value31
> > > >
> > > > key2.txt
> > > > key2 value21
> > > > key2 value22
> > > > key2 value23
> > > >
> > > > Best wishes,
> > > > Chen-Che Huang
> > > >
> > >
> >
> 


Re: Question about state processor data outputs

2021-04-16 Thread Chen-Che Huang
Hi Robert,

Due to some concerns, we planned to use state processor to achieve our goal. 
Now we will consider to reevaluate using datastream to do the job while 
exploring the possibility of implementing a custom FileOutputFormat. Thanks for 
your comments!

Best wishes,
Chen-Che Huang 

On 2021/04/16 06:53:37, Robert Metzger  wrote: 
> Hi,
> I assumed you are using the DataStream API, because you mentioned the
> streaming sink. But you also mentioned the state processor API (which I
> ignored a bit).
> 
> I wonder why you are using the state processor API. Can't you use the
> streaming job that created the state also for writing it to files using the
> StreamingFileSink?
> 
> If you want to stick to the DataSet API, then I guess you have to implement
> a custom (File)OutputFormat.
> 
> 
> On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang  wrote:
> 
> > Hi Robert,
> >
> > Thanks for your code. It's really helpful!
> >
> > However, with the readKeyedState api of state processor, we get dataset
> > for our data instead of datastream and it seems the dataset doesn't support
> > streamfilesink (not addSink method like datastream). If not, I need to
> > transform the dataset to a datastream. I'm not sure it's doable based on
> > https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628.
> > If it's doable, then I'll be able to solve our problem with applying
> > streamfilesink to the transformed dataset.
> >
> > Best wishes,
> > Chen-Che Huang
> >
> > On 2021/04/15 19:23:43, Robert Metzger  wrote:
> > > Hey Chen-Che Huang,
> > >
> > > I guess the StreamingFileSink is what you are looking for. It is
> > documented
> > > here:
> > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > > I drafted a short example (that is not production ready), which does
> > > roughly what you are asking for:
> > > https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
> > >
> > > Hope this helps!
> > >
> > > Best,
> > > Robert
> > >
> > >
> > > On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang 
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > We're going to use state processor to make our keyedstate data to be
> > > > written to different files based on the keys. More specifically, we
> > want
> > > > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt
> > where
> > > > the value with the same key is stored in the same file. In each file,
> > the
> > > > data may be stored as follows. As far as I know, I need to implement
> > my own
> > > > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction)
> > to
> > > > meet the requirement. However, I wonder is there a native way to
> > achieve
> > > > this without implementing my own Sink because using official solution
> > is
> > > > usually more efficient and reliable than doing it by myself.  Many
> > thanks
> > > > for any comment.
> > > >
> > > > key1.txt
> > > > key1 value11
> > > > key1 value21
> > > > key1 value31
> > > >
> > > > key2.txt
> > > > key2 value21
> > > > key2 value22
> > > > key2 value23
> > > >
> > > > Best wishes,
> > > > Chen-Che Huang
> > > >
> > >
> >
> 


Re: Question about state processor data outputs

2021-04-16 Thread Robert Metzger
Hi,
I assumed you are using the DataStream API, because you mentioned the
streaming sink. But you also mentioned the state processor API (which I
ignored a bit).

I wonder why you are using the state processor API. Can't you use the
streaming job that created the state also for writing it to files using the
StreamingFileSink?

If you want to stick to the DataSet API, then I guess you have to implement
a custom (File)OutputFormat.


On Fri, Apr 16, 2021 at 5:37 AM Chen-Che Huang  wrote:

> Hi Robert,
>
> Thanks for your code. It's really helpful!
>
> However, with the readKeyedState api of state processor, we get dataset
> for our data instead of datastream and it seems the dataset doesn't support
> streamfilesink (not addSink method like datastream). If not, I need to
> transform the dataset to a datastream. I'm not sure it's doable based on
> https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628.
> If it's doable, then I'll be able to solve our problem with applying
> streamfilesink to the transformed dataset.
>
> Best wishes,
> Chen-Che Huang
>
> On 2021/04/15 19:23:43, Robert Metzger  wrote:
> > Hey Chen-Che Huang,
> >
> > I guess the StreamingFileSink is what you are looking for. It is
> documented
> > here:
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> > I drafted a short example (that is not production ready), which does
> > roughly what you are asking for:
> > https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
> >
> > Hope this helps!
> >
> > Best,
> > Robert
> >
> >
> > On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang 
> wrote:
> >
> > > Hi all,
> > >
> > > We're going to use state processor to make our keyedstate data to be
> > > written to different files based on the keys. More specifically, we
> want
> > > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt
> where
> > > the value with the same key is stored in the same file. In each file,
> the
> > > data may be stored as follows. As far as I know, I need to implement
> my own
> > > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction)
> to
> > > meet the requirement. However, I wonder is there a native way to
> achieve
> > > this without implementing my own Sink because using official solution
> is
> > > usually more efficient and reliable than doing it by myself.  Many
> thanks
> > > for any comment.
> > >
> > > key1.txt
> > > key1 value11
> > > key1 value21
> > > key1 value31
> > >
> > > key2.txt
> > > key2 value21
> > > key2 value22
> > > key2 value23
> > >
> > > Best wishes,
> > > Chen-Che Huang
> > >
> >
>


Re: Question about state processor data outputs

2021-04-15 Thread Chen-Che Huang
Hi Robert,

Thanks for your code. It's really helpful!

However, with the readKeyedState api of state processor, we get dataset for our 
data instead of datastream and it seems the dataset doesn't support 
streamfilesink (not addSink method like datastream). If not, I need to 
transform the dataset to a datastream. I'm not sure it's doable based on 
https://www.alibabacloud.com/blog/deep-insights-into-flink-sql-flink-advanced-tutorials_596628.
 If it's doable, then I'll be able to solve our problem with applying 
streamfilesink to the transformed dataset.

Best wishes,
Chen-Che Huang

On 2021/04/15 19:23:43, Robert Metzger  wrote: 
> Hey Chen-Che Huang,
> 
> I guess the StreamingFileSink is what you are looking for. It is documented
> here:
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> I drafted a short example (that is not production ready), which does
> roughly what you are asking for:
> https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d
> 
> Hope this helps!
> 
> Best,
> Robert
> 
> 
> On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang  wrote:
> 
> > Hi all,
> >
> > We're going to use state processor to make our keyedstate data to be
> > written to different files based on the keys. More specifically, we want
> > our data to be written to files key1.txt, key2.txt, ..., and keyn.txt where
> > the value with the same key is stored in the same file. In each file, the
> > data may be stored as follows. As far as I know, I need to implement my own
> > Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction) to
> > meet the requirement. However, I wonder is there a native way to achieve
> > this without implementing my own Sink because using official solution is
> > usually more efficient and reliable than doing it by myself.  Many thanks
> > for any comment.
> >
> > key1.txt
> > key1 value11
> > key1 value21
> > key1 value31
> >
> > key2.txt
> > key2 value21
> > key2 value22
> > key2 value23
> >
> > Best wishes,
> > Chen-Che Huang
> >
> 


Re: Question about state processor data outputs

2021-04-15 Thread Robert Metzger
Hey Chen-Che Huang,

I guess the StreamingFileSink is what you are looking for. It is documented
here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
I drafted a short example (that is not production ready), which does
roughly what you are asking for:
https://gist.github.com/rmetzger/7d5dbdaa118c63f5875c8c9520cc311d

Hope this helps!

Best,
Robert


On Thu, Apr 15, 2021 at 11:33 AM Chen-Che Huang  wrote:

> Hi all,
>
> We're going to use state processor to make our keyedstate data to be
> written to different files based on the keys. More specifically, we want
> our data to be written to files key1.txt, key2.txt, ..., and keyn.txt where
> the value with the same key is stored in the same file. In each file, the
> data may be stored as follows. As far as I know, I need to implement my own
> Sink (org.apache.flink.streaming.api.functions.sink.RichSinkFunction) to
> meet the requirement. However, I wonder is there a native way to achieve
> this without implementing my own Sink because using official solution is
> usually more efficient and reliable than doing it by myself.  Many thanks
> for any comment.
>
> key1.txt
> key1 value11
> key1 value21
> key1 value31
>
> key2.txt
> key2 value21
> key2 value22
> key2 value23
>
> Best wishes,
> Chen-Che Huang
>