Table options do not contain an option key 'connector' for discovering a connector.

2020-07-12 文章 Zhou Zach
flink 1.11 sink hive table的connector设置为什么啊,尝试设置 WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 h','sink.partition-commit.policy.kind'='success-file'); 也报错误 query: streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) streamTableEnv.executeSql( """

Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-12 文章 王松
@Leonard Xu, 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1] 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下: [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html = org.apache.flink flink-sql-connector-kafka_${sca

Re:Re: flink on yarn日志问题

2020-07-12 文章 程龙
不好意思 怪我灭有描述清楚 1 目前开启日志收集功能 2 目前已是 per-job模式 3 集群使用cdh flink.1.10 在 2020-07-13 11:18:46,"Yangze Guo" 写道: >Hi, > >第一个问题,您可以尝试开启Yarn的日志收集功能[1] > >第二个问题,您可以尝试一下per-job mode [2][3] > >[1] >https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-file

Re: flink-benchmarks使用求助

2020-07-12 文章 zilong xiao
是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗? Congxian Qiu 于2020年7月10日周五 下午7:18写道: > Hi > 你说的 flink-benchmarks 是指 这个仓库[1]的代码吗? 是这个仓库的代码的话,你按照 readme 能跑出一个结果(csv > 文件,或者终端能看到最终的结果),这个结果就是 JMH 的的结果,具体的

Re: flink sql 侧输出

2020-07-12 文章 Dream-底限
hi、 好的,感谢 On Mon, Jul 13, 2020 at 12:07 PM Jark Wu wrote: > Hi, > > Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据, > 你可以给你的作业配上: > > table.exec.emit.late-fire.enabled = true > table.exec.emit.late-fire.delay = 1min > > 同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window

Re: Re: flink 1.11 es未定义pk的sink问题

2020-07-12 文章 Yangze Guo
INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。 Best, Yangze Guo On Mon, Jul 13, 2020 at 2:00 PM sunfulin wrote: > > > hi, Leonard > 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。 > 我将DDL更换为之前版本的with参数(声明使用update-mode = > ‘upsert’),不使用1.11最

Re:Re: flink 1.11 sql作业提交JM报错

2020-07-12 文章 sunfulin
hi, 感谢详细的解释和回复。那问题就清楚了。之前我们的job提交框架里统一都使用了StreamExecutionEnvironment.execute(jobName)方法,现在基于这个解释就明白了。 在 2020-07-12 22:55:34,"godfrey he" 写道: >hi sunfulin, > >1.11 对 StreamTableEnvironment.execute() >和 StreamExecutionEnvironment.execute() 的执行方式有所调整, >简单概述为: >1. StreamTableEnvironme

Re:how to set table.sql-dialect in flink1.11 StreamTableEnvironment

2020-07-12 文章 JasonLee
hi 把insert into 语句中的table去掉执行. -- Best JasonLee 在 2020-07-13 14:01:45,"Zhou Zach" 写道: >hi all, > > >我像下面那种方式尝试,报错了 > > >streamTableEnv.executeSql( >""" >| >| >|SET table.sql-dialect=hive; >|CREATE TABLE hive_table ( >| user_id STRING, >| age INT >|)

Re:how to set table.sql-dialect in flink1.11 StreamTableEnvironment

2020-07-12 文章 Zhou Zach
找到了: tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); 在 2020-07-13 14:01:45,"Zhou Zach" 写道: >hi all, > > >我像下面那种方式尝试,报错了 > > >streamTableEnv.executeSql( >""" >| >| >|SET table.sql-dialect=hive; >|CREATE TABLE hive_table ( >| user_id STRING, >| age INT

Re:Re: flink 1.11 es未定义pk的sink问题

2020-07-12 文章 sunfulin
hi, Leonard 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。 我将DDL更换为之前版本的with参数(声明使用update-mode = ‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。 @karma...@gmail.com 我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert? CREATE TA

Re:Re: flink 1.11 local execution oom问题

2020-07-12 文章 sunfulin
hi, 感谢大神的回复。看了下是因为我执行的SQL作业,有多个sink,在执行TableEnvironment.executeSQL时,Flink提交了多个job,在本地执行时貌似就走到了这个异常。我将多sink修改为TableEnvironment.createStatementSet来提交,就没有这个问题了。谢谢回复。 在 2020-07-13 13:51:55,"Xintong Song" 写道: >Local execution 模式下,Flink 是无法实际控制 JVM 的 Xmx, Xms, MaxDirectMemorySize >等参数的,

how to set table.sql-dialect in flink1.11 StreamTableEnvironment

2020-07-12 文章 Zhou Zach
hi all, 我像下面那种方式尝试,报错了 streamTableEnv.executeSql( """ | | |SET table.sql-dialect=hive; |CREATE TABLE hive_table ( | user_id STRING, | age INT |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( | 'partition.time-extractor.timestamp-patte

Re: flink 1.11 local execution oom问题

2020-07-12 文章 Xintong Song
Local execution 模式下,Flink 是无法实际控制 JVM 的 Xmx, Xms, MaxDirectMemorySize 等参数的,这些参数取决于你的 IDE 设置。 检查一下 idea 的 run configuration 是否有配置过 -XX:MaxDirectMemorySize。 Thank you~ Xintong Song On Sat, Jul 11, 2020 at 3:48 PM Congxian Qiu wrote: > Hi > > 这个问题可以看下是否和 releasenote[1] 中 memory configuration >

Re: flink 1.11 es未定义pk的sink问题

2020-07-12 文章 Leonard Xu
Hello, fulin 这个问题能提供段可以复现的代码吗? 祝好, Leonard Xu > 在 2020年7月13日,09:50,Yangze Guo 写道: > > Hi, > > 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2]. > 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。 > > [1] > https://github.com/apache/flink/blob/f0eeaec530e001ab02cb88

Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-12 文章 Leonard Xu
Hi, 王松 这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector 的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version} 可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka datastream connector 同时引用是会冲突的,请根据你的需要使用。 祝好, Leonard Xu [1]https://ci.apac

Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-12 文章 tison
那就要看下你是什么 Flink 版本,怎么提交到 YARN 上的,以及 YARN 的日志上的 classpath 是啥了 Best, tison. 王松 于2020年7月13日周一 下午12:54写道: > 各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错: > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'kafka' that implements > 'org.apache.f

Re: Re: Re: Re: Flink 多Sink 数据一致性保证

2020-07-12 文章 Yun Gao
是的,社区现在正在加exactly-oncer jdbc sink实现[1]。 另外,如果要实现两阶段提交的sink的话,总是需要有能跨session的transaction机制,就是在作业挂了之后,下次起来的时候这个事务还可以abort掉或者继续提交(取决于是否已经snapshot过了)。像jdbc必须要用xa事务,用单纯的jdbc事务应该就是有问题的,因为即使在snapshot的时候precommit过了,如果作业挂掉连接中断这个事务仍然会被abort掉。 [1] https://issues.apache.org/jira/browse/FLINK-15578

flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-12 文章 王松
各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. 请问是什么原因导致的呢? 代码如下:

Re: flink sql 侧输出

2020-07-12 文章 Jark Wu
Hi, Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据, 你可以给你的作业配上: table.exec.emit.late-fire.enabled = true table.exec.emit.late-fire.delay = 1min 同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window allowLateness 。 具体可以看下 org.apache.flink.table.planner.plan.utils.WindowEmit

Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

2020-07-12 文章 Leonard Xu
Hi, 可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。 祝好, Leonard Xu > 在 2020年7月13日,11:46,wind.fly@outlook.com 写道: > > Hi, all: > 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, > 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session > web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下: > insert into > x.r

Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

2020-07-12 文章 zilong xiao
topic是几个分区呢?如果是一个分区,要加一个rebalance参数吧? wind.fly@outlook.com 于2020年7月13日周一 上午11:46写道: > Hi, all: > 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, > 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session > web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下: > insert into > x.report.bi_report_fence_common_in

Re: flink sql连接HBase报错

2020-07-12 文章 Leonard Xu
hello, 这应该是碰到了Hbase connector的bug [1], 用户配置的hbaseconf 相关的参数,如connector.zookeeper.quorum 不会生效,这个 bug 在1.11.0 已经修复,可以升级下版本。 在1.10.0版本上一种 walkwaround 的方式是把把这些参数放在 hbase-site.xml 的配置文件中,然后将把配置文件添加到 HADOOP_CLASSPATH中,这样Flink程序也可以加载到正确的配置。 祝好, Leonard Xu [1] https://issues.apache.org/jira/browse/

flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

2020-07-12 文章 wind.fly....@outlook.com
Hi, all: 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下: insert into x.report.bi_report_fence_common_indicators select fence_id, 'finishedOrderCnt' as indicator_name, TUMBLE_END(dt, INTERVAL '5' MIN

flink sql 侧输出

2020-07-12 文章 Dream-底限
hi、 大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?

flink sql连接HBase报错

2020-07-12 文章 flink_learner
Hello: 在使用如下语句创建Flink SQL任务,执行查询报错,我想问下,是我遗漏了什么配置项导致flink在“/hbase” node去取元数据,实际集群的hbase配置是在zk的“/hbase-unsecure” node下的 Flink 版本是1.10,hbase的t1表有数据 create table t1 ( rowkey string, f1 ROW ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name'

Re: flink on yarn日志问题

2020-07-12 文章 Yangze Guo
Hi, 第一个问题,您可以尝试开启Yarn的日志收集功能[1] 第二个问题,您可以尝试一下per-job mode [2][3] [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files [2] https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode [3] https://ci.apache.org/projects

Re: Re: Re: Flink 多Sink 数据一致性保证

2020-07-12 文章 jindy_liu
原理大概理解了,想自己实现一个。比如kafka与mysql的实现,并想最大程度的复用些代码。 看了下源码,感觉要把现在的connector(kafka, jdbc)中的代码都看一下,然后扣出来,再去按twophasecommitsinkfunction的实现,重组一些代码,一个个方法实现。 另外问一下,好像现在源码里的jdbc只是at-least-once实现? -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink on yarn日志问题

2020-07-12 文章 程龙
请问一下两个问题 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看 ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在, 有没有好的方式或者策略 , 可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-12 文章 Jark Wu
你可以先用 map 再用 addSink,这样他们的调用被 chain 在一起,可以达到先写入 mysql ,再写入 kafka 的目的。 datastream.map(new MySQLSinkMapFunction()).addSink(new FlinkKafkaProducer()). 也就是将 mysql sink 伪装成了一个 MapFunction,里面先做了 写 mysql 的动作,写成功后再将数据输出到下游。 另外,如果要在 SQL 中解决这个需求的话,会比较麻烦,因为标准语法中没有这么个语法支持这个功能。 Best, Jark On Fri, 10 Jul 2

Re:Re: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-12 文章 程龙
问题不是很常见 ,但是同一个任务,提交在flink1.10 和 flink1.10.1上都会复现, 准备尝试一下升级一下jdk试试 在 2020-07-06 16:11:17,"Congxian Qiu" 写道: >@chenkaibit 多谢你的回复~ > >Best, >Congxian > > >chenkaibit 于2020年7月6日周一 下午3:53写道: > >> hi,Congxian。我在发现这个问题时也很奇怪,但是在打印了一些日志后,确实验证了我的想法。因为 <低版本jdk+flink1.9> 和 >> <高版本jdk+1.10>

Re: flink 双流join报错,java.lang.AssertionError

2020-07-12 文章 Jark Wu
cc @Danny Chan 也许 Danny 老师知道。 On Thu, 9 Jul 2020 at 17:29, sunfulin wrote: > > hi, > 我切到最新的1.11 release版本,跑同样的sql,没有抛出异常。想问下这有相关的issue么?想确认下原因。 > > > > > > > 在 2020-07-09 16:53:34,"sunfulin" 写道: > >hi, > >我使用flink 1.10.1 > >blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把j

Re: flink 1.11 es未定义pk的sink问题

2020-07-12 文章 Yangze Guo
Hi, 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2]. 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。 [1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/st

Re: flink 1.11 sql作业提交JM报错

2020-07-12 文章 godfrey he
hi sunfulin, 1.11 对 StreamTableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 的执行方式有所调整, 简单概述为: 1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业; 2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业; 3. 新引入的 TableEnvironment.executeSql() 和 St