Re:Table中的java.util.Date类型对应sql中的什么类型

2024-03-04 Thread Xuyang
Hi, 
java.util.Date没有sql中的常规类型和它对应,因此使用的兜底的Raw类型(结构化类型)。实际上java.sql.Date 
对应的是sql中的Date。
具体可以参考下这张表:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type-extraction




--

Best!
Xuyang





在 2024-03-05 09:23:38,"ha.fen...@aisino.com"  写道:
>从流转换成Table
>DataStream streamSource = env.addSource(new OrdersSourceObject());
>Table table = 
>tEnv.fromDataStream(streamSource).select($("addtime"),$("cusname"),$("price"),$("status"));
>tEnv.createTemporaryView("itemtable",table);
>
>Orders定义
>private Date addtime;
>private String cusname;
>private BigDecimal price;
>private int status;
>
>输出到kafka
>String creatDDL = "CREATE TABLE kafka_sink (\n" +
>"  `addtime` TIMESTAMP(0),\n" +
>"  `cusname` STRING,\n" +
>"  `price` DECIMAL(15, 2),\n" +
>"  `status` INT\n" +
>") WITH (\n" +
>"  'connector' = 'kafka',\n" +
>"  'format' = 'json'\n" +
>.
>")";
>
>String query = "INSERT INTO kafka_sink SELECT * FROM itemtable;";
>tEnv.executeSql(query);
>
>报错
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Column types of query result and sink for 
>'default_catalog.default_database.kafka_sink' do not match.
>Cause: Incompatible types for sink column 'addtime' at position 0.
>
>Query schema: [addtime: RAW('java.util.Date', '...'), cusname: STRING, price: 
>DECIMAL(38, 18), status: INT NOT NULL]
>Sink schema:  [addtime: TIMESTAMP(0), cusname: STRING, price: DECIMAL(15, 2), 
>status: INT]


Re: flink sql作业如何统计端到端延迟

2024-03-04 Thread Shawn Huang
Flink有一个端到端延迟的指标,可以参考以下文档[1],看看是否有帮助。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/ops/metrics/#end-to-end-latency-tracking

Best,
Shawn Huang


casel.chen  于2024年2月21日周三 15:31写道:

> flink sql作业从kafka消费mysql过来的canal
> json消息,经过复杂处理后写入doris,请问如何统计doris表记录的端到端时延?mysql表有update_time字段代表业务更新记录时间。
> doris系统可以在表schema新增一个更新时间列ingest_time,所以在doris表上可以通过ingest_time -
> update_time算出端到端时延,但这种方法只能离线统计,有没有实时统计以方便实时监控的方法?
>
> 查了SinkFunction类的invoke方法虽然带有Context类型参数可以获取当前处理时间和事件时间,但因为大部分sink都是采用攒微批方式再批量写入的,所以这两个时间直接相减得到的时间差并不能代表真实落库的时延。有没有精确获取时延的方法呢?


Re: 退订

2024-03-04 Thread Shawn Huang
Hi,退订可以发送任意内容的邮件到  user-zh-unsubscr...@flink.apache.org  来取消订阅来自
user-zh@flink.apache.org  邮件列表的邮件,邮件列表的订阅管理,可以参考[1]

[1] https://flink.apache.org/zh/what-is-flink/community/

Best,
Shawn Huang


雷刚  于2024年2月29日周四 14:41写道:

> 退订