我也有这个需求,意思就是topic里实时新增了一种日志,然后想动态创建对应新的日志的topic表,并写入到新的topic表,在一个任务中完成
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制
在2021年09月22日 11:23,Caizhi Weng 写道:
Hi!
不太明白这个需求,但如果希望发送给不同的 topic,需要给每个 topic 都定义 DDL。
如果是因为各 topic 之间的 schema 重复度比较高,只有些许字段以及 topic 名称等不同,可以看一下 DDL LIKE 语法:
不是,原因找到了,是函数多次嵌套导致,flink原始类型是not null,不能转换为string,这个报错信息真的是蛋疼,让人迷惑
在 2021-05-13 10:09:49,"allanqinjy" 写道:
>光看异常,应该是你插入了空值吧,你插入hbase的时候做个filter过滤吧,比如你的rowkey空了,你往hbase插入应该是不行的。你可以试试。
>
>
>| |
>allanqinjy
>|
>|
>allanqi...@163.com
>|
>签名由网
Mismatch of function's argument data type 'STRING NOT NULL' and actual
argument type 'STRING'.sql有些长,大概就是在执行 insert hbase sql时 报了上面的错误,请问这种错误是什么原因?
flink.version=1.12.0
Create table t1(
a varchar
)with (
connector=kafka
format=json
)
Create table t2(
a varchar
)with (
connector=filesystem
format=csv
)
SQL: Insert into t2 select a from t1
发送: {"a":[{"a1":1,"a2":2}]}
Hdfs的结果为:"[{""a1"":1,""a2"":2}]”
flink.version=1.12.0
Create table t1(
a varchar
)with (
connector=kafka
format=json
)
Create table t2(
a varchar
)with (
connector=filesystem
format=csv
)
SQL: Insert into t2 select a from t1
发送: {"a":[{"a1":1,"a2":2}]}
Hdfs的结果为:"[{""a1"":1,""a2"":2}]”
Create table t1(
a varchar
)with (
connector=kafka
format=json
)
Create table t2(
a varchar
)with (
connector=filesystem
format=csv
)
SQL: Insert into t2 select a from t1
发送: {"a":[{"a1":1,"a2":2}]}
Hdfs的结果为:"[{""a1"":1,""a2"":2}]”
问题:为什么一个双引号变成了2个双引号?
StatementSet statementSet = tableEnvironment.createStatementSet();
String sql1 = "insert into test select a,b,c from test_a_12342 /*+
OPTIONS('table-name'='test_a_1')*/";
String sql2 = "insert into test select a,b,c from test_a_12342 /*+
OPTIONS('table-name'='test_a_2')*/";
'scan.partition.column'='id',
'scan.partition.num'='15',
'scan.partition.lower-bound'='1',
'scan.partition.upper-bound'='680994'
我设置了上面这几个参数给source mysql分区,但是并没有生效,真实情况是只有一个task读的mysql全量数据
在使用flink batch sql的 union all时,任务并行度跟设置的-p参数不一致
例如 select a from t1 union all select a from t2……….
如果我-p设置了2,那么我union all了几个表,并行度就在-p基础上乘以几,-p=2 union
all了3个表,那么并行度就为变为6了,请问这块怎么限制并行度为’2’?
直接读topic,headers是空,我仅仅是想读key,不管topic是谁写入的
在2021年01月14日 16:03,酷酷的浑蛋 写道:
你意思是说,topic不是flink写入的,用flink sql就不能读到key?
在2021年01月13日 15:18,JasonLee<17610775...@163.com> 写道:
hi
你写入数据的时候设置 headers 了吗 没设置的话当然是空的了
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
官网没说在哪里读key啊
在2021年01月14日 14:52,Jark Wu 写道:
kafka 读 key fields:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#key-fields
On Wed, 13 Jan 2021 at 15:18, JasonLee <17610775...@163.com> wrote:
hi
你写入数据的时候设置 headers 了吗 没设置的话当然是空的了
-
Best Wishes
在2021年01月14日 16:03,酷酷的浑蛋 写道:
你意思是说,topic不是flink写入的,用flink sql就不能读到key?
在2021年01月13日 15:18,JasonLee<17610775...@163.com> 写道:
hi
你写入数据的时候设置 headers 了吗 没设置的话当然是空的了
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你意思是说,topic不是flink写入的,用flink sql就不能读到key?
在2021年01月13日 15:18,JasonLee<17610775...@163.com> 写道:
hi
你写入数据的时候设置 headers 了吗 没设置的话当然是空的了
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
WITH("`event_time` TIMESTAMP(3) METADATA FROM 'timestamp'," +
"`partition` BIGINT METADATA VIRTUAL," +
"`offset` BIGINT METADATA VIRTUAL," +
"`headers` MAP NOT NULL METADATA VIRTUAL,”+
在这里获取kakfa元数据时,官网没有说怎么获取kafka消息的key?,headers的信息是空的,请问怎么在flink sql中获取kafka消息key?
Flink-1.11.1, hive-2.2.0
在使用current_timestamp或者current_date函数时会报
Caused by: java.lang.NullPointerException
at
org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51)
at
flink读mysql分库分表可以自动识别吗? 还是只能一个一个读?
taskmanager.memory.process.size:
1728m1728改为2048就好了,这是啥原理taskmanager.memory.process.size: 2048m
在2020年11月4日 11:47,Yangze Guo 写道:
有更完整的am日志么?需要看一下rm那边资源申请情况。
Best,
Yangze Guo
On Wed, Nov 4, 2020 at 11:45 AM 酷酷的浑蛋 wrote:
下面是报错,说是没有资源,但资源是充足的,之后我把版本改为1.11.1,任务就可以运行了
]
atjava.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)~[?:1.8.0_191]
...25more
Causedby:java.util.concurrent.TimeoutException
...23more
在2020年11月4日 11:20,Guowei Ma 写道:
hi,
有看过am的日志没有,日志中有报什么异常么?
Best,
Guowei
On Wed, Nov 4, 2020 at 11:04 AM 酷酷的浑蛋 wrote:
flink
flink-1.11.2提交到yarn一直处于CREATED中,不会运行,flink-1.11.1没问题
资源已经分配
标题上写的就是flink1.11啊
在2020年11月2日 11:33,酷酷的浑蛋 写道:
你看历史的回复,用的就是flink1.11,最新的flink-1.11.2也试过了还是有这个问题,而且我是在flink sql中使用
在2020年11月2日 11:30,史 正超 写道:
你用的flink哪个版本,flink-1.11已经加了连接失效重连的功能了,应该没有这个问题了。如果不是flink-1.11.x版本的,也可以参考flink-1.11的jdbc-connector的实现,或者用SELECT
1 语句保活连接
你看历史的回复,用的就是flink1.11,最新的flink-1.11.2也试过了还是有这个问题,而且我是在flink sql中使用
在2020年11月2日 11:30,史 正超 写道:
你用的flink哪个版本,flink-1.11已经加了连接失效重连的功能了,应该没有这个问题了。如果不是flink-1.11.x版本的,也可以参考flink-1.11的jdbc-connector的实现,或者用SELECT
1 语句保活连接。
发件人: 酷酷的浑蛋
发送时间: 2020年11月2日 2:28
收件人: user
没有解决,隔一段时间就会报这个超时错误
在 2020-10-14 17:33:30,"superainbower" 写道:
>HI
>链接超时的问题,你后来解决了吗?能告诉下怎么解决的吗?
>| |
>superainbower
>|
>|
>superainbo...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2020年08月31日 15:57,酷酷的浑蛋 写道:
>关键是在sql中怎么设置,connector=jdbc
>
&
我用flink sql实时写入hive表时发现sink的并行度为1? 我看了FileSystemTableSink类的226行,确实设置了1,这是为什么呢?
并行度1的写入速度很慢
放到flink/lib下了
在2020年09月2日 16:52,Danny Chan 写道:
Es connector 的包放到哪个目录下了 ?
Best,
Danny Chan
在 2020年9月2日 +0800 PM3:38,酷酷的浑蛋 ,写道:
Caused by: java.lang.ClassNotFoundException:
org.elasticsearch.client.RestClientBuilder
Flink sql 写入ES:总是报上面的错误,我检查了依赖并没有冲突这个类啊,而且我解压了jar,里面是有这个类的啊
Caused by: java.lang.ClassNotFoundException:
org.elasticsearch.client.RestClientBuilder
Flink sql 写入ES:总是报上面的错误,我检查了依赖并没有冲突这个类啊,而且我解压了jar,里面是有这个类的啊
你是说让我修改mysql配置? 怎么可能允许我修改啊
在2020年09月1日 10:12,amen...@163.com 写道:
如果是mysql5.x以上的版本,url中autoReconnect参数会无效吧,
可以尝试下修改配置文件wait_timeout/interactive_out参数
best,
amenhub
发件人: 酷酷的浑蛋
发送时间: 2020-08-31 20:48
收件人: user-zh@flink.apache.org
主题: 回复: flink1.11连接mysql问题
下面是我连接mysql的配置,用的flink-1.11.1,还是报那个
1. Create hive表(...)with(...)
我发现写入hive只能根据checkpoint去提交分区?可以按照文件大小或者间隔时间来生成吗?
2. Create table (connector=filesystem,format=json) with(…)
这种方式format只能等于json? 我怎么按照分隔符写入hdfs?
,15:02,酷酷的浑蛋 写道:
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet
successfully received from the server was 52,445,041 milliseconds ago. The last
packet sent successfully to the server was 52,445,045 milliseconds ago. is
longer than the server configured value
关键是在sql中怎么设置,connector=jdbc
在2020年08月31日 15:06,13580506953<13580506...@163.com> 写道:
这个问题本质是连接活性问题,
连接数据库超时设置autoReconnect=true(mysql5以上的,设置autoReconnect=true 是无效的 只有4.x版本,起作用)
建议使用连接池druid进行连接活性保持
原始邮件
发件人: 酷酷的浑蛋
收件人: user-zh
发送时间: 2020年8月28日(周五) 15:02
主题: flink1.11连接m
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet
successfully received from the server was 52,445,041 milliseconds ago. The last
packet sent successfully to the server was 52,445,045 milliseconds ago. is
longer than the server configured value of'wait_timeout'. You
好吧,谢谢
在2020年08月25日 18:40,Benchao Li 写道:
Hi,
这个功能已经在1.12支持了[1],如果着急使用,可以cherry-pick回去试试看。
用法就是直接把这个字段声明为varchar,json format会帮你自动处理
[1] https://issues.apache.org/jira/browse/FLINK-18002
酷酷的浑蛋 于2020年8月25日周二 下午6:32写道:
还没到udf那一步,直接用create table的方式,过来的数据就是获取不到值的,
CREATE TABLE test (
a VARCHAR
'
);
在2020年08月25日 16:14,Jim Chen 写道:
这个需要你自定义UDF
酷酷的浑蛋 于2020年8月25日周二 下午3:46写道:
关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗
在2020年08月25日 15:34,taochanglian 写道:
flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。
在 2020/8/25 14:59, 酷酷的浑蛋 写道:
还是这个问题,如果字段的值有时候是json有时候是json数组
,format=json是获取不到这个字段值的,直接显示为空
在2020年08月25日 16:23,zilong xiao 写道:
直接CAST不可以吗?
酷酷的浑蛋 于2020年8月25日周二 下午3:46写道:
关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗
在2020年08月25日 15:34,taochanglian 写道:
flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。
在 2020/8/25 14:59, 酷酷的浑蛋 写道:
还是这个问题
关键是那个值不是固定的,有时候是json,有时候是json数组,没办法固定写一个,现在我只想把value当做字符串获取到,难道没有办法吗
在2020年08月25日 15:34,taochanglian 写道:
flinksql,处理json ,对象的话用row,数组的话用array获取具体的值。
在 2020/8/25 14:59, 酷酷的浑蛋 写道:
还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?
在2020年08月25日 14:05,酷酷的浑蛋 写道:
我知道了
在2020年08月25日 13:58,酷
还是这个问题,如果字段的值有时候是json有时候是json数组,那么我只想把它当做字符串显示,该怎么写?
在2020年08月25日 14:05,酷酷的浑蛋 写道:
我知道了
在2020年08月25日 13:58,酷酷的浑蛋 写道:
flink1.11
读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?
我知道了
在2020年08月25日 13:58,酷酷的浑蛋 写道:
flink1.11
读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?
flink1.11
读取json数据时format=“json”,当数据中某个字段的值是[{"a1":{"a2":"v2"}}]类似这种嵌套,flink取到的值就是空,这个怎么处理?
flink1.11启动后报这个错,然后任务就自己挂了,没有其它错误,也没有报我代码错
org.apache.hadoop.yarn.exceptions.YarnException:Containercontainer_1590424616102_807478_01_02isnothandledbythisNodeManager
atsun.reflect.NativeConstructorAccessorImpl.newInstance0(NativeMethod)~[?:1.8.0_191]
拿到的
minimum-allocation-mb 和 Yarn RM 实际使用的不一致。
Thank you~
Xintong Song
On Mon, Jul 27, 2020 at 7:42 PM 酷酷的浑蛋 wrote:
首先,flink1.9提交到yarn集群是没有问题的,同等的配置提交flink1.11.1到yarn集群就报下面的错误
2020-07-27 17:08:14,661 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
我找到问题了,我觉得我发现了一个bug,很严重,会导致flink持续占资源,一直增加
在2020年07月22日 14:08,酷酷的浑蛋 写道:
这是我的启动命令:./bin/flink run -m yarn-cluster -p 2 -ys 2 -yqu rt_constant -c
com.xx.Main -yjm 1024 -ynm RTC_TEST xx.jar
任务到yarn上后就一直在占用core,core数量和内存数量一直在增加
在2020年07月22日 12:48,JasonLee<17610775...@163.com> 写道:
HI
你使
nature is customized by Netease Mail Master
在2020年07月22日 12:44,酷酷的浑蛋 写道:
现在是为什么啊?启动任务自动占用所有core?core数量一直在增加,直到达到队列最大值,然后就卡住了,这flink1.11啥情况啊?
现在是为什么啊?启动任务自动占用所有core?core数量一直在增加,直到达到队列最大值,然后就卡住了,这flink1.11啥情况啊?
现在启动任务(yarn)怎么指定-ys 和-p 都不管用了? 自动就分配好多core?
默认启动个任务给我启动了100个core?100个container?我擦,啥情况啊,现在指定什么参数才生效啊?我默认配置也没有配置过多少个core啊,默认不是1个吗
jm里面没有日志啊,关键是配置都是一样的,我在1.9里运行就没问题,在flink1.11就一直卡在那里,不分配资源,到底启动方式改变了啥呢?
集群资源是有的,可是任务一直卡在那说没资源,这怎么办
在2020年07月21日 17:22,Shuiqiang Chen 写道:
Hi,
可以尝试在jm的log里看看是在申请哪个资源的时候超时了, 对比下所申请的资源规格和集群可用资源
Best,
Shuiqiang
酷酷的浑蛋 于2020年7月21日周二 下午4:37写道:
服了啊,这个flink1.11启动怎么净是问题啊
我1.7,1.8,1.9 都没有问题,到11就不
服了啊,这个flink1.11启动怎么净是问题啊
我1.7,1.8,1.9 都没有问题,到11就不行
./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm 1024
-ynm sql_test ./examples/batch/WordCount.jar --input
hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a
报错:
Caused by:
突问题,报错信息看起来是classpath加载到了两个相同的jar, javax.ws.rs-api-2.1.1.jar
这个jar包是你集群需要的吗?
可以把你场景说细点,比如这个问题如何复现,这样大家可以帮忙一起排查
祝好,
Leonard Xu
在 2020年7月20日,15:36,酷酷的浑蛋 写道:
Flink1.11启动时报错:
java.lang.LinkageError: ClassCastException: attempting to
castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext
这flink1.11啥情况啊,一启动就报
java.lang.LinkageError: ClassCastException: attempting to
castjar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
to
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
Flink1.11启动时报错:
java.lang.LinkageError: ClassCastException: attempting to
castjar:file:/data/rt/jar_version/sql/6.jar!/javax/ws/rs/ext/RuntimeDelegate.class
to
jar:file:/data/server/flink-1.11.0/lib/javax.ws.rs-api-2.1.1.jar!/javax/ws/rs/ext/RuntimeDelegate.class
at
找到了,谢谢
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制
在2020年07月17日 10:57,酷酷的浑蛋 写道:
我看您写了'format.type' = ‘custom',这个custom 是跟哪里关联的呢? 还是说这里要写类路径?
在2020年07月17日 10:47,夏帅 写道:
你好,这个是可以进行自定义的
参考https://jxeditor.github.io/2020/06/11/FlinkSQL%E8%87%AA%E5%AE%9A%E4%B9%89FORMAT_TYPE
我看您写了'format.type' = ‘custom',这个custom 是跟哪里关联的呢? 还是说这里要写类路径?
在2020年07月17日 10:47,夏帅 写道:
你好,这个是可以进行自定义的
参考https://jxeditor.github.io/2020/06/11/FlinkSQL%E8%87%AA%E5%AE%9A%E4%B9%89FORMAT_TYPE/
--
发件人:酷酷的浑蛋
发送时间:2020年7月17日(星期五) 10
请问flink可以自定义format吗,目前提供的format必须要进行一次数据过滤为规则数据才行,可不可以自定义format实现自己的数据格式source呢?
目前flink支持的:
| 格式 | 支持的连接器 |
| CSV | Apache Kafka, Filesystem |
| JSON | Apache Kafka, Filesystem, Elasticsearch |
| Apache Avro | Apache Kafka, Filesystem |
| Debezium CDC | Apache Kafka |
| Canal CDC | Apache Kafka |
您是说将那几个jar都放到flink/lib下吗?
在2020年06月11日 14:39,Leonard Xu 写道:
Hi
你服务器上是否配置了hadoop_classpath? 建议hbase在试用时 用 hadoop_classpath + flink-hbase
jar,不然依赖问题会比较麻烦。
祝好
Leonard Xu
在 2020年6月11日,14:24,酷酷的浑蛋 写道:
在使用flink sql ddl语句向hbase中写的时候报如下错误:
java.lang.NoClassDefFoundError: org/apache/hadoop
在使用flink sql ddl语句向hbase中写的时候报如下错误:
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
at
org.apache.flink.addons.hbase.HBaseUpsertTableSink.consumeDataStream(HBaseUpsertTableSink.java:87)
at
同问,我这里也会经常出现这种情况,我现在是写的代码自动kill,这是bug吗?
| |
apache22
邮箱:apach...@163.com
|
Signature is customized by Netease Mail Master
在2020年04月26日 11:01,Zhefu PENG 写道:
图好像挂了看不到。是不是和这两个场景描述比较相似
[1] http://apache-flink.147419.n8.nabble.com/flink-kafka-td2386.html
[2]
(AbstractKeyedStateBackend.java:328)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
... 10 more
在2020年4月17日 15:27,酷酷的浑蛋 写道
(AbstractKeyedStateBackend.java:328)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
... 10 more
在2020年4月17日 15:27,酷酷的浑蛋 写道
我在用StreamingFileSink
往hdfs写数据的时候,如果任务停止了,从前面的某个checkpoint启动(不是最新checkpoint),就会发生下面的情况:
其中part-4-9/part-4-13/part-4-14
好的,非常感谢您,我去按照您说的代码改下,非常感谢
在2020年4月17日 15:17,Benchao Li 写道:
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。
酷酷的浑蛋 于2020年4月17日周五 下午3:09写道:
我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
tableConfig.setIdleStateRetentionTime(Time.minutes(1),
Time.minutes(6));这种方式设置ttl
在2020年4月17日 14:54
ine
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
.build();
} else {
return StateTtlConfig.DISABLED;
}
}
酷酷的浑蛋 于2020年4月17日周五 下午2:47写道:
我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
在2020年4月17日 14:16,Benchao Li 写道:
这是两个问题,
- 状态只访问一次,可能不会清理。
/jira/browse/FLINK-16581
酷酷的浑蛋 于2020年4月17日周五 下午2:06写道:
其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
在2020年4月17日 13:07,Benchao Li 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。
[1] https
其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
在2020年4月17日 13:07,Benchao Li 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。
[1] https://issues.apache.org/jira/browse/FLINK-17199
酷酷的浑蛋 于2020年4月17日周五 下午12
我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
在2020年4月15日 18:04,Benchao Li 写道:
Hi,
你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
[1] https://issues.apache.org/jira/browse/FLINK-15938
酷酷的浑蛋 于
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
在2020年4月15日 18:04,Benchao Li 写道:
Hi,
你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
[1] https://issues.apache.org/jira/browse/FLINK-15938
酷酷的浑蛋 于2020年4月15日周三 下午5:40写道:
我在flink sql中设置了
我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a
当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
ExecutionEnvUtil
这个没有
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制
在2020年4月7日 12:23,苟刚 写道:
附件是两份主要代码
--
Best Wishes
Galen.K
在 2020-04-07 12:11:07,"酷酷的浑蛋" 写道:
>是不是代码中设置了从头消费,还有可能提交offset到kafka的代码中设置了false?因为你的代码应该不是全的,所以没法具体看
>
>
>| |
>apache2
是不是代码中设置了从头消费,还有可能提交offset到kafka的代码中设置了false?因为你的代码应该不是全的,所以没法具体看
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制
在2020年4月7日 12:03,苟刚 写道:
latest 不是最后消费的位置吗?
另外我一直不明白的是,如果我不新增新的算子,从savepoint启动是没有问题的。不会从头开始消费,之后新增算子后才会出现这个情况。
--
Best Wishes
Galen.K
在 2020-04-07
://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#assigning-operator-ids
[2]
https://ci.apache.org/projects/flink/flink-docs-master/ops/state/savepoints.html#faq
Best,
Yangze Guo
On Thu, Apr 2, 2020 at 6:01 PM 酷酷的浑蛋 wrote:
Failed to rollback to checkpoint/savepoint
hdfs://xxx
Failed to rollback to checkpoint/savepoint
hdfs://xxx/savepoint-9d5b7a-66c0340f6672. Cannot map checkpoint/savepoint state
for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the
operator is not available in the new program. If you want to allow to skip
this, you can
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制
- 转发邮件信息 -
发件人: 酷酷的浑蛋
发送日期: 2020年3月18日 15:15
发送至: user-zh
现在我发现个问题:flink sql实时 inner join ,结果会发生乱序,请问这是正常的吗
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制
| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制
- 转发邮件信息 -
发件人: 酷酷的浑蛋
发送日期: 2020年3月13日 19:18
发送至: user-zh@flink.apache.org
select a.x01,
udf_name(a.x02)
from (select a.x01,
a...
from tb_name) a
join
(select * from tb_name) b
70 matches
Mail list logo