Hi all
现有一个场景:
消费kafka消息,逐条处理加工每条kafka数据,每隔15分钟将不同数据写进hive表(多张表)
之后,对上面的多张表进行一系列join merge等操作写到新表,生成最终的数据。
这样的场景如果用Flink去处理,是不是需要启动两个flink job,一个处理流数据,一个处理批数据
因为两个执行环境不一样
流处理:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
批处理:
EnvironmentSettings
发现是flink sql 消费kafka 不管有没有解析成功。先去提交offset到kafka 但是实际 是解析失败了。
在 2021-01-06 14:01:34,"Evan" 写道:
>flinksql 貌似是目前做不到你说的这样
>
>
>
>
>发件人: air23
>发送时间: 2021-01-06 12:29
>收件人: user-zh
>主题: flink sql消费kafka sink到mysql问题
>你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
>然后再重启 发现报错的数据 会丢失
>采用的scan.startup.mode' =
hi everyone,
flink version: 1.12.0
job dag: kafka ->hive
今天碰到一个问题,我在第一次启动作业的时候,通过hivecatalog成功在hive中创建hive
table,并正常入数,正常做checkpoint,但由于kafka含有脏数据,导致作业在重启3次仍无法跳过脏数据后变为Failed状态,于是修改作业kafka配置,开启可跳过解析异常行参数,再通过-s
Dear All,在Flink SQL
job中,如果有多个sql语句,需要按顺序执行,即下一个sql的执行依赖上一个sql的执行结果。由于tableEnv.executeSql(sql)是*异步*提交的,那么如何保证多个sql是*顺序执行*?eg:在一个main函数中,有如下代码:String
sql1 = "";tableEnv.executeSql(sql1 );String sql2 =
"";tableEnv.executeSql(sql2 );问题:如何保证sql1先执行完成,再执行sql2
-
Thanks!
Jacob
--
Sent from:
flinksql 貌似是目前做不到你说的这样
发件人: air23
发送时间: 2021-01-06 12:29
收件人: user-zh
主题: flink sql消费kafka sink到mysql问题
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
然后再重启 发现报错的数据 会丢失
采用的scan.startup.mode' = 'group-offsets'
按理说 不是要重新消费 失败的那条数据 开始消费吗?
请问如何配置 可以不丢失数据
CREATE TABLE source1 (
id BIGINT ,
flink??flink-on-yarn??jobTimeStampcurrent_dateenv.setStateBackend(new
Hi:
我自定义一个InputFormat,在处理数据的过程中,发生异常,我想要将发生异常的上一个状态保存下来,以便于在问题修复后重启时能接着已经保存的状态点继续处理对应split剩余的数据,但是我又不需要像checkpoint那样,每个隔一段时间保存一下状态。这样的需求该怎么就现有的flink去实现呢?
我用的flink版本:1.12.0
祝好!
automths
谢谢回复
这个问题困扰了很久
已经解决
原因是写orc时候指定的字段名是column0、column1.、column33
而hive创建表的字段是实际字段的名字,两个不匹配,因此在flink sql中读不到
数据
-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/
好的,非常感谢。
赵一旦 于2021年1月6日周三 下午1:08写道:
> 这个的话去看看KafkaConnector相关的参数,比较新的版本支持配置解析错误忽略。
>
> Carmen Free 于2021年1月6日周三 上午10:58写道:
>
> > 感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。
> >
> > 紧接着我这边出现了新的异常
> >
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> >
可以看下文档去,配置忽略解析错误。
air23 于2021年1月6日周三 上午10:41写道:
> 你好 这边使用flink sql有如下问题;
>
>
>
>
>
>
> CREATE TABLE source1 (
> id BIGINT ,
> username STRING ,
> password STRING ,
> AddTime TIMESTAMP ,
> origin_table STRING METADATA FROM 'value.table' VIRTUAL
> ) WITH (
> 'connector' =
这个的话去看看KafkaConnector相关的参数,比较新的版本支持配置解析错误忽略。
Carmen Free 于2021年1月6日周三 上午10:58写道:
> 感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。
>
> 紧接着我这边出现了新的异常
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input at [Source:UNKONWN;
我不是很清楚,不过难度应该很大,不然社区早改了。当前任务经常导致机器资源不均衡,这个问题很常见。
penguin. 于2021年1月6日周三 上午11:15写道:
> Hi,请问大家知道怎么更改flink默认的任务调度方式吗?
你好。我这边在跑任务时候 发现使用flink sql消费kafka如果报错了
然后再重启 发现报错的数据 会丢失
采用的scan.startup.mode' = 'group-offsets'
按理说 不是要重新消费 失败的那条数据 开始消费吗?
请问如何配置 可以不丢失数据
CREATE TABLE source1 (
id BIGINT ,
username STRING ,
password STRING ,
AddTime TIMESTAMP ,
origin_table STRING METADATA FROM
HI Wei Zhong,
感谢您的回复!
发现是软链的问题(lib目录下的jar包不能用软链),去掉后就可以了。
在 2021/1/6 上午11:06,“Wei
Zhong” 写入:
Hi Zhizhao,
能检查一下'file://' 后面跟的是绝对路径吗?这个报错是因为对应的路径在本地磁盘上找不到导致的。
> 在 2021年1月6日,10:23,Zhizhao Shangguan 写道:
>
> Hi:
> PyFlink on Yarn,
Hi,请问大家知道怎么更改flink默认的任务调度方式吗?
Hi Zhizhao,
能检查一下'file://' 后面跟的是绝对路径吗?这个报错是因为对应的路径在本地磁盘上找不到导致的。
> 在 2021年1月6日,10:23,Zhizhao Shangguan 写道:
>
> Hi:
> PyFlink on Yarn,
> Per-Job模式,如何增加多个外部依赖jar包?比如flink-sql-connector-kafka、flink-connector-jdbc等。
>
> 环境信息
> Flink 版本:1.11.0
> Os: mac
>
> 尝试了如下方案,遇到了一些问题
> 1、
感谢帮忙解决问题,确实包的路径有问题,换成这个包就解决了这个问题。
紧接着我这边出现了新的异常
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
No content to map due to end-of-input at [Source:UNKONWN; line: -1, column:
-1;]
这个问题的原因,主要是由于kafka消息为空导致的,只要kafka消息不为空,就可以正常消费。
hello,请问大家知道怎么更改flink默认的任务调度方式吗?
hello,请问大家知道怎么更改flink默认的任务调度方式吗?
你好 这边使用flink sql有如下问题;
CREATE TABLE source1 (
id BIGINT ,
username STRING ,
password STRING ,
AddTime TIMESTAMP ,
origin_table STRING METADATA FROM 'value.table' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'plink_canal',
'properties.bootstrap.servers' = '',
那为什么没有日志呢,去机器看日志呗。
于2021年1月6日周三 上午10:11写道:
> 应该是状态大,超时设了10分钟,还没有达到超时时间。到处找不到相关日志。
>
> 发自我的iPhone
>
> > 在 2021年1月6日,10:03,赵一旦 写道:
> >
> > 没日志咋排查,为啥失败总得说下。超时的话可能就是任务压力大,状态大等。
> >
> > 于2021年1月6日周三 上午9:53写道:
> >
> >> flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?
> >>
> >>
你这个方法就可以的哈,至于第二个窗口又聚到一个结点的问题本身就是原始问题,基于你的方法缓解即可,第二层不可避免的。
你需要做的是调整合理的参数,使得第二层的数据虽然不均衡,但数据量以及足够低就可以了。
此外,还需要注意,当前key数量假设1w,加10随机就是10w,加100随机就是100w。这个key的膨胀也很严重的。最好的做法是仅针对高数据量的key分拆。
syumialiu 于2021年1月5日周二 下午11:53写道:
>
>
@chengyanan1...@foxmail.com 你那里也遇过吗,这个要是bug,也不会修复呀
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hello
目前我碰到一个问题,当我同时使用flink-orc_2.11-1.11.1.jar与flink-sql-connector-hive-1.2.2_2.11-1.11.0.jar时发现针对orc这种数据格式所依赖的版本不同,我们hive版本是cdh
1.1.0,官网看到依赖的orc版本是1.4.3并且无需orc-shims这个依赖,但是flink-orc这个模块需要同时依赖orc-core
1.5.6与orc-shims 1.5.6,这两个模块如何同时使用
--
Sent from:
应该是状态大,超时设了10分钟,还没有达到超时时间。到处找不到相关日志。
发自我的iPhone
> 在 2021年1月6日,10:03,赵一旦 写道:
>
> 没日志咋排查,为啥失败总得说下。超时的话可能就是任务压力大,状态大等。
>
> 于2021年1月6日周三 上午9:53写道:
>
>> flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?
>>
>> 发自我的iPhone
没日志咋排查,为啥失败总得说下。超时的话可能就是任务压力大,状态大等。
于2021年1月6日周三 上午9:53写道:
> flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?
>
> 发自我的iPhone
看下 Flink 任务运行,是否是其他机器上的资源先达到瓶颈,而不是 CPU,比如 IO,同时看下你的 flatmap 处理单条记录的时间。
同时也参考上面同学的,是否存在反压,如果 flatmap 逻辑比较复杂,也有这个可能。
Best,
LakeShen
赵一旦 于2021年1月5日周二 下午9:13写道:
>
> 可以看看是否反压。反压说明并行度还是不够,不反压的话看处理速度是否符合预期。符合预期就不用调了,说明你的任务不复杂,那点cpu占用就够了。如果不符合预期,也没有任何反压,那就是source消费速度太慢。
>
>
>
> housezhang
thank you
----
??:
"user-zh"
flink 1.11.2环境下,三个kafka topic 的数据进行join,出现checkpoint失败,没有日志,请问怎么排查?
发自我的iPhone
/etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_181-cloudera/
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export
ZOOKEEPER_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/zookeeper
export
??bug
?? 2021-01-05 20:20
user-zh
?? flinksql1.11 phoenixCaused by:
org.apache.calcite.avatica.NoSuchStatementException
??
flinkv1.11phoneix 1.14.1
CREATE TABLE
$HADOOP_CLASSPATH??/home/xjia/opt/module/hadoop3.2.1/lib/native??
----
??: "zhisheng"https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html
<
-??crontabflink-jobazkaban??
----
??: "zhisheng"https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html
<
我在一个job中有一些很大的数据(key的种类很少,但是单个key下的数据数量很多),基本要实现的是一个时间滑动窗口结束时,当某个key的数量大于一个固定值后,将该key下的所有原数据输出。我现在的方法是将key加后缀,然后keyBy做窗口,但是这个做完之后还是需要再次keyBy把数据还原回去,并且这个过程又将全量数据拉到了一个节点上,请问有没有一些别的解决方法?
| |
syumialiu
|
|
syumia...@163.com
|
签名由网易邮箱大师定制
我在一个job中有一些很大的数据(key的种类很少,但是单个key下的数据数量很多),基本要实现的是一个时间滑动窗口结束时,当某个key的数量大于一个固定值后,将该key下的所有原数据输出。我现在的方法是将key加后缀,然后keyBy做窗口,但是这个做完之后还是需要再次keyBy把数据还原回去,并且这个过程又将全量数据拉到了一个节点上,请问有没有一些别的解决方法?
| |
syumialiu
|
|
syumia...@163.com
|
签名由网易邮箱大师定制
我感觉还是jar的问题。如下尝试下,我懒得去试了。
将 org.apache.kafka.common.security.plain.PlainLoginModule 替换为
org.apache.flink.kafka.shaded.org.apache.kafka.common.securi
ty.plain.PlainLoginModule
因为你用的是sql-connector-kafka,这个包把kafka-clients的包shade了。
Carmen Free 于2021年1月5日周二 下午5:09写道:
> flink sql
可以看看是否反压。反压说明并行度还是不够,不反压的话看处理速度是否符合预期。符合预期就不用调了,说明你的任务不复杂,那点cpu占用就够了。如果不符合预期,也没有任何反压,那就是source消费速度太慢。
housezhang 于2021年1月5日周二 下午5:44写道:
> 有可能是cpu能够处理得过来,网络io处理不过来了,看看网络使用情况
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
好的,谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
??
flinkv1.11phoneix 1.14.1
CREATE TABLE pe_login_kafka (
id INT,
region_id INT,
ts TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 't-region,
'properties.bootstrap.servers' = '',
'properties.group.id' = gid');
CREATE
这个问题en...出在如下地方:
KeyedStream keyByStream =
signoutTimeAndWM.keyBy(new KeySelector() {
@Override
public String getKey(ShareRealTimeData value) throws Exception {
return DateUtilMinutes.timeStampToDate(new
Date().getTime()); // 此处,不可以使用new
按@Yang Wang 的指导,在 flink 脚本中设置了 HADOOP_CONF_DIR 可以成功运行了,感谢!
Yang Wang 于2021年1月4日周一 下午9:12写道:
> 1.11版本以后可以直接在Flink Client的机器上export HADOOP_CONF_DIR
> 然后运行flink run-application或者kubernetes_session.sh启动Flink任务,这样Flink
> Client会自动通过ConfigMap将Hadoop配置ship到JobManager和TaskManager pod
> 并且加到classpath的
>
operator操作:processWindowFunction的代码如下:
class MyProcessWindowFuncation extends
ProcessWindowFunction>, String, TimeWindow>{
private transient MapState>
eveShareNoMaxPrice;
private transient ValueState>> shareAndMaxPrice;
@Override
public void process(String s, Context
我有如下代码,从kafka消费数据,然后根据数据所在的秒(服务器时钟)进行keyby,获取数据所在的分钟的代码:
public static String timeStampToDate(Long timestamp){
ThreadLocal threadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("-MM-dd HH:mm:ss"));
String format = threadLocal.get().format(new Date(timestamp));
hi
可以检查一下提交任务的 flink 客户端的 lib 目录下面是否有 flink-sql-parquet_2.11-1.12.0.jar 依赖
Best
zhisheng
冯嘉伟 <1425385...@qq.com> 于2021年1月4日周一 上午9:58写道:
> hi!
>
> java.io.FileNotFoundException: File file:/home/xjia/.flink/...
> 可以看出,从本地加载jar包,而不是hdfs。
>
> 我觉得可能是hadoop环境的问题,导致读取的scheme是file,使用 echo
This is most likely a bug, could you reiterate a bit how it is invalid?
I'm also CCing Jark since he is one of the SQL experts.
On Mon, Dec 28, 2020 at 10:37 AM Jun Zhang
wrote:
> when I query hive table by sql, like this `select * from hivetable where
> id = 1 limit 1`, I found that the
请问需要在flink源码的哪些地方修改才能实现自己的任务调度呢 1214316932 邮箱:1214316...@qq.com 签名由 网易邮箱大师 定制
在2021年01月05日 11:27,Waldeinsamkeit. 写道: 是的,目前是想重写任务调度器,按自己的方式来将任务调度到集群的节点中。
--原始邮件-- 发件人:
可以看下flink jobmanager 上的日志,会不会有什么异常出现了。s
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi,
我遇到一个问题,消费的source里有字段定义为array>这种类型,然后想通过一个udf将它处理成一个字符串。udf的入参定义如下:
public String eval(Row[] item, String char1, String char2);
但是在函数处理时,debug发现拿到的item里的row信息始终为null。也通过DataTypeHint注解给出了item的实际类型。这是不是1.10的bug呀?如果有相关的issue单的话,烦请有知道的发我下哈。
我在1.11里验证同样的逻辑,是没这个问题的。
有可能是cpu能够处理得过来,网络io处理不过来了,看看网络使用情况
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink sql 消费鉴权的kafka,是怎么消费的呢?在kafka未添加鉴权之前,flink SQL消费kafka是正常的
1、版本说明
flink版本:1.10.2
kafka版本:1.1.0
2、kafka鉴权说明
仅使用了sasl鉴权方式
在kafka客户端有配置 kafka_server-jass.conf、
server.properties、producer.properties、consumer.properties
3、主要配置参数
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
我在测试的时候12个并行度,16,24都测试了但启任务后的cpu利用率还是 140%左右,不管并行度设置为多少。
在 2021-01-05 16:49:02,"赵一旦" 写道:
>不纠结几核。如果任务结点本身不多的话,可以提一提再,只要network buffer数量够就好。
>
>爱吃鱼 于2021年1月5日周二 下午4:39写道:
>
>> 24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在
怎么提高flink cpu利用率。
业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。
不纠结几核。如果任务结点本身不多的话,可以提一提再,只要network buffer数量够就好。
爱吃鱼 于2021年1月5日周二 下午4:39写道:
> 24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-01-05 16:24:30,"赵一旦" 写道:
> >加大并行度。
> >
> >爱吃鱼 于2021年1月5日周二 下午4:18写道:
> >
> >> 怎么提高flink cpu利用率。
> >> 业务场景,flink batch
在2021年01月05日 16:37,爱吃鱼 写道:
24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右
在 2021-01-05 16:24:30,"赵一旦" 写道:
>加大并行度。
>
>爱吃鱼 于2021年1月5日周二 下午4:18写道:
>
>> 怎么提高flink cpu利用率。
>> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
>> flatmap处理出来的数据转换成表,并用sql 对表的数据进行
24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右
在 2021-01-05 16:24:30,"赵一旦" 写道:
>加大并行度。
>
>爱吃鱼 于2021年1月5日周二 下午4:18写道:
>
>> 怎么提高flink cpu利用率。
>> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
>> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
>>
24核的机器,已经加到了24的并行度了,然后会 24个并行度的cpu利用率加起来也是140%左右
在 2021-01-05 16:24:30,"赵一旦" 写道:
>加大并行度。
>
>爱吃鱼 于2021年1月5日周二 下午4:18写道:
>
>> 怎么提高flink cpu利用率。
>> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
>> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
>>
怎么提高flink cpu利用率。
业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。
加大并行度。
爱吃鱼 于2021年1月5日周二 下午4:18写道:
> 怎么提高flink cpu利用率。
> 业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
> flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
> cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。
怎么提高flink cpu利用率。
业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。
怎么提高flink cpu利用率。
业务场景,flink batch ,读取大概两个亿数据的文件,然后进行flatmap处理。
flatmap处理出来的数据转换成表,并用sql 对表的数据进行 max(time),min(time),count等处理。
cpu利用率只有140%左右,机器还有很多资源,请问怎么可以提高flink cpu的利用率。
62 matches
Mail list logo