我之前对源码进行了修复,测试的时候没有恢复之前的源码状态,后来发现Map这种方式是不可以的
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi、
我想到是一个实现方案是在flink端ddl建立lookup表的时候,一张flink表对应上面说的那个外部子查询虚拟表,相当于flink建了一个视图吧
Dream-底限 于2020年10月14日周三 下午2:23写道:
> hi、
>
> 》》你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。
> 是的,可以理解为用一个key查询一个视图,这个视图来自于多表关联;在不做视图的情况下,直接点查外部系统的子查询,在flink端依然是原查询样式
> 依然是:JOIN t
hi、
》》你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。
是的,可以理解为用一个key查询一个视图,这个视图来自于多表关联;在不做视图的情况下,直接点查外部系统的子查询,在flink端依然是原查询样式 依然是:
JOIN table2 FOR SYSTEM_TIME AS OF
table1.proctime,只不过table2不再是一个物理实表,如:table2=(select
col from table)
Leonard Xu 于2020年10月13日周二 下
试了下一种解决方案,如下,可以调整sql并行度。
val table1: Table = stenv.sqlQuery("select * from test")
val schema = table1.getSchema
val table2 = stenv.fromDataStream(table1.toAppendStream[Row].map(item
=> Row.of(item.getField(0), item.getField(1)))(new
RowTypeInfo(schema.getFieldTypes.toList.take(2).toArray,
schema.
想获取到的话其实可以通过 REST API 去如下图的 metrics 处获取作业 source 往下 send
的数据量和速度,不过这个是单个并行度的,可以去将每个并行度的累加起来。
http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-10-14-060508.png
[image: image.png]
Best
zhisheng
Kevin Liu 于2020年10月14日周三 上午12:35写道:
> 可以参考 https://blog.csdn.net/weixin_41608066/article/detai
Benchao Li的那个方法是对的,avro的一个bug:
private static AvroToRowDataConverter createMapConverter(LogicalType type) {
final AvroToRowDataConverter keyConverter =
createConverter(DataTypes.STRING().getLogicalType());
final AvroToRowDataConverter valueConverter =
createConverter(extractValueTypeToAvro
??:
https://paste.ubuntu.com/p/KqpKwTw5zH/
$FLINK_HOME/bin/flink run -py /wordcount.py
data.output()
确定吗?我这边测试还是有问题,这应该是avro 的一个bug。
发件人: 奔跑的小飞袁
发送时间: 2020年10月14日 3:29
收件人: user-zh@flink.apache.org
主题: Re: 回复: flink-SQL1.11版本对map类型中value的空指针异常
我尝试使用MAP来定义我的类型,问题已经解决,谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hello,
stenv.fromDataStream(stream, $"")
请教下,如果stream中数据是org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
类型,field应该如何设置呢?
比如:
{
a: 1,
b: {
c: "test"
}
}
Best Wishes.
shizk233 于2020年9月28日周一 下午7:15写道:
> flink sql似乎不能设置rebalance,在Data Stream API可以设。
>
> 一种思路是拆
hi,我使用 1.10 测试过,发现 history server 查到 cancel job 的时间比较长(超过默认的
10s),但是最终还是会出现的。
如图:http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-10-14-033612.png
[image: image.png]
刘建刚 于2020年9月28日周一 下午4:13写道:
> 修复方案为:https://issues.apache.org/jira/browse/FLINK-18959
>
> xiao cai 于2020年9月27日周日 下午6:42写道
我尝试使用MAP来定义我的类型,问题已经解决,谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
好的,我尝试一下
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi, everyone
近期有做一个关于从外部路径加载UDF的开发,但报了如下异常:(截取主要的异常信息)
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user
class: com.xxx.xxx.udf.Uppercase
ClassLoader info: URL ClassLoader:
file:
'/tmp/blobStore-1318a525-1b5a-4c07-808e-f62083c3fb11/job_a5501605ff554915a81ae12e3018e7
所以我的建议是用avro的规范,你可以这样定义你的MAP类型:
MAP
发件人: 史 正超
发送时间: 2020年10月14日 2:45
收件人: user-zh
主题: 回复: flink-SQL1.11版本对map类型中value的空指针异常
但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is not
null.
代码这样写的前提是,不允许对象的值为null的。
是的,所以应该用createNullableConverter,而不是createConverter
史 正超 于2020年10月14日周三 上午10:45写道:
>
> 但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is
> not null.
> 代码这样写的前提是,不允许对象的值为null的。
>
> 发件人: Benchao Li
> 发送时间: 2020年10月14日 2:34
> 收件人: user-zh
>
我在用 flink最新的版本 (1.11.2) 去连接mysql的数据
下面是我的环境
docker flink 环境 是 flink:scala_2.12-java8
docker pull flink:scala_2.12-java8
jdbc 使用的是最新的 flink-connector-jdbc_2.11-1.11.2.jar,并且已经使用了
flink-connector-jdbc_2.11-1.11.2, mysql-connector-java-8.0.21.jar,
postgresql-42.2.17.jar 这几个jdbc的库已经放到了{FLINK}
但是 方法上有这样的一个注释:Creates a runtime converter which assuming input object is not
null.
代码这样写的前提是,不允许对象的值为null的。
发件人: Benchao Li
发送时间: 2020年10月14日 2:34
收件人: user-zh
主题: Re: flink-SQL1.11版本对map类型中value的空指针异常
嗯,这应该是一个实现的bug,可以提个issue修复一下~
史 正超 于2020年10月14日周三 上午10:1
好的,谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
嗯,这应该是一个实现的bug,可以提个issue修复一下~
史 正超 于2020年10月14日周三 上午10:19写道:
> 从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的:
>
> case CHAR:
> case VARCHAR:
>return avroObject -> StringData.fromString(avroObject.toString());
>
> 所以,你的map类型的value值为null,会报空指针异常的。
> ___
从你的异常来看,你用的format是 avro, 我看了下源码,他对varchar类型的covert和json不一样,avro的代码是这样的:
case CHAR:
case VARCHAR:
return avroObject -> StringData.fromString(avroObject.toString());
所以,你的map类型的value值为null,会报空指针异常的。
发件人: 奔跑的小飞袁
发送时间: 2020年10月14日 1:46
收件人: user-zh@flink.apache.or
版本:flink1.11
场景描述:flink sql注册kafka映射表,读取数据实时写入hive
报错:No operators defined in streaming topology. Cannot generate StreamGraph.
具体代码:
val flinkKafkaSqlSouce: String =
s"""create table slog(
|`f1` string,
|`f2` string,
|`f3` string,
|`f4` string,
|`f5` string,
|collect_date string
|) with (
建议先确认下瓶颈是不是 kafka sink, 一般来说 kafka 网卡打满都不会到瓶颈的, 猜测有可能其他逻辑导致的瓶颈
hailongwang <18868816...@163.com> 于2020年10月13日周二 下午10:22写道:
>
>
> Hi xyq,
> 1. 可以确认下下游 kakfa 6个分区写入数据量都是均匀的吗,看下 Partitioner 有没有设置好。
> 2. 还有 11000 条的数据量大小有多少呢,有没有存在 flink 集群 与 kafka 集群
> 跨机房的限制。(在我们内部多个机房,其中延迟比较大的机房的速率只有 3M/s 单并发)
> 3.
plan visualizer 应该是 stream graph, 不是一个图吧
Paul Lam 于2020年10月13日周二 下午9:23写道:
> Hi,
>
> 可以利用 Flink 的 plan visualizer,见[1]
>
> 1.
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_plans.html
>
> Best,
> Paul Lam
>
> hailongwang <18868816...@163.com> 于2020年10月12日周一 下午11:38写道:
>
other_para MAP
--
Sent from: http://apache-flink.147419.n8.nabble.com/
other_para MAP这是我定义的map类型
--
Sent from: http://apache-flink.147419.n8.nabble.com/
可以参考 https://blog.csdn.net/weixin_41608066/article/details/108557869。“目前flink
sql是不支持source/sink并行度配置的,flink sql中各算子并行度默认是根据source的partition数或文件数来决定。”
如果想实现在设置 source 时指定并行度,可以参考该文章对源码做些修改。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
在我们 1.10 版本的生产环境上这个问题也确实出现过,也有几个 issue 在讨论这个,比如:
https://issues.apache.org/jira/browse/FLINK-18196
其中说了2个方法,曾经也试过:
1、是换 JDK 版本,这个没有试过,因为需要更新 NodeManeger 的 JDK,代价比较高;
2、重新 new 一个 CheckpointMetaData,通过修改这个,生产环境上确实没有出现过这个问题了,但是本质原因不太清楚。
希望这些可以帮助到你
Best,
Hailong Wang
在 2020-10-13 18:04:11,"Stor
Hi xyq,
1. 可以确认下下游 kakfa 6个分区写入数据量都是均匀的吗,看下 Partitioner 有没有设置好。
2. 还有 11000 条的数据量大小有多少呢,有没有存在 flink 集群 与 kafka 集群
跨机房的限制。(在我们内部多个机房,其中延迟比较大的机房的速率只有 3M/s 单并发)
3. 可以确认下,Kafka sink 有没有一些耗时的序列化操作的
个人认为,这种问题我们最好分而治之,可以测试下 正常情况下两集群间的传输速率(比如scp),然后再看看 kafka 集群,然后 flink 任务等。
希望可以帮助到你
Best,
Hailong Wan
是的,我误写了,sorry。
在 2020-10-13 16:42:49,"丁浩浩" <18579099...@163.com> 写道:
>source应该是没有输入指标吧
>
>> 在 2020年10月13日,下午5:39,hailongwang <18868816...@163.com> 写道:
>>
>> Hi chenxuying,
>>
>>
>> 如果你的算子都是 chain 成一个 operator 的话,那么是没有的。因为 Source 没有 输出指标,Sink 没有输出指标,所以
>> chain在一起是话就没有了。
>> 如果你想要看的话,可以并发设置成不一样
Hi,
可以利用 Flink 的 plan visualizer,见[1]
1.
https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_plans.html
Best,
Paul Lam
hailongwang <18868816...@163.com> 于2020年10月12日周一 下午11:38写道:
> Hi,
> 你是想要自己做一个产品,将图显示在Web上?我们是只拿 DAG 中 json 值,然后前端进行处理的。
> 希望能帮助到你~
>
>
> Best,
> Hailong Wang
Hi
看起来你经确定了是在写入kafka时过慢导致了高峰期sink反压,
生产环境上1万的tps不算高的,可以先确定下是否是kafka集群写入瓶颈问题,kafka 自带的 perf
脚本可以测试下6个分区的topic写入的能力,测试时数据大小和tpoic配置可以和生产一致,如果是kafka写入瓶颈问题,那就需要增加分区,对应修改flink作业的写入并发。
另外,如果开启exactly-once配置,写入速度会慢一些,没有特别的业务需求可以用at-least-once.
祝好
Leonard
> 在 2020年10月13日,19:38,xyq 写道:
>
> kafka
Hi,
我理解网络开销更多来自于当前的lookup实现每次都需要访问外部系统,如果能做一些cache机制,这样能省掉较多的开销。
你说的点查sql子表可以节省开销,不是很理解,是指同一个key关联多张维表,然后查询外部系统时一个key带出多个表的数据吗?这个应该和目前flink的实现机制不太一致。
> 在 2020年10月13日,10:03,Dream-底限 写道:
>
> hi、
> 现在流表查询外部维表的时候,在有多张维表的情况下会多次查询外部系统,这就导致多次网络请求回传,社区后续会不会支持时态表子查询,就是根据指定的key查询外部系统的时候不再是一次查询一个指定的表,可以点查
Hi,
你的map是什么类型呢?我来复现一下。
奔跑的小飞袁 于2020年10月13日周二 下午6:07写道:
> hello
> 我在使用flink-sql1.11版本是使用到了map类型,但是我遇到了问题,当map中的value为空时会产生空指针异常,下面附上我的错误以及源代码
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
>
> org.apache.flink.runtime.executiongrap
Hi
这里涉及到的问题比较多。
1. 为什么心跳会超时?是因为Full
GC么,如果是使用的FsStateBackend/MemoryStateBackend,这是比较好解释的,因为数据在JVM堆上。如果使用的是RocksDB,这里是解释不通的。
2. window确实是使用state来存储数据,如果认为自己的state太大的话,是不是因为使用不当呢?可以参考文档 [1] 进行调优
3. 仍在运行的TM里面在做什么呢,为什么没有被JM释放,需要检查相关孤儿TM的日志以及jstack查看进程操作判断。
[1]
https://ci.apache.org/projec
hi,all
我在使用flink处理数据写入kafka的过程中,写入kafka的速度过慢,导致数据高峰期数据有堆积,
在数据达到每秒11000条的时候就开始堆积,处理逻辑很简单,就有坐标点经纬度转换成省市区,其他逻辑不复杂,
sink端kafka 6个分区,source端kafka6个分区,每天在晚高峰的时候会堆积30分钟,有没有办法大幅度提高写入能力,
难道只能加大kafka的分区吗?
flink submit 参数如下
p=6(与kafka分区对应)
m=yarn-cluster
yjm=1024
ytm=4096
ys=6
checkpoint_timeout=3
ma
的sink表的 url后面加上 &serverTimezone=Asia/Shanghai
发件人: dushang
发送时间: 2020年10月13日 8:38
收件人: user-zh@flink.apache.org
主题: Re: flink-connector-jdbc 落入mysql的时候timestamp 有时差问题
time_zone SYSTEM
system_time_zone SYSTEM
我是通过 flink-sql-connector-mysql-cdc获取mysql的binlo
flink版本:Flink1.10.1
部署方式:flink on yarn
hadoop版本:cdh5.15.2-2.6.0
现状:Checkpoint CountsTriggered: 9339In Progress: 0Completed: 8439Failed:
900Restored: 7
错误信息:
ava.lang.Exception: Could not perform checkpoint 1194 for operator Map
(3/3).
at
org.apache.flink.streaming.runtime.tasks.StreamT
贴代码看看?
发自我的iPhone
-- Original --
From: 熊云昆
你的state是用rocksdb存储的吗?
| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|
签名由 网易邮箱大师 定制
On 10/13/2020 18:20, 宁吉浩 wrote:
hi,all
最近在使用flink遇到了问题,问题描述如下:
checkpoint失败,jm报tm心跳丢失,然后重新开启tm,不一会yarn上就先后开启了多个tm。
数据量并不大,给的内存也足够,tm的内存都被沾满了,不管给多少运行到一定时间就会满,就会出现上述情况。
正好跑了很多程序,有带windows
也有不带的,在这里发现了端倪,带windows的程序checkpoi
我好像悟了,试了一下stream api 不同的并行度 ,
source节点的输出其实就是整个job的输入
sink节点的输入就是整个job的输出
flink在统计这些节点的时候 , 是没有统计开头结尾
但是还不知道flinksql如何设置不同的并发
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi,all
最近在使用flink遇到了问题,问题描述如下:
checkpoint失败,jm报tm心跳丢失,然后重新开启tm,不一会yarn上就先后开启了多个tm。
数据量并不大,给的内存也足够,tm的内存都被沾满了,不管给多少运行到一定时间就会满,就会出现上述情况。
正好跑了很多程序,有带windows
也有不带的,在这里发现了端倪,带windows的程序checkpoint的文件非常之大,给我的感觉是把窗口内的数据都加入到状态了。
个人推测是state把窗口接收到的元素都落盘了。
windows的半小时一次,滚动,非滑动。
checkpoint是1分钟一次。
有没有人遇到过这种情况
我理解:
这个Operator的并行度一样,chain成一个Operator,所以它的头尾就是source和sink,是计算的作业内部的,source输入的不会被统计到,只会统计到source输出到下游Operator的,同理,sink输入的可以统计到,输出的不能被统计到
cxydeve...@163.com wrote
> 我用的是flinksql , 不过以我的理解, 一个节点不是应该也有输入输出吗, 想问下这有对应的文档吗
> 详细代码
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExec
我用的是flinksql , 不过以我的理解, 一个节点不是应该也有输入输出吗, 想问下这有对应的文档吗
详细代码
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment table
hello
我在使用flink-sql1.11版本是使用到了map类型,但是我遇到了问题,当map中的value为空时会产生空指针异常,下面附上我的错误以及源代码
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHan
source应该是没有输入指标吧
> 在 2020年10月13日,下午5:39,hailongwang <18868816...@163.com> 写道:
>
> Hi chenxuying,
>
>
> 如果你的算子都是 chain 成一个 operator 的话,那么是没有的。因为 Source 没有 输出指标,Sink 没有输出指标,所以
> chain在一起是话就没有了。
> 如果你想要看的话,可以并发设置成不一样,或者显示的调用 disableChain。
>
>
> Best,
> Hailong Wang
>
>
> 在 2020-10-13 16:22:41
Hi chenxuying,
如果你的算子都是 chain 成一个 operator 的话,那么是没有的。因为 Source 没有 输出指标,Sink 没有输出指标,所以
chain在一起是话就没有了。
如果你想要看的话,可以并发设置成不一样,或者显示的调用 disableChain。
Best,
Hailong Wang
在 2020-10-13 16:22:41,"chenxuying" 写道:
>集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出,
>但是在任务详情页面没有Bytes Received,
如果你的算子全都串在一个节点里面的话,是看不到输入输出的。
> 在 2020年10月13日,下午5:22,chenxuying 写道:
>
> 集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出,
> 但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据
> 都是0
集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出,
但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据 都是0
time_zone SYSTEM
system_time_zone SYSTEM
我是通过 flink-sql-connector-mysql-cdc获取mysql的binlog。通过flink-connector-jdbc
sink到mysql中。
source 中有调节时区的参数。所以读取到的是正确的。但是sink 中没有调节时区的参数。时间就有了时差。
source:
CREATE TABLE student (
id INT,
name STRING,
create_time TIMESTAMP(0),
update_time TIMESTAMP(0
感觉是时区的问题 ,是使用ddl做的么
*show* variables *like* '%time_zone%’ 看一下
On Tue, Oct 13, 2020 at 2:56 PM 姬洪超 wrote:
> flink-connector-jdbc 获取mysql的timestamp类型的数据后,sink到mysql后时间会晚八个小时。Ex:
> 获取到的是2020-05-12T11:53:08,写入mysql后变成2020-05-11 22:53:08
50 matches
Mail list logo