我在使用flink 1.8 自定义 FileInputFormat 的时候遇到了不兼容问题,初步看了源代码,不知道自己得出的结论是否正确,并想了解一下后续趋势和进展,麻烦各位大佬抽空指点一下,先行感谢~~
问题1: StreamExecutionEnvironment 为什么要做这样的限制?ContinuousFileMonitoringFunction 的作用是什么? 相关的代码描述如下 StreamExecutionEnvironment 对 FileInputFormat 对象有特殊的处理逻辑 if (inputFormat instanceof FileInputFormat) { @SuppressWarnings("unchecked") FileInputFormat<OUT> format = (FileInputFormat<OUT>) inputFormat; source = createFileInput(format, typeInfo, "Custom File source", FileProcessingMode.PROCESS_ONCE, -1); } else { source = createInput(inputFormat, typeInfo, "Custom Source"); } return source; createFileInput 方法内 使用 ContinuousFileMonitoringFunction 对 inputFormat 进行处理,在其构造函数中,对 FileInputFormat<OUT> format 进行了一些条件约束 Preconditions.checkArgument( format.getFilePaths().length == 1, "FileInputFormats with multiple paths are not supported yet."); 这里就将 FileInputFormat 限制为只能添加一个file path。 问题2: 在flink 1.10 版本情况是否有改善?(在 FileInputFormat.supportsMultiPaths 方法中我看到flink 2.0 中,所有的FileInputFormat 都会支持多路径) /** * Override this method to supports multiple paths. * When this method will be removed, all FileInputFormats have to support multiple paths. * * @return True if the FileInputFormat supports multiple paths, false otherwise. * * @deprecated Will be removed for Flink 2.0. */ @Deprecated public boolean supportsMultiPaths() { return false; }