Re: flink sql 去重算法

2020-03-19 文章 LakeShen
Hi zhisheng,

我之前也遇到类似的问题,Flink 状态默认永久保留,针对这种 SQL 任务,我想到的就是设置状态空闲保留时间。
比如按照天来聚合的,空闲状态的最小保留时间26小时,最大空闲撞他为48小时(具体时间根据业务来设置),
总之肯定要设置一个最小和最大的空闲状态保留时间,不可能让状态永久保留。

对于 Flink 1.10 Blink planner 来说,TTL 时间就是设置的最小空闲状态保留时间,最大的空闲状态保留时间貌似没有用到。
Flink 1.10 默认状态清理机制是 in background 了,对于 RocksDBStateBackend 来说,使用
Compaction Filter 算法来清理。

第二个就是使用增量 Checkpoint 方式吧。

Best wishes,
LakeShen



lucas.wu  于2020年3月20日周五 上午11:50写道:

> 可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。
>
>
> 原始邮件
> 发件人:zhishengzhisheng2...@gmail.com
> 收件人:user-zhuser...@flink.apache.org
> 发送时间:2020年3月20日(周五) 11:44
> 主题:Re: flink sql 去重算法
>
>
> hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state
> 很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS
> 集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration ">
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
> ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li libenc...@gmail.com 于2020年3月20日周五
> 上午9:50写道:  Hi hiliuxg,   count distinct 用的MapVIew来做的去重:
> 在batch场景下,MapView的底层实现就是HashMap;
> 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。
>  hiliuxg 736742...@qq.com 于2020年3月19日周四 下午11:31写道:hi all:   请问flink
> sqlnbsp; count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ?
>  还是简单通过java的set容器去重的呢? --   Benchao Li  School of Electronics
> Engineering and Computer Science, Peking University  Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: ddl

2020-03-19 文章 Jingsong Li
Hi,

底层实现的话可以参考下[1]

[1] https://github.com/apache/bahir-flink/tree/master/flink-connector-kudu

Best,
Jingsong Lee

On Thu, Mar 19, 2020 at 11:30 PM hiliuxg <736742...@qq.com> wrote:

> 你可以自己定义tablesinkfactory,flink已经预留了这个接口
>
>
>
>
> --原始邮件--
> 发件人:"LakeShen" 发送时间:2020年3月14日(星期六) 中午11:13
> 收件人:"user-zh"
> 主题:Re: ddl
>
>
>
> Hi 志华,
>  你可以完全自己扩展 Flink SQL DDL 语法的功能,用来支持你们公司自己的实时数据源,或者
> Sink
> 等等,具体实现,请参考楼上 jinhai
> 的链接
>
> Best wishes,
> 沈磊
>
> jinhai wang 
>  Page on “User-defined Sources  Sinks”. For flink 1.10:
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
>  <
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
>  
> 
>  Best Regards
> 
>  jinhai...@gmail.com
> 
>   2020年3月13日 下午7:17,王志华   
>   目前FLINK中对于DDL这块,它都只能什么类型的技术作为源头表或者SINK 表呢,我也网上也仅仅看到了ddl mysql
> sink、ddl
>  hbase
> sink等。还有其他类型的支持吗?如果不支持的话,是否flink开放了相关的接口,可以提供对其他类型技术的ddl语法支持呢?比如想做一个
>  ddl kudu sink之类的
>  
>  
>   | |
>   王志华
>   |
>   |
>   a15733178...@163.com
>   |
>   签名由网易邮箱大师定制
>  
> 
> 



-- 
Best, Jingsong Lee


Re: rowtime 的类型序列化问题

2020-03-19 文章 Jingsong Li
Hi lucas,

看起来这个是query event_time字段的bug,TimeIndicatorTypeInfo导致的问题。

如果你用的是1.10,可以建个JIRA来跟踪这个问题。

Best,
Jingsong Lee

On Fri, Mar 20, 2020 at 11:40 AM lucas.wu  wrote:

> Hi all:
> 建表语句
> create table `source_table`(
> `SeqNo` varchar,
> `Type` varchar,
> `Table` varchar,
> `ServerId` varchar,
> `Database` varchar,
> `OldData` varchar,
> `GTID` varchar,
> `Offset` varchar,
> `event_ts` as
> to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
> WATERMARK FOR event_ts AS event_ts - interval '60' second
> ) with(…)
>
>
> 查询语句
> insert into sinkTable from Select * from source_table;
>
>
>
> 报错信息:
> java.lang.ClassCastException: java.sql.Timestamp cannot be cast to
> java.lang.Long at
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
> at SinkConversion$51.processElement(Unknown Source)
> ……
>
>
>
> 最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。
> 请问这个问题可以避免吗?



-- 
Best, Jingsong Lee


Re: flink sql 1.10.0 对hive GenericUDF支持,之前的图片在邮件组不显示,重新编辑

2020-03-19 文章 Jingsong Li
 Hi,

GenericUDFUnixTimeStamp 这个UDF
用上了Hive的SessionState,而我们的hive-integration中目前缺少这部分的支持。
Flink也支持这个函数,你可以考虑先用Flink的函数来支持。

我创建了相关issue,会尽量在1.10.1把它修复了。

[1] https://issues.apache.org/jira/browse/FLINK-16688

Best,
Jingsong Lee

On Fri, Mar 20, 2020 at 11:33 AM Chief  wrote:

> hi all:
> 现在在做把原来离线 hive 上的任务迁移到flink sql
> 1.10.0的工作,之前看文档说flink 支持hive的GenericUDF,但是我发现个别在flink中报错,请问是我哪里设置有问题么?
> 比如current_timestamp()
> 在hive中语句:
> select
> from_unixtime(unix_timestamp(current_timestamp()),'MMddHHmmss');正常执行
>而提交flink sql 任务报错
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> java.lang.reflect.InvocationTargetException
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> at
> com.trusfort.twinkling.sql.util.SqlSubmit.callInsertInto(SqlSubmit.java:55)
> at
> com.trusfort.twinkling.sql.util.SqlSubmit.callCommand(SqlSubmit.java:27)
> at
> com.trusfort.twinkling.sql.batch.template.BatchSlidingWindowSqlTemplate.main(BatchSlidingWindowSqlTemplate.java:60)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at
> java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at
> java.security.AccessController.doPrivileged(Native Method)
> at
> javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException
> at
> org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
> at
> org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
> at
> org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
> at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
> at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
> at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> at
> org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
> at
> org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
> at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
> at
> org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
> at
> org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
> 

Re: Flink 1.10 JSON 解析

2020-03-19 文章 宇张
hi,
好的,我这面进行了尝试,将 data 的schema定义需要改成
ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
另外删除 .jsonSchema(...)后,程序数据解析已经没问题了;但是如果保留
.jsonSchema(...)的话会抛出如下异常信息:Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type
ARRAY> of table field
'data' does not match with the physical type ROW<`f0` ROW<`tracking_number`
STRING, `invoice_no` STRING>> of the 'data' field of the TableSource return
type.
而之所以保留这个jsonschema是因为我想尝试将这种复杂的json源的元数据保存到hive,进而通过这种方式推断出下面语句的格式,因为我不知道对于上述的复杂json在定义下面sql的时候字段信息怎么映射,或者说有这种复杂json的sql映射案例吗,感谢
[image: image.png]

On Fri, Mar 20, 2020 at 11:42 AM Jark Wu  wrote:

> Hi,
>
> 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
> ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
> 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
> schema 了。
>
> Best,
> Jark
>
> On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:
>
> > hi:
> > 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
> > [image: image.png]
> > 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
> >
> >
> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
> >
> >
> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
> > connect:
> >
> > streamTableEnv
> > .connect(
> > new Kafka()
> > .version("0.11")
> > .topic("mysql_binlog_test_str")
> > .startFromEarliest()
> > .property("zookeeper.connect", "localhost:2181")
> > .property("bootstrap.servers", "localhost:9092")
> > )
> > .withFormat(
> > new Json()
> >
>  
> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
> > )
> > .withSchema(
> > new Schema()
> > .field("business", DataTypes.STRING())
> > .field("data",
> DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
> > DataTypes.FIELD("tracking_number",
> DataTypes.STRING()),
> > DataTypes.FIELD("invoice_no",
> DataTypes.STRING())
> > .field("database", DataTypes.STRING())
> > .field("table", DataTypes.STRING())
> > .field("ts", DataTypes.DECIMAL(38, 18))
> > .field("type", DataTypes.STRING())
> > .field("putRowNum", DataTypes.DECIMAL(38, 18))
> > )
> > .createTemporaryTable("Test");
> >
> > 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.
> >
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
> > at
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> > Caused by: java.lang.ClassCastException:
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
> > cannot be cast to
> >
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
> > at
> >
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > at
> >
> 

pushgateway内存异常

2020-03-19 文章 yanggang_it_job
Hi:

向大家请教一个使用org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter推送指标到
pushgateway时,pushgateway内存使用异常的问题,具体异常如下
 1、实际我们的内存使用在8G左右,但是pushgateway的内存一直在35G左右波动
 2、pushgateway曲线波动较大,不是一条平稳的曲线,会有8G左右的波动
 
希望大家帮忙看看导致以上问题的原因,谢谢...

Re: flink sql 去重算法

2020-03-19 文章 lucas.wu
可以考虑自己实现一个udf ,使用bitmap或者hyperloglog去实现。


原始邮件
发件人:zhishengzhisheng2...@gmail.com
收件人:user-zhuser...@flink.apache.org
发送时间:2020年3月20日(周五) 11:44
主题:Re: flink sql 去重算法


hi, 我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state 
很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS 
集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration 
">https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/query_configuration.html
 ,除此之外不清楚大家是否还有什么其他好的解决方法? Benchao Li libenc...@gmail.com 于2020年3月20日周五 
上午9:50写道:  Hi hiliuxg,   count distinct 用的MapVIew来做的去重:  
在batch场景下,MapView的底层实现就是HashMap;  
在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。   hiliuxg 
736742...@qq.com 于2020年3月19日周四 下午11:31写道:hi all:   请问flink sqlnbsp; 
count(disitinct)nbsp; 底层的算法是怎样的? 是bitmap ?   还是简单通过java的set容器去重的呢? --   
Benchao Li  School of Electronics Engineering and Computer Science, Peking 
University  Tel:+86-15650713730  Email: libenc...@gmail.com; 
libenc...@pku.edu.cn

Re: flink sql 去重算法

2020-03-19 文章 zhisheng
hi,

我发现我们生产有些使用 SQL 的 count distinct 去做去重,当作业跑了很久,作业的 Checkpoint state
很大(我这周就遇到过一个差不多 400G 的,导致 Checkpoint 很容易超时,并且可能会对 HDFS
集群的网卡也有一定的压力),我看官网文档有介绍说使用 query_configuration

 ,除此之外不清楚大家是否还有什么其他好的解决方法?

Benchao Li  于2020年3月20日周五 上午9:50写道:

> Hi hiliuxg,
>
> count distinct 用的MapVIew来做的去重:
> 在batch场景下,MapView的底层实现就是HashMap;
> 在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。
>
> hiliuxg <736742...@qq.com> 于2020年3月19日周四 下午11:31写道:
>
> > hi all:
> > 请问flink sql count(disitinct) 底层的算法是怎样的? 是bitmap ?
> > 还是简单通过java的set容器去重的呢?
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: Flink 1.10 JSON 解析

2020-03-19 文章 Jark Wu
Hi,

看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成
ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING)))
另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json
schema 了。

Best,
Jark

On Fri, 20 Mar 2020 at 11:34, 宇张  wrote:

> hi:
> 1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
> [image: image.png]
> 2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
>
> json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
>
> jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
> connect:
>
> streamTableEnv
> .connect(
> new Kafka()
> .version("0.11")
> .topic("mysql_binlog_test_str")
> .startFromEarliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")
> )
> .withFormat(
> new Json()
> 
> .jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
> )
> .withSchema(
> new Schema()
> .field("business", DataTypes.STRING())
> .field("data", DataTypes.ROW(DataTypes.FIELD("f0", 
> DataTypes.ROW(
> DataTypes.FIELD("tracking_number", 
> DataTypes.STRING()),
> DataTypes.FIELD("invoice_no", 
> DataTypes.STRING())
> .field("database", DataTypes.STRING())
> .field("table", DataTypes.STRING())
> .field("ts", DataTypes.DECIMAL(38, 18))
> .field("type", DataTypes.STRING())
> .field("putRowNum", DataTypes.DECIMAL(38, 18))
> )
> .createTemporaryTable("Test");
>
> 异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.
>
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> Caused by: java.lang.ClassCastException:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
> cannot be cast to
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more
>
>
>


rowtime 的类型序列化问题

2020-03-19 文章 lucas.wu
Hi all:
建表语句
create table `source_table`(
`SeqNo` varchar,
`Type` varchar,
`Table` varchar,
`ServerId` varchar,
`Database` varchar,
`OldData` varchar,
`GTID` varchar,
`Offset` varchar,
`event_ts` as 
to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'-MM-ddHH:mm:ss'),
WATERMARK FOR event_ts AS event_ts - interval '60' second
) with(…)


查询语句
insert into sinkTable from Select * from source_table;



报错信息:
java.lang.ClassCastException: java.sql.Timestamp cannot be cast to 
java.lang.Long at 
org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
 at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
 at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
 at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
 at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
 at SinkConversion$51.processElement(Unknown Source)
……


最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。
请问这个问题可以避免吗?

Flink 1.10 JSON 解析

2020-03-19 文章 宇张
hi:
1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint
[image: image.png]
2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug
json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":1}
jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}
connect:

streamTableEnv
.connect(
new Kafka()
.version("0.11")
.topic("mysql_binlog_test_str")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(
new Json()

.jsonSchema("{\"type\":\"object\",\"properties\":{\"business\":{\"type\":\"string\"},\"data\":{\"type\":\"array\",\"items\":[{\"type\":\"object\",\"properties\":{\"tracking_number\":{\"type\":\"string\"},\"invoice_no\":{\"type\":\"string\"}}}]},\"database\":{\"type\":\"string\"},\"table\":{\"type\":\"string\"},\"ts\":{\"type\":\"integer\"},\"type\":{\"type\":\"string\"},\"putRowNum\":{\"type\":\"integer\"}}}")
)
.withSchema(
new Schema()
.field("business", DataTypes.STRING())
.field("data",
DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(
DataTypes.FIELD("tracking_number",
DataTypes.STRING()),
DataTypes.FIELD("invoice_no",
DataTypes.STRING())
.field("database", DataTypes.STRING())
.field("table", DataTypes.STRING())
.field("ts", DataTypes.DECIMAL(38, 18))
.field("type", DataTypes.STRING())
.field("putRowNum", DataTypes.DECIMAL(38, 18))
)
.createTemporaryTable("Test");

异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object.

at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.lang.ClassCastException:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode
cannot be cast to
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more


flink sql 1.10.0 ??hive GenericUDF????????????????????????????????????????

2020-03-19 文章 Chief
hi all??
?? hive ??flink sql 
1.10.0flink 
hive??GenericUDF??flink??
current_timestamp()
??hive
select 
from_unixtime(unix_timestamp(current_timestamp()),'MMddHHmmss');
   ??flink sql 
org.apache.flink.table.api.ValidationException: SQL validation failed. 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at 
com.trusfort.twinkling.sql.util.SqlSubmit.callInsertInto(SqlSubmit.java:55)
at 
com.trusfort.twinkling.sql.util.SqlSubmit.callCommand(SqlSubmit.java:27)
at 
com.trusfort.twinkling.sql.batch.template.BatchSlidingWindowSqlTemplate.main(BatchSlidingWindowSqlTemplate.java:60)
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
at 
org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
at 
org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at 
org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at 
org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1865)
at 

flink sql 1.10.0 ??hive GenericUDF????

2020-03-19 文章 Chief
hi all??
?? hive ??flink sql 
1.10.0flink 
hive??GenericUDF??flink??
current_timestamp()
??hive??

   ??flink sql 
org.apache.flink.table.api.ValidationException: SQL validation failed. 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at 
com.trusfort.twinkling.sql.util.SqlSubmit.callInsertInto(SqlSubmit.java:55)
at 
com.trusfort.twinkling.sql.util.SqlSubmit.callCommand(SqlSubmit.java:27)
at 
com.trusfort.twinkling.sql.batch.template.BatchSlidingWindowSqlTemplate.main(BatchSlidingWindowSqlTemplate.java:60)
at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at 
java.security.AccessController.doPrivileged(Native Method)
at 
javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
at 
org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType(HiveFunctionUtils.java:77)
at 
org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction.lambda$createReturnTypeInference$0(HiveScalarSqlFunction.java:83)
at 
org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:470)
at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:437)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:303)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at 
org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)
at 
org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)
at 
org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)
at 
org.apache.calcite.sql.type.InferTypes.lambda$static$0(InferTypes.java:46)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1865)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.inferUnknownTypes(SqlValidatorImpl.java:1853)
at 

Re: flink sql 去重算法

2020-03-19 文章 Benchao Li
Hi hiliuxg,

count distinct 用的MapVIew来做的去重:
在batch场景下,MapView的底层实现就是HashMap;
在streaming场景下,MapView的底层实现是MapState,因为必须要用到state+cp,才能保证任务重启后状态不会丢失。

hiliuxg <736742...@qq.com> 于2020年3月19日周四 下午11:31写道:

> hi all:
> 请问flink sql count(disitinct) 底层的算法是怎样的? 是bitmap ?
> 还是简单通过java的set容器去重的呢?



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink sql如何支持每隔5分钟触发当日零点到当前5分钟的聚合计算

2020-03-19 文章 Jark Wu
Hi 你可以看下这篇文章是否满足的你需求:
http://wuchong.me/blog/2020/02/25/demo-building-real-time-application-with-flink-sql
#统计一天每10分钟累计独立用户数

Best,
Jark


On Thu, 19 Mar 2020 at 23:30, hiliuxg <736742...@qq.com> wrote:

> hi all:有这样子一个场景,我想通过每隔5分钟统计当日零点到当前5分钟的pv和uv,用批处理大概大概表达如下:
> select 
> '2020-03-19' as dt ,
> '2020-03-19 12:05:00' as etltime ,
> count(1) as pv ,
> count(distinct userid) as uv
> from t_user_log
> where logintime = '2020-03-19 00:00:00' and logintime <
> '2020-03-19 12:05:00'
>
>
> 这里,没法用flink sql 处理,当时这种场景又特别多,各位大神有好的方案处理吗?


flink sql????????????5??????????????????????5??????????????

2020-03-19 文章 hiliuxg
hi 
all:??5??5??pv??uv
select 
'2020-03-19' as dt ,
'2020-03-19 12:05:00' as etltime ,
count(1) as pv ,
count(distinct userid) as uv
from t_user_log
where logintime = '2020-03-19 00:00:00' and logintime < 
'2020-03-19 12:05:00'


flink sql ??

flink sql ????????

2020-03-19 文章 hiliuxg
hi all??
flink sql count(disitinct)  ??bitmap ? 
java??set??

?????? ddl

2020-03-19 文章 hiliuxg
??tablesinkfactory??flink??




----
??:"LakeShen"https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
 <
 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
 

 Best Regards

 jinhai...@gmail.com

  2020??3??13?? 7:17 

The question about the FLIP-45

2020-03-19 文章 LakeShen
Hi community,

Now I am reading the FLIP-45 Reinforce Job Stop Semantic, I have three
questions about it :
1. What the command to use to stop the Flink task, stop or cancel?

2. If use stop command to stop filnk task , but I see the flink source code
, the stop command we can set the savepoint dir , if we didn't set it , the
default savepoint dir will use . Both the target Savepoint  Dir or default
savepoint dir are null , the flink will throw the exception. But in FLIP-45
, If retained checkpoint is enabled, we should always do a checkpoint when
stopping job. I can't find this code.

Thanks to your reply.

Best regards,
LakeShen


flink????????????????

2020-03-19 文章 512348363
??
DataStream