private static class HiveRollingPolicy extends CheckpointRollingPolicy<RowData, String> {
private final long rollingFileSize; private final long rollingTimeInterval; private HiveRollingPolicy( long rollingFileSize, long rollingTimeInterval) { Preconditions.checkArgument(rollingFileSize > 0L); Preconditions.checkArgument(rollingTimeInterval > 0L); this.rollingFileSize = rollingFileSize; this.rollingTimeInterval = rollingTimeInterval; } @Override public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) { return true; } @Override public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData element) { return false; } @Override public boolean shouldRollOnProcessingTime( PartFileInfo<String> partFileState, long currentTime) { try { return currentTime - partFileState.getCreationTime() >= rollingTimeInterval || partFileState.getSize() > rollingFileSize; } catch (IOException e) { throw new UncheckedIOException(e); } } } 没太理解,为啥一定要控制checkpoint强制滚动文件,这样的话配置的文件滚动参数就失效了: 'sink.rolling-policy.check-interval' ='30s', 'sink.rolling-policy.rollover-interval'='10min', 'sink.rolling-policy.file-size'='128MB' 目的是想让文件按照 固定的size 或记录数 或时间 滚动,这样该如何做?