Hi Jie,

看起来确实是个问题。
sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
可以帮忙创建个 issue 么?

Best,
Jark

On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com> wrote:

> Hi,
>    是的,感觉你是对的。
>   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
> 时候调用format.flush。
>    WDYT @Jark @ Leonard
>
> Best,
> Hailong
>
>
> 在 2020-12-09 17:13:14,"jie mei" <meijie.w...@gmail.com> 写道:
> >Hi, Community
> >
> >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> >
> >我的问题是:是否有办法强制刷新buffer中的数据入库?
> >
> >
> >@Public
> >public interface OutputFormat<IT> extends Serializable {
> >
> >   /**
> >    * Configures this output format. Since output formats are
> >instantiated generically and hence parameterless,
> >    * this method is the place where the output formats set their
> >basic fields based on configuration values.
> >    * <p>
> >    * This method is always called first on a newly instantiated output 
> > format.
> >    *
> >    * @param parameters The configuration with all parameters.
> >    */
> >   void configure(Configuration parameters);
> >
> >   /**
> >    * Opens a parallel instance of the output format to store the
> >result of its parallel instance.
> >    * <p>
> >    * When this method is called, the output format it guaranteed to
> >be configured.
> >    *
> >    * @param taskNumber The number of the parallel instance.
> >    * @param numTasks The number of parallel tasks.
> >    * @throws IOException Thrown, if the output could not be opened
> >due to an I/O problem.
> >    */
> >   void open(int taskNumber, int numTasks) throws IOException;
> >
> >
> >   /**
> >    * Adds a record to the output.
> >    * <p>
> >    * When this method is called, the output format it guaranteed to be 
> > opened.
> >    *
> >    * @param record The records to add to the output.
> >    * @throws IOException Thrown, if the records could not be added to
> >to an I/O problem.
> >    */
> >   void writeRecord(IT record) throws IOException;
> >
> >   /**
> >    * Method that marks the end of the life-cycle of parallel output
> >instance. Should be used to close
> >    * channels and streams and release resources.
> >    * After this method returns without an error, the output is
> >assumed to be correct.
> >    * <p>
> >    * When this method is called, the output format it guaranteed to be 
> > opened.
> >    *
> >    * @throws IOException Thrown, if the input could not be closed properly.
> >    */
> >   void close() throws IOException;
> >}
> >
> >
> >--
> >
> >*Best Regards*
> >*Jeremy Mei*
>
>
>
>
>
>

回复