你好我的朋友:
太久没有写 flink 的代码了.今天写了一个 flink 的 udf.可能需要调用一下 open 函数.但是我不知道如何构造那个
context.有同学能帮忙吗?
非常感谢!
Best wishes!!!
function and I don’t want to generate a
default time or an error time, how should I deal with it?
Looking forward to your answer, thank you very much.
Best,
forideal
to update the file.
Best,
Forideal
At 2021-05-07 19:41:45, "forideal" wrote:
Hi My friends:
I use FlieSystem in Flink SQL, and I found that my success file was
submitted late, probably dozens of minutes late.
Here I provide some information:
1.Flink version is 1.11.1.
le (iter.hasNext()) {
String partition = iter.next();
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
Best,
Forideal
Hi My friends:
My watermark added 8 more hours to the timestamp displayed on the flink
web. What is the reason for this? Actually looking at the data, it is correct.
I don't know where the problem occurred? Is it because of the time zone?
Flink 1.11.1
Best Wishes!!!
forideal
Hi Arvid Heise,
Thank you for your reply.
Yes,my connection to the JM is bad !!!
Best wishes,forideal
At 2020-11-04 15:32:38, "Arvid Heise" wrote:
A jar upload shouldn't take minutes. There are two possibilities that likely
co-occured:
- your jar is much b
Future.java:1595)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thank you very much for your reply, forideal
Hi 本超,
感谢你的回复,这个地方的代码我们确实改动过,官方代码的行为是正常的。非常感谢!
> 目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。
> 1.并行度超过 topic partition 的时候会造成资源浪费
> 2.并行度超过 topic partition 后,checkpoint 也无法正常触发了
其中第二个问题是我们自己改动官方 Flink 源码造成的。
Best forideal
在 2020-08-22 11:37:20,"Benchao Li" 写道:
>Hi f
。
Best, forideal
在 2020-08-19 10:06:46,"godfrey he" 写道:
>看看deduplication语法[1] 是否满足你的需求
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication
>
>forideal 于2020年8月17日周一 下午12:13写道:
>
>> Hi,
>
Hi,
最近我有一个使用 Flink SQL 做简单的数据去重的需求,想使用 Flink 的 `ScalarFunction`,通过阅读 API 发现
FunctionContext context 并不支持访问 state。
我准备使用 Guava cache 做,不知道小伙伴有没有更好的建议哈!感谢。
Best,forideal
-fire.delay = 60 s
[1]http://apache-flink.147419.n8.nabble.com/FLINKSQL1-10-UV-td4003.html
Best, forideal
At 2020-08-16 13:21:25, "Chengcheng Zhang" <274522...@qq.com> wrote:
Hi, forideal
Thank you so much, it does help a lot.
The approach you mentioned e
BY time_str;
In this sql, time_str is an hour in 2020081600, 2020081601,...2020081623.
[1]http://apache-flink.147419.n8.nabble.com/flink-sql-5-5-td2011.html
[2]http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/
Hope this helps.
Best, forideal
At 2020
Hi 赵一旦,
目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。目前我这边遇到两个问题。
1.并行度超过 topic partition 的时候会造成资源浪费
2.并行度超过 topic partition 后,checkpoint 也无法正常触发了
Best forideal
在 2020-08-14 12:03:32,"赵一旦" 写道:
>检查点呢,大多数用FlinkSQL的同学们,你们的任务是随时可运行那种吗,不是必须保证不可间断的准确性级别吗?
>
>Xingbo Huang 于2
ts Arraylist
public class ConcatString extends ArrayList {
@Override
public boolean add(String toString) {
if (this.size() < 1000) {
super.add(toString);
return true;
}
return false;
}
public List getList() {
return this;
}
}
Best forideal
At 2020-08-14 21:46:45,
catString createAccumulator() {
return new ConcatString();
}
@Override
public void open(FunctionContext context)
throws Exception {
}
Best forideal
将能减少用户的成本。
Best forideal
在 2020-08-13 16:33:29,"Zhou Zach" 写道:
>
>
>
>Hi forideal, Shengkai Fang,
>
>加上env.disableOperatorChaining()之后,发现5个算子,
>
>
>
>
>Source: TableSourceScan(table=[[default_catalog, default_databa
ime - INTERVAL '10'
SECOND
Best forideal
在 2020-08-13 15:20:13,"Zhou Zach" 写道:
>
>
>
>Hi forideal,
>我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下:
>
>
>val streamExecutionEnv = StreamExecuti
这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成
watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain 观察每个 op
的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。
Best forideal
在 2020-08-13 12:56:57,"forideal" 写道:
>大家好
>
>
>关于这个问题我进行了一些 debug,发现了 wa
实无法进一步debug了。
如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
Best forideal
在 2020-08-11 17:13:01,"forideal" 写道:
>大家好,请教一个问题
>
>
> 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成
> watermark。消费大量的数据的时候,就无法生成watermark。
> 一直是
test
) T
where
user_id is not null
and user_id <> ''
and CHARACTER_LENGTH(user_id) = 24
) T
group by
SESSION(event_time, INTERVAL '10' SECOND),
user_id
Best forideal
等一会儿发到下游。起到等一会的效果。
Best forideal
在 2020-07-03 23:05:06,"Benchao Li" 写道:
>奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。
>
>admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道:
>
>> Hi,all
>> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流
st(new Row(this.fieldNames.length)));
}
} catch (Exception e) {
result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
}
});
}
Best forideal.
在 2020-07-02 15:56:46,"sunfulin" 写道:
>hi,
>我在使用flink 1.10.1 blink
>planner,通过扩展tablesourcesinkfac
Hi 本超
关于Mysql 做维表,关掉cache后的优化手段,有什么建议吗?
比如,20k records per second 的流量,关掉 cache 会对 mysql 产生很大的压力。不知道 MySQL Lookup 做成
async + batch 会不会提升性能或者有副作用。
Best forideal.
在 Benchao Li ,2020年7月1日 13:22写道:
我理解你只需要把这同一个Mysql表再做一个维表即可。可以写两次DDL,一个给维表用,一个给sink用。
如果你就觉得它是实时变化的,你可以把维表的cache关掉,保证
ns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。
>> >
>> >
>> >
>> >
>> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。
>> >
>> >
>> &g
你好,我的朋友:
我使用的是 Flink 1.10 Blink Planer。
我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。
为什么我想要这个功能:
场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 string,后面还需要 cast 比较繁琐,如果使用
get_int、get_double、get_string 这样的方式,实现起来又非常多
场景2: 我的数据是一个 Json ,问题同上。
在场景1中,我改了下 Flink 的源码,在
]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions
Best
forideal
在 2020-05-28 10:16:45,"111" 写道:
>Hi,
>想要在sqlgateway里面使用,那么可以看看下面几个条件:
>1 满足SPI的要求,能让flink自动发现实现类
>2 配置FLINK_HO
/JdbcInputFormat.java#L236
Why don't we support LocalDateTime?
Best wishes.
forideal
forideal
Hi Jark:
Thanks for your replay!
1. 是基于哪个版本,哪个 planner 进行的测试?
Flink 1.9.0 Blink Planner
2. 流计算模式还是批计算模式?
流计算模式
3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗?
注册的名字为 red_sum
Best forideal
在 2020-04-28 11:13:50,"Jark Wu" 写道:
>Hi,
大家好:
我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG 等等。
我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误)
SQL:
select
query_nor,
sum(cast (1asbigint))as query_nor_counter
from ods_search_track
groupby
query_nor,
Hello, my friend.
I have a dimension table.
createtabledim_u_score(u_idbigint,varchar,score_adouble,score_bdouble)with{xxx}Inascene
The condition of lookup is fliter score_a > 0.9
In another scenario
The condition of lookup is fliter score_b > 1
In Flink, at present, lookup join can use on
ey, value);
}
@Override
public String getValue(Map acc) {
return JSON.toJSONString(acc);
}
@Override
public TypeInformation getResultType() {
return Types.STRING;
}
}
Best forideal
At 2020-04-21 10:05:05, "Kurt Young" wrote:
Thanks, once you can reproduce this issue locally,
forideal
At 2020-04-18 21:51:13, "Jark Wu" wrote:
Hi,
What's the statebackend are you using? Is it Heap statebackend?
Best,
Jark
On Sat, 18 Apr 2020 at 07:06, tison wrote:
Hi,
Could you share the stack traces?
Best,
tison.
forideal 于2020年4月18日周六 上午12:33写道:
Hello
/v2-d3b1105b1419fef3ea6b9176085a5597_1440w.jpg
https://pic3.zhimg.com/80/v2-b6ea7b4a8368c4bae03afb94c723bcca_1440w.jpg
Best, forideal
ate(sql);
});
env.execute(jobName);
Best Wishes
At 2020-04-10 16:35:33, "Jark Wu" wrote:
Hi forideal,
Are you using `StreamTableEnvironment` or SQL CLI?
Currently, only `TableEnvironemnt` with Blink planner have the multi-sink
optimization (reuse shared upstream operators).
Hello
There are 3 SQLs all querying the same table, but the generated GAG is 3
independent topologies.I think, the better result is that there is one Source
and 3 Sinks.
createtablegood_sink(datavarchar)with(
'connector.type'='console',
大家好,本文为 Flink Weekly 的第十期,由张成整理,主要内容包括:近期社区开发进展,邮件问题答疑以及社区直播和相关技术博客。
社区开发进展
[release] 关于发布 Flink 1.10.1 的讨论正在火热进行,最新消息请参考 Yu Li 发起的讨论。
[1]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-td38689.html
[Checkpoint] Arvid Heise 发起 FLIP-76
the job is fully initiated?
No,the job can't init.
Topology
op1-hash->op2-hash->op3-hash->op4
|
|-hash->op5
op1 parallelism is 200
op2 parallelism is 400
op3 parallelism is 400
op4 parallelism is 400
op5 parallelism is 400
Best Wishes
forideal
At 2020-03-20 15:20:07, "Xin
', and 'taskmanager.network.memory.max'.
But actually this waste too many resource.
Memory Segments
| Type | Count |
| Available | 698,838 |
|
Total
| 700,000 |
| Direct | 700,103 | 21.4 GB | 21.4 GB |
| Mapped | 0 | 0 B | 0 B |
Best Wishes
forideal
operator, parallelism is 3
Two is GroupWindowAggregate operator,parallelism is 3
Three is LookupJoin operator,parallelism is 3
I want to change the parallelism of GroupWindowAggregate,but i can't.
Best wishes
forideal
大家好,本文为 Flink Weekly 的第八期,由张成整理,主要内容包括:近期社区开发进展,邮件问题答疑以及社区直播和相关技术博客。
社区开发进展
Yangze Guo 在 FLIP-108 中建议 Flink 支持对 GPU 的资源管理。
[1]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-108-Add-GPU-support-in-Flink-tp38286.html
从 Flink 1.10.0 开始,Apache Flink 项目开始维护
Hello everyone
Now i have a job with big state in RocksDB.This job's source is Kafka. If i
want to replay data, the job will crash.
One of the motivation of FLIP 27 is event time alignment , however , it is
not already for me.
How can i work around?
Here is an immature solution, I
42 matches
Mail list logo