你们分析是对的,这是个bug,这里应该用SinkFunctionProvider, 
用GenericJdbcSinkFunction再wrap一层,不用OutputFormatProvider,因为OutputFormatSinkFunction没有继承CheckpointedFunction,
 没法保证在cp时将buffer数据刷到数据库,
也可以说是OutputFormat不会参与cp,  所以at-least-once都不一定能保证。

修复应该很简单的,@jie mei 你有兴趣帮忙修复吗?

祝好,
Leonard



> 在 2020年12月10日,11:22,jie mei <meijie.w...@gmail.com> 写道:
> 
> Hi,Jark
> 
> 好的,我会就此创建一个issue
> 
> Jark Wu <imj...@gmail.com <mailto:imj...@gmail.com>> 于2020年12月10日周四 上午11:17写道:
> Hi Jie,
> 
> 看起来确实是个问题。
> sink 返回的 JdbcBatchingOutputFormat 没有 wrap 到 GenericJdbcSinkFunction 里面。
> 可以帮忙创建个 issue 么?
> 
> Best,
> Jark
> 
> On Thu, 10 Dec 2020 at 02:05, hailongwang <18868816...@163.com 
> <mailto:18868816...@163.com>> wrote:
> 
> > Hi,
> >    是的,感觉你是对的。
> >   `JdbcOutputFormat` 会被 wrap 在 `OutputFormatSinkFunction` 中,而
> > `OutputFormatSinkFunction` 没有继承 `CheckpointedFunction`,所以没法在 snapshotState
> > 时候调用format.flush。
> >    WDYT @Jark @ Leonard
> >
> > Best,
> > Hailong
> >
> >
> > 在 2020-12-09 17:13:14,"jie mei" <meijie.w...@gmail.com 
> > <mailto:meijie.w...@gmail.com>> 写道:
> > >Hi, Community
> > >
> > >JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为
> > >OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的
> > >代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
> > >
> > >我的问题是:是否有办法强制刷新buffer中的数据入库?
> > >
> > >
> > >@Public
> > >public interface OutputFormat<IT> extends Serializable {
> > >
> > >   /**
> > >    * Configures this output format. Since output formats are
> > >instantiated generically and hence parameterless,
> > >    * this method is the place where the output formats set their
> > >basic fields based on configuration values.
> > >    * <p>
> > >    * This method is always called first on a newly instantiated output 
> > > format.
> > >    *
> > >    * @param parameters The configuration with all parameters.
> > >    */
> > >   void configure(Configuration parameters);
> > >
> > >   /**
> > >    * Opens a parallel instance of the output format to store the
> > >result of its parallel instance.
> > >    * <p>
> > >    * When this method is called, the output format it guaranteed to
> > >be configured.
> > >    *
> > >    * @param taskNumber The number of the parallel instance.
> > >    * @param numTasks The number of parallel tasks.
> > >    * @throws IOException Thrown, if the output could not be opened
> > >due to an I/O problem.
> > >    */
> > >   void open(int taskNumber, int numTasks) throws IOException;
> > >
> > >
> > >   /**
> > >    * Adds a record to the output.
> > >    * <p>
> > >    * When this method is called, the output format it guaranteed to be 
> > > opened.
> > >    *
> > >    * @param record The records to add to the output.
> > >    * @throws IOException Thrown, if the records could not be added to
> > >to an I/O problem.
> > >    */
> > >   void writeRecord(IT record) throws IOException;
> > >
> > >   /**
> > >    * Method that marks the end of the life-cycle of parallel output
> > >instance. Should be used to close
> > >    * channels and streams and release resources.
> > >    * After this method returns without an error, the output is
> > >assumed to be correct.
> > >    * <p>
> > >    * When this method is called, the output format it guaranteed to be 
> > > opened.
> > >    *
> > >    * @throws IOException Thrown, if the input could not be closed 
> > > properly.
> > >    */
> > >   void close() throws IOException;
> > >}
> > >
> > >
> > >--
> > >
> > >*Best Regards*
> > >*Jeremy Mei*
> >
> >
> >
> >
> >
> >
> 
> 
> -- 
> 
> Best Regards
> Jeremy Mei

回复