Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 jie mei
Hi, Leonard


好的,我将会提一个PR来修复这个issue


Leonard Xu  于2020年12月10日周四 下午12:10写道:

> 你们分析是对的,这是个bug,这里应该用SinkFunctionProvider,
> 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为
> OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库,
> 也可以说是OutputFormat不会参与cp,  所以at-least-once都不一定能保证。
>
> 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗?
>
> 祝好,
> Leonard
>
>
>
> 在 2020年12月10日,11:22,jie mei  写道:
>
> Hi,Jark
>
> 好的,我会就此创建一个issue
>
> Jark Wu  于2020年12月10日周四 上午11:17写道:
>
>> Hi Jie,
>>
>> 看起来确实是个问题。
>> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
>> 可以帮忙创建个 issue 么?
>>
>> Best,
>> Jark
>>
>> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:
>>
>> > Hi,
>> >是的,感觉你是对的。
>> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
>> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在
>> snapshotState
>> > 时候调用format.flush。
>> >WDYT @Jark @ Leonard
>> >
>> > Best,
>> > Hailong
>> >
>> >
>> > 在 2020-12-09 17:13:14,"jie mei"  写道:
>> > >Hi, Community
>> > >
>> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
>> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
>> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
>> > >
>> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
>> > >
>> > >
>> > >@Public
>> > >public interface OutputFormat extends Serializable {
>> > >
>> > >   /**
>> > >* Configures this output format. Since output formats are
>> > >instantiated generically and hence parameterless,
>> > >* this method is the place where the output formats set their
>> > >basic fields based on configuration values.
>> > >* 
>> > >* This method is always called first on a newly instantiated
>> output format.
>> > >*
>> > >* @param parameters The configuration with all parameters.
>> > >*/
>> > >   void configure(Configuration parameters);
>> > >
>> > >   /**
>> > >* Opens a parallel instance of the output format to store the
>> > >result of its parallel instance.
>> > >* 
>> > >* When this method is called, the output format it guaranteed to
>> > >be configured.
>> > >*
>> > >* @param taskNumber The number of the parallel instance.
>> > >* @param numTasks The number of parallel tasks.
>> > >* @throws IOException Thrown, if the output could not be opened
>> > >due to an I/O problem.
>> > >*/
>> > >   void open(int taskNumber, int numTasks) throws IOException;
>> > >
>> > >
>> > >   /**
>> > >* Adds a record to the output.
>> > >* 
>> > >* When this method is called, the output format it guaranteed to
>> be opened.
>> > >*
>> > >* @param record The records to add to the output.
>> > >* @throws IOException Thrown, if the records could not be added to
>> > >to an I/O problem.
>> > >*/
>> > >   void writeRecord(IT record) throws IOException;
>> > >
>> > >   /**
>> > >* Method that marks the end of the life-cycle of parallel output
>> > >instance. Should be used to close
>> > >* channels and streams and release resources.
>> > >* After this method returns without an error, the output is
>> > >assumed to be correct.
>> > >* 
>> > >* When this method is called, the output format it guaranteed to
>> be opened.
>> > >*
>> > >* @throws IOException Thrown, if the input could not be closed
>> properly.
>> > >*/
>> > >   void close() throws IOException;
>> > >}
>> > >
>> > >
>> > >--
>> > >
>> > >*Best Regards*
>> > >*Jeremy Mei*
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>
>
>

-- 

*Best Regards*
*Jeremy Mei*


Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 Leonard Xu
你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, 
用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为OutputFormatSinkFunction没有继承CheckpointedFunction,
 没法保证在cp时将buffer数据刷到数据库,
也可以说是OutputFormat不会参与cp,  所以at-least-once都不一定能保证。

修复应该很简单的,@jie mei 你有兴趣帮忙修复吗?

祝好,
Leonard



> 在 2020年12月10日,11:22,jie mei  写道:
> 
> Hi,Jark
> 
> 好的,我会就此创建一个issue
> 
> Jark Wu mailto:imj...@gmail.com>> 于2020年12月10日周四 上午11:17写道:
> Hi Jie,
> 
> 看起来确实是个问题。
> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
> 可以帮忙创建个 issue 么?
> 
> Best,
> Jark
> 
> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com 
> > wrote:
> 
> > Hi,
> >是的,感觉你是对的。
> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
> > 时候调用format.flush。
> >WDYT @Jark @ Leonard
> >
> > Best,
> > Hailong
> >
> >
> > 在 2020-12-09 17:13:14,"jie mei"  > > 写道:
> > >Hi, Community
> > >
> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> > >
> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
> > >
> > >
> > >@Public
> > >public interface OutputFormat extends Serializable {
> > >
> > >   /**
> > >* Configures this output format. Since output formats are
> > >instantiated generically and hence parameterless,
> > >* this method is the place where the output formats set their
> > >basic fields based on configuration values.
> > >* 
> > >* This method is always called first on a newly instantiated output 
> > > format.
> > >*
> > >* @param parameters The configuration with all parameters.
> > >*/
> > >   void configure(Configuration parameters);
> > >
> > >   /**
> > >* Opens a parallel instance of the output format to store the
> > >result of its parallel instance.
> > >* 
> > >* When this method is called, the output format it guaranteed to
> > >be configured.
> > >*
> > >* @param taskNumber The number of the parallel instance.
> > >* @param numTasks The number of parallel tasks.
> > >* @throws IOException Thrown, if the output could not be opened
> > >due to an I/O problem.
> > >*/
> > >   void open(int taskNumber, int numTasks) throws IOException;
> > >
> > >
> > >   /**
> > >* Adds a record to the output.
> > >* 
> > >* When this method is called, the output format it guaranteed to be 
> > > opened.
> > >*
> > >* @param record The records to add to the output.
> > >* @throws IOException Thrown, if the records could not be added to
> > >to an I/O problem.
> > >*/
> > >   void writeRecord(IT record) throws IOException;
> > >
> > >   /**
> > >* Method that marks the end of the life-cycle of parallel output
> > >instance. Should be used to close
> > >* channels and streams and release resources.
> > >* After this method returns without an error, the output is
> > >assumed to be correct.
> > >* 
> > >* When this method is called, the output format it guaranteed to be 
> > > opened.
> > >*
> > >* @throws IOException Thrown, if the input could not be closed 
> > > properly.
> > >*/
> > >   void close() throws IOException;
> > >}
> > >
> > >
> > >--
> > >
> > >*Best Regards*
> > >*Jeremy Mei*
> >
> >
> >
> >
> >
> >
> 
> 
> -- 
> 
> Best Regards
> Jeremy Mei



Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 jie mei
Hi,Jark

好的,我会就此创建一个issue

Jark Wu  于2020年12月10日周四 上午11:17写道:

> Hi Jie,
>
> 看起来确实是个问题。
> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
> 可以帮忙创建个 issue 么?
>
> Best,
> Jark
>
> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:
>
> > Hi,
> >是的,感觉你是对的。
> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在
> snapshotState
> > 时候调用format.flush。
> >WDYT @Jark @ Leonard
> >
> > Best,
> > Hailong
> >
> >
> > 在 2020-12-09 17:13:14,"jie mei"  写道:
> > >Hi, Community
> > >
> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> > >
> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
> > >
> > >
> > >@Public
> > >public interface OutputFormat extends Serializable {
> > >
> > >   /**
> > >* Configures this output format. Since output formats are
> > >instantiated generically and hence parameterless,
> > >* this method is the place where the output formats set their
> > >basic fields based on configuration values.
> > >* 
> > >* This method is always called first on a newly instantiated output
> format.
> > >*
> > >* @param parameters The configuration with all parameters.
> > >*/
> > >   void configure(Configuration parameters);
> > >
> > >   /**
> > >* Opens a parallel instance of the output format to store the
> > >result of its parallel instance.
> > >* 
> > >* When this method is called, the output format it guaranteed to
> > >be configured.
> > >*
> > >* @param taskNumber The number of the parallel instance.
> > >* @param numTasks The number of parallel tasks.
> > >* @throws IOException Thrown, if the output could not be opened
> > >due to an I/O problem.
> > >*/
> > >   void open(int taskNumber, int numTasks) throws IOException;
> > >
> > >
> > >   /**
> > >* Adds a record to the output.
> > >* 
> > >* When this method is called, the output format it guaranteed to be
> opened.
> > >*
> > >* @param record The records to add to the output.
> > >* @throws IOException Thrown, if the records could not be added to
> > >to an I/O problem.
> > >*/
> > >   void writeRecord(IT record) throws IOException;
> > >
> > >   /**
> > >* Method that marks the end of the life-cycle of parallel output
> > >instance. Should be used to close
> > >* channels and streams and release resources.
> > >* After this method returns without an error, the output is
> > >assumed to be correct.
> > >* 
> > >* When this method is called, the output format it guaranteed to be
> opened.
> > >*
> > >* @throws IOException Thrown, if the input could not be closed
> properly.
> > >*/
> > >   void close() throws IOException;
> > >}
> > >
> > >
> > >--
> > >
> > >*Best Regards*
> > >*Jeremy Mei*
> >
> >
> >
> >
> >
> >
>


-- 

*Best Regards*
*Jeremy Mei*


Re: flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 Jark Wu
Hi Jie,

看起来确实是个问题。
sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
可以帮忙创建个 issue 么?

Best,
Jark

On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:

> Hi,
>是的,感觉你是对的。
>   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
> 时候调用format.flush。
>WDYT @Jark @ Leonard
>
> Best,
> Hailong
>
>
> 在 2020-12-09 17:13:14,"jie mei"  写道:
> >Hi, Community
> >
> >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> >
> >我的问题是:是否有办法强制刷新buffer中的数据入库?
> >
> >
> >@Public
> >public interface OutputFormat extends Serializable {
> >
> >   /**
> >* Configures this output format. Since output formats are
> >instantiated generically and hence parameterless,
> >* this method is the place where the output formats set their
> >basic fields based on configuration values.
> >* 
> >* This method is always called first on a newly instantiated output 
> > format.
> >*
> >* @param parameters The configuration with all parameters.
> >*/
> >   void configure(Configuration parameters);
> >
> >   /**
> >* Opens a parallel instance of the output format to store the
> >result of its parallel instance.
> >* 
> >* When this method is called, the output format it guaranteed to
> >be configured.
> >*
> >* @param taskNumber The number of the parallel instance.
> >* @param numTasks The number of parallel tasks.
> >* @throws IOException Thrown, if the output could not be opened
> >due to an I/O problem.
> >*/
> >   void open(int taskNumber, int numTasks) throws IOException;
> >
> >
> >   /**
> >* Adds a record to the output.
> >* 
> >* When this method is called, the output format it guaranteed to be 
> > opened.
> >*
> >* @param record The records to add to the output.
> >* @throws IOException Thrown, if the records could not be added to
> >to an I/O problem.
> >*/
> >   void writeRecord(IT record) throws IOException;
> >
> >   /**
> >* Method that marks the end of the life-cycle of parallel output
> >instance. Should be used to close
> >* channels and streams and release resources.
> >* After this method returns without an error, the output is
> >assumed to be correct.
> >* 
> >* When this method is called, the output format it guaranteed to be 
> > opened.
> >*
> >* @throws IOException Thrown, if the input could not be closed properly.
> >*/
> >   void close() throws IOException;
> >}
> >
> >
> >--
> >
> >*Best Regards*
> >*Jeremy Mei*
>
>
>
>
>
>


flink jdbc connector 在checkpoint的时候如何确保buffer中的数据全都sink到数据库

2020-12-09 文章 jie mei
Hi, Community

JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。

我的问题是:是否有办法强制刷新buffer中的数据入库?


@Public
public interface OutputFormat extends Serializable {

   /**
* Configures this output format. Since output formats are
instantiated generically and hence parameterless,
* this method is the place where the output formats set their
basic fields based on configuration values.
* 
* This method is always called first on a newly instantiated output format.
*
* @param parameters The configuration with all parameters.
*/
   void configure(Configuration parameters);

   /**
* Opens a parallel instance of the output format to store the
result of its parallel instance.
* 
* When this method is called, the output format it guaranteed to
be configured.
*
* @param taskNumber The number of the parallel instance.
* @param numTasks The number of parallel tasks.
* @throws IOException Thrown, if the output could not be opened
due to an I/O problem.
*/
   void open(int taskNumber, int numTasks) throws IOException;


   /**
* Adds a record to the output.
* 
* When this method is called, the output format it guaranteed to be opened.
*
* @param record The records to add to the output.
* @throws IOException Thrown, if the records could not be added to
to an I/O problem.
*/
   void writeRecord(IT record) throws IOException;

   /**
* Method that marks the end of the life-cycle of parallel output
instance. Should be used to close
* channels and streams and release resources.
* After this method returns without an error, the output is
assumed to be correct.
* 
* When this method is called, the output format it guaranteed to be opened.
*
* @throws IOException Thrown, if the input could not be closed properly.
*/
   void close() throws IOException;
}


-- 

*Best Regards*
*Jeremy Mei*