已解决。覆盖了flink这部分源码去除了对非hdfs的schema限制。

张锴 <zk357794...@gmail.com> 于2021年1月21日周四 下午7:35写道:

> @赵一旦
> 另外,上次我还提了一个问题请教你,我试了你说的那个想法,但是好像有点问题,你可以看一下
>
> 张锴 <zk357794...@gmail.com> 于2021年1月21日周四 下午7:13写道:
>
> > 我用的flink 1.10版,FlieSink就是BucketingSink,我是用这个写hdfs的
> >
> > 赵一旦 <hinobl...@gmail.com> 于2021年1月21日周四 下午7:05写道:
> >
> >> @Michael Ran; 嗯嗯,没关系。
> >>
> >> @张锴 你说的是flink哪个版本的connector,stream or sql。我搜了下我的没有。我是1.12,stream。
> >>
> >>
> 目前看文档有streamFileSink,还有FileSink,从文档内容来看使用方式差不多。我计划试一下FileSink,但不清楚FileSink和StreamFileSink啥区别,是否都能写hadoop类文件系统,因为涉及是否原子写,比较分布式文件系统不支持追加和编辑等。
> >>
> >> Michael Ran <greemqq...@163.com> 于2021年1月21日周四 下午7:01写道:
> >>
> >> >
> >> >
> >>
> 很抱歉,我已经很久没用过这个了。但是可以根据异常信息以及API源码执行进行分析,确定是否能直接写入。<br/>如果你要写入自定义的文件系统,那么只能实现自己的sink方式。或者你的文件系统的写入方式兼容hdfs的上层API<br/>可以参考各个sink端的写法
> >> > 在 2021-01-21 18:45:06,"张锴" <zk357794...@gmail.com> 写道:
> >> > >import
> >> org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink,
> >> > >DateTimeBucketer}
> >> > >
> >> > >sink.setBucketer sink.setWriter用这种方式试试
> >> > >
> >> > >
> >> > >
> >> > >赵一旦 <hinobl...@gmail.com> 于2021年1月21日周四 下午6:37写道:
> >> > >
> >> > >> @Michael Ran
> >> > >> 然后有什么解决方案吗,我这个是使用flink的streamFileSink方式写hdfs的时候出现的异常。
> >> > >>
> >> > >> Michael Ran <greemqq...@163.com> 于2021年1月21日周四 下午5:23写道:
> >> > >>
> >> > >> > 这里应该是用了hdfs 的特定API吧,文件系统没兼容<br/>public
> >> > >> > HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {...}
> >> > >> > 在 2021-01-21 17:18:23,"赵一旦" <hinobl...@gmail.com> 写道:
> >> > >> > >具体报错信息如下:
> >> > >> > >
> >> > >> > >java.lang.UnsupportedOperationException: Recoverable writers on
> >> > Hadoop
> >> > >> are
> >> > >> > >only supported for HDFS
> >> > >> > >    at
> >> > org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(
> >> > >> > >HadoopRecoverableWriter.java:61)
> >> > >> > >    at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem
> >> > >> > >.createRecoverableWriter(HadoopFileSystem.java:210)
> >> > >> > >    at org.apache.flink.core.fs.SafetyNetWrapperFileSystem
> >> > >> > >.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> >> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> >
> >> >
> >StreamingFileSink$RowFormatBuilder.createBucketWriter(StreamingFileSink
> >> > >> > >.java:260)
> >> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> >StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:270)
> >> > >> > >    at org.apache.flink.streaming.api.functions.sink.filesystem.
> >> > >> > >StreamingFileSink.initializeState(StreamingFileSink.java:412)
> >> > >> > >    at
> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >> > >.tryRestoreFunction(StreamingFunctionUtils.java:185)
> >> > >> > >    at
> >> > org.apache.flink.streaming.util.functions.StreamingFunctionUtils
> >> > >> > >.restoreFunctionState(StreamingFunctionUtils.java:167)
> >> > >> > >    at
> >> > >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator
> >> > >> > >.initializeState(AbstractUdfStreamOperator.java:96)
> >> > >> > >    at
> >> > >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler
> >> > >> > >.initializeOperatorState(StreamOperatorStateHandler.java:107)
> >> > >> > >    at
> >> > org.apache.flink.streaming.api.operators.AbstractStreamOperator
> >> > >> > >.initializeState(AbstractStreamOperator.java:264)
> >> > >> > >    at org.apache.flink.streaming.runtime.tasks.OperatorChain
> >> > >> > >.initializeStateAndOpenOperators(OperatorChain.java:400)
> >> > >> > >    at org.apache.flink.streaming.runtime.tasks.StreamTask
> >> > >> > >.lambda$beforeInvoke$2(StreamTask.java:507)
> >> > >> > >    at
> >> > >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
> >> > >> > >.runThrowing(StreamTaskActionExecutor.java:47)
> >> > >> > >    at
> >> > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> >> > >> > >StreamTask.java:501)
> >> > >> > >    at
> >> > >> >
> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> >> > >> > >.java:531)
> >> > >> > >    at
> >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> >> > >> > >    at
> >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> >> > >> > >    at java.lang.Thread.run(Thread.java:748)
> >> > >> > >
> >> > >> > >
> >> > >> > >赵一旦 <hinobl...@gmail.com> 于2021年1月21日周四 下午5:17写道:
> >> > >> > >
> >> > >> > >> Recoverable writers on Hadoop are only supported for HDFS
> >> > >> > >>
> >> > >> > >> 如上,我们用的hadoop协议的,但底层不是hdfs,是公司自研的分布式文件系统。
> >> > >> > >>
> >> > >> > >> 使用spark写,spark-sql读等都没问题。但是flink写和读当前都没尝试成功。
> >> > >> > >>
> >> > >> > >>
> >> > >> > >>
> >> > >> >
> >> > >>
> >> >
> >>
> >
>

回复