各位大佬好, flink 升级1.12.0,在使用类似如下语句时
select trade_id,sku_id,max(case f= 1 then 'aaa' else '' end ) as a,max(case f=2
then 10 else 0 end) as b from order group by trade_id,sku_id
出现 MaxWithRetractAggFunction$MaxWithRetractAccumulatr$Converter.toInternal
java.lang.interger cannot to org.apache.flink.ta
INTO `tablename`(`key1`, `key2`, `f1`, `f2`) VALUES (?, ?, ?, ?) ON
DUPLICATE KEY UPDATE `key1`=VALUES(`key1`), `key2`=VALUES(`key2`),
`f1`=VALUES(`f1`), `f2`=VALUES(`f2`)
里面已经包含了定义的key, 当发生唯一键冲突时,会执行更新。所以无需指定uniqueKeyFields的
发件人: wind.fly@outlook.com
发送时间
Hi,all:
近日因为用到JdbcDynamicTableSink,发现往mysql插入数据时没有按我指定的primary
key更新数据,无意间追踪到org.apache.flink.connector.jdbc.dialect.MySQLDialect类中getUpsertStatement方法:
/**
*
Mysql upsert query use DUPLICATE KEY UPDATE.
*
*
NOTE: It requires Mysql's primary key to be consistent with pkFields.
*
*
We d
补充一下,sql中dt是timestamp(3)类型,同时是watermark
发件人: wind.fly@outlook.com
发送时间: 2020年11月4日 17:29
收件人: user-zh@flink.apache.org
主题: flink1.11.0 sql自定义UDAF包含复合类型时报Incompatible types
Hi,all
本人使用flink版本为1.11.0,自定义udaf如下:
public class GetContinuousListenDuration
Hi,all
本人使用flink版本为1.11.0,自定义udaf如下:
public class GetContinuousListenDuration extends AggregateFunction {
private static final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("-MM-dd HH:mm");
@Override
@DataTypeHint("ROW")
public Row getValue(Continuo
org.apache.hadoop.hbase.util.ByteStringer时调用了new
LiteralByteString(),这样就无法找到该类,从而报了java.lang.NoClassDefFoundError: Could not
initialize class org.apache.hadoop.hbase.util.ByteStringer。
解决方法:flink打包时去掉了protobuf-java(3.7.1)依赖,提交时将protobuf-java:2.5.0作为依赖即可。
发件人: wind.fly@outlook.com
Hi, all:
本人试图将flink-sql-gateway(https://github.com/ververica/flink-sql-gateway)升级到1.11支持版本,将flink
sql(用到hbase connector)提交到yarn session后运行时报:
org.apache.hadoop.hbase.DoNotRetryIOException:
java.lang.NoClassDefFoundError: Could not initialize class
org.apache.hadoop.hbase.util.B
PM wind.fly@outlook.com <
wind.fly....@outlook.com> wrote:
> Hi,all:
> 最近在升级flink1.11,sql中用到hbase connctor,发布到yarn-session时,报如下异常:
> 2020-07-29 11:49:55
> org.apache.hadoop.hbase.DoNotRetryIOException:
> java.lang.NoClassDefFoundError: Could
Hi,all:
最近在升级flink1.11,sql中用到hbase connctor,发布到yarn-session时,报如下异常:
2020-07-29 11:49:55
org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.NoClassDefFoundError:
Could not initialize class org.apache.hadoop.hbase.util.ByteStringer
at
org.apache.hadoop.hbase.client.RpcRetryingCaller.translate
就是:
当session创建的时候,worker thread1 会创建一个TableEnvironment,
然后当后续其他该session请求过来时候,可能是 worker thread2使用该TableEnvironment执行sql。
这个其实就是在多线程情况下使用TableEnvironment。不符合TableEnvironment只能在单线程使用的约束。
wind.fly@outlook.com 于2020年7月28日周二 下午2:09写道:
>
> gateway就类似于一个web服务,大概流程是建立连接时会初始化一个session,在sessio
RelMetadataQueryBase.THREAD_PROVIDERS
.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));
这句话临时fix。
wind.fly@outlook.com 于2020年7月28日周二 上午11:02写道:
> 不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment
>
> 发件人: godfrey he
> 发
周二 上午9:55写道:
> hi 能给出详细的schema信息吗?
>
> wind.fly@outlook.com 于2020年7月27日周一
> 下午7:02写道:
>
>> 补充一下,执行的sql如下:
>>
>> select order_no, order_time from
>> x.ods.ods_binlog_test_trip_create_t_order_1
>>
>> ________
>>
d' = 'testGroup',
'connector.startup-mode' = 'group-offsets',
'connector.topic' = 'ods-test_trip_create-t_order'
)
________
发件人: godfrey he
发送时间: 2020年7月28日 9:55
收件人: user-zh
主题: Re:
补充一下,执行的sql如下:
select order_no, order_time from x.ods.ods_binlog_test_trip_create_t_order_1
发件人: wind.fly@outlook.com
发送时间: 2020年7月27日 18:49
收件人: user-zh@flink.apache.org
主题: flink1.11.0 执行sqlQuery时报NullPointException
Hi,all:
本人正在为公司之前基于flink1.10的
Hi,all:
本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
Caused by: java.lang.NullPointerException
at java.util.Objects.requireNonNull(Objects.java:203)
at
org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMe
exactly once语义,需要配置checkpoint才能得到结果。
Best,
Godfrey
wind.fly@outlook.com 于2020年7月23日周四 下午7:22写道:
> Hi, all:
>
> 本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
> sql如下:
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment
Hi, all:
本人当前使用flink版本1.11.0,但是在执行executeSql后,print时没有在console打印出结果(查看kafka是一直有数据产生的),
sql如下:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build
:46,wind.fly....@outlook.com 写道:
>
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka,
> 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session
> web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
> x.report.bi_report_fence_common_indicators
> select
> fenc
Hi, all:
本人使用的flink版本为flink 1.10.1, flink sql消费kafka,
当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session
web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
insert into
x.report.bi_report_fence_common_indicators
select
fence_id,
'finishedOrderCnt' as indicator_name,
TUMBLE_END(dt, INTERVAL '5' MIN
Hi,all:
本人当前用的flink版本1.10,通过yarn-session发布job,通过jobs/job/stop api 停止任务会报Unable to load
requested file,问一下在yarn-session模式下没有这个api吗?
Best,
Junbao Zhang
交
Hi,
时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。
wind.fly@outlook.com 于2020年5月28日周四 下午5:27写道:
> Hi, Benchao:
>
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMES
kafka源表的offset提交
嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
wind.fly@outlook.com 于2020年5月28日周四 下午5:02写道:
> Hi, Benchao:
> 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
>
>
>
>
>
,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
wind.fly@outlook.com 于2020年5月28日周四 下午3:14写道:
> Hi,all:
> 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
>
> tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> tEnv.sqlUp
Hi,all:
当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
其中a是kafka表,connector属性为:
'connector.properti
Hi,all
使用flink版本1.10.0,在hive catalog下建了映射kafka的表:
CREATE TABLE x.log.yanfa_log (
dt TIMESTAMP(3),
conn_id STRING,
sequence STRING,
trace_id STRING,
span_info STRING,
service_id STRING,
msg_id STRING,
servicename STRING,
ret_code STRING,
durati
g 包含 “.”
这个关键字符,这和表的全名:catalogName.databaseName.tableName 会冲突,应该在建表时会报catalog x
不存在的问题,没复现proctime field不支持的问题。
Best,
Leonard
> 在 2020年5月20日,11:01,wind.fly@outlook.com 写道:
>
> Hi,
>建表语句为:
>CREATE TABLE x.log.yanfa_log (
>dt TIMESTAMP(3),
>conn
nk-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql
<https://github.com/leonardBang/flink-sql-etl/blob/master/etl-job/src/main/resources/job-sql-1.10/kafka2hbase/KafkaJoinHbaseJoinMysql2Hbase.sql>
> 在 2020年5月19日,09:23,wind.fly@outlook.com 写道:
&
#x27;s proctime field, doesn't support 'PROCTIME()'
可以的吧,jark大佬的例子http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/
<http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/>
也是这么用的,我也试过sql client和tab
Hi,
经过尝试,select时候调用PROCTIME()函数生成proctime字段是可行的,谢谢。
发件人: 111
发送时间: 2020年5月18日 16:07
收件人: user-zh@flink.apache.org
主题: 回复: flink sql使用维表关联时报Temporal table join currently only supports 'FOR
SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIM
()'
你第二次贴的DDL好像也有些问题,是不是`proctime AS PROCTIME(),`?
wind.fly@outlook.com 于2020年5月18日周一 上午9:48写道:
> Sorry, 之前建表语句copy错了,应该是这样:
> CREATE TABLE x.log.yanfa_log (
> dt TIMESTAMP(3),
> conn_id STRING,
> sequence STRING,
> trace_id STRING,
log',
'connector.properties.bootstrap.servers' = '**:9092',
'connector.properties.zookeeper.connect' = '**:2181',
'connector.startup-mode' = 'latest-offset',
'update-mode' = 'append',
'form
Hi, all:
本人使用的flink 版本为1.10.0,planner为BlinkPlanner,用LEFT JOIN FOR SYSTEM_TIME AS
OF 语法关联维表:
select TUMBLE_END(l.dt, INTERVAL '30' SECOND) as index_time,
l.extra_info['cityCode'] as city_code, v.vehicle_level as vehicle_level,
CAST(COUNT(DISTINCT req_body['driverId']) as STRING) as
32 matches
Mail list logo