Hi, Leonard

好的,我将会提一个PR来修复这个issue


Leonard Xu <xbjt...@gmail.com> 于2020年12月10日周四 下午12:10写道:

> 你们分析是对的,这是个bug,这里应该用SinkFunctionProvider,
> 用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为
> OutputFormatSinkFunction没有继承CheckpointedFunction, 没法保证在cp时将buffer数据刷到数据库,
> 也可以说是OutputFormat不会参与cp,  所以at-least-once都不一定能保证。
>
> 修复应该很简单的,@jie mei 你有兴趣帮忙修复吗?
>
> 祝好,
> Leonard
>
>
>
> 在 2020年12月10日,11:22,jie mei <meijie.w...@gmail.com> 写道:
>
> Hi,Jark
>
> 好的,我会就此创建一个issue
>
> Jark Wu <imj...@gmail.com> 于2020年12月10日周四 上午11:17写道:
>
>> Hi Jie,
>>
>> 看起来确实是个问题。
>> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
>> 可以帮忙创建个 issue 么?
>>
>> Best,
>> Jark
>>
>> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:
>>
>> > Hi,
>> >    是的,感觉你是对的。
>> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
>> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在
>> snapshotState
>> > 时候调用format.flush。
>> >    WDYT @Jark @ Leonard
>> >
>> > Best,
>> > Hailong
>> >
>> >
>> > 在 2020-12-09 17:13:14,"jie mei" <meijie.w...@gmail.com> 写道:
>> > >Hi, Community
>> > >
>> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
>> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
>> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
>> > >
>> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
>> > >
>> > >
>> > >@Public
>> > >public interface OutputFormat<IT> extends Serializable {
>> > >
>> > >   /**
>> > >    * Configures this output format. Since output formats are
>> > >instantiated generically and hence parameterless,
>> > >    * this method is the place where the output formats set their
>> > >basic fields based on configuration values.
>> > >    * <p>
>> > >    * This method is always called first on a newly instantiated
>> output format.
>> > >    *
>> > >    * @param parameters The configuration with all parameters.
>> > >    */
>> > >   void configure(Configuration parameters);
>> > >
>> > >   /**
>> > >    * Opens a parallel instance of the output format to store the
>> > >result of its parallel instance.
>> > >    * <p>
>> > >    * When this method is called, the output format it guaranteed to
>> > >be configured.
>> > >    *
>> > >    * @param taskNumber The number of the parallel instance.
>> > >    * @param numTasks The number of parallel tasks.
>> > >    * @throws IOException Thrown, if the output could not be opened
>> > >due to an I/O problem.
>> > >    */
>> > >   void open(int taskNumber, int numTasks) throws IOException;
>> > >
>> > >
>> > >   /**
>> > >    * Adds a record to the output.
>> > >    * <p>
>> > >    * When this method is called, the output format it guaranteed to
>> be opened.
>> > >    *
>> > >    * @param record The records to add to the output.
>> > >    * @throws IOException Thrown, if the records could not be added to
>> > >to an I/O problem.
>> > >    */
>> > >   void writeRecord(IT record) throws IOException;
>> > >
>> > >   /**
>> > >    * Method that marks the end of the life-cycle of parallel output
>> > >instance. Should be used to close
>> > >    * channels and streams and release resources.
>> > >    * After this method returns without an error, the output is
>> > >assumed to be correct.
>> > >    * <p>
>> > >    * When this method is called, the output format it guaranteed to
>> be opened.
>> > >    *
>> > >    * @throws IOException Thrown, if the input could not be closed
>> properly.
>> > >    */
>> > >   void close() throws IOException;
>> > >}
>> > >
>> > >
>> > >--
>> > >
>> > >*Best Regards*
>> > >*Jeremy Mei*
>> >
>> >
>> >
>> >
>> >
>> >
>>
>
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>
>
>

-- 

*Best Regards*
*Jeremy Mei*

回复