回复:flink 历史数据join

2020-05-14 文章 jimandlice
就是要用api的方式来继承 不是直接操作sql那样来出来




| |
jimandlice
|
|
邮箱:jimandl...@163.com
|

Signature is customized by Netease Mail Master

在2020年05月15日 11:38,jimandlice 写道:
api 做 还是用table sql 来做 谁做比较好集成 因为都要用来join 之后数据写入 hdfs 当中 因为刚刚接手 有很多不太明白 望给予帮助 谢谢




| |
jimandlice
|
|
邮箱:jimandl...@163.com
|

Signature is customized by Netease Mail Master

在2020年05月15日 11:34,Benchao Li 写道:
看起来就是一个异构数据源join的需求吧。
可以直接用Flink SQL尝试一下。Flink SQL现在有batch读取Hbase、Mysql的能力,也有写入Hive的能力。

jimandlice  于2020年5月15日周五 上午11:16写道:

> 先工作上有一个需求  2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据
> 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道
> 是用datatream还是dataset  没有一个很好的 解决方案 望给与回复
>
>
>
> | |
> jimandlice
> |
> |
> 邮箱:jimandl...@163.com
> |
>
> Signature is customized by Netease Mail Master



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复:flink 历史数据join

2020-05-14 文章 jimandlice
api 做 还是用table sql 来做 谁做比较好集成 因为都要用来join 之后数据写入 hdfs 当中 因为刚刚接手 有很多不太明白 望给予帮助 谢谢




| |
jimandlice
|
|
邮箱:jimandl...@163.com
|

Signature is customized by Netease Mail Master

在2020年05月15日 11:34,Benchao Li 写道:
看起来就是一个异构数据源join的需求吧。
可以直接用Flink SQL尝试一下。Flink SQL现在有batch读取Hbase、Mysql的能力,也有写入Hive的能力。

jimandlice  于2020年5月15日周五 上午11:16写道:

> 先工作上有一个需求  2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据
> 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道
> 是用datatream还是dataset  没有一个很好的 解决方案 望给与回复
>
>
>
> | |
> jimandlice
> |
> |
> 邮箱:jimandl...@163.com
> |
>
> Signature is customized by Netease Mail Master



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink 历史数据join

2020-05-14 文章 Benchao Li
看起来就是一个异构数据源join的需求吧。
可以直接用Flink SQL尝试一下。Flink SQL现在有batch读取Hbase、Mysql的能力,也有写入Hive的能力。

jimandlice  于2020年5月15日周五 上午11:16写道:

> 先工作上有一个需求  2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据
> 现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道
> 是用datatream还是dataset  没有一个很好的 解决方案 望给与回复
>
>
>
> | |
> jimandlice
> |
> |
> 邮箱:jimandl...@163.com
> |
>
> Signature is customized by Netease Mail Master



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


flink 历史数据join

2020-05-14 文章 jimandlice
先工作上有一个需求  2个数据源 一个是mysql 一个是Hbase 2者上 有很多历史数据 这2个数据源上 已经没有数据写入了 都是历史数据 
现在要把这2个数据源的某两张张表 进行join 生成之后的数据 存在放在hdfs上 导入到hive上去现在就是不知道 是用datatream还是dataset 
 没有一个很好的 解决方案 望给与回复



| |
jimandlice
|
|
邮箱:jimandl...@163.com
|

Signature is customized by Netease Mail Master

Re: save point容灾方案咨询

2020-05-14 文章 LakeShen
Hi ,

你可以把你的场景在描述的详细一些。

Best,
LakeShen

请叫我雷锋 <854194...@qq.com> 于2020年5月14日周四 下午9:42写道:

> 各位大佬好,请问有啥好的save point容灾方案嘛?
>
>
>
> 发自我的iPhone


save point容灾方案咨询

2020-05-14 文章 请叫我雷锋
各位大佬好,请问有啥好的save point容灾方案嘛?



发自我的iPhone

Flink Weekly | 每周社区动态更新 - 2020/05/14

2020-05-14 文章 forideal
大家好,本文为 Flink Weekly 的第十五期,由张成整理、李本超 Review。主要内容包括:近期社区开发进展,邮件问题答疑以及 Flink
最新社区动态及技术文章推荐。


社区开发进展 


Release

[releases] Flink 1.10.1 正式发布。

http://apache-flink.147419.n8.nabble.com/ANNOUNCE-Apache-Flink-1-10-1-released-td3054.html



[releases] Tzu-Li 发起了 Flink Stateful Functions Release 2.0.0 RC #4 
的投票,经过讨论后决定将会在新开一个 RC。

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-Release-2-0-0-release-candidate-4-td39453.html=


DEV

阿里巴巴已经在 flink-packages.org 上发布了 SpillableHeapStateBackend 的预览版。 该状态后端在 
FLINK-12692 中贡献给 Apache Flink。 SpillableHeapStateBackend 是一个基于 Java 
堆的Statebackend(如 FilesystemStatebackend ),在堆耗尽之前将最冷的状态溢出到磁盘。

https://flink-packages.org/packages/spillable-state-backend-for-flink

https://issues.apache.org/jira/browse/FLINK-12692

Flip

[FLIP-108] Yangze Guo 发起了有关类加载器和依赖项问题的讨论。问题是 mainClassLoader 无法识别 
ExternalResourceInfo 的子类。 ExternalResourceInfo 位于 ExternalResourceDriver jar 
中,并通过 PluginManager 与 mainClassLoader 隔离。因此,程序会抛出 ClassNotFoundExeption 
异常。Yangze Guo 提出了如下 3 个备选方案,大家在讨论中认为第 3 个方案比较好。Yangze Guo 发起了使用第 3 个方案修改 API 
的投票,投票最终获得通过。

备选方案1:

不使用插件机制,只需将驱动程序加载到 mainClassLoader。缺点是用户需要处理依赖冲突。

备选方案2:

强制要求用户构建两个单独的 jar,分别用于 ExternalResourceDriver 和 ExternalResourceInfo。然后把包含 
ExternalResourceInfo 类的 jar 添加到“ / lib”目录。这种方法可能有效,但可能会使用户厌烦。

备选方案3:

修改 RuntimeContext#getExternalResourceInfos 方法,让其返回 ExternalResourceInfo,同时在 
ExternalResourceInfo 接口增加一个类似  “Properties getInfo()”  
的方法。该方法的返回值可以由驱动程序提供者和用户指定。

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-108-Problems-regarding-the-class-loader-and-dependency-td40893.html

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/quot-VOTE-FLIP-108-edit-the-Public-API-quot-td40995.html

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/RESULT-VOTE-FLIP-108-edit-the-Public-API-td41061.html

Discuss

[docker] Ismaël Mejía 发起的是否可以在 Flink 正式 release 之外发布 docker 镜像的讨论有了新的更新。

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Intermediary-releases-of-the-flink-docker-images-td40602.html




[udf/udaf] Leerho 发起了关于 Flink 集成 DataSketches 的讨论。Arvid Heise 建议先放在 
flink-package 中。

https://flink-packages.org/

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Integration-of-DataSketches-into-Flink-td40750.html




[connector] 李劲松发起了在 TableFactory 中引入 StatefulSequenceSource 
的讨论。StatefulSequenceSource 能够方便用户更简单的进行 Flink SQL 测试。

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Introduce-TableFactory-for-StatefulSequenceSource-td39116.html




[connector] Leonard Xu 发起了重构 Flink JDBC Connector 的讨论。Leonard Xu 
介绍,在重构之后,我们可以很容易地为表和数据流引入统一的可插拔 JDBC 方言,并且我们可以有更好的模块组织和实现。目前大家已经达成一致,Leonard Xu 
已经创建了相关的 Jira。同时 Flink Hbase Connector 也有同样的问题,后续在做的时候会单独进行讨论。

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Refactor-flink-jdbc-connector-structure-td40984.html

https://issues.apache.org/jira/browse/FLINK-17537

https://issues.apache.org/jira/browse/FLINK-17538




[configuration] Timo 发起了讨论如何在属性( Flink 配置以及 Connector 属性)中表示配置层次结构,以便生成的文件将是有效的 
JSON / YAML。

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Hierarchies-in-ConfigOption-tp40920.html




[hadoop] Robert Metzge 发起了讨论增加对 Hadoop 3 的支持,并讨论了是否将通过 flink-shaded-hadoop 
的方式支持 Hadoop 3 的问题。

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-support-for-Hadoop-3-and-removing-flink-shaded-hadoop-td40570.html

用户问题

Kcz 在社区提问 Flink 内存设置问题(Metaspace OOM)。李劲松回答 Metaspace OOM 通常是 JVM 
加载的类过多导致的。比如增加 slot 数量也会导致加载的类变多。同时,社区已经有些用户反馈 Flink 1.10.0 的默认 metaspace 
大小可能不太合理。1.10.1 中会调大这个默认值。用户可以先通过 taskmanager.memory.metaspace.size 调到 256m 试一试。

http://apache-flink.147419.n8.nabble.com/flink-metaspace-td2835.html




Lucas Wu 想单独设置某些 Flink SQL Job 的并行度,李本超回复目前无法做到这一点。目前 SQL 还不支持对单独的 operator 
设置并行度,可以通过 table.exec.resource.default-parallelism 
设置全局的并行度。http://apache-flink.147419.n8.nabble.com/flink-sql-job-td2847.html

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-SQL-How-can-i-set-parallelism-in-clause-of-group-by-td33736.html

http://apache-flink.147419.n8.nabble.com/MySQL-td3014.html




王磊在社区提问关于 Flink SQL Retraction 的问题。Michael Ran、李劲松进行了详细的回答。有兴趣的同学可以参考如下链接。

http://apache-flink.147419.n8.nabble.com/FlinkSQL-Retraction-td2893.html




Luan Cooper 在社区提问 Sink 表 Append Mode/Upsert Mode 的问题。比如使用 Upsert Mode 写 
ElasticSearch 会遇到无法指定 primary key 问题。这个问题社区同学进行了详细的解答。其中 Jark Wu 回复 FLIP-95 和 
FLIP-105 后,问题中的 query 就可以原生支持了。FLIP-95 和 FLIP-105 的核心工作就是识别 binlog 中的 
update/delete/insert 消息,而不是全当成 append 消息。预计 1.11 能见到这些功能。

http://apache-flink.147419.n8.nabble.com/Streaming-SQL-Source-Sink-Append-Mode-Upsert-Mode-td2956.html




Hb 遇到了时区相关的问题,李劲松和 Jark Wu 进行了回答。这是一个 bug。Blink 中默认使用了没有时区的 
timestamp,生成时间的时候是直接取 “System.currentTimeMillis()”,Sink 的时候会按照 timestamp 
without time zone 来转成 String,结果就是差了8个小时。这个问题已经有对应的Jira。社区会以很高的优先级进行修复。


????SQLClient ????????????

2020-05-14 文章 AlfredFeng
Hi All,
SQL Client yaml??append1.10
??kafka source??field0,field_1,field_2...field_n 
??n??field0field0,

Re: flink1.10 ES sink 配置输出object字段问题

2020-05-14 文章 Yangze Guo
ES sink 是Json format的,你可以参考 [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#json-format

Best,
Yangze Guo

On Thu, May 14, 2020 at 3:37 PM XiaChang <13628620...@163.com> wrote:
>
> hi,大家好
>
>
> 请问ES index中存在object字段时,flink1.10版本ES sink 的ddl该如何配置。


Re: SQL DDL怎样使用Long类型的时间戳作为事件时间

2020-05-14 文章 Leonard Xu
Hi
目前还不支持,需要自己写个简单的udf转换下,
社区有个issue[1]在跟这个问题了


Best,
Leonard Xu
[1]https://issues.apache.org/jira/browse/FLINK-16889 



> 在 2020年5月14日,10:01,zzh...@foxmail.com 写道:
> 
> Hi,all
> kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值,
> 我的问题是:
> 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK
> 2、如果没有,后期是否有相应的规划
> 
> 测试版本:Flink 1.10
> 
> 
> 在Flink 1.10中测试SQL如下:
> CREATE TABLE session_login (
> deal_time TIMESTAMP(3)
> ,aaVARCHAR
> ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND
>  )WITH(
> 'connector.type' = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic' = 'topic',
> 'connector.startup-mode' = 'latest-offset',
> 'connector.properties.group.id' = 'group.id',
> 'connector.properties.zookeeper.connect' = 'ip:port',
> 'connector.properties.bootstrap.servers' = 'ip:port',
> 'format.type' = 'json',
> 'format.derive-schema' = 'true'
>  );
> 
> 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常:
> Caused by: java.io.IOException: Failed to deserialize JSON object.
> at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> Caused by: java.time.format.DateTimeParseException: Text '1589420545' could 
> not be parsed at index 0
> at 
> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at 
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more
> 
> 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下:
> Exception in thread "main" 
> org.apache.calcite.runtime.CalciteContextException: From line 4, column 33 to 
> line 4, column 64: Cannot apply '-' to arguments of type ' - 
> '. Supported form(s): ' - '
> ' - '
> ' - '
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840)
>   at 
> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280)
>   at 
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96)
>   at 
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130)
>   at 
> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:255)
>   at 
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:668)
>   at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:432)
>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:518)
>   at 
> org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144)
>   at 
> 

flink1.10 ES sink 配置输出object字段问题

2020-05-14 文章 XiaChang
hi,大家好


请问ES index中存在object字段时,flink1.10版本ES sink 的ddl该如何配置。

Flink SQL????????

2020-05-14 文章 Senior.Hu
Hi All??
 FlinkSqlParserImpl.FACTORYFlink DML SQLJoin with 
Temporal TableLATERAL
 LEFT JOIN
side_room FOR SYSTEM_TIME AS OF a1.proctime as a2
  ON
a1.rowkey_room = a2.rowkey
 
LEFT JOIN LATERAL `side_room` FOR SYSTEM_TIME AS OF `a1`.`proctime` 
AS `a2` ON `a1`.`rowkey_room` = `a2`.`rowkey`

 ??SQL??Flink SQL??
 Caused by: org.apache.flink.table.api.SqlParserException: SQL parse 
failed. Encountered "`side_room`" at line 7, column 19.
 Was expecting one of:
  "TABLE" ...
  "(" ...

SqlParser.Config
private final SqlParser.Config config = SqlParser.configBuilder()
  
.setParserFactory(FlinkSqlParserImpl.FACTORY)
  .setQuoting(Quoting.BACK_TICK)
  .setUnquotedCasing(Casing.UNCHANGED)
  .setQuotedCasing(Casing.UNCHANGED)
  .setCaseSensitive(true)
  .build();

 ??

Re: SQL DDL怎样使用Long类型的时间戳作为事件时间

2020-05-14 文章 fanchuanpo-163



> 在 2020年5月14日,上午11:12,zzh...@foxmail.com 写道:
> 
> 非常感谢Benchao Li,使用UDF测试通过,SQL示例如下:
> 
> CREATE TABLE session_login (
> ,deal_time BIGINT
> ,deal_time_obj as DY_FROM_UNIXTIME(deal_time*1000)
> ,WATERMARK FOR deal_time_obj AS deal_time_obj - INTERVAL '60' SECOND
> )WITH( 
>   ..
> )
> 
> 其中DY_FROM_UNIXTIME负责将long转成timestamp类型
> 
> oliver zhang,云长
> zzh...@foxmail.com
>  
> 发件人: Benchao Li
> 发送时间: 2020-05-14 10:23
> 收件人: user-zh
> 主题: Re: SQL DDL怎样使用Long类型的时间戳作为事件时间
> 你可以通过计算列写一个udf把long转成timestamp类型,在这个计算列上定义watermark就可以了。
>  
> zzh...@foxmail.com  于2020年5月14日周四 上午10:02写道:
>  
>> Hi,all
>>  kafka消息中存在消息的时间是Long类型,既有也有秒值,毫秒值,
>> 我的问题是:
>> 1、对于Long类型的时间数据,Flink SQL DDL现在怎样定义,才能使用WATERMARK
>> 2、如果没有,后期是否有相应的规划
>> 
>> 测试版本:Flink 1.10
>> 
>> 
>> 在Flink 1.10中测试SQL如下:
>> CREATE TABLE session_login (
>>  deal_time TIMESTAMP(3)
>>  ,aaVARCHAR
>>  ,WATERMARK FOR deal_time AS deal_time - INTERVAL '60' SECOND
>>   )WITH(
>>  'connector.type' = 'kafka',
>>  'connector.version' = 'universal',
>>  'connector.topic' = 'topic',
>>  'connector.startup-mode' = 'latest-offset',
>>  'connector.properties.group.id' = 'group.id',
>>  'connector.properties.zookeeper.connect' = 'ip:port',
>>  'connector.properties.bootstrap.servers' = 'ip:port',
>>  'format.type' = 'json',
>>  'format.derive-schema' = 'true'
>>   );
>> 
>> 如果将deal_time定义为TIMESTAMP(3),JSON反序列化会提示如下异常:
>> Caused by: java.io.IOException: Failed to deserialize JSON object.
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>> Caused by: java.time.format.DateTimeParseException: Text '1589420545'
>> could not be parsed at index 0
>> at
>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>> ... 7 more
>> 
>> 如果将deal_time定义成BIGINT,SQL校验失败,提示异常如下:
>> Exception in thread "main"
>> org.apache.calcite.runtime.CalciteContextException: From line 4, column 33
>> to line 4, column 64: Cannot apply '-' to arguments of type ' -
>> '. Supported form(s): ' - '
>> ' - '
>> ' - '
>>  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)
>>  at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>  at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>>  at
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
>>  at
>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)
>>  at
>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)
>>  at
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4840)
>>  at
>> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:280)
>>  at
>> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:96)
>>  at
>> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:130)
>>  at
>>