flink 1.11 sink hive table的connector设置为什么啊,尝试设置
WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
h','sink.partition-commit.policy.kind'='success-file');
也报错误
query:
streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
streamTableEnv.executeSql(
"""
@Leonard Xu,
非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
=
org.apache.flink
flink-sql-connector-kafka_${sca
不好意思 怪我灭有描述清楚
1 目前开启日志收集功能
2 目前已是 per-job模式
3 集群使用cdh flink.1.10
在 2020-07-13 11:18:46,"Yangze Guo" 写道:
>Hi,
>
>第一个问题,您可以尝试开启Yarn的日志收集功能[1]
>
>第二个问题,您可以尝试一下per-job mode [2][3]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-file
是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown
timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗?
Congxian Qiu 于2020年7月10日周五 下午7:18写道:
> Hi
> 你说的 flink-benchmarks 是指 这个仓库[1]的代码吗? 是这个仓库的代码的话,你按照 readme 能跑出一个结果(csv
> 文件,或者终端能看到最终的结果),这个结果就是 JMH 的的结果,具体的
hi、
好的,感谢
On Mon, Jul 13, 2020 at 12:07 PM Jark Wu wrote:
> Hi,
>
> Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据,
> 你可以给你的作业配上:
>
> table.exec.emit.late-fire.enabled = true
> table.exec.emit.late-fire.delay = 1min
>
> 同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window
INSERT走的就是processUpsert这个方法,当不指定PK时,生成的key会是null,然后创建一个IndexRequest。
Best,
Yangze Guo
On Mon, Jul 13, 2020 at 2:00 PM sunfulin wrote:
>
>
> hi, Leonard
> 我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。
> 我将DDL更换为之前版本的with参数(声明使用update-mode =
> ‘upsert’),不使用1.11最
hi,
感谢详细的解释和回复。那问题就清楚了。之前我们的job提交框架里统一都使用了StreamExecutionEnvironment.execute(jobName)方法,现在基于这个解释就明白了。
在 2020-07-12 22:55:34,"godfrey he" 写道:
>hi sunfulin,
>
>1.11 对 StreamTableEnvironment.execute()
>和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
>简单概述为:
>1. StreamTableEnvironme
hi
把insert into 语句中的table去掉执行.
--
Best
JasonLee
在 2020-07-13 14:01:45,"Zhou Zach" 写道:
>hi all,
>
>
>我像下面那种方式尝试,报错了
>
>
>streamTableEnv.executeSql(
>"""
>|
>|
>|SET table.sql-dialect=hive;
>|CREATE TABLE hive_table (
>| user_id STRING,
>| age INT
>|)
找到了:
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
在 2020-07-13 14:01:45,"Zhou Zach" 写道:
>hi all,
>
>
>我像下面那种方式尝试,报错了
>
>
>streamTableEnv.executeSql(
>"""
>|
>|
>|SET table.sql-dialect=hive;
>|CREATE TABLE hive_table (
>| user_id STRING,
>| age INT
hi, Leonard
我定义了一个ddl和一个dml,sql如下。ddl中没有定义PK,我观察到的现象是:这样在sink到es结果中,结果生成的id是index名,导致只有一条记录。
我将DDL更换为之前版本的with参数(声明使用update-mode =
‘upsert’),不使用1.11最新的with参数,观察到sink结果就正常了。不确定是不是我哪边配置的方式不太对,还是说使用方式有问题。
@karma...@gmail.com 我看了下给的源代码,貌似这个是处理upsert的情况,如果不声明pk的话,是不是会是processInsert?
CREATE TA
hi,
感谢大神的回复。看了下是因为我执行的SQL作业,有多个sink,在执行TableEnvironment.executeSQL时,Flink提交了多个job,在本地执行时貌似就走到了这个异常。我将多sink修改为TableEnvironment.createStatementSet来提交,就没有这个问题了。谢谢回复。
在 2020-07-13 13:51:55,"Xintong Song" 写道:
>Local execution 模式下,Flink 是无法实际控制 JVM 的 Xmx, Xms, MaxDirectMemorySize
>等参数的,
hi all,
我像下面那种方式尝试,报错了
streamTableEnv.executeSql(
"""
|
|
|SET table.sql-dialect=hive;
|CREATE TABLE hive_table (
| user_id STRING,
| age INT
|) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
| 'partition.time-extractor.timestamp-patte
Local execution 模式下,Flink 是无法实际控制 JVM 的 Xmx, Xms, MaxDirectMemorySize
等参数的,这些参数取决于你的 IDE 设置。
检查一下 idea 的 run configuration 是否有配置过 -XX:MaxDirectMemorySize。
Thank you~
Xintong Song
On Sat, Jul 11, 2020 at 3:48 PM Congxian Qiu wrote:
> Hi
>
> 这个问题可以看下是否和 releasenote[1] 中 memory configuration
>
Hello, fulin
这个问题能提供段可以复现的代码吗?
祝好,
Leonard Xu
> 在 2020年7月13日,09:50,Yangze Guo 写道:
>
> Hi,
>
> 如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
> 所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
>
> [1]
> https://github.com/apache/flink/blob/f0eeaec530e001ab02cb88
Hi, 王松
这个报错是pom中缺少了 Kafka SQL connector的依赖,你引入的依赖都是Kafka datastream connector
的依赖,正确的依赖是:flink-sql-connector-kafka-${version}_${scala.binary.version}
可以参考官网文档[1], 查看和下载SQL Client Jar. 另外, Kafka SQL connector 和 Kafka datastream
connector 同时引用是会冲突的,请根据你的需要使用。
祝好,
Leonard Xu
[1]https://ci.apac
那就要看下你是什么 Flink 版本,怎么提交到 YARN 上的,以及 YARN 的日志上的 classpath 是啥了
Best,
tison.
王松 于2020年7月13日周一 下午12:54写道:
> 各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.f
是的,社区现在正在加exactly-oncer jdbc sink实现[1]。
另外,如果要实现两阶段提交的sink的话,总是需要有能跨session的transaction机制,就是在作业挂了之后,下次起来的时候这个事务还可以abort掉或者继续提交(取决于是否已经snapshot过了)。像jdbc必须要用xa事务,用单纯的jdbc事务应该就是有问题的,因为即使在snapshot的时候precommit过了,如果作业挂掉连接中断这个事务仍然会被abort掉。
[1] https://issues.apache.org/jira/browse/FLINK-15578
各位好,写了个demo,代码如下,在本地跑没有问题,提交到yarn session上报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.
请问是什么原因导致的呢?
代码如下:
Hi,
Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据,
你可以给你的作业配上:
table.exec.emit.late-fire.enabled = true
table.exec.emit.late-fire.delay = 1min
同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window
allowLateness 。
具体可以看下 org.apache.flink.table.planner.plan.utils.WindowEmit
Hi,
可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。
祝好,
Leonard Xu
> 在 2020年7月13日,11:46,wind.fly@outlook.com 写道:
>
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka,
> 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session
> web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
> x.r
topic是几个分区呢?如果是一个分区,要加一个rebalance参数吧?
wind.fly@outlook.com 于2020年7月13日周一 上午11:46写道:
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka,
> 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session
> web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
> x.report.bi_report_fence_common_in
hello,
这应该是碰到了Hbase connector的bug [1], 用户配置的hbaseconf
相关的参数,如connector.zookeeper.quorum 不会生效,这个 bug 在1.11.0 已经修复,可以升级下版本。
在1.10.0版本上一种 walkwaround 的方式是把把这些参数放在 hbase-site.xml 的配置文件中,然后将把配置文件添加到
HADOOP_CLASSPATH中,这样Flink程序也可以加载到正确的配置。
祝好,
Leonard Xu
[1] https://issues.apache.org/jira/browse/
Hi, all:
本人使用的flink版本为flink 1.10.1, flink sql消费kafka,
当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session
web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
insert into
x.report.bi_report_fence_common_indicators
select
fence_id,
'finishedOrderCnt' as indicator_name,
TUMBLE_END(dt, INTERVAL '5' MIN
hi、
大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql
api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?
Hello:
在使用如下语句创建Flink SQL任务,执行查询报错,我想问下,是我遗漏了什么配置项导致flink在“/hbase”
node去取元数据,实际集群的hbase配置是在zk的“/hbase-unsecure” node下的
Flink 版本是1.10,hbase的t1表有数据
create table t1 (
rowkey string,
f1 ROW
) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.table-name'
Hi,
第一个问题,您可以尝试开启Yarn的日志收集功能[1]
第二个问题,您可以尝试一下per-job mode [2][3]
[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
[2]
https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
[3]
https://ci.apache.org/projects
原理大概理解了,想自己实现一个。比如kafka与mysql的实现,并想最大程度的复用些代码。
看了下源码,感觉要把现在的connector(kafka,
jdbc)中的代码都看一下,然后扣出来,再去按twophasecommitsinkfunction的实现,重组一些代码,一个个方法实现。
另外问一下,好像现在源码里的jdbc只是at-least-once实现?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
请问一下两个问题
1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看 ,除了使用es收集日志的这种方案,
还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在, 有没有好的方式或者策略 ,
可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
你可以先用 map 再用 addSink,这样他们的调用被 chain 在一起,可以达到先写入 mysql ,再写入 kafka 的目的。
datastream.map(new MySQLSinkMapFunction()).addSink(new
FlinkKafkaProducer()).
也就是将 mysql sink 伪装成了一个 MapFunction,里面先做了 写 mysql 的动作,写成功后再将数据输出到下游。
另外,如果要在 SQL 中解决这个需求的话,会比较麻烦,因为标准语法中没有这么个语法支持这个功能。
Best,
Jark
On Fri, 10 Jul 2
问题不是很常见 ,但是同一个任务,提交在flink1.10 和 flink1.10.1上都会复现, 准备尝试一下升级一下jdk试试
在 2020-07-06 16:11:17,"Congxian Qiu" 写道:
>@chenkaibit 多谢你的回复~
>
>Best,
>Congxian
>
>
>chenkaibit 于2020年7月6日周一 下午3:53写道:
>
>> hi,Congxian。我在发现这个问题时也很奇怪,但是在打印了一些日志后,确实验证了我的想法。因为 <低版本jdk+flink1.9> 和
>> <高版本jdk+1.10>
cc @Danny Chan 也许 Danny 老师知道。
On Thu, 9 Jul 2020 at 17:29, sunfulin wrote:
>
> hi,
> 我切到最新的1.11 release版本,跑同样的sql,没有抛出异常。想问下这有相关的issue么?想确认下原因。
>
>
>
>
>
>
> 在 2020-07-09 16:53:34,"sunfulin" 写道:
> >hi,
> >我使用flink 1.10.1
> >blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把j
Hi,
如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].
所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。
[1]
https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/st
hi sunfulin,
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和 St
33 matches
Mail list logo