format.fail-on-missing-field
format.ignore-parse-errors
挺有用的,可以容错数据格式不对,缺少字段等。
但是不能容错对于使用事件时间的处理模型,因为容错的手段是将所有字段值设为null,事件时间也设置为null。
但是flink
sql需要从数据中提取事件时间且不能为null,否则就会抛出异常,从这一点来说,上述配置并没有完全容错,详细如下源码(这里应该可以考虑直接跳过这条数据吧?):
Thanks,
??historyServer,flink
1.12.0??Dockerfile ??CMD ["history-server"]
??8082,??
----
??:
git clone??,??
----
??:
"user-zh"
Flink代码里Json反序列化里有2个参数应该对你有帮助,你到官网上查询下怎么使用
上述2个配置项的参数名字分别是:
format.fail-on-missing-field
format.ignore-parse-errors
在 2020-12-20 10:48:04,"陈帅" 写道:
>业务上游数据源发出来的数据有可能会有脏数据导致数据无法解析成源表的结构,如kafka json topic映射成源表。
>请问这种情况下flink sql要如何处理? 期望的是将脏数据发到一个专门的topic,是不是要自己写个connector? 标准kafka
??Kafka??schemacsv??json??avro??schema??
----
??:
业务上游数据源发出来的数据有可能会有脏数据导致数据无法解析成源表的结构,如kafka json topic映射成源表。
请问这种情况下flink sql要如何处理? 期望的是将脏数据发到一个专门的topic,是不是要自己写个connector? 标准kafka
connector支持这种需求么?
flink 1.11+ 支持yarn
application模式提交任务,我试着用这个模式提交examples下的TopSpeedWindowing任务,我将$FLINK_HOME/lib目录下的文件和要运行任务的jar文件都上传到了hdfs,运行如下命令:
./bin/flink run-application -p 1 -t yarn-application \
-yD yarn.provided.lib.dirs="hdfs://localhost:9000/flink/libs" \
是的
张锴 于2020年12月19日周六 下午5:45写道:
> 我按官网操作,重写了序列化方式
>
> val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
> props)kafkaSource.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor[MyType] {
> def extractAscendingTimestamp(element: MyType): Long =
>
你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
Flink
Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的
Best,
Yang
liujian <13597820...@qq.com> 于2020年12月19日周六 下午8:29写道:
>
>
HDFS??Ha,hdfs-site.xml,,configMap??hdfs-site.xml??$FLINK_HOME/conf??
----
??:
"user-zh"
我按官网操作,重写了序列化方式
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema,
props)kafkaSource.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long =
element.eventTimestamp})
val stream: DataStream[MyType] =
11 matches
Mail list logo