如何写 flink udf 的 ut?

2022-06-20 Thread forideal
你好我的朋友: 太久没有写 flink 的代码了.今天写了一个 flink 的 udf.可能需要调用一下 open 函数.但是我不知道如何构造那个 context.有同学能帮忙吗? 非常感谢! Best wishes!!!

How to deal with the abnormal extraction time of the calculated column?

2021-05-13 Thread forideal
function and I don’t want to generate a default time or an error time, how should I deal with it? Looking forward to your answer, thank you very much. Best, forideal

Re:FlieSystem Connector's Success File Was Submitted Late

2021-05-07 Thread forideal
to update the file. Best, Forideal At 2021-05-07 19:41:45, "forideal" wrote: Hi My friends: I use FlieSystem in Flink SQL, and I found that my success file was submitted late, probably dozens of minutes late. Here I provide some information: 1.Flink version is 1.11.1.

FlieSystem Connector's Success File Was Submitted Late

2021-05-07 Thread forideal
le (iter.hasNext()) { String partition = iter.next(); LocalDateTime partTime = extractor.extract( partitionKeys, extractPartitionValues(new Path(partition))); if (watermark > toMills(partTime) + commitDelay) { needCommit.add(partition); iter.remove(); } } return needCommit; } Best, Forideal

Watermark time zone issue

2021-05-07 Thread forideal
Hi My friends: My watermark added 8 more hours to the timestamp displayed on the flink web. What is the reason for this? Actually looking at the data, it is correct. I don't know where the problem occurred? Is it because of the time zone? Flink 1.11.1 Best Wishes!!! forideal

Re:Re: JM upload files to blob server is slow

2020-11-05 Thread forideal
Hi Arvid Heise, Thank you for your reply. Yes,my connection to the JM is bad !!! Best wishes,forideal At 2020-11-04 15:32:38, "Arvid Heise" wrote: A jar upload shouldn't take minutes. There are two possibilities that likely co-occured: - your jar is much b

JM upload files to blob server is slow

2020-11-03 Thread forideal
Future.java:1595) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Thank you very much for your reply, forideal

Re:Re: Re: 如何设置FlinkSQL并行度

2020-08-24 Thread forideal
Hi 本超, 感谢你的回复,这个地方的代码我们确实改动过,官方代码的行为是正常的。非常感谢! > 目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。 > 1.并行度超过 topic partition 的时候会造成资源浪费 > 2.并行度超过 topic partition 后,checkpoint 也无法正常触发了 其中第二个问题是我们自己改动官方 Flink 源码造成的。 Best forideal 在 2020-08-22 11:37:20,"Benchao Li" 写道: >Hi f

Re:Re: ScalarFunction 访问 state

2020-08-19 Thread forideal
。 Best, forideal 在 2020-08-19 10:06:46,"godfrey he" 写道: >看看deduplication语法[1] 是否满足你的需求 > >[1] >https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#deduplication > >forideal 于2020年8月17日周一 下午12:13写道: > >> Hi, >

ScalarFunction 访问 state

2020-08-16 Thread forideal
Hi, 最近我有一个使用 Flink SQL 做简单的数据去重的需求,想使用 Flink 的 `ScalarFunction`,通过阅读 API 发现 FunctionContext context 并不支持访问 state。 我准备使用 Guava cache 做,不知道小伙伴有没有更好的建议哈!感谢。 Best,forideal

Re:回复:How to get the evaluation result of a time-based window aggregation in time after a new event falling into the window?

2020-08-16 Thread forideal
-fire.delay = 60 s [1]http://apache-flink.147419.n8.nabble.com/FLINKSQL1-10-UV-td4003.html Best, forideal At 2020-08-16 13:21:25, "Chengcheng Zhang" <274522...@qq.com> wrote: Hi, forideal Thank you so much, it does help a lot. The approach you mentioned e

Re:How to get the evaluation result of a time-based window aggregation in time after a new event falling into the window?

2020-08-15 Thread forideal
BY time_str; In this sql, time_str is an hour in 2020081600, 2020081601,...2020081623. [1]http://apache-flink.147419.n8.nabble.com/flink-sql-5-5-td2011.html [2]http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql/ Hope this helps. Best, forideal At 2020

Re:Re: 如何设置FlinkSQL并行度

2020-08-15 Thread forideal
Hi 赵一旦, 目前 Flink SQL 我这边使用也是无法指定各个算子的并行度。目前我这边遇到两个问题。 1.并行度超过 topic partition 的时候会造成资源浪费 2.并行度超过 topic partition 后,checkpoint 也无法正常触发了 Best forideal 在 2020-08-14 12:03:32,"赵一旦" 写道: >检查点呢,大多数用FlinkSQL的同学们,你们的任务是随时可运行那种吗,不是必须保证不可间断的准确性级别吗? > >Xingbo Huang 于2

Re:Re: Flink SQL UDAF com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID

2020-08-15 Thread forideal
ts Arraylist public class ConcatString extends ArrayList { @Override public boolean add(String toString) { if (this.size() < 1000) { super.add(toString); return true; } return false; } public List getList() { return this; } } Best forideal At 2020-08-14 21:46:45,

Flink SQL UDAF com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID

2020-08-14 Thread forideal
catString createAccumulator() { return new ConcatString(); } @Override public void open(FunctionContext context) throws Exception { } Best forideal

Re:Re:Re:Re:Re:Re:Flink SQL No Watermark

2020-08-13 Thread forideal
将能减少用户的成本。 Best forideal 在 2020-08-13 16:33:29,"Zhou Zach" 写道: > > > >Hi forideal, Shengkai Fang, > >加上env.disableOperatorChaining()之后,发现5个算子, > > > > >Source: TableSourceScan(table=[[default_catalog, default_databa

Re:Re:Re:Re:Flink SQL No Watermark

2020-08-13 Thread forideal
ime - INTERVAL '10' SECOND Best forideal 在 2020-08-13 15:20:13,"Zhou Zach" 写道: > > > >Hi forideal, >我也遇到了No Watermark问题,我也设置了table.exec.source.idle-timeout 参数,如下: > > >val streamExecutionEnv = StreamExecuti

Re:Re:Flink SQL No Watermark

2020-08-13 Thread forideal
这个问题的分析路径会更简单。我应该直接可以发现大部分 task 可以生成 watermark,少部分 task 无 watermark,能够快速的减少debug的时间。当前使用 disable chain 观察每个 op 的情况,对于 Flink sql 的 debug 有很大的便利之处,不知社区是否有相关参数帮助开发者。 Best forideal 在 2020-08-13 12:56:57,"forideal" 写道: >大家好 > > >关于这个问题我进行了一些 debug,发现了 wa

Re:Flink SQL No Watermark

2020-08-12 Thread forideal
实无法进一步debug了。 如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢 Best forideal 在 2020-08-11 17:13:01,"forideal" 写道: >大家好,请教一个问题 > > > 我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 > watermark。消费大量的数据的时候,就无法生成watermark。 > 一直是

Flink SQL No Watermark

2020-08-11 Thread forideal
test ) T where user_id is not null and user_id <> '' and CHARACTER_LENGTH(user_id) = 24 ) T group by SESSION(event_time, INTERVAL '10' SECOND), user_id Best forideal

Re:Re: Flink sql 主动使数据延时一段时间有什么方案

2020-07-03 Thread forideal
等一会儿发到下游。起到等一会的效果。 Best forideal 在 2020-07-03 23:05:06,"Benchao Li" 写道: >奥,对,还有一种思路。如果你的source的mq支持延迟消息,这个应该就不需要Flink做什么了,直接用mq的延迟消息就可以了。 > >admin <17626017...@163.com> 于2020年7月3日周五 下午5:54写道: > >> Hi,all >> 我们有这样一个场景,双流join,一个快流,一个慢流,想让快流

Re:flink asynctablefunction调用异常

2020-07-03 Thread forideal
st(new Row(this.fieldNames.length))); } } catch (Exception e) { result.complete(Collections.singletonList(new Row(this.fieldNames.length))); } }); } Best forideal. 在 2020-07-02 15:56:46,"sunfulin" 写道: >hi, >我在使用flink 1.10.1 blink >planner,通过扩展tablesourcesinkfac

Re: 关于flink sql问题

2020-07-02 Thread forideal
Hi 本超 关于Mysql 做维表,关掉cache后的优化手段,有什么建议吗? 比如,20k records per second 的流量,关掉 cache 会对 mysql 产生很大的压力。不知道 MySQL Lookup 做成 async + batch 会不会提升性能或者有副作用。 Best forideal. 在 Benchao Li ,2020年7月1日 13:22写道: 我理解你只需要把这同一个Mysql表再做一个维表即可。可以写两次DDL,一个给维表用,一个给sink用。 如果你就觉得它是实时变化的,你可以把维表的cache关掉,保证

Re:Re: Flink SQL UDF 动态类型

2020-06-09 Thread forideal
ns()内指定我们解析出的TypeInformation,这个TypeInformation可以是任何动态类型的。 >> > >> > >> > >> > >> 但是你的场景是使用udf,根据你发的udf示例来看,我明白你想要动态类型输出,但是实际上我不了解你们的场景,不明白什么样的场景需要用到这种处理,以及会用到一个udf产出不同的结果,因为我理解为了udf的管理方便、可读性以及可维护性,udf的输出参数类型应该定下来比较好一点。 >> > >> > >> &g

Flink SQL UDF 动态类型

2020-06-08 Thread forideal
你好,我的朋友: 我使用的是 Flink 1.10 Blink Planer。 我想构造一个Flink UDF ,这个 UDF 可以根据不同的参数返回不同的类型。 为什么我想要这个功能: 场景1: 我的数据是一个 pb 的 bytes,我想从里面获取数据,如果统一的返回 string,后面还需要 cast 比较繁琐,如果使用 get_int、get_double、get_string 这样的方式,实现起来又非常多 场景2: 我的数据是一个 Json ,问题同上。 在场景1中,我改了下 Flink 的源码,在

Re:回复: flink 如何自定义connector

2020-05-27 Thread forideal
] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/debugging_classloading.html#x-cannot-be-cast-to-x-exceptions Best forideal 在 2020-05-28 10:16:45,"111" 写道: >Hi, >想要在sqlgateway里面使用,那么可以看看下面几个条件: >1 满足SPI的要求,能让flink自动发现实现类 >2 配置FLINK_HO

Why Flink Connector JDBC does't support LocalDateTime ?

2020-05-22 Thread forideal
/JdbcInputFormat.java#L236 Why don't we support LocalDateTime? Best wishes. forideal

Flink Weekly | 每周社区动态更新 - 2020/05/14

2020-05-14 Thread forideal
forideal

Re:Re: Flink Buildin UDF 性能较慢

2020-04-27 Thread forideal
Hi Jark: Thanks for your replay! 1. 是基于哪个版本,哪个 planner 进行的测试? Flink 1.9.0 Blink Planner 2. 流计算模式还是批计算模式? 流计算模式 3. 你的自定义 UDAF 有注册成 "sum" 吗?能使用另一个名字,比如"mysum" 来避免可能的命名冲突吗? 注册的名字为 red_sum Best forideal 在 2020-04-28 11:13:50,"Jark Wu" 写道: >Hi,

Flink Buildin UDF 性能较慢

2020-04-27 Thread forideal
大家好: 我最近在使用 Flink SQL 做一些性能测试,我发现 Flink Buildin 的 Aggr 性能都很慢,比如 COUNT,LISTAGG 等等。 我采用自己写的 count 性能是 buildin 的 COUNT 的函数的两倍都不止。(各种窗口都测试过,不知道是不是我使用错误) SQL: select query_nor, sum(cast (1asbigint))as query_nor_counter from ods_search_track groupby query_nor,

Flink Lookup Filter Pushdown

2020-04-27 Thread forideal
Hello, my friend. I have a dimension table. createtabledim_u_score(u_idbigint,varchar,score_adouble,score_bdouble)with{xxx}Inascene The condition of lookup is fliter score_a > 0.9 In another scenario The condition of lookup is fliter score_b > 1 In Flink, at present, lookup join can use on

Re:Re: Blink SQL java.lang.ArrayIndexOutOfBoundsException

2020-04-20 Thread forideal
ey, value); } @Override public String getValue(Map acc) { return JSON.toJSONString(acc); } @Override public TypeInformation getResultType() { return Types.STRING; } } Best forideal At 2020-04-21 10:05:05, "Kurt Young" wrote: Thanks, once you can reproduce this issue locally,

Re:Re: multi-sql checkpoint fail

2020-04-19 Thread forideal
forideal At 2020-04-18 21:51:13, "Jark Wu" wrote: Hi, What's the statebackend are you using? Is it Heap statebackend? Best, Jark On Sat, 18 Apr 2020 at 07:06, tison wrote: Hi, Could you share the stack traces? Best, tison. forideal 于2020年4月18日周六 上午12:33写道: Hello

multi-sql checkpoint fail

2020-04-17 Thread forideal
/v2-d3b1105b1419fef3ea6b9176085a5597_1440w.jpg https://pic3.zhimg.com/80/v2-b6ea7b4a8368c4bae03afb94c723bcca_1440w.jpg Best, forideal

Re:Re: Multiple SQL Optimization

2020-04-10 Thread forideal
ate(sql); }); env.execute(jobName); Best Wishes At 2020-04-10 16:35:33, "Jark Wu" wrote: Hi forideal, Are you using `StreamTableEnvironment` or SQL CLI? Currently, only `TableEnvironemnt` with Blink planner have the multi-sink optimization (reuse shared upstream operators).

Multiple SQL Optimization

2020-04-10 Thread forideal
Hello There are 3 SQLs all querying the same table, but the generated GAG is 3 independent topologies.I think, the better result is that there is one Source and 3 Sinks. createtablegood_sink(datavarchar)with( 'connector.type'='console',

Flink Weekly | 每周社区动态更新 - 2020/03/26

2020-03-26 Thread forideal
大家好,本文为 Flink Weekly 的第十期,由张成整理,主要内容包括:近期社区开发进展,邮件问题答疑以及社区直播和相关技术博客。 社区开发进展 [release] 关于发布 Flink 1.10.1 的讨论正在火热进行,最新消息请参考 Yu Li 发起的讨论。 [1]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-10-1-td38689.html [Checkpoint] Arvid Heise 发起 FLIP-76

Re:Re: How can i set the value of taskmanager.network.numberOfBuffers ?

2020-03-23 Thread forideal
the job is fully initiated? No,the job can't init. Topology op1-hash->op2-hash->op3-hash->op4 | |-hash->op5 op1 parallelism is 200 op2 parallelism is 400 op3 parallelism is 400 op4 parallelism is 400 op5 parallelism is 400 Best Wishes forideal At 2020-03-20 15:20:07, "Xin

How can i set the value of taskmanager.network.numberOfBuffers ?

2020-03-19 Thread forideal
', and 'taskmanager.network.memory.max'. But actually this waste too many resource. Memory Segments | Type | Count | | Available | 698,838 | | Total | 700,000 | | Direct | 700,103 | 21.4 GB | 21.4 GB | | Mapped | 0 | 0 B | 0 B | Best Wishes forideal

Flink SQL, How can i set parallelism in clause of group by ?

2020-03-17 Thread forideal
operator, parallelism is 3 Two is GroupWindowAggregate operator,parallelism is 3 Three is LookupJoin operator,parallelism is 3 I want to change the parallelism of GroupWindowAggregate,but i can't. Best wishes forideal

Flink Weekly | 每周社区动态更新 - 2020/03/14

2020-03-13 Thread forideal
大家好,本文为 Flink Weekly 的第八期,由张成整理,主要内容包括:近期社区开发进展,邮件问题答疑以及社区直播和相关技术博客。 社区开发进展 Yangze Guo 在 FLIP-108 中建议 Flink 支持对 GPU 的资源管理。 [1]http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-108-Add-GPU-support-in-Flink-tp38286.html 从 Flink 1.10.0 开始,Apache Flink 项目开始维护

FLIP 27 is not already, how can i workaround ?

2020-03-13 Thread forideal
Hello everyone Now i have a job with big state in RocksDB.This job's source is Kafka. If i want to replay data, the job will crash. One of the motivation of FLIP 27 is event time alignment , however , it is not already for me. How can i work around? Here is an immature solution, I