private static class HiveRollingPolicy extends CheckpointRollingPolicy<RowData, 
String> {




private final long rollingFileSize;

private final long rollingTimeInterval;




private HiveRollingPolicy(

long rollingFileSize,

long rollingTimeInterval) {

Preconditions.checkArgument(rollingFileSize > 0L);

Preconditions.checkArgument(rollingTimeInterval > 0L);

this.rollingFileSize = rollingFileSize;

this.rollingTimeInterval = rollingTimeInterval;

}




@Override

public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {

return true;

}




@Override

public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, RowData 
element) {

return false;

}




@Override

public boolean shouldRollOnProcessingTime(

PartFileInfo<String> partFileState, long currentTime) {

try {

return currentTime - partFileState.getCreationTime() >= rollingTimeInterval ||

partFileState.getSize() > rollingFileSize;

} catch (IOException e) {

throw new UncheckedIOException(e);

}

}

}
没太理解,为啥一定要控制checkpoint强制滚动文件,这样的话配置的文件滚动参数就失效了:  
'sink.rolling-policy.check-interval' ='30s',
  'sink.rolling-policy.rollover-interval'='10min',
  'sink.rolling-policy.file-size'='128MB'
目的是想让文件按照 固定的size 或记录数  或时间 滚动,这样该如何做?

回复