获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。
可以参考 OpenLineage[1] 的实现.
1.
好的,已经贴了sql片段
在 2024-03-08 11:02:34,"Xuyang" 写道:
>Hi, 你的图挂了,可以用图床或者直接贴SQL
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>在 2024-03-08 10:54:19,"iasiuide" 写道:
>
>
>
>
>
>下面的sql片段中
>ods_ymfz_prod_sys_divide_order 为kafka source表
>dim_ymfz_prod_sys_trans_log 为mysql为表
你好,我们用的是1.13.2和1.15.4版本的,看了下flink ui,这两种版本针对下面sql片段的lookup执行计划中的关联维表条件是一样的
在 2024-03-08 11:08:51,"Yu Chen" 写道:
>Hi iasiuide,
>方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc
>connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。
>
>[1] https://issues.apache.org/jira/browse/FLINK-33365
>
>祝好~
>
Hi iasiuide,
方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc
connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。
[1] https://issues.apache.org/jira/browse/FLINK-33365
祝好~
> 2024年3月8日 11:02,iasiuide 写道:
>
>
>
>
> 图片可能加载不出来,下面是图片中的sql片段
> ..
> END AS trans_type,
>
>
Hi, 你的图挂了,可以用图床或者直接贴SQL
--
Best!
Xuyang
在 2024-03-08 10:54:19,"iasiuide" 写道:
下面的sql片段中
ods_ymfz_prod_sys_divide_order 为kafka source表
dim_ymfz_prod_sys_trans_log 为mysql为表
dim_ptfz_ymfz_merchant_info 为mysql为表
flink web ui界面的执行计划片段如下:
图片可能加载不出来,下面是图片中的sql片段
..
END AS trans_type,
a.div_fee_amt,
a.ts
FROM
ods_ymfz_prod_sys_divide_order a
LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time
AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
AND
下面的sql片段中
ods_ymfz_prod_sys_divide_order 为kafka source表
dim_ymfz_prod_sys_trans_log 为mysql为表
dim_ptfz_ymfz_merchant_info 为mysql为表
flink web ui界面的执行计划片段如下:
[1]:TableSourceScan(table=[[default_catalog, default_database,
ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS
Hi, casel chan,
社区已经对增量框架实现动态加表(https://github.com/apache/flink-cdc/pull/3024
),预计3.1对mongodb和postgres暴露出来,但是Oracle和Sqlserver目前并没暴露,你可以去社区参照这两个框架,将参数打开,并且测试和适配。
Best,
Hongshun
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
ListPerson list = new ArrayList();list.add(new
Person("Fred",35));
在非窗口化的表上使用窗口属性
At 2024-03-08 09:28:10, "ha.fen...@aisino.com" wrote:
>public static void main(String[] args) {
>StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
Hi, fengqi.
这看起来像是select语句中,不能直接使用非来源于window
agg的proctime或者event函数。目前不确定这是不是预期行为,方便的话可以在社区jira[1]上提一个bug看看。
快速绕过的话,可以试试下面的代码:
DataStream flintstones = env.fromCollection(list); // Table select =
table.select($("name"), $("age"), $("addtime").proctime()); Table table =
tEnv.fromDataStream(
我使用注册kafka topic对应的schema到confluent schema registry时报错,想知道问题的原因是什么?如何fix?
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:
Schema being registered is incompatible with an earlier schema for subject
"rtdp_test-test_schema-value", details:
12 matches
Mail list logo