最近看了 云邪 大佬关于flink cdc sql的视频,并且动手操作了 例子
https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B
感受到了flink sql 在实时流计算的便捷性以及强大,但同时也有一些疑问。如下:
flink connector cdc 直接对接订单表,物流表,商品表表的binlog
1、通过flink进行3流join的时候,这个join是对应flink底层api的哪种join,是否受窗口大小以及时间现在?
2、假如是全量join , 这
我使用UDAF的方式解决了
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我希望能将某些维度下过去24小时的每一小时的统计结果计算出,然后合并保存在一个map中
在写SQL时,我尝试将多条计算结果合并保存至Map中:
create table to_redis(
biz_name STRING,
mchnt_id STRING,
zb_value MAP
) WITH (
'connector' = 'redis',
'redis-mode' = 'single',
'host' = '172.30.251.225',
'port' = '10006',
'password' = 'xxx',
我希望能将某些维度下过去24小时的每一小时的统计结果计算出,然后合并保存在一个map中
在写SQL时,我尝试将多条计算结果合并保存至Map中:
create table to_redis(
biz_name STRING,
mchnt_id STRING,
zb_value MAP
) WITH (
'connector' = 'redis',
'redis-mode' = 'single',
'host' = '172.30.251.225',
'port' = '10006',
'password' =
flink run -yD yarn.application.queue=x
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi
可以通过在 flink-conf.yaml 配置文件中添加 yarn.application.queue 参数来设置
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Hi,guomuhua
`The number of inputs accumulated by local aggregation every time is
based on mini-batch interval. It means local-global aggregation depends on
mini-batch optimization is enabled `
,关于本地聚合,官网有这么一段话,也就是说,需要先开启批次聚合,然后才能使用本地聚合,加起来有三个参数.
Hi,Jark
我理解疑问中的sql是一个普通的agg操作,只不过分组的键是时间字段,不知道您说的 `我看你的作业里面是window agg`
,这个怎么理解
Best,
Robin
Jark wrote
>> 如果不是window agg,开启参数后flink会自动打散是吧
> 是的
>
>> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
> 文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。
>
> Best,
> Jark
>
> [1]:
>
1??yarn-session??
2??yarn-perjob?? --
??
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with
id container_e66_1616483562588_2358_01_02() timed out.
at
> 如果不是window agg,开启参数后flink会自动打散是吧
是的
> 那关于window agg, 不能自动打散,这部分的介绍,在文档中可以找到吗?
文档中没有说明。 这个文档[1] 里说地都是针对 unbounded agg 的优化。
Best,
Jark
[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
On Fri,
Jark wrote
> 我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
> agg支持这个参数了。可以期待下。
>
> Best,
> Jark
>
> On Wed, 24 Mar 2021 at 19:29, Robin Zhang
> vincent2015qdlg@
>
> wrote:
>
>> Hi,guomuhua
>> 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>>
>> Best,
>> Robin
>>
>>
>>
我看你的作业里面是window agg,目前 window agg 还不支持自动拆分。1.13 的基于 window tvf 的 window
agg支持这个参数了。可以期待下。
Best,
Jark
On Wed, 24 Mar 2021 at 19:29, Robin Zhang
wrote:
> Hi,guomuhua
> 开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
>
> Best,
> Robin
>
>
> guomuhua wrote
> > 在SQL中,如果开启了 local-global 参数:set
> >
Hi,guomuhua
开启本地聚合,是不需要自己打散进行二次聚合的哈,建议看看官方的文档介绍。
Best,
Robin
guomuhua wrote
> 在SQL中,如果开启了 local-global 参数:set
> table.optimizer.agg-phase-strategy=TWO_PHASE;
> 或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
> set
>
Hi, 请问:
env.setParallelism(8);
source = select * from table1,
Table filterTable = source.filter(x-> x>10).limit(1);
try (CloseableIterator rows = filterTable.execute().collect()) {
while (rows.hasNext()) {
Row r = rows.next();
String a = r.getField(1).toString();
Hi, 请问:
env.setParallelism(8);
source = select * from table1,
Table filterTable = source.filter(x-> x>10);
try (CloseableIterator rows = filterTable.execute().collect()) {
while (rows.hasNext()) {
Row r = rows.next();
String a = r.getField(1).toString();
在SQL中,如果开启了 local-global 参数:set table.optimizer.agg-phase-strategy=TWO_PHASE;
或者开启了Partial-Final 参数:set table.optimizer.distinct-agg.split.enabled=true;
set
table.optimizer.distinct-agg.split.bucket-num=1024;
还需要对应的将SQL改写为两段式吗?
例如:
原SQL:
SELECT day,
//ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#joins
best,
amenhub
发件人: Gengshen Zhao
发送时间: 2021-03-18 16:26
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: Flink SQL JDBC connector不能checkpoint
Flink开发者们,你们好:
我在使用flink开发过程中遇到一个问题,在使用jdbc做
//ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#joins
best,
amenhub
发件人: Gengshen Zhao
发送时间: 2021-03-18 16:26
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>
主题: Flink SQL JDBC connector不能checkpoint
Flink开发者们,你们好:
我在使用flink开发过程中遇到一个问题,在使用jdbc做
hi,
请问使用的Flink版本是什么呢?猜测你应该是写成普通的join方式了,可参考 [1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#joins
best,
amenhub
发件人: Gengshen Zhao
发送时间: 2021-03-18 16:26
收件人: user-zh@flink.apache.org
主题: Flink SQL JDBC connector不能checkpoint
Flink开发者们,你们好:
我
Flink开发者们,你们好:
我在使用flink开发过程中遇到一个问题,在使用jdbc做维度表关联时,该算子很快就finished了,从而导致无法正常的checkoint(我看源码中checkpoint前会检查所有算子状态必须为running),请问目前有什么参数可以使jdbc不finished或者在算子finished后依然可以checkpoint么?如果没有,那对这种情况的支持是否列入flink未来版本的开发计划中?
期待你们的回信
祝各位工作顺利,谢谢
赵庚申
赵庚申
Phone:15383463958
Flink开发者们,你们好:
我在使用flink开发过程中遇到一个问题,在使用jdbc做维度表关联时,该算子很快就finished了,从而导致无法正常的checkoint(我看源码中checkpoint前会检查所有算子状态必须为running),请问目前有什么参数可以使jdbc不finished或者在算子finished后依然可以checkpoint么?如果没有,那对这种情况的支持是否列入flink未来版本的开发计划中?
期待你们的回信
祝各位工作顺利,谢谢
赵庚申
Phone:15383463958
这个问题换种写法解决了,从MySQL数据库表里取时间戳字段再转timestamp,可以实现滚动窗口,没报错。
从MySQL表里直接取datetime类型,jdbc表flink设置timestamp类型,会报错,直接取source的时间类型字段是不是转换有点问题。
---原始邮件---
发件人: "guoyb"<861277...@qq.com
发送时间: 2021年3月17日(周三) 下午5:43
收件人: "user-zh"
直接 SQL Top-N 即可:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
Best,
Kurt
On Tue, Mar 16, 2021 at 3:40 PM Tian Hengyu wrote:
> 咋么有人啊~~~
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>
版本1.12.1
请问,支持事件时间吗?应该设置为哪种时间类型。
as_of_time time
group by xx
, tumble( as_of_time, interval "5" second)
sql client报错,
window aggregate can only be defined over a time attribute column, but time(0)
encountered
Hi,
你可以理解为用的是MapState来保存的状态。
op <520075...@qq.com> 于2021年3月16日周二 下午3:00写道:
> 各位大佬好,想问下flinksql里的count (distinct)默认是用哪种state保存的状态
--
Best,
Benchao Li
咋么有人啊~~~
--
Sent from: http://apache-flink.147419.n8.nabble.com/
??flinksqlcount (distinct??state??
在做实时数仓的时候,有需求要使用flink sql实现全局的row_number(),请教下各位有啥方案吗?
目前想的是,将流进行row
number处理后存储到hbase中,然后每次处理流数据都和hbase进行关联,row_number处理后将最新结果存入hbase中,即通过对hbase的实时读写实现全局row_number().
请问以上方法可行不,,实时读hbase关联,然后在写入最新数据到hbase,效率会有问题吗,这样能满足实时的需求吗?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
Flink版本flink1.11
从第二个窗口后的结果就出现了,更新数据和废弃数据的,现在的输出是table转datastream然后filter为true的结果,
但是我想实现的TOP2 每个窗口只输出俩条数据,现在filter为true的结果是>2条的,求问怎么才能正常输出我想要的TOP2的数据?
执行SQL:select * from (
select *,ROW_NUMBER() OVER (PARTITION BY window_end ORDER BY counter DESC) as
row_num
from (
select world,count(world)
Flink sql中如何插入null值,有人了解吗?目前,insert 语句values中直接写null在zeppelin上报错了。
|
Best,
Jimmy
|
Signature is customized by Netease Mail Master
Zeppelin 支持加载UDF jar的,可以参考下面的代码,不过架构上可能与你们的原有架构会有所差别
https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2#8iONE
https://github.com/apache/zeppelin/blob/master/flink/interpreter/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala#L469
chenxyz 于2021年3月12日周五 上午9:42写道:
>
目前这种方法不可行,在公司的平台化系统里提交flink任务,自己能掌控的只有代码这块。
在 2021-03-11 16:39:24,"silence" 写道:
>启动时通过-C加到classpath里试试
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
请user-zh 不要再发邮件了
--
发件人:silence
发送时间:2021年3月11日(星期四) 16:39
收件人:user-zh
主 题:Re: flink sql如何从远程加载jar包中的udf
启动时通过-C加到classpath里试试
--
Sent from: http://apache-flink.147419.n8.nabble.com/
启动时通过-C加到classpath里试试
--
Sent from: http://apache-flink.147419.n8.nabble.com/
1.10应该是registerFunction吧,当前jar包中没有这个类(这个类在远程jar包中),这种方法没办法实例化TableFunction。
> 2021年3月11日 上午11:21,HunterXHunter <1356469...@qq.com> 写道:
>
> 通过 createTemporarySystemFunction 试试看呢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
通过 createTemporarySystemFunction 试试看呢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我们将开发的udf放在远程服务器,需要动态地加载jar包。Flink版本1.10,代码如下
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment exeEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings envSet = EnvironmentSettings
.newInstance()
请教一下flink sql多条数据sink用 statement set 语句时,
1. 如果其中一条sink条发生背压或故障,会影响其他sink流吗?
2. 在flink sql cdc 消费同一张mysql表sink进多种数据源场景下,例如 mysql -> fink cdc -> mongodb &
polardb 建议是启多个作业分别etl,还是分两段 mysql -> flink cdc -> kafka -> flink -> mongodb &
polardb ... 呢?关系数据库端接入同时多个cdc会不会影响性能?
1.两套逻辑结果,只能定时任务做check2.同一套逻辑,就要具体分析了,只要不是一个人、一套代码逻辑出来的,都有可能出问题
在 2021-03-09 12:51:50,"Smile" 写道:
>对,离线和实时的计算语义本来就是不一样的,所以这个地方也没有特别完美的解决方案,一般都是 case by case 看一下。
>有一些显而易见的问题比如 Join 是否关联成功这种还是比较容易查,其他的确实不太好判断。
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/
对,离线和实时的计算语义本来就是不一样的,所以这个地方也没有特别完美的解决方案,一般都是 case by case 看一下。
有一些显而易见的问题比如 Join 是否关联成功这种还是比较容易查,其他的确实不太好判断。
--
Sent from: http://apache-flink.147419.n8.nabble.com/
好的,谢谢!
---原始邮件---
发件人: "Rui Li"https://issues.apache.org/jira/browse/FLINK-20913
有关了,这个issue是1.12.2修复的,可以升级一下试试。
On Mon, Mar 8, 2021 at 2:15 PM guoyb <861277...@qq.com wrote:
您好!
hive.metastore.sasl.enabled 是true
启动sql client的时候,可以正常读取到认证信息,并读取metastore的表名。
读和写,认证就失败了。
---原始邮件---
;
>
>
> ---原始邮件---
> 发件人: "Rui Li" 发送时间: 2021年3月8日(周一) 中午12:12
> 收件人: "user-zh" 主题: Re: 【flink sql-client 读写 Kerberos认证的hive】
>
>
> Hi,
>
>
> 从你发的stacktrace来看,走到了set_ugi方法说明client认为server没有开启kerberos。确认一下你HiveCatalog这边指定的hive-site.xml是否配置
恩,这里有个问题就是,假设我们以离线结果为基准去对比,但离线结果一般天级或小时级,但实时部分可能是秒级的,两个结果在连线环境做比较,也不好去看这个结果有差异的时候,到底实时计算部分有没有问题!
有很多种原因可能会导致这个结果不准确。。。比如flink sql的bug或都流式消息丢失了等等!
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好,
实时和离线对数的问题确实也比较难,没有很完美的解决方案。
一般可以考虑把实时产出结果也落离线表,然后对两张离线表做对比,离线 Join 上然后跑具体对比逻辑即可。
Smile
jindy_liu wrote
> 有没有大佬有思路可以参考下?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
--
Sent from: http://apache-flink.147419.n8.nabble.com/
您好!
hive.metastore.sasl.enabled 是true
启动sql client的时候,可以正常读取到认证信息,并读取metastore的表名。
读和写,认证就失败了。
---原始邮件---
发件人: "Rui Li"
7, 2021 at 5:49 PM 861277...@qq.com <861277...@qq.com> wrote:
>
>> 环境:
>> flink1.12.1
>> hive2.1.0
>> CDH6.2.0
>>
>>
>> 【问题描述】
>> 在没开启Kerberos认证时,可以正常读写hive表
>>
>> 开启Kerberos认证后,
>> 启动时可以正常读取到hive metastor
【问题描述】
> 在没开启Kerberos认证时,可以正常读写hive表
>
> 开启Kerberos认证后,
> 启动时可以正常读取到hive metastore的元数据信息,读写不了表。
>
>
> 【sql-client.sh embedded】
> Flink SQL show tables;
> dimension_table
> dimension_table1
> test
>
>
> Flink SQL
有没有大佬有思路可以参考下?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
环境:
flink1.12.1
hive2.1.0
CDH6.2.0
【问题描述】
在没开启Kerberos认证时,可以正常读写hive表
开启Kerberos认证后,
启动时可以正常读取到hive metastore的元数据信息,读写不了表。
【sql-client.sh embedded】
Flink SQL show tables;
dimension_table
dimension_table1
test
Flink SQL select * from test;
[ERROR] Could not execute SQL statement. Reason
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/legacy.html#temporal-table-function-join
flink sql?? temporal-table join io
casel.chen
Hi Jark.
对于 upsert-kafka connector 有两个疑问:
1. upsert-kafka 没有像 kafka connector 里面设置 offset 的参数 `scan.startup.* `
,我试了下每次都是从 earliest 开始;
2. 中间的 operator ChangelogNormalize 会放大数据量,输入一条数据,经过 ChangelogNormalize
算子之后会变成2条,这个不是很理解?
Qishang 于2021年3月5日周五 上午11:14写道:
>
> 某些原因导致上游 kafka partition
某些原因导致上游 kafka partition 只有一个,业务逻辑大都是关联维表或者 UDF 调用 API,这个就很NICE。。
学到了,感谢。
Jark Wu 于2021年3月4日周四 下午11:11写道:
> 1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
> 2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
> 也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
>
目前Flink SQL 中的connector都没实现异步io关联维表,接口是上已经支持了的,如果是自己实现可以参考[1]
另外,HBase connector 社区有人正在支持异步io关联维表,预计1.13可以使用[2]
祝好
[1]https://github.com/apache/flink/blob/73cdd3d0d9f6a807b3e47c09eef7983c9aa180c7/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/legacy.html#temporal-table-function-join
flink sql的 temporal-table join 应该都是通过异步io来关联维表的
casel.chen 于2021年3月3日周三 下午10:54写道:
> flink sql中如何使用异步io关联维表?官网文档有介绍么?
1. 对于 upsert-kafka 会默认加上 ChangelogNormalize
2. ChangelogNormalize 会用 env 并发,所以可以认为能突破你说的并发限制。kafka + canal-json
也能用,但是要加上 table.exec.source.cdc-events-duplicate = true
参数[1]才能开启。但是要注意 ChangelogNormalize 是一个 stateful 节点,本身也是有性能开销的,总体性能可能还不如
forward。
Best,
Jark
[1]:
flink??sql??sql?
example:
tEnv.registerDataStream("tableName", dataStream, "id,
name, age ,time");
Table result = tEnv.sqlQuery("SQL" );
??SQL??
如题
官方的恢复是flink run -s path xxx.jar
那么flink sql没有jar包如何恢复呢
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("execution.savepoint.path","xxx")貌
如题
官方的恢复是flink run -s path xxx.jar
那么flink sql没有jar包如何恢复呢
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("execution.savepoint.path","xxx")貌
如题
官方的恢复是flink run -s path xxx.jar
那么flink sql没有jar包如何恢复呢
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv,
bsSettings);
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("execution.savepoint.path","xxx")貌
Hi 社区。
Flink 1.12.1
现在的业务是通过 canal-json 从kafka 接入数据写入DB ,但是由于 Topic 是1个 Partition ,设置大的并发,对于只有
forword 的ETL没有作用。
insert into table_a select id,udf(a),b,c from table_b;
发现 upsert-kafka connector expain 生成的 Stage 有 ChangelogNormalize 可以分区
1. ChangelogNormalize 是会默认加上的吗,还是在哪里可以设置?
2. 这个可以改变默认
定义一个 sourcetable
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink sql中如何使用异步io关联维表?官网文档有介绍么?
在将旧版本升级至1.12版本中,需要支持proctime和eventime时发现
DefinedProctimeAttribute该方法已过期,但是查看官方文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E4%BD%BF%E7%94%A8-tablesource-%E5%AE%9A%E4%B9%89-1
实例仍然使用 DefinedProctimeAttribute该方法 且并没有说明替换方法?
gt;
>
>发件人: yinghua...@163.com
>发送时间: 2021-02-27 14:23
>收件人: user-zh
>主题: Re: Flink SQL 应用情况请教
>这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No
> Data,sink显示的是No Watermark
>我的SQL语句如下:
>CREATE TABLE t_stock_match_p_1(
> id
1503,61,15811,1614405166858
1504,61,15813,1614405333871
1505,61,15814,1614405544862
1506,61,15814,1614405673863
就这几条数据,并行度设置为1
发件人: yinghua...@163.com
发送时间: 2021-02-27 14:23
收件人: user-zh
主题: Re: Flink SQL 应用情况请教
这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其
这个问题不知道是不是这个原因导致的,我在Flink的webUI监控界面source和sink任务中都没看到watermark的值,其中source的Watermarks显示No
Data,sink显示的是No Watermark
我的SQL语句如下:
CREATE TABLE t_stock_match_p_1(
id VARCHAR,
stkcode INT,
volume INT,
matchtime BIGINT,
ts as TO_TIMESTAMP(FROM_UNIXTIME(matchtime/1000,'-MM-dd HH:mm:ss')),
不是指标显示问题,是数据一直没写到mysql中,也没啥错误日志,然后今天早上我把任务重启了下,数据就全部写入到mysql中了
> 在 2021年2月26日,15:02,Smile 写道:
>
> 你好,
>
> 关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和
> numRecordsOut,看是哪个算子开始有输入没输出的。
> 上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。
>
> GroupWindowAggregate(groupBy=[stkcode],
我也遇到类似的问题了, 求问楼主最后怎么解决的.
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我也遇到相同的问题了, 区别在于我是有一个springboot的项目提交的sql, 1.11.3上是好的,
换成1.12.1之后就不行了.sql-client本身可以执行, 但是我自己在springboot里面就提交不了sql了. 报的错是一样的,
求问楼主最后怎么解决的, 我以为应该是包有冲突, 但是具体是哪个jar包有冲突我还说不上来.
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好,
关于指标的问题,可以进到具体的算子里面的 Metrics 页面看看每个算子的 numRecordsIn 和
numRecordsOut,看是哪个算子开始有输入没输出的。
上面贴的指标看起来是 Overview 页面上的,这个地方展示的指标是对整个 Chain 起来的整体算的。
GroupWindowAggregate(groupBy=[stkcode], window=[TumblingGroupWindow('w$,
matchtime, 6)], properties=[w$start, w$end, w$rowtime, w$proctime],
我们在使用Flink SQL 统计一分钟内各个股票的交易量,SQL代码如下:
CREATE TABLE t_stock_match_p_1(
id VARCHAR,
stkcode INT,
volume INT,
matchtime TIMESTAMP,
WATERMARK FOR matchtime as matchtime
) WITH (
'connector' = 'kafka-0.10',
'topic' = 'stock_match_p_1',
'scan.startup.mode' = 'latest-offset
Hi Jeff,
对于 SQL,现在只能设置整个 SQL 的并发,不能单独提高某个算子的并发。
不过可以考虑把消费 Kafka 的部分用 DataStream 来实现,然后再把 DataStream 转成 Table 去跑 SQL。这样消费
Kafka 的并发和 SQL 的并发就可以分开来设置了。
还有一个想法是如果你的 Kafka Source 到 UDF 之间有 hash (比如 Group By)之类的重分发的逻辑,是否可以忽略 Kafka
HI,
如题,想要在Flink
SQL中通过自定义UDF增加指标,从而实现自定义告警。那么如何在UDF中获取到RuntimeContext从而修改Metrics呢?
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi all:
使用flink sql发现一个时区问题,在flink 1.11.3,flink 1.10 都有发现。
使用eventtime,datestream 转换为table,对times字段使用 rowtime。数据为 161421840,执行完rowtime
后变成 161418960 直接就少了8小时,导致后续的开窗都有问题。
代码参考:https://paste.ubuntu.com/p/xYpWNrR9MT/
hi all,
用flink sql消费kafka数据,有效并发数是由kafka分区数来决定的,请问有什么方法提高有效并发数吗? 因为有一个UDF是请求python
http服务,速度不快,有没有方法单独提高这一块的并发数呢?
是的,hive表必须存在HiveCatalog里才能正常读写
On Tue, Feb 23, 2021 at 10:14 AM yinghua...@163.com
wrote:
>
> Flink的版本是1.11.3,目前我们所有表的catalog类型都是GenericInMemoryCatalog,是不是Hive表要用HiveCatalog才行?
>
>
>
> yinghua...@163.com
>
> 发件人: Rui Li
> 发送时间: 2021-02-23 10:05
> 收件人: user-zh
>
Flink的版本是1.11.3,目前我们所有表的catalog类型都是GenericInMemoryCatalog,是不是Hive表要用HiveCatalog才行?
yinghua...@163.com
发件人: Rui Li
发送时间: 2021-02-23 10:05
收件人: user-zh
主题: Re: Re: Flink SQL 写入Hive问题请教
你好,
用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么?
On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心 wrote:
> 我增加调试
你好,
用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么?
On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心 wrote:
> 我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert
> into时创建Hive表时提示没有连接器的配置
> Table options are: 'is_generic'='false'
> 'partition.time-extractor.timestamp-pattern'='$dt $hr'
>
在hive catalog下创建kafka source表会在hive
metastore中创建一张仅包含元数据的表,hive不可查,flink任务中可以识别并当成hive表读,然后只需要在hive
dialect下正常读出写入即可。
参考 https://my.oschina.net/u/2828172/blog/4415970
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert
into时创建Hive表时提示没有连接器的配置
Table options are: 'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr'
'sink.partition-commit.delay'='0S'
'sink.partition-commit.policy.kind'='metastore,success-file'
flink-jdbc写入clickhouse 在flink 1.12版本 tps只能到35W左右的tps,各位,有什么可以性能调优?
你这没有把dialect set成hive吧,走到了else分支。default
dialect是需要指定connector的,参考文档的kafka到hive代码
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我们在开发一个Flink SQL 框架,在从kafka读取数据加工写入到Hive时一直不成功,sql脚本如下:
CREATE TABLE hive_table_from_kafka (
collect_time STRING,
content1 STRING,
content2 STRING
) PARTITIONED BY (
dt STRING,hr STRING
) TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr',
'sink.partition
这几天研究了flink table 转化为stream node 的源码,发现是某个算子的并发度取决于上一个算子的并发度。
但是在实际测试过程中发现使用window aggregate 语句时候 该算子的并发度和上游的source不一致 和我cli 命令配置的并发度一致
这是为什么呢?
guaishushu1...@163.com
MP,
primary key (building_id, sofa_id, local_date, `hour`)
NOT ENFORCED
) with (
'connector' = 'jdbc',
'url' = '',
'table-name' = '',
'username' = 'x'
'pass
>
> 二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key
>
> 这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' =
> 'all',但是还是因为没有 primary Key,所以无法 run。
>
> 现在就是针对第二种情况,因为Hive的维度表不是我维护的,很多人都在用,所以不能修改去加上 primary key,无法进行 join.
第二种情况,hive表不是streaming读的,相当于是一张静态表,每次都是加载最新的全量,所以配置如下参数即可
Flink-1.12.0 SQL定义timestamp(3)格式出现时间解析问题
CREATE TABLE
user_log1 (
user_id string,
ts TIMESTAMP(3),
proc_time as PROCTIME()) WITH (
Caused by: java.io.IOException: Failed to deserialize JSON
'{"user_id":"1188","ts":"2021-02-19T17:52:20.921Z"}'.
at
Hi, Leonard
我们的业务变得越来越复杂,所以现在需要 Join Hive 维表的情况非常普遍。现在维表分三种情况
一,维表没有分区,没有 primary key
这时候 `'streaming-source.partition.include' = 'latest',因为没有
parition,所以 latest 应该加载的就是全部的数据。
二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key
这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' =
Hi, macia
> 在 2021年2月9日,10:40,macia kk 写道:
>
> SELECT *FROM
>(
>SELECT tt.*
>FROM
>input_tabe_01 tt
>FULL OUTER JOIN input_tabe_02 mt
>ON (mt.transaction_sn = tt.reference_id)
>and tt.create_time >= mt.create_time + INTERVAL
SELECT *FROM
(
SELECT tt.*
FROM
input_tabe_01 tt
FULL OUTER JOIN input_tabe_02 mt
ON (mt.transaction_sn = tt.reference_id)
and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES
and tt.create_time <=
Hi,
那join的语句是怎么写的呢?
On Mon, Feb 8, 2021 at 2:45 PM macia kk wrote:
> 图就是哪个报错
>
> 建表语句如下,表示公共表,我也没有改的权限.
>
> CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
> 'country', `currency` string COMMENT 'currency', `exchange_rate`
> decimal(25,10) COMMENT 'exchange rate')
>
图就是哪个报错
建表语句如下,表示公共表,我也没有改的权限.
CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
'country', `currency` string COMMENT 'currency', `exchange_rate`
decimal(25,10) COMMENT 'exchange rate')
PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd')
ROW FORMAT SERDE
你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?
On Mon, Feb 8, 2021 at 10:33 AM macia kk wrote:
> Currently the join key in Temporal Table Join can not be empty.
>
> 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
>
> [image: image.png]
>
--
Best regards!
Rui Li
Currently the join key in Temporal Table Join can not be empty.
我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
[image: image.png]
Hi
> 在 2021年2月5日,09:47,macia kk 写道:
>
> the `latest` only works` when the
> streaming hive source table used as temporal table.
只能用在temporal(时态)表中,时态表只能在 temporal join(也就是我们常说的维表join) 中使用
祝好
the
streaming hive source table used as temporal table. By default the option
is `all`.
报错
Flink SQL> SELECT * FROM exrate_table /*+
OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'
= 'latest') */; [ERROR] Could not execute SQL statement. Rea
我做了。。
添加了一个sql语法类似
"select " +
"msg," +
"count(1) cnt" +
" from test" +
" where msg = 'hello' " +
" group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " +
" EMIT \n" +
" WITH
??
??hdfs??sink??
??
CREATE TABLE csvTableSink ( id BIGINT,name STRING) WITH ('connector.path'=
'hdfs://hacluster/flink/qyq_qyq13','connector.type'='filesystem','format.type'='csv','update-mode'
= 'append')
今天在使用Flink 1.11.3版本使用Flink SQL将kafka中数据导入到HDFS上时提示如下的错误
Caused by: org.apache.flink.table.api.TableException: Could not load service
provider for factories.
at
org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:346
hi, ?? ??1.12flink sql
??datastream?,
共有 1922 项搜索結果,以下是第 601 - 700 matches
Mail list logo