回复:请教设置uid的最佳实践

2020-11-25 文章 flink
你这里绝大多数都不用设置吧


| |
18579099920
|
|
邮箱:18579099...@163.com
|

签名由 网易邮箱大师 定制

在2020年11月26日 12:09,范瑞 写道:
无状态算子可以不设置


Best
fanrui



---原始邮件---
发件人: "kingdomad"

回复:Flink cdc 多表关联处理延迟很大

2020-11-17 文章 flink
cdc关联


| |
18579099920
|
|
邮箱:18579099...@163.com
|

签名由 网易邮箱大师 定制

在2020年11月18日 13:07,hailongwang 写道:
抱歉,描述错了。。
你的业务需求是流表数据与 CDC mysql 流数据互相关联还是 CDC mysql 流数据去关联维表数据的呢

在 2020-11-18 11:59:52,"hailongwang" <18868816...@163.com> 写道:
>我看你的 SQL 和 截图上的算子名称,应该是用的流表 JOIN[1],而不是维表 JOIN[2] ?
>你的业务需求是流表数据与 CDC mysql 数据互相关联还是流表单边去关联 CDC mysql 数据呢?
>如果是流表 JOIN 的话,也可以看下是否有 join key数据倾斜问题导致单个 task 压力大,而导致 checkpoint 不成功。
>
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#regular-joins
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table
>在 2020-11-18 10:34:48,"Jark Wu"  写道:
>>另外,join 节点的并发可以再增加一些,提升 join 的处理性能。
>>
>>On Wed, 18 Nov 2020 at 11:34, Jark Wu  wrote:
>>
>>> 估计是你的全量数据太大,但是 join 节点处理性能太慢,导致半小时内全量数据还没有处理完,而 checkpoint 超时了。
>>> 注意全量数据阶段,是做不了 checkpoint 的。 具体可以看下这篇文章的第四点。
>>> https://mp.weixin.qq.com/s/Mfn-fFegb5wzI8BIHhNGvQ
>>>
>>> 解决办法文中也有提及:
>>>
>>> 解决办法:在 flink-conf.yaml 配置 failed checkpoint 容忍次数,以及失败重启策略,如下:
>>>
>>> execution.checkpointing.interval: 10min   # checkpoint间隔时间
>>> execution.checkpointing.tolerable-failed-checkpoints: 100  # checkpoint
>>> 失败容忍次数
>>> restart-strategy: fixed-delay  # 重试策略
>>> restart-strategy.fixed-delay.attempts: 2147483647   # 重试次数
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 18 Nov 2020 at 11:28, 丁浩浩 <18579099...@163.com> wrote:
>>>
 即使我将not
 exists改成了join,join节点的checkpoint也无法完成,是我设置的checkpint时间太短了嘛,我设置的是每隔半小时发起checkpoint一次,超时时间也是半小时。
 下面是截图,(我上传图片每次都看不了啥情况)
 https://imgchr.com/i/DeqixU
 https://imgchr.com/i/DeqP2T

 > 在 2020年11月16日,上午10:29,Jark Wu  写道:
 >
 > 瓶颈应该在两个 not exists 上面,not exists 目前只能单并发来做,所以无法水平扩展性能。
 > 可以考虑把 not exists 替换成其他方案,比如 udf,维表 join。
 >
 > Best,
 > Jark
 >
 > On Mon, 16 Nov 2020 at 10:05, 丁浩浩 <18579099...@163.com> wrote:
 >
 >> select
 >>ri.sub_clazz_number,
 >>prcrs.rounds,
 >>count(*) as num
 >> from
 >>subclazz gs
 >> JOIN
 >>(SELECT gce.number, min( gce.extension_value ) AS grade FROM
 >> course_extension gce WHERE gce.isdel = 0 AND gce.extension_type = 4
 GROUP
 >> BY gce.number) AS temp
 >> ON
 >>temp.number = gs.course_number AND temp.grade>30
 >> JOIN
 >>right_info ri
 >> ON
 >>gs.number = ri.sub_clazz_number
 >> join
 >>wide_subclazz ws
 >> on
 >>ws.number = ri.sub_clazz_number
 >> join
 >>course gc
 >> on
 >>gc.number = ws.course_number and gc.course_category_id in (30,40)
 >> left join
 >>performance_regular_can_renewal_sign prcrs
 >> on prcrs.order_number = ri.order_number andprcrs.rounds in (1,2)
 >> where ri.is_del = 0 and ri.end_idx = -1 and prcrs.rounds is not null
 >> and not exists (select 1 from internal_staff gis where gis.user_id =
 >> ri.user_id)
 >> and not exists (select 1 from clazz_extension ce where ws.clazz_number
 =
 >> ce.number
 >>and ce.extension_type = 3 and ce.isdel = 0
 >>and ce.extension_value in (1,3,4,7,8,11))
 >> group by ri.sub_clazz_number, prcrs.rounds
 >> Sql代码是这样的。
 >> 瓶颈在所有的join节点上,每次的checkpoint无法完成的节点都是join节点。
 >>
 >>> 在 2020年11月14日,下午5:53,Jark Wu  写道:
 >>>
 >>> 能展示下你的代码吗?是用的维表关联的语法 (FOR SYSTEM TIME AS OF)?
 >>> 需要明确下,到底是什么节点慢了。
 >>>
 >>> On Fri, 13 Nov 2020 at 19:02, 丁浩浩 <18579099...@163.com> wrote:
 >>>
  我用flink cdc 对mysql的表进行关联查询,发现flink只能两张表关联之后再跟下一张表关联,导致最后落库的延迟非常大。
  有没有比较好的优化方案能缓解这样的问题?
 >>
 >>
 >>





Re:Re:flink sql cdc任务提交报错

2020-11-02 文章 flink


 The program finished with the following exception:




org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Unable to create a sink for writing table 
'default_catalog.default_database.test_wide_performance_ceres_can_renewal'.




Table options are:




'connector'='jdbc'

'password'='123456'

'sink.buffer-flush.interval'='2s'

'sink.buffer-flush.max-rows'='1000'

'table-name'='test_wide_performance_ceres_can_renewal'

'url'='jdbc:mysql://localhost:3306/stat'

'username'='root'

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)

at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)

at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)

at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)

at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

Caused by: org.apache.flink.table.api.ValidationException: Unable to create a 
sink for writing table 
'default_catalog.default_database.test_wide_performance_ceres_can




以上是堆栈信息。








在 2020-11-03 11:00:44,"flink小猪" <18579099...@163.com> 写道:
>
>
>
>
>
>
>我明明上传了图片呀
>
>
>
>
>
>在 2020-11-03 10:41:57,"flink小猪" <18579099...@163.com> 写道:
>>当我提交flink sql任务到集群上时,报以下错误。
>>首先感觉像是缺少依赖jar包,但是我使用反编译软件打开时发现依赖存在,那这是什么问题呢,我应该如何去解决?


Re:flink sql cdc任务提交报错

2020-11-02 文章 flink






我明明上传了图片呀





在 2020-11-03 10:41:57,"flink小猪" <18579099...@163.com> 写道:
>当我提交flink sql任务到集群上时,报以下错误。
>首先感觉像是缺少依赖jar包,但是我使用反编译软件打开时发现依赖存在,那这是什么问题呢,我应该如何去解决?


flink sql cdc任务提交报错

2020-11-02 文章 flink
当我提交flink sql任务到集群上时,报以下错误。
首先感觉像是缺少依赖jar包,但是我使用反编译软件打开时发现依赖存在,那这是什么问题呢,我应该如何去解决?

Re:Re: Re:Re: 用hive streaming写 orc文件的问题

2020-08-14 文章 flink
]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource.run(DataGeneratorSource.java:82)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1]
这个文件是存在的,并且无法关闭,然后又会起新的文件,然后无法关闭,一直重复。
在使用sql 
client的过程中,并行度好像只能需要读取的文件数有关。我有一张分区表,进行查询,需要58个并行度,而我的集群只有10个,导致无法查询到数据,我应该
如果能解决这个问题呢














在 2020-08-13 15:40:54,"Rui Li"  写道:
>如果是IDE里执行的话,tableEnv.executeSql是马上返回的,然后就退出了,可以用类似这种写法等作业结束:
>
>val tableResult = tEnv.executeSql(insert)
>// wait to finish
>tableResult.getJobClient.get
>  .getJobExecutionResult(Thread.currentThread.getContextClassLoader)
>  .get
>
>> 为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
>
>这里其实是缺少orc的依赖,按说只有table.exec.hive.fallback-mapred-writer设置为false的时候才会发生,我后面修复一下
>
>> sql client 我想要设置checkpoint生成间隔我应该在哪里设置?
>
>可以在flink-conf.yaml里设置execution.checkpointing.interval
>
>
>On Thu, Aug 13, 2020 at 10:23 AM flink小猪 <18579099...@163.com> wrote:
>
>> 添加不了附件,我就直接贴代码了
>>
>> import java.time.Duration
>>
>>
>> import org.apache.flink.streaming.api.{CheckpointingMode,
>> TimeCharacteristic}
>> import
>> org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect,
>> TableResult}
>> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>> import org.apache.flink.table.catalog.hive.HiveCatalog
>>
>>
>>
>>
>> /**
>>   * author dinghh
>>   * time 2020-08-11 17:03
>>   */
>> object WriteHiveStreaming {
>> def main(args: Array[String]): Unit = {
>>
>>
>> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> streamEnv.setParallelism(3)
>>
>>
>> val tableEnvSettings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build()
>> val tableEnv = StreamTableEnvironment.create(streamEnv,
>> tableEnvSettings)
>>
>> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
>> CheckpointingMode.EXACTLY_ONCE)
>>
>> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
>> Duration.ofSeconds(20))
>>
>>
>>
>>
>>
>>
>> val catalogName = "my_catalog"
>> val catalog = new HiveCatalog(
>> catalogName,  // catalog name
>> "default",// default database
>>
>> "D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources",
>> // Hive config (hive-site.xml) directory
>> "1.1.0"   // Hive version
>> )
>> tableEnv.registerCatalog(catalogName, catalog)
>> tableEnv.useCatalog(catalogName)
>>
>>
>>
>>
>> //删除流表
>> tableEnv.executeSql(
>> """
>>   |DROP TABLE IF EXISTS `stream_db`.`datagen_user`
>> """.stripMargin)
>>
>>
>> //创建流表
>> tableEnv.executeSql(

Re:Re:Re: 用hive streaming写 orc文件的问题

2020-08-12 文章 flink
ql(
"""
  |INSERT INTO `default`.`hive_user_parquet`
  |SELECT
  |id,name,
  |    DATE_FORMAT(dt,'-MM-dd'),
  |DATE_FORMAT(dt,'HH'),
  |DATE_FORMAT(dt,'mm')
  |FROM
  |stream_db.datagen_user
""".stripMargin)


}


}


















在 2020-08-13 10:08:55,"flink小猪" <18579099...@163.com> 写道:




尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。




1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。
2.没有设置table.exec.hive.fallback-mapred-writer。
以下是我的几个疑问
1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片
 
这是orc生成的文件

这是parquet生成的文件





在 2020-08-12 17:33:30,"Rui Li"  写道:
>Hi,
>
>写orc表的作业有报错么?还是成功执行但是hive查不到数据呢?
>看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer?
>
>On Wed, Aug 12, 2020 at 5:14 PM 18579099...@163.com <18579099...@163.com>
>wrote:
>
>>
>>
>> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
>>
>> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
>> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
>> in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval
>> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。
>> --
>> 18579099...@163.com
>>
>
>
>-- 
>Best regards!
>Rui Li






 

Re:Re: 用hive streaming写 orc文件的问题

2020-08-12 文章 flink



尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。




1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。
2.没有设置table.exec.hive.fallback-mapred-writer。
以下是我的几个疑问
1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片
 
这是orc生成的文件

这是parquet生成的文件





在 2020-08-12 17:33:30,"Rui Li"  写道:
>Hi,
>
>写orc表的作业有报错么?还是成功执行但是hive查不到数据呢?
>看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer?
>
>On Wed, Aug 12, 2020 at 5:14 PM 18579099...@163.com <18579099...@163.com>
>wrote:
>
>>
>>
>> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
>>
>> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
>> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
>> in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval
>> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。
>> --
>> 18579099...@163.com
>>
>
>
>-- 
>Best regards!
>Rui Li


Re:回复:FlinkKafkaProducer没有写入多个topic的功能

2020-07-07 文章 flink
兄弟,感谢
















在 2020-07-08 11:04:40,"夏帅"  写道:

你好,可以尝试自定义KafkaSerializationSchema来实现你的业务场景
class DemoSerializationSchema extends KafkaSerializationSchema[DemoBean] {
override def serialize(element: DemoBean, timestamp: lang.Long): 
ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]](element.getTopic, element.getValue)
  }
}
--
发件人:18579099...@163.com <18579099...@163.com>
发送时间:2020年7月8日(星期三) 10:59
收件人:user-zh 
主 题:FlinkKafkaProducer没有写入多个topic的功能


我有一个需求是通过读取一个kafka的主题的数据经过flink处理再写入到多个kafka的主题中(写入的主题是动态的,数据中能解析到需要写入到的目的地主题),
但是FlinkKafkaProducer好像只能写入一个主题里面?



18579099...@163.com




Re:Re: 如何在窗口关闭的时候清除状态

2020-07-06 文章 flink



[1].设置TTL应该也能达到相同的效果,我还是希望在窗口关闭的时候能够做一些自定义的操作(比如这里的清除状态,也许之后会有其他的操作TTL就不一样好用了)
[2].KeyedProcessFunction,应该自己注册定时器把,在我的代码里面是timeWIndow().trigger().process(), 
ProcessWindowFunction方法我只需要处理逻辑即可,不需要管定时的窗口。














在 2020-07-05 11:56:03,"Congxian Qiu"  写道:
>看上去这个需求是 一天的窗口,每个小时都 trigger 一次,希望 state 在 1 天之后进行清理。
>你可以尝试一下 TTL[1] State
>另外想问一下,你自己写 ProcessWindowFunction 的话,为什么不考虑 KeyedProcessFunction[2] 呢
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/state/state.html#%E7%8A%B6%E6%80%81%E6%9C%89%E6%95%88%E6%9C%9F-ttl
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/operators/process_function.html#the-keyedprocessfunction
>Best,
>Congxian
>
>
>JasonLee <17610775...@163.com> 于2020年7月4日周六 下午8:29写道:
>
>> 设置一下状态过期的时间呢
>>
>>
>> | |
>> JasonLee
>> |
>> |
>> 邮箱:17610775...@163.com
>> |
>>
>> Signature is customized by Netease Mail Master
>>
>> 在2020年07月03日 14:02,18579099...@163.com 写道:
>>
>> 大家好,我有一个需求,我在ProcessWindowFunction算子中定义了一个valueState,我希望在窗口关闭的时候能够将状态清理。我应该在哪里清理呢?
>>
>> 1.刚开始我选择在ProcessWindowFunction算子的process方法中清理,但是这里会有一个问题,我事件时间窗口开1天,我写了一个trigger,每隔一个小时输出一次结果。
>>
>> 如果我在process方法中清理,每隔一个小时就会被清理,而valueState中存的是我的中间结果,应该在窗口关闭的时候被清理(即一天之后)。这应该怎么办呢?
>>
>>
>>
>> 18579099...@163.com
>>


Re:Re: flinksql如何控制结果输出的频率

2020-03-28 文章 flink



感谢您的回复,我试了一下,的确通过您说的这种方式,可以得到一个retract流的数据。换一个场景
我需要每小时计算当天的交易额(例如两点到了,我应该输出从0点到2点的总交易额)我想获得如下结果:
2020-03-28T01:00 100
2020-03-28T02:00 280

2020-03-28T23:00 18000
2020-03-28T00:00 19520
2020-03-29T01:00 120
2020-03-29T01:00 230
我应该获得是一个不断append的数据流,而不是retract数据流。
并且设置提前发射的事件,flink应该是选取的处理时间而不是事件时间?








在 2020-03-27 15:23:39,"Benchao Li"  写道:
>Hi,
>
>对于第二个场景,可以尝试一下fast emit:
>table.exec.emit.early-fire.enabled = true
>table.exec.emit.early-fire.delay = 5min
>
>PS:
>1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
>2. window加了emit之后,会由原来输出append结果变成输出retract结果
>
>Jingsong Li  于2020年3月27日周五 下午2:51写道:
>
>> Hi,
>>
>> For #1:
>> 创建级联的两级window:
>> - 1分钟窗口
>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>>
>> Best,
>> Jingsong Lee
>>
>
>
>-- 
>
>Benchao Li
>School of Electronics Engineering and Computer Science, Peking University
>Tel:+86-15650713730
>Email: libenc...@gmail.com; libenc...@pku.edu.cn


flinksql如何控制结果输出的频率

2020-03-26 文章 flink
我有两个需求
1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办?
2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?

Re:Re: flinksql创建源表添加水位线失败

2020-03-24 文章 flink
)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 39 more


另外有一个问题是当我把flink-connector-kafkajar包放入lib目录下时,我的项目没有吧flink-connector-kafka打入jar包里,运行时提示我缺少flink-connector-kafka-base包,当我再添加时flink-connector-kafka-base包时
提示我缺少org/apache/kafka/common/serialization/ByteArrayDeserializer,我观察maven依赖是发现,flink-connector-kafka_2.11-1.10.0.jar它依赖着flink-connector-kafka-base和kafka-client包,当我只把flink-connector-kafka加入lib
目录下时,由于缺少它所依赖的包,导入执行不成功。在这里,我想知道一般提交flink任务,是尽量少添加这类jar包,然后通过maven将这些依赖打到项目中去吗?
期待您的回复


















在 2020-03-25 09:56:56,"Jark Wu"  写道:
>Emm... 这个好奇怪,理论上 IDEA 中运行的时候可能会有问题 (Calcite bug 导致的问题),SQL CLI 中不应该有问题。
>你的集群/作业中有其他的依赖吗? 比如自己依赖了 Calcite?
>
>Best,
>Jark
>
>On Tue, 24 Mar 2020 at 23:37, flink小猪 <18579099...@163.com> wrote:
>
>> 当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章),
>> CREATE TABLE user_behavior (
>> user_id BIGINT,
>> item_id BIGINT,
>> category_id BIGINT,
>> behavior STRING,
>> ts TIMESTAMP(3),
>> proctime as PROCTIME(),
>> WATERMARK FOR ts as ts - INTERVAL '5' SECOND
>> ) WITH (
>> 'connector.type' = 'kafka',
>> 'connector.version' = 'universal',
>> 'connector.topic' = 'user_behavior',
>> 'connector.startup-mode' = 'earliest-offset',
>> 'connector.properties.zookeeper.connect' = '192.168.1.214:2181',
>> 'connector.properties.bootstrap.servers' = '192.168.1.214:9092',
>> 'format.type' = 'json'
>> )
>> 出现错误
>> org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier
>> 'ts'
>> 我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException:
>> Unknown identifier 'ts'
>> ,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了
>> flink-sql-connector-kafka_2.11-1.10.0.jar
>> flink-json-1.10.0.jar
>> 在sql-client上执行还是错误,是我缺少什么jar包吗?
>>
>>
>>
>>
>>
>>


flinksql创建源表添加水位线失败

2020-03-24 文章 flink
当我尝试通过sql client创建kafka源表时(这里借鉴了云邪大佬的《Demo:基于 Flink SQL 构建流式应用》文章),
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),
WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = '192.168.1.214:2181',
'connector.properties.bootstrap.servers' = '192.168.1.214:9092',
'format.type' = 'json'
)
出现错误
org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'ts'
我以为是我的建表语句有问题,但是当我通过代码(在idea上运行)创建这个表时,没有任何问题,数据也是可以正常查询到。当我提交到集群上运行时又出现org.apache.calcite.sql.validate.SqlValidatorException:
 Unknown identifier 'ts'
,我猜测是jar不全的问题,但是我按照云邪大佬的文章导入了
flink-sql-connector-kafka_2.11-1.10.0.jar
flink-json-1.10.0.jar
在sql-client上执行还是错误,是我缺少什么jar包吗?