java.lang.UnsatisfiedLinkError:
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
at
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
at
1.flink 小文件合并
测试下来发现,同一分区在不同时期去查询数据量不断增长,直到分区下所有的文件都compact完成,才定下来 ?
这个是什么原因。目前看起来像是分区提交没有等到分区下所有文件compact完成
2. 某些分区没数据时无法触发分区提交问题
我们实现了自定分区提交策略,为了通知离线下游。这样如果分区没数据,不能提交的话,就会把下游调度hang住。这种问题,怎么解决
1.flink 小文件合并
测试下来发现,同一分区在不同时期去查询数据量不断增长,直到分区下所有的文件都compact完成,才定下来 ?
这个是什么原因。目前看起来像是分区提交没有等到分区下所有文件compact完成
2. 某些分区没数据时无法触发分区提交问题
我们实现了自定分区提交策略,为了通知离线下游。这样如果分区没数据,不能提交的话,就会把下游调度hang住。这种问题,怎么解决
flink版本:1.12
hive版本:2.3.4
flink 1.12分支写入hive decimal类型报错:
java.lang.NoSuchMethodError:
org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J
at
org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010)
at
flink-jdbc写入clickhouse 在flink 1.12版本 tps只能到35W左右的tps,各位,有什么可以性能调优?
flink 如何升级hadoop3 ?
flink 如何升级hadoop3 ?
java.lang.NoSuchMethodError:
org.apache.hadoop.hive.serde2.io.HiveDecimalWritable.serialize64(I)J
at
org.apache.orc.impl.ColumnStatisticsImpl$Decimal64StatisticsImpl.updateDecimal(ColumnStatisticsImpl.java:1010)
at
1min的滚动窗口:
table.exec.emit.early-fire.enabled=true;
table.exec.emit.early-fire.delay=10 s;
设置窗口定期trigger之后,参数不生效
查看执行计划:
{
"id": 6,
"type": "GroupWindowAggregate(groupBy=[mid, code, floor_id],
window=[TumblingGroupWindow('w$, log_ts, 6)], properties=[w$start, w$end,
w$rowtime,
以直接转成TIMESTAMP
>select someFunc(field)
>from `someTable`
>group by TUMBLE(eventTime, INTERVAL '1' SECOND)
>
>
>
>guoliubi...@foxmail.com
>
>发件人: kandy.wang
>发送时间: 2020-12-14 11:23
>收件人: user-zh
>主题: Re:回复: Window aggregate can only be defined over a time attrib
个参数需要的就是bigint,你这边time_local 直接用就好,不用转另外TIMESTAMP
>
>
>
>guoliubi...@foxmail.com
>
>发件人: kandy.wang
>发送时间: 2020-12-14 10:28
>收件人: user-zh
>主题: Window aggregate can only be defined over a time attribute column, but
>TI
[ERROR] Could not execute SQL statement.
Reason:org.apache.flink.table.api.TableException: Window aggregate can only be
defined over a time attribute column, but TIMESTAMP(3) encountered.
SQL 如下:
create temporary view expose as
select
mid
,time_local
JdbcBatchingOutputFormat:
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
attemptFlush();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= executionOptions.getMaxRetries()) {
throw new
@Jianzhi Zhang
嗯,是这个原因,感谢 回复。 就是decimal的精度问题
在 2020-12-01 13:24:23,"Jianzhi Zhang" 写道:
>是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现
>
>> 2020年11月19日 下午10:41,kandy.wang 写道:
>>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_lea
insert into kudu.default_database.index_agg
SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as
leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss')
FROM XX.XX.XX
group by v_spu_id;
XX.XX.XX 是通过自定义cdc format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。
insert into kudu.default_database.index_agg
SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as
leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss')
FROM XX.XX.XX
group by v_spu_id;
XX.XX.XX 是通过自定义cdc format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。
自定义AggregateFunction 实现了UV的 HLL 近似计算,问题是 HyperLogLog 是第三方包,这个如何让flink 识别 ?
就不知道这个TypeInformation该如何写。
代码如下:
import io.airlift.slice.Slices;
import io.airlift.stats.cardinality.HyperLogLog;
import org.apache.flink.table.functions.AggregateFunction;
import org.slf4j.Logger;
import
问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
问题的。
>
>3. 你的 format 再解析 update 时,时先发的 before 还是 after?
>4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?
>
>On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote:
>
>>
>>
>>
>>
>>
>>
>> 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>>
>> 2
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
2. 没有开启
在 2020-11-20 11:49:44,"Jark Wu" 写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote:
>
>> hi
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
2. 没有开启
在 2020-11-20 11:49:44,"Jark Wu" 写道:
>实现上应该没什么问题。
>
>1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
>2. 是否开启 mini-batch了?
>
>Best,
>Jark
>
>On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote:
>
>> hi Jark:
>>
atabase.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
ase.table; 看看每个字段的值是否是正常正确的,有无 null
>值的,以验证你的自定义 format 没有问题。
>
>Best,
>Jark
>
>On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote:
>
>> --mysql表
>> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
>>`id` INT UNSIGNED AUTO_INCREMENT,
--mysql表
CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`(
`id` INT UNSIGNED AUTO_INCREMENT,
`spu_id` BIGINT NOT NULL,
`leaving_price` DECIMAL(10, 5)
PRIMARY KEY ( `id` ),
unique key idx_spu_id (spu_id)
)ENGINE=InnoDB DEFAULT CHARSET=utf8
--flink表
CREATE
hi:
按照我的理解,partition time提交分区,是会在current watermark > partition time + commit
delay 时机触发分区提交,得看你的sink.partition-commit.delay
设置的多久,如果超过之后,应当默认是会丢弃的吧。
https://cloud.tencent.com/developer/article/1707182
这个连接可以看一下
在 2020-11-12 11:58:22,"admin" <17626017...@163.com> 写道:
>Hi,all
>Flink
看了一下hbase的维表关联主要是通过org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction
实现的,测试了一下性能tps只有大概3-4w, 经加本地cache之后性能仍然没有提升。 分析了一下flink ui LookupJoin 是与kafka
source的算子 chain 在一起了,这样整个算子的并行度就受限于kafka分区的并行度。
1.想问一下这块的 hbase connector开发,是否有做过connector的性能测试。
想了解一下flink sql state里的东西,是否可以用datastream里的queryable api 查询 ? 怎么查询呢,是需要知道key
才可以查询么。
诉求就是想知道state里到底存的啥
group agg 开启了mini batch之后,state ttl不生效的问题:
现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink
算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 十几万。
sql-client-defaults.yaml对应的参数应该是这2个吧:
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle
hi
你建mysql要指定主键,另外创建flink表时也要指定一下主键
PRIMARY KEY (id) NOT ENFORCED,这样就会根据主键upsert了
在 2020-09-27 13:36:25,"xiao cai" 写道:
>如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。
>这是我很困惑的地方。
>
>
> 原始邮件
>发件人: lec ssmi
>收件人: flink-user-cn
>发送时间: 2020年9月27日(周日)
hi Leonard:
实际在HBaseSinkFunction中打log测试下来发现,都是UPDATE_AFTER类型的RowData数据写Hbase,没有你说的那种retract消息呢。如果是retract
应该是 会先发一条UPDATE_BEFORE 消息,再发一条UPDATE_AFTER消息吧。实际测下来
都是UPDATE_AFTER,转成了hbase的Put操作,就好比每次都是upsert一样。
在 2020-09-25 10:03:34,"Leonard Xu" 写道:
>Hi
>>
insert into hive.temp_dw.day_order_index select rowkey, ROW(orderN,)
from
(
select order_date as rowkey,
count(distinct parent_sn) as orderN,
group by order_date
)
通过sql查hbase时,有时查到数据,有时候查不到数据。是不是group操作,会有下游算子 发送撤回消息,导致在delete
hbase的某条rowkey数据,导致客户端查不到数据?
我理解 hbase sink
sql如下:
select
(case when act_name is not null then act_name else 'default_value'
end) as act_name,
(case when fst_plat is not null then fst_plat else 'default_value'
end) as fst_plat,
sum(amount) as saleN
from
因flink目前不支持pb format,调用了,protobuf-java-util
com.google.protobuf.utilJsonFormat.printer().preservingProtoFieldNames().print(message)
先再pb 转成json 再套用 JsonRowDataDeserializationSchema处理json,
发现处理的性能就只能达到20w左右的tps,而如果是处理json格式的数据,tps是可以达到50-60w的tps.
想问一下,1、flink要是处理pb格式的数据,有什么好的办法? 2
、社区对pb format
投票通过,即将发布
>>
>> On Thu, Sep 17, 2020 at 12:46 PM kandy.wang
>
>> kandy1203@
>
>> wrote:
>>
>>> @Jingsong Li
>>>
>>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>>CatalogTable table = checkNotN
ter不应该有较大性能差距。
>
>> 为何要强制滚动文件
>
>因为要保证Exactly-Once, 像Orc和parquet类似的 format,它并不能把一个文件拆成多次来写。
>
>On Thu, Sep 17, 2020 at 2:05 PM kandy.wang wrote:
>
>>
>>
>>
>> ok. 就是用hadoop mr writer vs flink 自实现的native
>> writer之间的性能对比了。至少目前看了一下table.exec.hive.fa
-policy.rollover-interval参数就不work了,如果5min一个分区,2min做一次checkpoint,那文件还不到几十M就滚动了。配置的参数就没意义了
在 2020-09-17 13:43:04,"Jingsong Li" 写道:
>可以再尝试下最新的1.11.2吗?
>
>https://flink.apache.org/downloads.html
>
>On Thu, Sep 17, 2020 at 1:33 PM kandy.wang wrote:
>
>> 是master分支代码
>&
g/jira/browse/FLINK-19121
>它是影响性能的,1.11.2已经投票通过,即将发布
>
>On Thu, Sep 17, 2020 at 12:46 PM kandy.wang wrote:
>
>> @Jingsong Li
>>
>> public TableSink createTableSink(TableSinkFactory.Context context) {
>>CatalogTable table = checkNotNull(context.getTable())
;
>On Wed, Sep 16, 2020 at 8:16 PM kandy.wang wrote:
>
>> 场景很简单,就是kafka2hive
>> --5min入仓Hive
>>
>> INSERT INTO hive.temp_.hive_5min
>>
>> SELECT
>>
>> arg_service,
>>
>> time_local
>>
>> .
>>
>> FROM_
StreamingFileWriter算子的并行度,有什么好的办法么?然后StreamingFileWriter
这块,有没有什么可以提升性能相关的优化参数?
在 2020-09-16 19:29:50,"Jingsong Li" 写道:
>Hi,
>
>可以分享下具体的测试场景吗?有对比吗?比如使用手写的DataStream作业来对比下,性能的差距?
>
>另外,压测时是否可以看下jstack?
>
>Best,
>Jingsong
>
>On Wed, Sep 16, 2020 at 2:03 PM kandy.wa
加上这个参数'sink.partition-commit.policy.kind'='metastore,success-file'
这个应该是可以work的
在 2020-09-16 15:01:35,"highfei2011" 写道:
>Hi,各位好!
> 目前遇到一个问题,在使用 FLink -1.11.0 消费 Kafka 数据后,使用 Streaming File Sink 的
> BucketAssigner 的分桶策略 sink 到 hdfs ,默认没有生成 _SUCCESS 标记文件。
> 我在配置中新增了
>val hadoopConf =
压测下来,发现streaming方式写入hive StreamingFileWriter ,在kafka partition=40 ,source
writer算子并行度 =40的情况下,kafka从头消费,tps只能达到 7w
想了解一下,streaming方式写Hive 这块有压测过么?性能能达到多少
private static class HiveRollingPolicy extends CheckpointRollingPolicy {
private final long rollingFileSize;
private final long rollingTimeInterval;
private HiveRollingPolicy(
long rollingFileSize,
long rollingTimeInterval) {
Preconditions.checkArgument(rollingFileSize > 0L);
自实现了kudu connector报错:
2020-09-09 18:34:59,442 WARN org.apache.flink.table.client.cli.CliClient
[] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
statement.
at
@Jingsong orc格式,都看过了,还是没有commit。感觉你们可以测一下这个场景
在 2020-08-12 16:04:13,"Jingsong Li" 写道:
>另外问一下,是什么格式?csv还是parquet。
>有等到10分钟(rollover-interval)过后和下一次checkpoint后再看吗?
>
>On Wed, Aug 12, 2020 at 2:45 PM kandy.wang wrote:
>
>>
>>
>>
>>
>>
&g
有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉,
重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。
就是感觉停止之前正在写的那个分区,没有触发commit
在 2020-08-12 14:26:53,"Jingsong Li" 写道:
>那你之前的分区除了in-progress文件,有已完成的文件吗?
>
>On Wed, Aug 12, 2020 at 1:57 PM kandy.wang wrote:
>
>>
>
; 写道:
>你的source是exactly-once的source吗?
>
>in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>
>On Wed, Aug 12, 2020 at 12:51 PM kandy.wang wrote:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
t; 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>>>
>>> On Tue, 11 Aug 2020 at 21:15, kandy.wang wrote:
>>>
>>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>>> > 21:0
2 中支持。
>>
>> On Tue, 11 Aug 2020 at 21:15, kandy.wang wrote:
>>
>> > 1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>> &g
1.StreamingFileWriter 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在 21:04分左右的时候做一次checkpoint
或savepoint,重启任务的时候,hm =2100分区的数据还存在很多的in-progress文件。
另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
2. sql-client不支持 checkpoint
设置 'csv.field-delimiter'='\t'
,查询数据的时候,报错:org.apache.flink.table.api.ValidationException: Option
'csv.field-delimiter' must be a string with single character, but was: \t
请问,该怎么搞?
@ godfrey
你说的这种StatementSet 提交方式,在sql-client提交任务的时候不支持吧? 可以给加上么。
在 2020-08-04 19:36:56,"godfrey he" 写道:
>调用 StatementSet#explain() 把结果打出来看看是否因 Deduplicate的digest不一样导致的没法复用
>
>kandy.wang 于2020年8月4日周二 下午6:21写道:
>
>>
>>
>>
>>
>>
尽量复用重复计算部分。
>1.11里用StatementSet来提交job,1.11之前用sqlUpdate/insertInto + execute提交任务
>
>kandy.wang 于2020年8月4日周二 下午5:20写道:
>
>> FLINK SQL view相关问题:
>> create view order_source
>>
>> as
>>
>> select order_id, order_goods_id, user_id,...
>>
>
FLINK SQL view相关问题:
create view order_source
as
select order_id, order_goods_id, user_id,...
from (
.. proctime,row_number() over(partition by order_id, order_goods_id
order by proctime desc) as rownum
from hive.temp_dw.dm_trd_order_goods/*+
现象:
CREATE TABLE test.xxx_5min (
..
) PARTITIONED BY (dt string , hm string) stored as orc TBLPROPERTIES(
'sink.partition-commit.trigger'='process-time',
'sink.partition-commit.delay'='5 min',
'sink.partition-commit.policy.kind'='metastore,success-file',
json格式,如果是一个json array 该如何定义 schema,array里还可能存在嵌套json array的情况。
如数据:
55 matches
Mail list logo