回复:请教设置uid的最佳实践
你这里绝大多数都不用设置吧 | | 18579099920 | | 邮箱:18579099...@163.com | 签名由 网易邮箱大师 定制 在2020年11月26日 12:09,范瑞 写道: 无状态算子可以不设置 Best fanrui ---原始邮件--- 发件人: "kingdomad"
回复:Flink cdc 多表关联处理延迟很大
cdc关联 | | 18579099920 | | 邮箱:18579099...@163.com | 签名由 网易邮箱大师 定制 在2020年11月18日 13:07,hailongwang 写道: 抱歉,描述错了。。 你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢 在 2020-11-18 11:59:52,"hailongwang" <18868816...@163.com> 写道: >我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ? >你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢? >如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。 > > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins >[2] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table >在 2020-11-18 10:34:48,"Jark Wu" 写道: >>另外,join 节点的并发可以再增加一些,提升 join 的处理性能。 >> >>On Wed, 18 Nov 2020 at 11:34, Jark Wu wrote: >> >>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。 >>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。 >>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ >>> >>> 解决办法文中也有提及: >>> >>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下: >>> >>> execution.checkpointing.interval: 10min # checkpoint间隔时间 >>> execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint >>> 失败容忍次数 >>> restart-strategy: fixed-delay # 重试策略 >>> restart-strategy.fixed-delay.attempts: 2147483647 # 重试次数 >>> >>> Best, >>> Jark >>> >>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote: >>> 即使我将not exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。 下面是截图,(我上传图片每次都看不了啥情况) https://imgchr.com/i/DeqixU https://imgchr.com/i/DeqP2T > 在 2020年11月16日,上午10:29,Jark Wu 写道: > > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。 > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。 > > Best, > Jark > > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote: > >> select >>ri.sub_clazz_number, >>prcrs.rounds, >>count(*) as num >> from >>subclazz gs >> JOIN >>(SELECT gce.number, min( gce.extension_value ) AS grade FROM >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4 GROUP >> BY gce.number) AS temp >> ON >>temp.number = gs.course_number AND temp.grade>30 >> JOIN >>right_info ri >> ON >>gs.number = ri.sub_clazz_number >> join >>wide_subclazz ws >> on >>ws.number = ri.sub_clazz_number >> join >>course gc >> on >>gc.number = ws.course_number and gc.course_category_id in (30,40) >> left join >>performance_regular_can_renewal_sign prcrs >> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2) >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null >> and not exists (select 1 from internal_staff gis where gis.user_id = >> ri.user_id) >> and not exists (select 1 from clazz_extension ce where ws.clazz_number = >> ce.number >>and ce.extension_type = 3 and ce.isdel = 0 >>and ce.extension_value in (1,3,4,7,8,11)) >> group by ri.sub_clazz_number, prcrs.rounds >> Sql代码是这样的。 >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。 >> >>> 在 2020年11月14日,下午5:53,Jark Wu 写道: >>> >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)? >>> 需要明确下,到底是什么节点慢了。 >>> >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote: >>> 我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。 有没有比较好的优化方案能缓解这样的问题? >> >> >>
Re:Re:flink sql cdc任务提交报错
The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a sink for writing table 'default_catalog.default_database.test_wide_performance_ceres_can_renewal'. Table options are: 'connector'='jdbc' 'password'='123456' 'sink.buffer-flush.interval'='2s' 'sink.buffer-flush.max-rows'='1000' 'table-name'='test_wide_performance_ceres_can_renewal' 'url'='jdbc:mysql://localhost:3306/stat' 'username'='root' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.test_wide_performance_ceres_can 以上是堆栈信息。 在 2020-11-03 11:00:44,"flink小猪" <18579099...@163.com> 写道: > > > > > > >我明明上传了图片呀 > > > > > >在 2020-11-03 10:41:57,"flink小猪" <18579099...@163.com> 写道: >>当我提交flink sql任务到集群上时,报以下错误。 >>首先感觉像是缺少依赖jar包,但是我使用反编译软件打开时发现依赖存在,那这是什么问题呢,我应该如何去解决?
Re:flink sql cdc任务提交报错
我明明上传了图片呀 在 2020-11-03 10:41:57,"flink小猪" <18579099...@163.com> 写道: >当我提交flink sql任务到集群上时,报以下错误。 >首先感觉像是缺少依赖jar包,但是我使用反编译软件打开时发现依赖存在,那这是什么问题呢,我应该如何去解决?
flink sql cdc任务提交报错
当我提交flink sql任务到集群上时,报以下错误。 首先感觉像是缺少依赖jar包,但是我使用反编译软件打开时发现依赖存在,那这是什么问题呢,我应该如何去解决?
Re:Re: Re:Re: 用hive streaming写 orc文件的问题
] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.11-1.11.1.jar:1.11.1] 这个文件是存在的,并且无法关闭,然后又会起新的文件,然后无法关闭,一直重复。 在使用sql client的过程中,并行度好像只能需要读取的文件数有关。我有一张分区表,进行查询,需要58个并行度,而我的集群只有10个,导致无法查询到数据,我应该 如果能解决这个问题呢 在 2020-08-13 15:40:54,"Rui Li" 写道: >如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束: > >val tableResult = tEnv.executeSql(insert) >// wait to finish >tableResult.getJobClient.get > .getJobExecutionResult(Thread.currentThread.getContextClassLoader) > .get > >> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? > >这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下 > >> sql client 我想要设置checkpoint生成间隔我应该在哪里设置? > >可以在flink-conf.yaml里设置execution.checkpointing.interval > > >On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <18579099...@163.com> wrote: > >> 添加不了附件,我就直接贴代码了 >> >> import java.time.Duration >> >> >> import org.apache.flink.streaming.api.{CheckpointingMode, >> TimeCharacteristic} >> import >> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, >> TableResult} >> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment >> import org.apache.flink.table.catalog.hive.HiveCatalog >> >> >> >> >> /** >> * author dinghh >> * time 2020-08-11 17:03 >> */ >> object WriteHiveStreaming { >> def main(args: Array[String]): Unit = { >> >> >> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment >> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> streamEnv.setParallelism(3) >> >> >> val tableEnvSettings = EnvironmentSettings.newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build() >> val tableEnv = StreamTableEnvironment.create(streamEnv, >> tableEnvSettings) >> >> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, >> CheckpointingMode.EXACTLY_ONCE) >> >> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, >> Duration.ofSeconds(20)) >> >> >> >> >> >> >> val catalogName = "my_catalog" >> val catalog = new HiveCatalog( >> catalogName, // catalog name >> "default",// default database >> >> "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources", >> // Hive config (hive-site.xml) directory >> "1.1.0" // Hive version >> ) >> tableEnv.registerCatalog(catalogName, catalog) >> tableEnv.useCatalog(catalogName) >> >> >> >> >> //删除流表 >> tableEnv.executeSql( >> """ >> |DROP TABLE IF EXISTS `stream_db`.`datagen_user` >> """.stripMargin) >> >> >> //创建流表 >> tableEnv.executeSql(
Re:Re:Re: 用hive streaming写 orc文件的问题
ql( """ |INSERT INTO `default`.`hive_user_parquet` |SELECT |id,name, | DATE_FORMAT(dt,'-MM-dd'), |DATE_FORMAT(dt,'HH'), |DATE_FORMAT(dt,'mm') |FROM |stream_db.datagen_user """.stripMargin) } } 在 2020-08-13 10:08:55,"flink小猪" <18579099...@163.com> 写道: 尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 2.没有设置table.exec.hive.fallback-mapred-writer。 以下是我的几个疑问 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片 这是orc生成的文件 这是parquet生成的文件 在 2020-08-12 17:33:30,"Rui Li" 写道: >Hi, > >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢? >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer? > >On Wed, Aug 12, 2020 at 5:14 PM 18579099...@163.com <18579099...@163.com> >wrote: > >> >> >> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, >> >> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多, >> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception >> in thread "main" java.lang.NoClassDefFoundError: >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。 >> -- >> 18579099...@163.com >> > > >-- >Best regards! >Rui Li
Re:Re: 用hive streaming写 orc文件的问题
尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 2.没有设置table.exec.hive.fallback-mapred-writer。 以下是我的几个疑问 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片 这是orc生成的文件 这是parquet生成的文件 在 2020-08-12 17:33:30,"Rui Li" 写道: >Hi, > >写orc表的作业有报错么?还是成功执行但是hive查不到数据呢? >看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer? > >On Wed, Aug 12, 2020 at 5:14 PM 18579099...@163.com <18579099...@163.com> >wrote: > >> >> >> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, >> >> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多, >> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception >> in thread "main" java.lang.NoClassDefFoundError: >> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval >> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。 >> -- >> 18579099...@163.com >> > > >-- >Best regards! >Rui Li
Re:回复:FlinkKafkaProducer没有写入多个topic的功能
兄弟,感谢 在 2020-07-08 11:04:40,"夏帅" 写道: 你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景 class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] { override def serialize(element: DemoBean, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, element.getValue) } } -- 发件人:18579099...@163.com <18579099...@163.com> 发送时间:2020年7月8日(星期三) 10:59 收件人:user-zh 主 题:FlinkKafkaProducer没有写入多个topic的功能 我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题), 但是FlinkKafkaProducer好像只能写入一个主题里面? 18579099...@163.com
Re:Re: 如何在窗口关闭的时候清除状态
[1].设置TTL应该也能达到相同的效果,我还是希望在窗口关闭的时候能够做一些自定义的操作(比如这里的清除状态,也许之后会有其他的操作TTL就不一样好用了) [2].KeyedProcessFunction,应该自己注册定时器把,在我的代码里面是timeWIndow().trigger().process(), ProcessWindowFunction方法我只需要处理逻辑即可,不需要管定时的窗口。 在 2020-07-05 11:56:03,"Congxian Qiu" 写道: >看上去这个需求是 一天的窗口,每个小时都 trigger 一次,希望 state 在 1 天之后进行清理。 >你可以尝试一下 TTL[1] State >另外想问一下,你自己写 ProcessWindowFunction 的话,为什么不考虑 KeyedProcessFunction[2] 呢 > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl >[2] >https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html#the-keyedprocessfunction >Best, >Congxian > > >JasonLee <17610775...@163.com> 于2020年7月4日周六 下午8:29写道: > >> 设置一下状态过期的时间呢 >> >> >> | | >> JasonLee >> | >> | >> 邮箱:17610775...@163.com >> | >> >> Signature is customized by Netease Mail Master >> >> 在2020年07月03日 14:02,18579099...@163.com 写道: >> >> 大家好,我有一个需求,我在ProcessWindowFunction算子中定义了一个valueState,我希望在窗口关闭的时候能够将状态清理。我应该在哪里清理呢? >> >> 1.刚开始我选择在ProcessWindowFunction算子的process方法中清理,但是这里会有一个问题,我事件时间窗口开1天,我写了一个trigger,每隔一个小时输出一次结果。 >> >> 如果我在process方法中清理,每隔一个小时就会被清理,而valueState中存的是我的中间结果,应该在窗口关闭的时候被清理(即一天之后)。这应该怎么办呢? >> >> >> >> 18579099...@163.com >>
Re:Re: flinksql如何控制结果输出的频率
感谢您的回复,我试了一下,的确通过您说的这种方式,可以得到一个retract流的数据。换一个场景 我需要每小时计算当天的交易额(例如两点到了,我应该输出从0点到2点的总交易额)我想获得如下结果: 2020-03-28T01:00 100 2020-03-28T02:00 280 2020-03-28T23:00 18000 2020-03-28T00:00 19520 2020-03-29T01:00 120 2020-03-29T01:00 230 我应该获得是一个不断append的数据流,而不是retract数据流。 并且设置提前发射的事件,flink应该是选取的处理时间而不是事件时间? 在 2020-03-27 15:23:39,"Benchao Li" 写道: >Hi, > >对于第二个场景,可以尝试一下fast emit: >table.exec.emit.early-fire.enabled = true >table.exec.emit.early-fire.delay = 5min > >PS: >1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature >2. window加了emit之后,会由原来输出append结果变成输出retract结果 > >Jingsong Li 于2020年3月27日周五 下午2:51写道: > >> Hi, >> >> For #1: >> 创建级联的两级window: >> - 1分钟窗口 >> - 5分钟窗口,计算只是保存数据,发送明细数据结果 >> >> Best, >> Jingsong Lee >> > > >-- > >Benchao Li >School of Electronics Engineering and Computer Science, Peking University >Tel:+86-15650713730 >Email: libenc...@gmail.com; libenc...@pku.edu.cn
flinksql如何控制结果输出的频率
我有两个需求 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办? 2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?
Re:Re: flinksql创建源表添加水位线失败
) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 39 more 另外有一个问题是当我把flink-connector-kafkajar包放入lib目录下时,我的项目没有吧flink-connector-kafka打入jar包里,运行时提示我缺少flink-connector-kafka-base包,当我再添加时flink-connector-kafka-base包时 提示我缺少org/apache/kafka/common/serialization/ByteArrayDeserializer,我观察maven依赖是发现,flink-connector-kafka_2.11-1.10.0.jar它依赖着flink-connector-kafka-base和kafka-client包,当我只把flink-connector-kafka加入lib 目录下时,由于缺少它所依赖的包,导入执行不成功。在这里,我想知道一般提交flink任务,是尽量少添加这类jar包,然后通过maven将这些依赖打到项目中去吗? 期待您的回复 在 2020-03-25 09:56:56,"Jark Wu" 写道: >Emm... 这个好奇怪,理论上 IDEA 中运行的时候可能会有问题 (Calcite bug 导致的问题),SQL CLI 中不应该有问题。 >你的集群/作业中有其他的依赖吗? 比如自己依赖了 Calcite? > >Best, >Jark > >On Tue, 24 Mar 2020 at 23:37, flink小猪 <18579099...@163.com> wrote: > >> 当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章), >> CREATE TABLE user_behavior ( >> user_id BIGINT, >> item_id BIGINT, >> category_id BIGINT, >> behavior STRING, >> ts TIMESTAMP(3), >> proctime as PROCTIME(), >> WATERMARK FOR ts as ts - INTERVAL '5' SECOND >> ) WITH ( >> 'connector.type' = 'kafka', >> 'connector.version' = 'universal', >> 'connector.topic' = 'user_behavior', >> 'connector.startup-mode' = 'earliest-offset', >> 'connector.properties.zookeeper.connect' = '192.168.1.214:2181', >> 'connector.properties.bootstrap.servers' = '192.168.1.214:9092', >> 'format.type' = 'json' >> ) >> 出现错误 >> org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier >> 'ts' >> 我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException: >> Unknown identifier 'ts' >> ,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了 >> flink-sql-connector-kafka_2.11-1.10.0.jar >> flink-json-1.10.0.jar >> 在sql-client上执行还是错误,是我缺少什么jar包吗? >> >> >> >> >> >>
flinksql创建源表添加水位线失败
当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章), CREATE TABLE user_behavior ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_behavior', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.zookeeper.connect' = '192.168.1.214:2181', 'connector.properties.bootstrap.servers' = '192.168.1.214:9092', 'format.type' = 'json' ) 出现错误 org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts' 我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts' ,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了 flink-sql-connector-kafka_2.11-1.10.0.jar flink-json-1.10.0.jar 在sql-client上执行还是错误,是我缺少什么jar包吗?