如果解决写hdfs的文件描述符问题?

2020-07-13 文章 李宇彬
hi, everyone



环境信息:flink-1.10.0、hadoop 2.6.0

我从kafka 
topic消费数据,通过BucketSink写到hdfs,在0点的时候会遇到这样的问题,要同时消费当天和昨天的数据,这样在写hdfs时会产生两倍的文件描述符,对hdfs造成很大的压力,请问在不增加文件描述符上限的情况下,如何解决?

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Peihui He
Hi Congxian,

这个错误是从1.9 升级到1.10 遇到的问题。用简单的wordcount 测试,自己根据特定word
抛出runtimeException,就能够重现。flink on yarn 和 flink on k8s 都出现这个问题。1.10
都不能从上次的checkpoint状态中恢复。不知道是不是1.10需要其他配置呢?

Best wishes.

Congxian Qiu  于2020年7月14日周二 下午1:54写道:

> Hi
>
> 这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢?
> 另外你可以看下 tm log 看看有没有其他异常
>
> Best,
> Congxian
>
>
> Yun Tang  于2020年7月14日周二 上午11:57写道:
>
> > Hi Peihui
> >
> >
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> > cause。
> >
> > [1]
> >
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
> >
> >
> > 祝好
> > 唐云
> > 
> > From: Peihui He 
> > Sent: Tuesday, July 14, 2020 10:42
> > To: user-zh@flink.apache.org 
> > Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
> >
> > hello,
> >
> > 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
> >
> >
> > Caused by: java.nio.file.NoSuchFileException:
> >
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> > ->
> >
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
> >
> > 配置和1.9.2 一样:
> > state.backend: rocksdb
> > state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> > state.savepoints.dir: hdfs:///flink/savepoints/wc/
> > state.backend.incremental: true
> >
> > 代码上都有
> >
> > env.enableCheckpointing(1);
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> > org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
> >
> >
> >   是1.10.0 需要做什么特别配置么?
> >
>


Re: flink-1.11使用executeSql()执行DDL语句问题

2020-07-13 文章 Benchao Li
看起来是format找不到实现,你可以添加一下flink-json的依赖看一下。

amen...@163.com  于2020年7月14日周二 下午2:38写道:

> hi, everyone
>
> 环境信息:flink-1.11.0, blink-planner, 本地ide开发测试(IDEA)
>
> 问题描述:使用executeSql()方法执行DDL语句,控制台打印如下异常信息。(flink-connector-kafka_2.11依赖已添加)
>
> 我不确定是否还有某个必要的依赖没有添加,还是有其他的地方没有考虑完整,请大佬赐教。
>
>
> --分割线-
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Unable to create a source for reading table
> 'default_catalog.default_database.kafka_out'.
>
> Table options are:
>
> 'connector'='kafka'
> 'format'='json'
> 'properties.bootstrap.servers'='localhost:9092'
> 'properties.group.id'='flink-1.11'
> 'scan.startup.mode'='group-offsets'
> 'topic'='flink-kafka'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at example.Example.main(Example.java:77)
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factories that implement
> 'org.apache.flink.table.factories.DeserializationFormatFactory' in the
> classpath.
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413)
> at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ... 25 more
>
>
> --分割线-
>
> 祝好!
> amenhub
>


-- 

Best,
Benchao Li


Re: flink-1.11使用executeSql()执行DDL语句问题

2020-07-13 文章 Jingsong Li
还要添加flink-json

Best,
Jingsong

On Tue, Jul 14, 2020 at 2:38 PM amen...@163.com  wrote:

> hi, everyone
>
> 环境信息:flink-1.11.0, blink-planner, 本地ide开发测试(IDEA)
>
> 问题描述:使用executeSql()方法执行DDL语句,控制台打印如下异常信息。(flink-connector-kafka_2.11依赖已添加)
>
> 我不确定是否还有某个必要的依赖没有添加,还是有其他的地方没有考虑完整,请大佬赐教。
>
>
> --分割线-
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Unable to create a source for reading table
> 'default_catalog.default_database.kafka_out'.
>
> Table options are:
>
> 'connector'='kafka'
> 'format'='json'
> 'properties.bootstrap.servers'='localhost:9092'
> 'properties.group.id'='flink-1.11'
> 'scan.startup.mode'='group-offsets'
> 'topic'='flink-kafka'
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
> at example.Example.main(Example.java:77)
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factories that implement
> 'org.apache.flink.table.factories.DeserializationFormatFactory' in the
> classpath.
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)
> at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413)
> at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> ... 25 more
>
>
> --分割线-
>
> 祝好!
> amenhub
>


-- 
Best, Jingsong Lee


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Peihui He
Hi Yun,

我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce ->
print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2
里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on
yarn。

Best wishes.

Yun Tang  于2020年7月14日周二 上午11:57写道:

> Hi Peihui
>
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> cause。
>
> [1]
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>
>
> 祝好
> 唐云
> 
> From: Peihui He 
> Sent: Tuesday, July 14, 2020 10:42
> To: user-zh@flink.apache.org 
> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> hello,
>
> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>
>
> Caused by: java.nio.file.NoSuchFileException:
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> ->
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>
> 配置和1.9.2 一样:
> state.backend: rocksdb
> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> state.savepoints.dir: hdfs:///flink/savepoints/wc/
> state.backend.incremental: true
>
> 代码上都有
>
> env.enableCheckpointing(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>
>
>   是1.10.0 需要做什么特别配置么?
>


flink-1.11使用executeSql()执行DDL语句问题

2020-07-13 文章 amen...@163.com
hi, everyone

环境信息:flink-1.11.0, blink-planner, 本地ide开发测试(IDEA)

问题描述:使用executeSql()方法执行DDL语句,控制台打印如下异常信息。(flink-connector-kafka_2.11依赖已添加)

我不确定是否还有某个必要的依赖没有添加,还是有其他的地方没有考虑完整,请大佬赐教。

--分割线-
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to create a source for reading table 
'default_catalog.default_database.kafka_out'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'properties.group.id'='flink-1.11'
'scan.startup.mode'='group-offsets'
'topic'='flink-kafka'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2178)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678)
at example.Example.main(Example.java:77)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factories that implement 
'org.apache.flink.table.factories.DeserializationFormatFactory' in the 
classpath.
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:229)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalFormatFactory(FactoryUtil.java:538)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverOptionalDecodingFormat(FactoryUtil.java:426)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.discoverDecodingFormat(FactoryUtil.java:413)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactoryBase.createDynamicTableSource(KafkaDynamicTableFactoryBase.java:73)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
... 25 more

--分割线-

祝好!
amenhub


Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 Robin Zhang
没有窗口,就简单的表join,有kafka流表 ,kudu维表,使用了group by

> Jul 14, 2020; 12:36pm — by zhisheng zhisheng
> 有没有窗口啊?

Robin Zhang <[hidden email]> 于2020年7月14日周二 上午11:48写道:

> 
> 我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
> 代码如下:
>tEnv.getConfig()
>  .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime),
>
> Time.hours(maxIdleStateRetentionTime));
>
> 程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink state

2020-07-13 文章 Congxian Qiu
Hi Robert

Boardcast state[1] 是否满足你的需求呢?另外也可以看下这篇文章[2]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html
[2] https://cloud.tencent.com/developer/article/1509789
Best,
Congxian


Robert.Zhang <173603...@qq.com> 于2020年7月13日周一 下午9:50写道:

> Hello,all
> 目前stream中遇到一个问题,
> 想使用一个全局的state 在所有的keyed stream中使用,或者global
> parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream
> operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽
>
>
> Best regards


Re: flink-benchmarks使用求助

2020-07-13 文章 Congxian Qiu
Hi zilong

之前没有使用 `-t max` 跑过,你可以分享一下你使用的全部命令么?我可以本地看看

Best,
Congxian


zilong xiao  于2020年7月14日周二 上午10:16写道:

> `-t max`之后出现的~ 改小并发后貌似没问题
>
> Congxian Qiu  于2020年7月13日周一 下午8:14写道:
>
> > Hi
> >
> > 没有遇到过这个错误,这个错误是在指定 `-t max` 之后出现的,还是说其他情况下也会遇到呢?
> >
> > Best,
> > Congxian
> >
> >
> > zilong xiao  于2020年7月13日周一 下午2:32写道:
> >
> > > 是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown
> > > timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗?
> > >
> > > Congxian Qiu  于2020年7月10日周五 下午7:18写道:
> > >
> > > > Hi
> > > > 你说的 flink-benchmarks 是指 这个仓库[1]的代码吗? 是这个仓库的代码的话,你按照 readme
> 能跑出一个结果(csv
> > > > 文件,或者终端能看到最终的结果),这个结果就是 JMH 的的结果,具体的可以阅读 JMH 的相关文档[2]
> > > >
> > > > [1] https://github.com/dataArtisans/flink-benchmarks
> > > > [2] http://openjdk.java.net/projects/code-tools/jmh/
> > > >
> > > > Best,
> > > > Congxian
> > > >
> > > >
> > > > zilong xiao  于2020年7月10日周五 下午3:54写道:
> > > >
> > > > >
> > 如题,最近在新机器上跑flink-benchmarks验证下机器性能,但是不太会对跑出的结果进行分析,不知是否有大神也用过这个,可否指点一二
> > > > >
> > > >
> > >
> >
>


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Congxian Qiu
Hi

这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢?
另外你可以看下 tm log 看看有没有其他异常

Best,
Congxian


Yun Tang  于2020年7月14日周二 上午11:57写道:

> Hi Peihui
>
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> cause。
>
> [1]
> https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>
>
> 祝好
> 唐云
> 
> From: Peihui He 
> Sent: Tuesday, July 14, 2020 10:42
> To: user-zh@flink.apache.org 
> Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
> hello,
>
> 当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
>
>
> Caused by: java.nio.file.NoSuchFileException:
>
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
> ->
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst
>
> 配置和1.9.2 一样:
> state.backend: rocksdb
> state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
> state.savepoints.dir: hdfs:///flink/savepoints/wc/
> state.backend.incremental: true
>
> 代码上都有
>
> env.enableCheckpointing(1);
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
> org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
>
>
>   是1.10.0 需要做什么特别配置么?
>


Re: 退订

2020-07-13 文章 Congxian Qiu
Hi

退订需要发送邮件到  user-zh-unsubscr...@flink.apache.org

Best,
Congxian


成欢晴  于2020年7月14日周二 下午12:44写道:

> 退订
>
>
> | |
> chq19970719
> |
> |
> 邮箱:chq19970...@163.com
> |
>
> Signature is customized by Netease Mail Master


Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-13 文章 Congxian Qiu
Hi

如果可以的话,建议先调用 RestClient 的 stop 等命令(这样可以在最后做一次 savepoint,或者 checkpoint -- 这个
FLINK-12619 想做),然后失败再使用 yarn 的 kill 命令,这样能够减少后续启动时的回放数据量

Best,
Congxian


zhisheng  于2020年7月14日周二 下午12:53写道:

> 如果是 on yarn 的话,也可以直接调用 yarn 的 kill 命令停止作业
>
> Jeff Zhang  于2020年7月11日周六 下午11:23写道:
>
> > Zeppelin 能够帮你提交和cancel job,就是通过上面jianxu说的ClusterClient
> > api来做到的,对zeppelin感兴趣的话,可以参考这个视频
> >
> > https://www.bilibili.com/video/BV1Te411W73b?p=21
> >
> >
> > jianxu  于2020年7月11日周六 下午4:52写道:
> >
> > > Hi:
> > >
> > >
> >
> 我想,你可能打算通过API的方式来取消正在运行的流任务。Flink任务提交时需要构建ClusterClient,提交成功后会返回任务对应的JobId。任务取消时,通过调用ClusterClient的cancel(JobID
> > > jobId)取消流任务。
> > > Flink源码可以看看 CliFrontend[1]中的逻辑,如果觉得比较麻烦可以参考
> > > https://github.com/todd5167/flink-spark-submiter
> > > 项目的任务提交部分,取消任务时构建ClusterClient即可。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > | |
> > > jianxu
> > > |
> > > |
> > > rjia...@163.com
> > > |
> > >
> > >
> > >
> > >
> > > 在2020年07月11日 16:19,Congxian Qiu 写道:
> > > Hi
> > >
> > > 如果你是想做一个作业管理的平台,可以尝试看一下 CliFrontend[1] 中相关的逻辑,对于 On Yarn
> > > 的作业,简单地说你需要能够正确的初始化一个 client 和 Yarn RM 交互,然后你需要知道
> applicationId,另外你还需要知道
> > > flink 的 JobId,接下来就是调用 Flink 的接口了
> > >
> > > 如果像更多的了解参数如从和命令行传到 java 代码的,你可以自己写一个单元测试,单步调试一下整个流程。
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > godfrey he  于2020年7月9日周四 上午10:08写道:
> > >
> > > 可以通过 StreamExecutionEnvironment#executeAsync 提交作业,返回 JobClient [1], 通过
> > > JobClient 可以 cancel 作业,获取 job status。
> > >
> > > [1]
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> > >
> > > Best,
> > > Godfrey
> > >
> > > Evan  于2020年7月9日周四 上午9:40写道:
> > >
> > > 这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming
> > > API有没有提供类似的接口,调用后就能停止这个Stream作业呢?
> > >
> > >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


Re: State 0点清除的问题

2020-07-13 文章 Congxian Qiu
Hi
如果你需要精确的控制每天 0 点清除  state 的话,或许你可以考虑使用 processFunction[1], 然后自己使用 timer
实现相关逻辑

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/process_function.html
Best,
Congxian


ゞ野蠻遊戲χ  于2020年7月14日周二 下午1:10写道:

> 大家好:     
>  我想问下,在ProcessAllWindowFunction中,在每天的0点清除state如何清除?
>
>
> Thanks
> 嘉治


Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 Robin Zhang
没有使用窗口呢,就多表关联,涉及到流表join流表,流表join维表,group by 、topN等



--
Sent from: http://apache-flink.147419.n8.nabble.com/

State 0????????????

2020-07-13 文章 ?g???U?[????
      
 ProcessAllWindowFunction0??state??


Thanks


Re: 代码中如何取消正在运行的Flink Streaming作业

2020-07-13 文章 zhisheng
如果是 on yarn 的话,也可以直接调用 yarn 的 kill 命令停止作业

Jeff Zhang  于2020年7月11日周六 下午11:23写道:

> Zeppelin 能够帮你提交和cancel job,就是通过上面jianxu说的ClusterClient
> api来做到的,对zeppelin感兴趣的话,可以参考这个视频
>
> https://www.bilibili.com/video/BV1Te411W73b?p=21
>
>
> jianxu  于2020年7月11日周六 下午4:52写道:
>
> > Hi:
> >
> >
> 我想,你可能打算通过API的方式来取消正在运行的流任务。Flink任务提交时需要构建ClusterClient,提交成功后会返回任务对应的JobId。任务取消时,通过调用ClusterClient的cancel(JobID
> > jobId)取消流任务。
> > Flink源码可以看看 CliFrontend[1]中的逻辑,如果觉得比较麻烦可以参考
> > https://github.com/todd5167/flink-spark-submiter
> > 项目的任务提交部分,取消任务时构建ClusterClient即可。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > | |
> > jianxu
> > |
> > |
> > rjia...@163.com
> > |
> >
> >
> >
> >
> > 在2020年07月11日 16:19,Congxian Qiu 写道:
> > Hi
> >
> > 如果你是想做一个作业管理的平台,可以尝试看一下 CliFrontend[1] 中相关的逻辑,对于 On Yarn
> > 的作业,简单地说你需要能够正确的初始化一个 client 和 Yarn RM 交互,然后你需要知道 applicationId,另外你还需要知道
> > flink 的 JobId,接下来就是调用 Flink 的接口了
> >
> > 如果像更多的了解参数如从和命令行传到 java 代码的,你可以自己写一个单元测试,单步调试一下整个流程。
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
> >
> > Best,
> > Congxian
> >
> >
> > godfrey he  于2020年7月9日周四 上午10:08写道:
> >
> > 可以通过 StreamExecutionEnvironment#executeAsync 提交作业,返回 JobClient [1], 通过
> > JobClient 可以 cancel 作业,获取 job status。
> >
> > [1]
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API
> >
> > Best,
> > Godfrey
> >
> > Evan  于2020年7月9日周四 上午9:40写道:
> >
> > 这个问题之前看到过有人在问,但是没有看到答案,我想问一下,Flink Streaming
> > API有没有提供类似的接口,调用后就能停止这个Stream作业呢?
> >
> >
>
> --
> Best Regards
>
> Jeff Zhang
>


退订

2020-07-13 文章 成欢晴
退订


| |
chq19970719
|
|
邮箱:chq19970...@163.com
|

Signature is customized by Netease Mail Master

Re: (无主题)

2020-07-13 文章 Jingsong Li
Hi

退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org

Best
Jingsong

On Tue, Jul 14, 2020 at 12:36 PM 成欢晴  wrote:

> 退订
>
>
> | |
> chq19970719
> |
> |
> 邮箱:chq19970...@163.com
> |
>
> Signature is customized by Netease Mail Master



-- 
Best, Jingsong Lee


(无主题)

2020-07-13 文章 成欢晴
退订


| |
chq19970719
|
|
邮箱:chq19970...@163.com
|

Signature is customized by Netease Mail Master

Re: Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 zhisheng
有没有窗口啊?

Robin Zhang  于2020年7月14日周二 上午11:48写道:

> 
> 我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
> 代码如下:
>tEnv.getConfig()
>  .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime),
>
> Time.hours(maxIdleStateRetentionTime));
>
> 程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: flink on yarn日志问题

2020-07-13 文章 zhisheng
知道 YARN 的 applicationId,应该也可以去 HDFS 找对应的 taskmanager 的日志(可以拼出路径),然后复制到本地去查看

Yangze Guo  于2020年7月14日周二 上午11:58写道:

> Hi, 王松
>
> 我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。
>
> Best,
> Yangze Guo
>
> On Tue, Jul 14, 2020 at 8:26 AM 王松  wrote:
> >
> > 我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。
> >
> > Yangze Guo  于2020年7月13日周一 下午5:03写道:
> >
> > > 1.
> > >
> 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
> > > 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job
> > >
> > > [1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
> > >
> > > Best,
> > > Yangze Guo
> > >
> > >
> > > On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
> > > >
> > > > 不好意思  怪我灭有描述清楚
> > > > 1 目前开启日志收集功能
> > > > 2 目前已是 per-job模式
> > > > 3 集群使用cdh flink.1.10
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
> > > > >Hi,
> > > > >
> > > > >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> > > > >
> > > > >第二个问题,您可以尝试一下per-job mode [2][3]
> > > > >
> > > > >[1]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> > > > >[2]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> > > > >[3]
> > >
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > > > >
> > > > >
> > > > >Best,
> > > > >Yangze Guo
> > > > >
> > > > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
> > > > >>
> > > > >> 请问一下两个问题
> > > > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
> > > ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
> > > > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
> > > 有没有好的方式或者策略 ,   可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
> > > > >>
> > >
>


退出邮件组

2020-07-13 文章 成欢晴




| |
chq19970719
|
|
邮箱:chq19970...@163.com
|

Signature is customized by Netease Mail Master

回复:flink1.9状态及作业迁移

2020-07-13 文章 成欢晴
退订




| |
chq19970719
|
|
邮箱:chq19970...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月14日 12:15,Yun Tang 写道:
对于Flink本身机制不支持的场景,可以通过直接修改Checkpoint meta 
文件同时将meta以及data文件迁移到新HDFS集群也能做到,加载Checkpoint的具体代码可以参照Checkpoints#loadAndValidateCheckpoint
 [1],而存储Checkpoint的代码可以参照Checkpoints#storeCheckpointMetadata [2]


[1] 
https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L124
[2] 
https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L81


祝好
唐云

From: Dream-底限 
Sent: Tuesday, July 14, 2020 11:57
To: user-zh@flink.apache.org 
Subject: Re: flink1.9状态及作业迁移

hi、
请问对于下面的情况,Checkpoint meta中存储的hdfs namespace可以修改吗
》》Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。

Yun Tang  于2020年7月14日周二 上午11:54写道:

> Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。
>
> Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-5763
>
> 祝好
> 唐云
>
> 
> From: Dream-底限 
> Sent: Tuesday, July 14, 2020 11:07
> To: user-zh@flink.apache.org 
> Subject: flink1.9状态及作业迁移
>
> hi:
>
> flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?
>


Re: flink1.9状态及作业迁移

2020-07-13 文章 Yun Tang
对于Flink本身机制不支持的场景,可以通过直接修改Checkpoint meta 
文件同时将meta以及data文件迁移到新HDFS集群也能做到,加载Checkpoint的具体代码可以参照Checkpoints#loadAndValidateCheckpoint
 [1],而存储Checkpoint的代码可以参照Checkpoints#storeCheckpointMetadata [2]


[1] 
https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L124
[2] 
https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L81


祝好
唐云

From: Dream-底限 
Sent: Tuesday, July 14, 2020 11:57
To: user-zh@flink.apache.org 
Subject: Re: flink1.9状态及作业迁移

hi、
请问对于下面的情况,Checkpoint meta中存储的hdfs namespace可以修改吗
》》Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。

Yun Tang  于2020年7月14日周二 上午11:54写道:

> Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。
>
> Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-5763
>
> 祝好
> 唐云
>
> 
> From: Dream-底限 
> Sent: Tuesday, July 14, 2020 11:07
> To: user-zh@flink.apache.org 
> Subject: flink1.9状态及作业迁移
>
> hi:
>
> flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?
>


Re: Re: flink on yarn日志问题

2020-07-13 文章 Yangze Guo
Hi, 王松

我理解拼接url就可以了,不用实际去登陆机器然后进到对应目录。

Best,
Yangze Guo

On Tue, Jul 14, 2020 at 8:26 AM 王松  wrote:
>
> 我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。
>
> Yangze Guo  于2020年7月13日周一 下午5:03写道:
>
> > 1.
> > 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
> > 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
> >
> > Best,
> > Yangze Guo
> >
> >
> > On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
> > >
> > > 不好意思  怪我灭有描述清楚
> > > 1 目前开启日志收集功能
> > > 2 目前已是 per-job模式
> > > 3 集群使用cdh flink.1.10
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
> > > >Hi,
> > > >
> > > >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> > > >
> > > >第二个问题,您可以尝试一下per-job mode [2][3]
> > > >
> > > >[1]
> > https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> > > >[2]
> > https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> > > >[3]
> > https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > > >
> > > >
> > > >Best,
> > > >Yangze Guo
> > > >
> > > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
> > > >>
> > > >> 请问一下两个问题
> > > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
> > ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
> > > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
> > 有没有好的方式或者策略 ,   可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
> > > >>
> >


Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Yun Tang
Hi Peihui

你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
 cause。

[1] 
https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473


祝好
唐云

From: Peihui He 
Sent: Tuesday, July 14, 2020 10:42
To: user-zh@flink.apache.org 
Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

hello,

当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示


Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
-> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

配置和1.9.2 一样:
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
state.savepoints.dir: hdfs:///flink/savepoints/wc/
state.backend.incremental: true

代码上都有

env.enableCheckpointing(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));


  是1.10.0 需要做什么特别配置么?


Re: flink1.9状态及作业迁移

2020-07-13 文章 Dream-底限
hi、
请问对于下面的情况,Checkpoint meta中存储的hdfs namespace可以修改吗
》》Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。

Yun Tang  于2020年7月14日周二 上午11:54写道:

> Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。
>
> Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-5763
>
> 祝好
> 唐云
>
> 
> From: Dream-底限 
> Sent: Tuesday, July 14, 2020 11:07
> To: user-zh@flink.apache.org 
> Subject: flink1.9状态及作业迁移
>
> hi:
>
> flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?
>


Re: flink1.9状态及作业迁移

2020-07-13 文章 Yun Tang
Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。

Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。


[1] https://issues.apache.org/jira/browse/FLINK-5763

祝好
唐云


From: Dream-底限 
Sent: Tuesday, July 14, 2020 11:07
To: user-zh@flink.apache.org 
Subject: flink1.9状态及作业迁移

hi:
flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?


Flink1.10 flinksql 多表join状态ttl不成功的问题

2020-07-13 文章 Robin Zhang
 
我用flink sql 设置了 空闲状态的清理时间,但是 状态还是一直增加,里面有 多次 group by  和多次 流表的关联 。
代码如下:
   tEnv.getConfig()
 .setIdleStateRetentionTime(Time.hours(minIdleStateRetentionTime), 

Time.hours(maxIdleStateRetentionTime));

程序运行一周之后状态现在2.2G. 最近几天越来越大,表现在ttl没有成功,请教一下各位大佬



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hi

那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 
在导入Kafka,之后再FlinkSQL 处理。

可爱的木兰


发件人: Benchao Li 
发送时间: 2020年7月14日 11:00
收件人: user-zh 
主题: Re: Flink SQL处理Array型的JSON

我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].

[1] https://issues.apache.org/jira/browse/FLINK-18590

Leonard Xu  于2020年7月14日周二 上午10:42写道:

> Hello,可爱的木兰
>
> 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
>
> SELECT users, tag
> FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> >
>
> > 在 2020年7月14日,10:34,hua mulan  写道:
> >
> > 可爱的木兰
>
>

--

Best,
Benchao Li


回复: Flink SQL复杂JSON解析

2020-07-13 文章 hua mulan
Hi

那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 
在导入Kafka,之后再FlinkSQL 处理。

可爱的木兰

发件人: Benchao Li 
发送时间: 2020年7月8日 20:46
收件人: user-zh 
主题: Re: Flink SQL复杂JSON解析

看代码应该是支持复合类型的,你可以试下。

hua mulan  于2020年7月8日周三 下午8:34写道:

> 我试了下 Array里是基本元素可以CROSS JOIN
> UNNEST直接解开。如果Array里是Row、POJO、Tuple这种复合类型我就只能UDTF了是吧。
>
> 来自 Outlook
>
> 
> 发件人: Benchao Li 
> 发送时间: 2020年7月6日 22:35
> 收件人: user-zh 
> 主题: Re: Flink SQL复杂JSON解析
>
> 我理解最佳实践是第一种,先读出来array,再用table function展开成多行。
> 实际上把array转成多行是Flink 内置支持的,可以参考[1]的”Expanding arrays into a relation“部分
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins
>
> 王 outlook  于2020年7月6日周一 下午9:29写道:
>
> > 像如下这种JSON输入,
> >
> > {
> >   "id": 1,
> >   "many_names": [
> > {"name": "foo"},
> > {"name": "bar"}
> >   ]
> > }
> >
> > 输出表两行  id 1, name foo  |  id 1, name bar
> >
> > 最佳实践是从Kafka读到后,调用TableFunction这个UDTF转出多行?
> > 还是Flink SQL有更方便的操作,从Source读出来就能把数组展开。
> >
> >
> > 来自 Outlook
> >
>
>
> --
>
> Best,
> Benchao Li
>


--

Best,
Benchao Li


flink1.9状态及作业迁移

2020-07-13 文章 Dream-底限
hi:
flink1.9的检查点或保存点中会保留hadoop集群的nameservice数据吗?现在想将一个集群的flink作业迁移到另一个集群,但两个集群的nameservice名称不一样,迁移会有问题吗?如果有问题的话对应状态保存的nameservice可以修改吗?或者说迁移的时候还有哪些其他需要注意的问题?


flink cep 如何处理超时事件?

2020-07-13 文章 drewfranklin
Hello all.
 想请教下各位。
 
我有个用户开户超时断点的场景。调研了一下,想通过flink cep 来实现。
 
但是我定义pattern 后发现,我的这个没办法在一条事件数据上完成判定。必须借助和上一事件数据比较之后判断是不是超时。


想知道该如何定义pattern 能够,取到排序之后前后两个两个事件。

Re:回复:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach
Hi,
感谢社区热心答疑!

















在 2020-07-14 11:00:18,"夏帅"  写道:
>你好,
>本质还是StreamingFileSink,所以目前只能append
>
>
>--
>发件人:Zhou Zach 
>发送时间:2020年7月14日(星期二) 10:56
>收件人:user-zh 
>主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录
>
>
>
>
>Hi Leonard,
>原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-07-14 09:56:00,"Leonard Xu"  写道:
>>Hi,
>>
>>> 在 2020年7月14日,09:52,Zhou Zach  写道:
>>> 
>   |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
> cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>>
>>看下这个抽取出来的rowkey是否有重复的呢?
>>
>>祝好,
>>Leonard Xu


回复:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 夏帅
你好,
本质还是StreamingFileSink,所以目前只能append


--
发件人:Zhou Zach 
发送时间:2020年7月14日(星期二) 10:56
收件人:user-zh 
主 题:Re:Re: flink 同时sink hbase和hive,hbase少记录




Hi Leonard,
原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式














在 2020-07-14 09:56:00,"Leonard Xu"  写道:
>Hi,
>
>> 在 2020年7月14日,09:52,Zhou Zach  写道:
>> 
   |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
 cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>
>看下这个抽取出来的rowkey是否有重复的呢?
>
>祝好,
>Leonard Xu


Re: Flink SQL处理Array型的JSON

2020-07-13 文章 Benchao Li
我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].

[1] https://issues.apache.org/jira/browse/FLINK-18590

Leonard Xu  于2020年7月14日周二 上午10:42写道:

> Hello,可爱的木兰
>
> 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
>
> SELECT users, tag
> FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> >
>
> > 在 2020年7月14日,10:34,hua mulan  写道:
> >
> > 可爱的木兰
>
>

-- 

Best,
Benchao Li


Re:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach



Hi Leonard,
原来是有重复key,hbase做了upsert,请问Hive Streaming Writing是不是目前只支持append模式,不支持upsert模式














在 2020-07-14 09:56:00,"Leonard Xu"  写道:
>Hi,
>
>> 在 2020年7月14日,09:52,Zhou Zach  写道:
>> 
   |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
 cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>
>看下这个抽取出来的rowkey是否有重复的呢?
>
>祝好,
>Leonard Xu


回复: Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hello,Leonard Xu

我这边JSON 不是

{
"id": 2,
"heap": [
{
"foo": 14,
"bar": "foo"

},
{
"foo": 16,
"bar": "bar"
}
],
}

而是直接一个Array

[
{
"foo": 14,
"bar": "foo"

},
{
"foo": 16,
"bar": "bar"
}
]

我发现DDL没法声明,SQL层面我不知道怎么做了。

可爱的木兰


发件人: Leonard Xu 
发送时间: 2020年7月14日 10:42
收件人: user-zh 
主题: Re: Flink SQL处理Array型的JSON

Hello,可爱的木兰

可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
 


> 在 2020年7月14日,10:34,hua mulan  写道:
>
> 可爱的木兰



Re: 【Flink Join内存问题】

2020-07-13 文章 admin
regular join会缓存两边流的所有数据,interval join只存一段时间内的,相比当然节省很大的状态存储

> 2020年7月13日 下午10:30,忝忝向仧 <153488...@qq.com> 写道:
> 
> Hi:
> 
> 
> interval join可以缓解key值过多问题么?
> interval join不也是计算某段时间范围内的join么,跟regular join相比,如何做到避免某个stream的key过多问题?
> 谢谢.
> 
> 
> 
> 
> -- 原始邮件 --
> 发件人:  
>   "user-zh"   
>  
> <17626017...@163.com>;
> 发送时间: 2020年7月6日(星期一) 中午11:12
> 收件人: "user-zh" 
> 主题: Re: 【Flink Join内存问题】
> 
> 
> 
> regular join确实是这样,所以量大的话可以用interval join 、temporal join
> 
> > 2020年7月5日 下午3:50,忝忝向仧 <153488...@qq.com> 写道:
> > 
> > Hi,all:
> > 
> > 我看源码里写到JoinedStreams:
> > 也就是说join时候都是走内存计算的,那么如果某个stream的key值过多,会导致oom
> > 那么有什么预防措施呢?
> > 将key值多的一边进行打散?
> > 
> > 
> > Right now, the join is being evaluated in memory so you need to ensure 
> that the number
> > * of elements per key does not get too high. Otherwise the JVM might 
> crash.



自定义的sql connector在sql-cli中运行问题

2020-07-13 文章 admin
hi all,
我自定义了一个sql 
connector,在本地idea里面是调试通过的,数据能正常写入,但是整个flink编译之后,用编译后的包在本地起了standalone集群,在sql-cli中运行报错如下
2020-07-14 10:36:29,148 WARN  org.apache.flink.table.client.cli.CliClient   
   [] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update 
statement.
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:698)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:551) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299) 
~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_251]
   at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) 
[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
Caused by: scala.MatchError: null
   at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.inferSinkPhysicalSchema(TableSinkUtils.scala:165)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateLogicalPhysicalTypesCompatible(TableSinkUtils.scala:305)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:194)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.Option.map(Option.scala:146) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 ~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:767)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:571)
 ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:341)
 ~[flink-table-api-java-bridge_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$17(LocalExecutor.java:691)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:246)
 ~[flink-sql-client_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
   at 
org

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
好吧,感谢回答



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Peihui He
hello,

当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示


Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/09.sst
-> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/09.sst

配置和1.9.2 一样:
state.backend: rocksdb
state.checkpoints.dir: hdfs:///flink/checkpoints/wc/
state.savepoints.dir: hdfs:///flink/savepoints/wc/
state.backend.incremental: true

代码上都有

env.enableCheckpointing(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));


  是1.10.0 需要做什么特别配置么?


Re: Flink SQL处理Array型的JSON

2020-07-13 文章 Leonard Xu
Hello,可爱的木兰

可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
 


> 在 2020年7月14日,10:34,hua mulan  写道:
> 
> 可爱的木兰



Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-13 文章 jindy_liu
本来还想尽最大可能的复用源码,看了下JdbcOutputFormat的源码实现,batch size是sql语句的个数据,kafka的batch
size是字节数,两个协调不好,两个各sink各自的时间阈值也同步不了。

我准备按你的说的方式,用RichFlatMapFunction,里面实现实现一个buffer。
等buffer达阈值或定时时间条件满足时,一次性手动调用JdbcOutputFormat(可以设置更大的buffer值)的writeRecord和flush;不满足的时候,RichFlatMapFunction里不输出元素;
这样kafka的batch sinka节奏应该就不用管了,两者的batch条件相互独立。
我自己初步看了下,应该可以?
初学者,望大佬提点,还有其它的注意事项要注意不?






--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hi

Kafka中的JSON结构是个Array例子如下。
[
 { "id": 1},
 { "id": 2}
]
读出来变成表的两行。Flink SQL层面最佳实践是什么?
如果没有办法是不是只能改JSON结构了。


可爱的木兰


flink1.9.1-消费kafka落pg库任务出错

2020-07-13 文章 nicygan
dear all:
  
我有一个消费kafka数据写到pg库的任务,任务发生过重启,yarn日志显示jobmanager发生oom,但找不到具体原因,因为数据量非常小,按道理不该发生oom。
  详细如下:


1、部署方式:
flink on yarn ,pre-job,每个container 1024 M
jobmanager的jvmoption(默认的)  -Xms424m-Xmx424m


2、数据情况:
kafka数据,约1分钟1条,文本数据,每条数据都非常小。


3、任务情况:
很简单,消费kafka然后直接写到pg库,中间没有任何处理,没有自定义的状态。
消费采用 FlinkKafkaConsumer
写库采用 JDBCAppendTableSink
并行度 1
checkpoint 2分钟一次,每次checkpoint约100ms
statebackend rocksdb


4、报错情况:
2020-07-10 11:51:54,237 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 555 @ 1594353114226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:51:54,421 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 555 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 77 ms).
2020-07-10 11:53:54,253 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 556 @ 1594353234226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:53:54,457 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 556 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 124 ms).
2020-07-10 11:55:54,246 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 557 @ 1594353354226 for job cd5ceeedeb35e8e094991edf09233483.
2020-07-10 11:55:54,402 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
checkpoint 557 for job cd5ceeedeb35e8e094991edf09233483 (1238 bytes in 115 ms).
2020-07-10 11:56:34,155 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
'flink-akka.actor.default-dispatcher-4673' produced an uncaught exception. 
Stopping the process...
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:717)
at 
akka.dispatch.forkjoin.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1672)
at 
akka.dispatch.forkjoin.ForkJoinPool.signalWork(ForkJoinPool.java:1966)
at 
akka.dispatch.forkjoin.ForkJoinPool.fullExternalPush(ForkJoinPool.java:1905)
at 
akka.dispatch.forkjoin.ForkJoinPool.externalPush(ForkJoinPool.java:1834)
at akka.dispatch.forkjoin.ForkJoinPool.execute(ForkJoinPool.java:2955)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool.execute(ForkJoinExecutorConfigurator.scala:30)
at 
akka.dispatch.ExecutorServiceDelegate.execute(ThreadPoolBuilder.scala:211)
at 
akka.dispatch.ExecutorServiceDelegate.execute$(ThreadPoolBuilder.scala:211)
at 
akka.dispatch.Dispatcher$LazyExecutorServiceDelegate.execute(Dispatcher.scala:39)
at akka.dispatch.Dispatcher.registerForExecution(Dispatcher.scala:115)
at akka.dispatch.Dispatcher.dispatch(Dispatcher.scala:55)
at akka.actor.dungeon.Dispatch.sendMessage(Dispatch.scala:142)
at akka.actor.dungeon.Dispatch.sendMessage$(Dispatch.scala:136)
at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
at akka.actor.Cell.sendMessage(ActorCell.scala:350)
at akka.actor.Cell.sendMessage$(ActorCell.scala:349)
at akka.actor.ActorCell.sendMessage(ActorCell.scala:429)
at akka.actor.RepointableActorRef.$bang(RepointableActorRef.scala:173)
at akka.actor.Scheduler$$anon$3.run(Scheduler.scala:171)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

thanks all / by nicygan


Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Jark Wu
这可能是 connect API  的某个 bug 吧。 建议先用 DDL 。

Best,
Jark

On Tue, 14 Jul 2020 at 08:54, Hito Zhu  wrote:

> rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。
> Rowtime rowtime = new Rowtime()
> .timestampsFromField("searchTime")
> .watermarksPeriodicBounded(5 * 1000);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
补充一下,kubernetes版本是1.18
Yvette zhai  于2020年7月13日周一 下午9:10写道:

> 1. 执行的脚本,产生的日志是:
> 2020-07-13 21:00:25,248 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.address, localhost
> 2020-07-13 21:00:25,251 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-07-13 21:00:25,251 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2020-07-13 21:00:25,251 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2020-07-13 21:00:25,251 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2020-07-13 21:00:25,251 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: parallelism.default, 1
> 2020-07-13 21:00:25,252 INFO
>  org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-07-13 21:00:25,344 INFO
>  org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] -
> Could not load factory due to missing dependencies.
> 2020-07-13 21:00:26,136 INFO
>  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
> derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
> less than its min value 192.000mb (201326592 bytes), min value will be used
> instead
> 2020-07-13 21:00:26,154 INFO
>  org.apache.flink.kubernetes.utils.KubernetesUtils[] -
> Kubernetes deployment requires a fixed port. Configuration blob.server.port
> will be set to 6124
> 2020-07-13 21:00:26,154 INFO
>  org.apache.flink.kubernetes.utils.KubernetesUtils[] -
> Kubernetes deployment requires a fixed port. Configuration
> taskmanager.rpc.port will be set to 6122
> 2020-07-13 21:00:26,204 INFO
>  org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
> derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
> less than its min value 192.000mb (201326592 bytes), min value will be used
> instead
> 2020-07-13 21:00:26,220 WARN
>  org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator
> [] - Found 0 files in directory null/etc/hadoop, skip to mount the Hadoop
> Configuration ConfigMap.
> 2020-07-13 21:00:26,220 WARN
>  org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator
> [] - Found 0 files in directory null/etc/hadoop, skip to create the Hadoop
> Configuration ConfigMap.
> 2020-07-13 21:00:26,958 INFO
>  org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
> flink session cluster k8s-session-1 successfully, JobManager Web Interface:
> http://172.16.5.175:8081
>
> 2. 查看 desrcibe 日志是:
> MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> "flink-config-k8s-session-1" not found
>
> 3. logs 日志是:
>
> Start command : /bin/bash -c $JAVA_HOME/bin/java -classpath
> $FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824
> -XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log
> -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j.properties
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
> 1> /opt/flink/log/jobmanager.out 2> /opt/flink/log/jobmanager.err
>
> 4. kubectl get cm 可以看到
> NAME DATA   AGE
> flink-config-k8s-session-1   3  5m45s
>
> 麻烦大佬帮忙看看~是不是我的语句有问题还是缺什么文件~
> 我是直接官网下的包,没有改任何文件~
>
> Leonard Xu  于2020年7月13日周一 下午8:41写道:
>
>> Hi, zhai
>>
>> 可以贴详细点吗?我帮你 CC 了熟悉这块的大佬 Yun Gao
>>
>> 祝好
>>
>> > 在 2020年7月13日,20:11,Yvette zhai  写道:
>> >
>> > 报错是MountVolume.SetUp failed for volume "flink-config-volume" : configmap
>> > "flink-config-k8s-session-1" not found
>> >
>> >
>> > Leonard Xu  于2020年7月13日周一 下午8:03写道:
>> >
>> >> Hi, zhai
>> >>
>> >> 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。
>> >>
>> >> Best,
>> >> Leonard Xu
>> >>
>> >>> 在 2020年7月13日,19:59,Yvette zhai  写道:
>> >>>
>> >>> 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
>> >>> 下载的flink-1.11.0-bin-scala_2.11.tgz
>> >>>
>> >>> 执行命令是
>> >>> ./bin/kubernetes-session.sh \
>> >>>-Dkubernetes.cluster-id=k8s-session-1 \
>> >>>-Dtaskmanager.memory.process.size=4096m \
>> >>>-Dkubernetes.taskmanager.cpu=2 \
>> >>>-Dtaskmanager.numberOfTaskSlots=4 \
>> >>>-Dresourcemanager.taskmanager-timeout=360 \
>> >>>-Dkubernetes.container.image=flink:1.11.0-scala_2.11
>> >>>
>> >>> 但是会报错,找不到configmap
>> >>>
>> >>>
>> >>> 我看是执行上述命令是会生成configmap的,为什么还会报找不到。
>> >>>
>> >>
>> >>
>>
>>


Re: flink-benchmarks使用求助

2020-07-13 文章 zilong xiao
`-t max`之后出现的~ 改小并发后貌似没问题

Congxian Qiu  于2020年7月13日周一 下午8:14写道:

> Hi
>
> 没有遇到过这个错误,这个错误是在指定 `-t max` 之后出现的,还是说其他情况下也会遇到呢?
>
> Best,
> Congxian
>
>
> zilong xiao  于2020年7月13日周一 下午2:32写道:
>
> > 是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown
> > timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗?
> >
> > Congxian Qiu  于2020年7月10日周五 下午7:18写道:
> >
> > > Hi
> > > 你说的 flink-benchmarks 是指 这个仓库[1]的代码吗? 是这个仓库的代码的话,你按照 readme 能跑出一个结果(csv
> > > 文件,或者终端能看到最终的结果),这个结果就是 JMH 的的结果,具体的可以阅读 JMH 的相关文档[2]
> > >
> > > [1] https://github.com/dataArtisans/flink-benchmarks
> > > [2] http://openjdk.java.net/projects/code-tools/jmh/
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > zilong xiao  于2020年7月10日周五 下午3:54写道:
> > >
> > > >
> 如题,最近在新机器上跑flink-benchmarks验证下机器性能,但是不太会对跑出的结果进行分析,不知是否有大神也用过这个,可否指点一二
> > > >
> > >
> >
>


Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Leonard Xu
Hi,

> 在 2020年7月14日,09:52,Zhou Zach  写道:
> 
>>>   |   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
>>> cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,

看下这个抽取出来的rowkey是否有重复的呢?

祝好,
Leonard Xu

Re:Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach






Hi, Leonard
我设置了 'connector.write.buffer-flush.interval' = ‘1s',然后重启运行程序,
再消息发送刚开始,比如说发送了4条,hive和hbase接收的消息都是4条,再消息发送48条的时候,我停止了producer,
再去查结果hbase是19条,hive是48条,如果说每1s钟flink查一下sink hbase 
buffer是不是到1mb,到了就sink,没到就不sink,但是这解释不了,为啥刚开始,hbase和hive接收到到数据是同步的,奇怪











在 2020-07-13 21:50:54,"Leonard Xu"  写道:
>Hi, Zhou
>
>
>>   'connector.write.buffer-flush.max-size' = '1mb',
>>   'connector.write.buffer-flush.interval' = ‘0s'
>
>(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 
>BufferredMutator 
>做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval
> 设置为 0s 
>时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval
> 设置成 1s 应该就能看到数据了。
>
>(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 
>所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 
>1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1]
>
>
>Best,
>Leonard Xu
>[1] 
>http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674
> 
>
>
>
>> 在 2020年7月13日,21:09,Zhou Zach  写道:
>> 
>> 
>> 
>> flink订阅kafka消息,同时sink到hbase和hive中,
>> 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
>> 
>> 
>> query:
>> streamTableEnv.executeSql(
>>  """
>>|
>>|CREATE TABLE hbase_table (
>>|rowkey VARCHAR,
>>|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>>|) WITH (
>>|'connector.type' = 'hbase',
>>|'connector.version' = '2.1.0',
>>|'connector.table-name' = 'ods:user_hbase6',
>>|'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>|'connector.zookeeper.znode.parent' = '/hbase',
>>|'connector.write.buffer-flush.max-size' = '1mb',
>>|'connector.write.buffer-flush.max-rows' = '1',
>>|'connector.write.buffer-flush.interval' = '0s'
>>|)
>>|""".stripMargin)
>> 
>>val statementSet = streamTableEnv.createStatementSet()
>>val insertHbase =
>>  """
>>|insert into hbase_table
>>|SELECT
>>|   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
>> cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>>|   ROW(sex, age, created_time ) as cf
>>|FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
>> created_time from kafka_table)
>>|
>>|""".stripMargin
>> 
>>statementSet.addInsertSql(insertHbase)
>> 
>>val insertHive =
>>  """
>>|
>>|INSERT INTO odsCatalog.ods.hive_table
>>|SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'), 
>> DATE_FORMAT(created_time, 'HH')
>>|FROM kafka_table
>>|
>>|""".stripMargin
>>statementSet.addInsertSql(insertHive)
>> 
>> 
>>statementSet.execute()
>> 
>> 
>> 是因为参数'connector.write.buffer-flush.max-size' = 
>> '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
>> bytes) value but was: 1kb
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
>> bytes) value but was: 10b
>> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
>> bytes) value but was: 1
>> 
>> 
>> 
>> 
>> 
>> 
>> 并且,按照官网文档
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
>> 
>> 
>> 设置参数也不识别,报错:
>> Caused by: org.apache.flink.table.api.ValidationException: Could not find 
>> any factory for identifier 'hbase-2.1.0' that implements 
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
>> 
>> 
>> 看了一下源码,
>> org.apache.flink.table.descriptors.HBaseValidator
>> public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
>>public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
>>public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
>>public static final String CONNECTOR_ZK_QUORUM = 
>> "connector.zookeeper.quorum";
>>public static final String CONNECTOR_ZK_NODE_PARENT = 
>> "connector.zookeeper.znode.parent";
>>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = 
>> "connector.write.buffer-flush.max-size";
>>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = 
>> "connector.write.buffer-flush.max-rows";
>>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = 
>> "connector.write.buffer-flush.interval";
>> 参数还是老参数
>


Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
rowtime 定义如下,我发现 SchemaValidator#deriveFieldMapping 方法给移除了。
Rowtime rowtime = new Rowtime()
.timestampsFromField("searchTime")
.watermarksPeriodicBounded(5 * 1000);



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink on yarn日志问题

2020-07-13 文章 王松
我们也有问题 1,和 Yangze Guo 说的一样,每次都要去对应的tm目录中去找日志,很麻烦,不知道有没有更简单的办法。

Yangze Guo  于2020年7月13日周一 下午5:03写道:

> 1.
> 我验证了一下,如果开启了日志收集,那tm的日志是会保存的,但是你整个application结束前可能看不到,有一个trick的方法,首先在jm日志中找到tm分配到了哪个NodeManager上,通过拼接url的方式来获取container的日志
> 2. 你是否需要调整一下重启策略[1]? 如果开启了ck,默认情况下就会一直尝试重启job
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/task_failure_recovery.html
>
> Best,
> Yangze Guo
>
>
> On Mon, Jul 13, 2020 at 2:40 PM 程龙 <13162790...@163.com> wrote:
> >
> > 不好意思  怪我灭有描述清楚
> > 1 目前开启日志收集功能
> > 2 目前已是 per-job模式
> > 3 集群使用cdh flink.1.10
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-07-13 11:18:46,"Yangze Guo"  写道:
> > >Hi,
> > >
> > >第一个问题,您可以尝试开启Yarn的日志收集功能[1]
> > >
> > >第二个问题,您可以尝试一下per-job mode [2][3]
> > >
> > >[1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
> > >[2]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
> > >[3]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#run-a-single-flink-job-on-yarn
> > >
> > >
> > >Best,
> > >Yangze Guo
> > >
> > >On Mon, Jul 13, 2020 at 10:49 AM 程龙 <13162790...@163.com> wrote:
> > >>
> > >> 请问一下两个问题
> > >> 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看
> ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。
> > >> 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在,
> 有没有好的方式或者策略 ,   可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
> > >>
>


?????? ??Flink Join??????????

2020-07-13 文章 ????????
Hi:


interval joinkey?
interval join??join??regular 
join??stream??key?
.




--  --
??: 
   "user-zh"

<17626017...@163.com>;
: 2020??7??6??(??) 11:12
??: "user-zh"

Re: flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Leonard Xu
Hi, Zhou


>   'connector.write.buffer-flush.max-size' = '1mb',
>   'connector.write.buffer-flush.interval' = ‘0s'

(1) connector.write.buffer-flush.max-size这个配置项支持的单位只有mb,其他不支持,所以会报对应的错。这个参数用于 
BufferredMutator 
做buffer优化的参数,表示buffer存多大的size就触发写,flush.interval参数是按照多长的时间轮询写入,两个参数根据需要配合使用。当connector.write.buffer-flush.interval
 设置为 0s 
时,表示不会轮询,所以只会等connector.write.buffer-flush.max-size到最大size再写入。你把connector.write.buffer-flush.interval
 设置成 1s 应该就能看到数据了。

(2) Hbase connector 1.11.0 之前的版本只支持1.4.3,所以你填2.1.0会报错,在1.11.0开始支持为1.4.x, 
所以1.11.0新的connector里支持的参数为’connector’ = ‘hbase-1.4’, 因为hbase 
1.4.x版本API是兼容的,另外社区也在讨论支持HBase 2.x[1]


Best,
Leonard Xu
[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Upgrade-HBase-connector-to-2-2-x-tc42657.html#a42674
 



> 在 2020年7月13日,21:09,Zhou Zach  写道:
> 
> 
> 
> flink订阅kafka消息,同时sink到hbase和hive中,
> 当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条
> 
> 
> query:
> streamTableEnv.executeSql(
>  """
>|
>|CREATE TABLE hbase_table (
>|rowkey VARCHAR,
>|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
>|) WITH (
>|'connector.type' = 'hbase',
>|'connector.version' = '2.1.0',
>|'connector.table-name' = 'ods:user_hbase6',
>|'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>|'connector.zookeeper.znode.parent' = '/hbase',
>|'connector.write.buffer-flush.max-size' = '1mb',
>|'connector.write.buffer-flush.max-rows' = '1',
>|'connector.write.buffer-flush.interval' = '0s'
>|)
>|""".stripMargin)
> 
>val statementSet = streamTableEnv.createStatementSet()
>val insertHbase =
>  """
>|insert into hbase_table
>|SELECT
>|   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
> cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
>|   ROW(sex, age, created_time ) as cf
>|FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
> created_time from kafka_table)
>|
>|""".stripMargin
> 
>statementSet.addInsertSql(insertHbase)
> 
>val insertHive =
>  """
>|
>|INSERT INTO odsCatalog.ods.hive_table
>|SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'), 
> DATE_FORMAT(created_time, 'HH')
>|FROM kafka_table
>|
>|""".stripMargin
>statementSet.addInsertSql(insertHive)
> 
> 
>statementSet.execute()
> 
> 
> 是因为参数'connector.write.buffer-flush.max-size' = 
> '1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
> bytes) value but was: 1kb
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
> bytes) value but was: 10b
> Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
> bytes) value but was: 1
> 
> 
> 
> 
> 
> 
> 并且,按照官网文档
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html
> 
> 
> 设置参数也不识别,报错:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
> factory for identifier 'hbase-2.1.0' that implements 
> 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.
> 
> 
> 看了一下源码,
> org.apache.flink.table.descriptors.HBaseValidator
> public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
>public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
>public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
>public static final String CONNECTOR_ZK_QUORUM = 
> "connector.zookeeper.quorum";
>public static final String CONNECTOR_ZK_NODE_PARENT = 
> "connector.zookeeper.znode.parent";
>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = 
> "connector.write.buffer-flush.max-size";
>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = 
> "connector.write.buffer-flush.max-rows";
>public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = 
> "connector.write.buffer-flush.interval";
> 参数还是老参数



flink state

2020-07-13 文章 Robert.Zhang
Hello,all
目前stream中遇到一个问题,
想使用一个全局的state 在所有的keyed stream中使用,或者global 
parameter,主要的需求在于是这个state是可变的,需要对其进行修改并且对所有stream 
operator可见,大家有遇到过类似场景或者可以提供相关思路么,感激不尽


Best regards

Re: flink 1.11运算结果存mysql出错

2020-07-13 文章 godfrey he
1.11 对 StreamTableEnvironment.execute()
和 StreamExecutionEnvironment.execute() 的执行方式有所调整,
简单概述为:
1. StreamTableEnvironment.execute() 只能执行 sqlUpdate 和 insertInto 方法执行作业;
2. Table 转化为 DataStream 后只能通过 StreamExecutionEnvironment.execute() 来执行作业;
3. 新引入的 TableEnvironment.executeSql() 和 StatementSet.execute() 方法是直接执行sql作业
(异步提交作业),不需要再调用 StreamTableEnvironment.execute()
或 StreamExecutionEnvironment.execute()

TableEnvironment.executeSql() 和 StatementSet.execute()
提交的作业都是异步的,如果是在本地测试的话,不会等有最终结果才会推出。针对这个问题,1.12里准备引入 await 方法
[3],代码还在review中。

TableResult是用来描述一个statement执行的结果。对于SELECT和INSERT,TableResult中还包含了JobClient
[4]
用来操作对应的job,例如获取job状态,cancel作业,等待作业结束等。TableResult还可以collect方法拿到statement执行的schema和结果数据,例如
select/show的结果。


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E7%BF%BB%E8%AF%91%E4%B8%8E%E6%89%A7%E8%A1%8C%E6%9F%A5%E8%AF%A2
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/common.html#%E5%B0%86%E8%A1%A8%E8%BD%AC%E6%8D%A2%E6%88%90-datastream-%E6%88%96-dataset
[3] https://issues.apache.org/jira/browse/FLINK-18337
[4]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-74%3A+Flink+JobClient+API

Best,
Godfrey


小学生 <201782...@qq.com> 于2020年7月13日周一 下午9:12写道:

> 嗯嗯,尝试了,这下没问题了,想问下这个TableResult对象,设计的目的是啥呢,不是特别懂呢,谢谢!
>


回复: 使用Flink Array Field Type

2020-07-13 文章 叶贤勋
谢谢 Leonard的解答。刚刚也看到了这个jira单[1]


[1] https://issues.apache.org/jira/browse/FLINK-17847
| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制


在2020年07月13日 20:50,Leonard Xu 写道:
Hi,

SQL 中数据下标是从1开始的,不是从0,所以会有数组越界问题。建议使用数组时通过 select arr[5] from T where 
CARDINALITY(arr) >= 5 这种方式防止数组访问越界。


祝好,
Leonard Xu

在 2020年7月13日,20:34,叶贤勋  写道:

test_array_string[0]



Re: flink 1.11??????????mysql????

2020-07-13 文章 ??????
??TableResult

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Jark Wu
你的源码中 new
Schema().field("searchTime",DataTypes.TIMESTAMP()).rowtime(rowtime);
里面的 rowtime 的定义能贴下吗?

On Mon, 13 Jul 2020 at 20:53, Hito Zhu  wrote:

> Hi Jark 异常信息如下:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field null does not exist
> at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$mapToResolvedField$4(TimestampExtractorUtils.java:85)
> at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)
> at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.mapToResolvedField(TimestampExtractorUtils.java:85)
> at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$getAccessedFields$0(TimestampExtractorUtils.java:58)
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
>
> java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
> at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
> at
>
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at
> java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
> at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:73)
> at
>
> org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:65)
> at
>
> org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:244)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:119)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:118)
> at scala.Option.map(Option.scala:146)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:118)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
>
> 

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
1. 执行的脚本,产生的日志是:
2020-07-13 21:00:25,248 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.address, localhost
2020-07-13 21:00:25,251 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.rpc.port, 6123
2020-07-13 21:00:25,251 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.memory.process.size, 1600m
2020-07-13 21:00:25,251 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.memory.process.size, 1728m
2020-07-13 21:00:25,251 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 1
2020-07-13 21:00:25,251 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: parallelism.default, 1
2020-07-13 21:00:25,252 INFO
 org.apache.flink.configuration.GlobalConfiguration   [] - Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-07-13 21:00:25,344 INFO
 org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] -
Could not load factory due to missing dependencies.
2020-07-13 21:00:26,136 INFO
 org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
less than its min value 192.000mb (201326592 bytes), min value will be used
instead
2020-07-13 21:00:26,154 INFO
 org.apache.flink.kubernetes.utils.KubernetesUtils[] -
Kubernetes deployment requires a fixed port. Configuration blob.server.port
will be set to 6124
2020-07-13 21:00:26,154 INFO
 org.apache.flink.kubernetes.utils.KubernetesUtils[] -
Kubernetes deployment requires a fixed port. Configuration
taskmanager.rpc.port will be set to 6122
2020-07-13 21:00:26,204 INFO
 org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
less than its min value 192.000mb (201326592 bytes), min value will be used
instead
2020-07-13 21:00:26,220 WARN
 org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator
[] - Found 0 files in directory null/etc/hadoop, skip to mount the Hadoop
Configuration ConfigMap.
2020-07-13 21:00:26,220 WARN
 org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator
[] - Found 0 files in directory null/etc/hadoop, skip to create the Hadoop
Configuration ConfigMap.
2020-07-13 21:00:26,958 INFO
 org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
flink session cluster k8s-session-1 successfully, JobManager Web Interface:
http://172.16.5.175:8081

2. 查看 desrcibe 日志是:
MountVolume.SetUp failed for volume "flink-config-volume" : configmap
"flink-config-k8s-session-1" not found

3. logs 日志是:

Start command : /bin/bash -c $JAVA_HOME/bin/java -classpath
$FLINK_CLASSPATH -Xmx1073741824 -Xms1073741824
-XX:MaxMetaspaceSize=268435456 -Dlog.file=/opt/flink/log/jobmanager.log
-Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j.properties
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
1> /opt/flink/log/jobmanager.out 2> /opt/flink/log/jobmanager.err

4. kubectl get cm 可以看到
NAME DATA   AGE
flink-config-k8s-session-1   3  5m45s

麻烦大佬帮忙看看~是不是我的语句有问题还是缺什么文件~
我是直接官网下的包,没有改任何文件~

Leonard Xu  于2020年7月13日周一 下午8:41写道:

> Hi, zhai
>
> 可以贴详细点吗?我帮你 CC 了熟悉这块的大佬 Yun Gao
>
> 祝好
>
> > 在 2020年7月13日,20:11,Yvette zhai  写道:
> >
> > 报错是MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> > "flink-config-k8s-session-1" not found
> >
> >
> > Leonard Xu  于2020年7月13日周一 下午8:03写道:
> >
> >> Hi, zhai
> >>
> >> 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。
> >>
> >> Best,
> >> Leonard Xu
> >>
> >>> 在 2020年7月13日,19:59,Yvette zhai  写道:
> >>>
> >>> 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
> >>> 下载的flink-1.11.0-bin-scala_2.11.tgz
> >>>
> >>> 执行命令是
> >>> ./bin/kubernetes-session.sh \
> >>>-Dkubernetes.cluster-id=k8s-session-1 \
> >>>-Dtaskmanager.memory.process.size=4096m \
> >>>-Dkubernetes.taskmanager.cpu=2 \
> >>>-Dtaskmanager.numberOfTaskSlots=4 \
> >>>-Dresourcemanager.taskmanager-timeout=360 \
> >>>-Dkubernetes.container.image=flink:1.11.0-scala_2.11
> >>>
> >>> 但是会报错,找不到configmap
> >>>
> >>>
> >>> 我看是执行上述命令是会生成configmap的,为什么还会报找不到。
> >>>
> >>
> >>
>
>


flink 同时sink hbase和hive,hbase少记录

2020-07-13 文章 Zhou Zach


flink订阅kafka消息,同时sink到hbase和hive中,
当向kafka发送42条记录,然后停止producer发消息,去hive中查可以精准地查到42条,但是在hbase中却只查到30条


query:
streamTableEnv.executeSql(
  """
|
|CREATE TABLE hbase_table (
|rowkey VARCHAR,
|cf ROW(sex VARCHAR, age INT, created_time VARCHAR)
|) WITH (
|'connector.type' = 'hbase',
|'connector.version' = '2.1.0',
|'connector.table-name' = 'ods:user_hbase6',
|'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
|'connector.zookeeper.znode.parent' = '/hbase',
|'connector.write.buffer-flush.max-size' = '1mb',
|'connector.write.buffer-flush.max-rows' = '1',
|'connector.write.buffer-flush.interval' = '0s'
|)
|""".stripMargin)

val statementSet = streamTableEnv.createStatementSet()
val insertHbase =
  """
|insert into hbase_table
|SELECT
|   CONCAT(SUBSTRING(MD5(CAST(uid AS VARCHAR)), 0, 6), 
cast(CEILING(UNIX_TIMESTAMP(created_time)/60) as string), sex) as uid,
|   ROW(sex, age, created_time ) as cf
|FROM  (select uid,sex,age, cast(created_time as VARCHAR) as 
created_time from kafka_table)
|
|""".stripMargin

statementSet.addInsertSql(insertHbase)

val insertHive =
  """
|
|INSERT INTO odsCatalog.ods.hive_table
|SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'), 
DATE_FORMAT(created_time, 'HH')
|FROM kafka_table
|
|""".stripMargin
statementSet.addInsertSql(insertHive)


statementSet.execute()


是因为参数'connector.write.buffer-flush.max-size' = 
'1mb'吗?我尝试设置‘0’,‘10b','1kb',都失败了,报错如下:
Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
bytes) value but was: 1kb
Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
bytes) value but was: 10b
Property 'connector.write.buffer-flush.max-size' must be a memory size (in 
bytes) value but was: 1






并且,按照官网文档
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html


设置参数也不识别,报错:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'hbase-2.1.0' that implements 
'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.


看了一下源码,
org.apache.flink.table.descriptors.HBaseValidator
public static final String CONNECTOR_TYPE_VALUE_HBASE = "hbase";
public static final String CONNECTOR_VERSION_VALUE_143 = "2.1.0";
public static final String CONNECTOR_TABLE_NAME = "connector.table-name";
public static final String CONNECTOR_ZK_QUORUM = 
"connector.zookeeper.quorum";
public static final String CONNECTOR_ZK_NODE_PARENT = 
"connector.zookeeper.znode.parent";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_SIZE = 
"connector.write.buffer-flush.max-size";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_MAX_ROWS = 
"connector.write.buffer-flush.max-rows";
public static final String CONNECTOR_WRITE_BUFFER_FLUSH_INTERVAL = 
"connector.write.buffer-flush.interval";
参数还是老参数

Re: flink 1.11运算结果存mysql出错

2020-07-13 文章 Leonard Xu
Hi,知道了
source.execute_insert("g_source_tab”) 
返回的结果是一个TableResult对象,如果不显示地等待任务的执行,这个任务会直接返回,你试下这个

result.execute_insert("g_source_tab") \
.get_job_client() \
.get_job_execution_result() \
.result()

这是Flip-84引入的一个改动,为了更好地处理table程序的返回值。

祝好,
Leonard Xu

> 在 2020年7月13日,20:57,小学生 <201782...@qq.com> 写道:
> 
> 不像吧,这个是1.10版的,我执行这个程序很快就结束了,不会挂着。



Re: flink 1.11??????????mysql????

2020-07-13 文章 ??????
??1.10

Re: flink sql报错 Could not find any factory for identifier 'kafka'

2020-07-13 文章 王松
感谢大家的热情解答,最后问题解决了。原因正是 Leonard Xu所说的,我应该引入的是
flink-sql-connector-kafka-${version}_${scala.binary.version},然后当时改成
flink-sql-connector-kafka
后继续报错的原因是:我还在pom文件中引入了flink-table-planner-blink,如下:

org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}

添加provided后就没有问题了。

最后附上正确的pom文件 (如 Jingsong
所说,也可以把flink-sql-connector-kafka、flink-json这些都在pom文件中去掉,直接将jar报放入lib中):




org.apache.flink

flink-table-planner-blink_${scala.binary.version}
${flink.version}



org.apache.flink
flink-json
${flink.version}


org.apache.flink

flink-sql-connector-kafka_${scala.binary.version}
${flink.version}


org.apache.flink
flink-core
${flink.version}


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}




Jingsong Li  于2020年7月13日周一 下午4:35写道:

> Hi,
>
> 1.推荐方式:把flink-sql-connector-kafka-0.11_2.11-1.11.0.jar放入lib下。下载链接:[1]
>
> 2.次推荐方式:你的java工程打包时,需要用shade插件把kafka相关类shade到最终的jar中。(不能用jar-with-deps,因为它会覆盖掉java
> spi)
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
>
> Best,
> Jingsong
>
> On Mon, Jul 13, 2020 at 4:04 PM 王松  wrote:
>
> > 你好本超,
> > 是的,我尝试解压打包好的jar包,里边是包含我pom中写的依赖的
> >
> > Benchao Li  于2020年7月13日周一 下午3:42写道:
> >
> > > 你的程序打包的时候是不是把依赖都shade进去了呢?像这种connector,一般最好是在用户程序中打进去;
> > > 或者你不打进去的话,也可以在提交作业的时候把这些connector放到classpath里面。
> > > 当然,直接粗暴的放到lib下,也是可以的。
> > >
> > > Leonard Xu  于2020年7月13日周一 下午3:38写道:
> > >
> > > > Hi
> > > > 你可以试下把 flink-connector-kafka_2.11-1.11.0.jar
> > > > 的依赖也放lib下试下(pom中删掉),排除是否因为提交作业的方式导致没有正确加载 还是 其他原因。
> > > >
> > > > 祝好
> > > >
> > > > > 在 2020年7月13日,15:28,王松  写道:
> > > > >
> > > > > 您好,我只加载了flink-sql-connector-kafka,另外 scope没有设置,使用了默认值compile。
> > > > >
> > > > > 我机器上flink/lib下jar包如下:
> > > > > -rw-rw-r-- 1 hadoop hadoop117719 6月  30 12:41
> > flink-avro-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 90782 7月   8 10:09
> > flink-csv-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 108349203 7月   8 10:09
> > > > flink-dist_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 94863 7月   8 10:09
> > flink-json-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop   7712156 7月   8 10:09
> > > > > flink-shaded-zookeeper-3.4.14.jar
> > > > > -rw-r--r-- 1 hadoop hadoop  33325754 7月   8 10:09
> > > > > flink-table_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop  37330521 7月   8 10:09
> > > > > flink-table-blink_2.11-1.11.0.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 67114 7月   8 10:09
> > > > log4j-1.2-api-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop276771 7月   8 10:09
> > log4j-api-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop   1674433 7月   8 10:09
> > log4j-core-2.12.1.jar
> > > > > -rw-r--r-- 1 hadoop hadoop 23518 7月   8 10:09
> > > > > log4j-slf4j-impl-2.12.1.jar
> > > > >
> > > > > Leonard Xu  于2020年7月13日周一 下午3:05写道:
> > > > >
> > > > >> Hi,
> > > > >> flink-connector-kafka_${scala.binary.version 和
> > > > >> flink-sql-connector-kafka_${scala.binary.version
> > > > >> 只用加载一个应该就好了,前者的话是dataStream 或者 Table API 程序使用,
> > > > >> 后者的话主要是对前者做了shade处理,方便用户在 SQL
> > > > >> Client的环境中使用。理论上两个都应该ok的,还是同样的错误看起来是依赖没有正确的加载,不知道你的依赖的scope是如何制定的,
> > > > >> 可以检查下yarn集群上Flink对应的lib下是否有对应的依赖了或者依赖的版本是否正确。
> > > > >>
> > > > >> [1] 中的话是有SQL Client JAR 的下载链接,就是
> > > > >> flink-sql-connector-kafka_${scala.binary.version jar 包的下载链接,你看一看下。
> > > > >>
> > > > >> 祝好
> > > > >> Leonard Xu
> > > > >>
> > > > >>> 在 2020年7月13日,14:42,王松  写道:
> > > > >>>
> > > > >>> @Leonard Xu,
> > > > >>> 非常感谢您的回复,我试了试您说的方式,还是报同样的错误,另外,我在 [1]
> > > > >>> 中并没有看到关于flink-sql-connecter-kafka相关的信息重新的pom如下:
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> > > > >>> =
> > > > >>> 
> > > > >>>   org.apache.flink
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> flink-sql-connector-kafka_${scala.binary.version}
> > > > >>>   ${flink.version}
> > > > >>>   
> > > > >>>
> > > > >>>
> > > > >>>   
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> 
> > > > >>>   
> > > > >>>   
> > > > >>>   
> > > > >>>   
> > > > >>>
> > > > >> 
> > > > >>>   
> > > > >>>   
> > > > >>>   
> > > > >>>
> > > > >>>   
> > > > >>>   
> > > > >>>
> >  
> > > > >>>   
> > > > >>>   
> > > > >>> =
> > > > >>>
> > > > >>> Leonard Xu  于2020年7月13日周一 下午1:39写道:
> > > > >>>
> > > >  Hi, 王松
> > > > 
> > > >  这个报错是pom中缺少了 Kafka SQL connector的依赖,

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Hito Zhu
Hi Jark 异常信息如下:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field null does not exist
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$mapToResolvedField$4(TimestampExtractorUtils.java:85)
at java.util.OptionalInt.orElseThrow(OptionalInt.java:189)
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.mapToResolvedField(TimestampExtractorUtils.java:85)
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.lambda$getAccessedFields$0(TimestampExtractorUtils.java:58)
at
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at 
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:73)
at
org.apache.flink.table.sources.tsextractors.TimestampExtractorUtils.getAccessedFields(TimestampExtractorUtils.java:65)
at
org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:244)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:119)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan$$anonfun$1.apply(StreamExecLegacyTableSourceScan.scala:118)
at scala.Option.map(Option.scala:146)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:118)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlanInternal(StreamExecLegacyTableSourceScan.scala:63)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacyTableSourceScan.translateToPlan(StreamExecLegacyTableSourceScan.scala:63)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:127)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:321)
at
org.apache.flink.table.

回复: flink 1.11运算结果存mysql出错

2020-07-13 文章 Zhonghan Tang
有新数据进来吗,看起来和这个jira很像
https://issues.apache.org/jira/browse/FLINK-15262




在2020年07月13日 20:38,Leonard Xu 写道:
Hi,

简单看了下代码应该没啥问题,alarm_test_g 这个kafka 
topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点

Best,
Leonard Xu

在 2020年7月13日,20:06,小学生 <201782...@qq.com> 写道:

各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python 
*.py执行的。完整代码如下


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
 id VARCHAR,   
 alarm_id VARCHAR,   
 trck_id VARCHAR


) WITH (
 'connector' = 'kafka',
 'topic' = 'alarm_test_g',   
 'scan.startup.mode' = 'earliest-offset',
 'properties.bootstrap.servers' = '10.2.2.73:2181',
 'properties.bootstrap.servers' = '10.2.2.73:9092',
 'format' = 'json' 
)
"""

sink="""
CREATE TABLE g_source_tab (
 id VARCHAR,   
 alarm_id VARCHAR,     
 trck_id VARCHAR


) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
 'table-name' = 'g',   
 'username' = 'root',
 'password' = '123456t',
 'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
        .select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")


Re: 使用Flink Array Field Type

2020-07-13 文章 Leonard Xu
Hi, 

SQL 中数据下标是从1开始的,不是从0,所以会有数组越界问题。建议使用数组时通过 select arr[5] from T where 
CARDINALITY(arr) >= 5 这种方式防止数组访问越界。


祝好,
Leonard Xu

> 在 2020年7月13日,20:34,叶贤勋  写道:
> 
> test_array_string[0]



Re: flink 1.11??????????mysql????

2020-07-13 文章 ??????
topic??,??flink1.10??insert_into

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Leonard Xu
Hi, zhai

可以贴详细点吗?我帮你 CC 了熟悉这块的大佬 Yun Gao

祝好

> 在 2020年7月13日,20:11,Yvette zhai  写道:
> 
> 报错是MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> "flink-config-k8s-session-1" not found
> 
> 
> Leonard Xu  于2020年7月13日周一 下午8:03写道:
> 
>> Hi, zhai
>> 
>> 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。
>> 
>> Best,
>> Leonard Xu
>> 
>>> 在 2020年7月13日,19:59,Yvette zhai  写道:
>>> 
>>> 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
>>> 下载的flink-1.11.0-bin-scala_2.11.tgz
>>> 
>>> 执行命令是
>>> ./bin/kubernetes-session.sh \
>>>-Dkubernetes.cluster-id=k8s-session-1 \
>>>-Dtaskmanager.memory.process.size=4096m \
>>>-Dkubernetes.taskmanager.cpu=2 \
>>>-Dtaskmanager.numberOfTaskSlots=4 \
>>>-Dresourcemanager.taskmanager-timeout=360 \
>>>-Dkubernetes.container.image=flink:1.11.0-scala_2.11
>>> 
>>> 但是会报错,找不到configmap
>>> 
>>> 
>>> 我看是执行上述命令是会生成configmap的,为什么还会报找不到。
>>> 
>> 
>> 



Re: flink 1.11运算结果存mysql出错

2020-07-13 文章 Leonard Xu
Hi,

简单看了下代码应该没啥问题,alarm_test_g 这个kafka 
topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点

Best,
Leonard Xu

> 在 2020年7月13日,20:06,小学生 <201782...@qq.com> 写道:
> 
> 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python
>  *.py执行的。完整代码如下
> 
> 
> from pyflink.datastream import StreamExecutionEnvironment, 
> TimeCharacteristic, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> source="""
> CREATE TABLE kafka_source_tab (
>  id VARCHAR,   
>  alarm_id VARCHAR,   
>  trck_id VARCHAR
> 
> 
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'alarm_test_g',   
>  'scan.startup.mode' = 'earliest-offset', 
>  'properties.bootstrap.servers' = '10.2.2.73:2181',
>  'properties.bootstrap.servers' = '10.2.2.73:9092',
>  'format' = 'json'  
> )
> """
> 
> sink="""
> CREATE TABLE g_source_tab (
>  id VARCHAR,    
>  alarm_id VARCHAR,     
>  trck_id VARCHAR
> 
> 
> ) WITH (
>  'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',  
>  'table-name' = 'g',   
>  'username' = 'root',
>  'password' = '123456t',
>  'sink.buffer-flush.interval' = '1s'
> )
> """
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_parallelism(1)
> env_settings = 
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
> 
> 
> 
> t_env.execute_sql(source)
> t_env.execute_sql(sink)
> 
> 
> source = t_env.from_path("kafka_source_tab")\
>         .select("id,alarm_id,trck_id")
> source.execute_insert("g_source_tab")



使用Flink Array Field Type

2020-07-13 文章 叶贤勋
Flink 1.10.0
问题描述:source表中有个test_array_string 
ARRAY字段,在DML语句用test_array_string[0]获取数组中的值会报数组越界异常。另外测试过Array也是相同错误,Array,Array等类型也会报数组越界问题。
请问这是Flink1.10的bug吗?


SQL:
CREATETABLE source (
……
test_array_string ARRAY
) WITH (
'connector.type'='kafka',
'update-mode'='append',
'format.type'='json'
  ……
);


CREATETABLE sink(
v_string string
) WITH (
  ……
);


INSERTINTO
sink
SELECT
test_array_string[0] as v_string
from
source;


kafka样例数据:{"id":1,"test_array_string":["ff”]}


Flink 执行的时候报以下错误:
java.lang.ArrayIndexOutOfBoundsException: 33554432
at 
org.apache.flink.table.runtime.util.SegmentsUtil.getByteMultiSegments(SegmentsUtil.java:598)
at 
org.apache.flink.table.runtime.util.SegmentsUtil.getByte(SegmentsUtil.java:590)
at 
org.apache.flink.table.runtime.util.SegmentsUtil.bitGet(SegmentsUtil.java:534)
at 
org.apache.flink.table.dataformat.BinaryArray.isNullAt(BinaryArray.java:117)
at StreamExecCalc$9.processElement(UnknownSource)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at SourceConversion$1.processElement(UnknownSource)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:408)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)


| |
叶贤勋
|
|
yxx_c...@163.com
|
签名由网易邮箱大师定制



Re:Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
好的,感谢答疑

















在 2020-07-13 19:49:10,"Jingsong Li"  写道:
>创建kafka_table需要在default dialect下。
>
>不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach  wrote:
>
>> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
>> 如果是default Dialect创建的表,是不是只是在临时会话有效
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-13 19:27:44,"Jingsong Li"  写道:
>> >Hi,
>> >
>> >问题一:
>> >
>> >只要current catalog是HiveCatalog。
>> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
>> >
>> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
>> >
>> >问题二:
>> >
>> >用filesystem创建出来的是filesystem的表,它和hive
>> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
>> >
>> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
>> >但是它的partition commit是不支持metastore的,所以不会有自动add
>> >partition到hive的默认实现,你需要自定义partition-commit-policy.
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach  wrote:
>> >
>> >> 尴尬
>> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
>> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
>> >> 还有两个问题问下,
>> >> 问题1:
>> >> 创建的kafka_table,在hive和Flink
>> >>
>> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 问题2:
>> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
>> >> java.util.concurrent.CompletionException:
>> >>
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> >> Could not execute application.
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> >> ~[?:1.8.0_161]
>> >> at
>> >>
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >> at
>> >>
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
>> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> >> at
>> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >> [?:1.8.0_161]
>> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> >> [?:1.8.0_161]
>> >> at
>> >>
>> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at
>> >>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at
>> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at
>> >>
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at
>> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >> [qile-data-flow-1.0.jar:?]
>> >> at
>> >>
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >> [qile-data-flow-1.0.jar:?]
>> >> Caused by:
>> >>
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> >> Could not execute application.
>> >> ... 11 more
>> >> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> The
>> >> main method caused an error: Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table1'.
>> >>
>> >> Table options are:
>> >>
>> >> 'connector'='filesystem'
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='0s'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> at
>> >>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:30

Re: flink-benchmarks使用求助

2020-07-13 文章 Congxian Qiu
Hi

没有遇到过这个错误,这个错误是在指定 `-t max` 之后出现的,还是说其他情况下也会遇到呢?

Best,
Congxian


zilong xiao  于2020年7月13日周一 下午2:32写道:

> 是的,用的 flink-benchmarks 代码,在跑的时候,指定参数-t max(最大工程线程),在运行中会出现异常: `shutdown
> timeout of 30 seconds expired, forcing forked VM to exit`,前辈有遇到过这种情况吗?
>
> Congxian Qiu  于2020年7月10日周五 下午7:18写道:
>
> > Hi
> > 你说的 flink-benchmarks 是指 这个仓库[1]的代码吗? 是这个仓库的代码的话,你按照 readme 能跑出一个结果(csv
> > 文件,或者终端能看到最终的结果),这个结果就是 JMH 的的结果,具体的可以阅读 JMH 的相关文档[2]
> >
> > [1] https://github.com/dataArtisans/flink-benchmarks
> > [2] http://openjdk.java.net/projects/code-tools/jmh/
> >
> > Best,
> > Congxian
> >
> >
> > zilong xiao  于2020年7月10日周五 下午3:54写道:
> >
> > > 如题,最近在新机器上跑flink-benchmarks验证下机器性能,但是不太会对跑出的结果进行分析,不知是否有大神也用过这个,可否指点一二
> > >
> >
>


Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
报错是MountVolume.SetUp failed for volume "flink-config-volume" : configmap
"flink-config-k8s-session-1" not found


Leonard Xu  于2020年7月13日周一 下午8:03写道:

> Hi, zhai
>
> 图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。
>
> Best,
> Leonard Xu
>
> > 在 2020年7月13日,19:59,Yvette zhai  写道:
> >
> > 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
> > 下载的flink-1.11.0-bin-scala_2.11.tgz
> >
> > 执行命令是
> > ./bin/kubernetes-session.sh \
> > -Dkubernetes.cluster-id=k8s-session-1 \
> > -Dtaskmanager.memory.process.size=4096m \
> > -Dkubernetes.taskmanager.cpu=2 \
> > -Dtaskmanager.numberOfTaskSlots=4 \
> > -Dresourcemanager.taskmanager-timeout=360 \
> > -Dkubernetes.container.image=flink:1.11.0-scala_2.11
> >
> > 但是会报错,找不到configmap
> >
> >
> > 我看是执行上述命令是会生成configmap的,为什么还会报找不到。
> >
>
>


Re: Re: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常

2020-07-13 文章 Congxian Qiu
Hi  程龙

如果可以的话,也麻烦使用 1.11.0 测试下看问题是否还存在。

Best,
Congxian


程龙 <13162790...@163.com> 于2020年7月13日周一 上午10:45写道:

>
>
>
>
>
>
> 问题不是很常见 ,但是同一个任务,提交在flink1.10 和 flink1.10.1上都会复现, 准备尝试一下升级一下jdk试试
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-06 16:11:17,"Congxian Qiu"  写道:
> >@chenkaibit 多谢你的回复~
> >
> >Best,
> >Congxian
> >
> >
> >chenkaibit  于2020年7月6日周一 下午3:53写道:
> >
> >> hi,Congxian。我在发现这个问题时也很奇怪,但是在打印了一些日志后,确实验证了我的想法。因为 <低版本jdk+flink1.9> 和
> >> <高版本jdk+1.10> 都不会抛 NPE(见 FLINK-17479),我猜测和 lambda 表达式中外部变量的垃圾回收机制以及 1.10
> >> 引入的 MailBox 模型有关,外部 checkpointMetaData 实例被意外回收了。所以在修复的 patch 中我在 lambda
> >> 表达式内部实例化了一个新的 checkpointMetaData,目前看这个方法是有效的,没有再发现过
> >> NPE。这是个临时的修复方法,根本原因可能还需要进一步分析。
> >>
> >>
> >> --
> >> Best, yuchuan
> >>
> >>
> >>
> >> 在 2020-07-06 14:04:58,"Congxian Qiu"  写道:
> >> >@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份
> >> >CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。
> >> >
> >> >Best,
> >> >Congxian
> >> >
> >> >
> >> >陈凯  于2020年7月6日周一 上午9:53写道:
> >> >
> >> >>
> >> >> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
> >> >> 我之前提了个jira 描述了这个问题
> >> >> https://issues.apache.org/jira/browse/FLINK-18196
> >> >>
> >> >> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk
> 版本,可以参考下面的patch:
> >> >>
> >> >>
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
> >> >>
> >> >>
> >> >>
> >> >> -邮件原件-
> >> >> 发件人: zhisheng 
> >> >> 发送时间: 2020年7月5日 15:01
> >> >> 收件人: user-zh 
> >> >> 主题: Re: 回复:flink1.10 checkpoint 运行一段时间空指针异常
> >> >>
> >> >> 生产集群 JDK 使用的地方比较多,不敢轻易换版本,后面再观察一下,如果频繁出现这种问题再考虑更换版本,感谢 Congxian
> >> >>
> >> >> Best!
> >> >> zhisheng
> >> >>
> >> >> Congxian Qiu  于2020年7月4日周六 下午3:21写道:
> >> >>
> >> >> > @zhisheng 你们有尝试过更换 jdk 版本吗?更换版本是否能解决这个问题呢?
> >> >> >
> >> >> > Best,
> >> >> > Congxian
> >> >> >
> >> >> >
> >> >> > zhisheng  于2020年7月4日周六 下午12:27写道:
> >> >> >
> >> >> > > 我们也有遇到过这个异常,但是不是很常见
> >> >> > >
> >> >> > > Congxian Qiu  于2020年7月3日周五 下午2:08写道:
> >> >> > >
> >> >> > > > 你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
> >> >> > > > [1]  https://issues.apache.org/jira/browse/FLINK-17479
> >> >> > > > Best,
> >> >> > > > Congxian
> >> >> > > >
> >> >> > > >
> >> >> > > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> >> >> > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > >
> >> >> > > > > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> >> >> > > > > >你到具体的tm上找到相关的operator看看是不是有异常信息
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > >| |
> >> >> > > > > >JasonLee
> >> >> > > > > >|
> >> >> > > > > >|
> >> >> > > > > >邮箱:17610775...@163.com
> >> >> > > > > >|
> >> >> > > > > >
> >> >> > > > > >Signature is customized by Netease Mail Master
> >> >> > > > > >
> >> >> > > > > >在2020年07月01日 20:43,程龙 写道:
> >> >> > > > > >flink1.10上 程序运行几个小时后就会报不能执行checkpoint 空指针异常 具体如下:
> >> >> > > > > >
> >> >> > > > > >
> >> >> > > > > >java.lang.Exception: Could not perform checkpoint 3201 for
> >> >> operator
> >> >> > > > > Filter -> Map (2/8).
> >> >> > > > > >   at
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:816)
> >> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> .CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86)
> >> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> .CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177)
> >> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> >> > > > >
> .CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
> >> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> >> > > > >
> .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133)
> >> >> > > > > >   at org.apache.flink.streaming.runtime.io
> >> >> > > > >
> >> >> >
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
> >> >> > > > > >   at
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
> >> >> > > > > >   at
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> >> >> > > > > >   at
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> >> >> > > > > >   at
> >> >> > > > >
> >> >> > > >
> >> >> > >
> >> >> >
> >> >>
> org.apache.f

flink 1.11??????????mysql????

2020-07-13 文章 ??????
flink??Kafka??mysqlmysqllinuxpython
 *.py


from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
 id VARCHAR,   
 alarm_id VARCHAR,   
 trck_id VARCHAR


) WITH (
 'connector' = 'kafka',
 'topic' = 'alarm_test_g',   
 'scan.startup.mode' = 'earliest-offset', 
 'properties.bootstrap.servers' = '10.2.2.73:2181',
 'properties.bootstrap.servers' = '10.2.2.73:9092',
 'format' = 'json'  
)
"""

sink="""
CREATE TABLE g_source_tab (
 id VARCHAR,    
 alarm_id VARCHAR,     
 trck_id VARCHAR


) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',  
 'table-name' = 'g',   
 'username' = 'root',
 'password' = '123456t',
 'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
        .select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")

Re: flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Leonard Xu
Hi, zhai

图挂了。。可以整个图床工具贴出来,如果是异常直接贴文本也可以的。

Best,
Leonard Xu

> 在 2020年7月13日,19:59,Yvette zhai  写道:
> 
> 大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
> 下载的flink-1.11.0-bin-scala_2.11.tgz
> 
> 执行命令是
> ./bin/kubernetes-session.sh \
> -Dkubernetes.cluster-id=k8s-session-1 \
> -Dtaskmanager.memory.process.size=4096m \
> -Dkubernetes.taskmanager.cpu=2 \
> -Dtaskmanager.numberOfTaskSlots=4 \
> -Dresourcemanager.taskmanager-timeout=360 \
> -Dkubernetes.container.image=flink:1.11.0-scala_2.11
> 
> 但是会报错,找不到configmap
> 
> 
> 我看是执行上述命令是会生成configmap的,为什么还会报找不到。
> 



Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-07-13 文章 Congxian Qiu
Hi Zhefu

感谢你在邮件列表分享你的解决方法,这样其他人遇到类似问题也有一个参考。

Best,
Congxian


Zhefu PENG  于2020年7月13日周一 下午7:51写道:

> Hi all,
>
> 这封邮件最开始发出已经一个月了,这一个月里尝试了很多朋友或者各位大佬的建议,目前经过一周末加上两个工作日的查看,问题看来是解决了。
>
>
> 问题的根本原因:Kafka集群的性能不足(怀疑是CPU负荷过大)。问题出现的时候线上kakfa集群只有七台机器,在排除所有别的原因以及能进行到的尝试方案后,决定进行扩容。扩到15台机器。目前来看,平稳运行,没有再报出类似错误。
>
> 反馈一下,如果有朋友遇到类似的问题,可以参考,给这个问题做一个闭环。谢谢各位的关注和帮忙。
>
> Best,
> Zhefu
>
> LakeShen  于2020年6月12日周五 上午9:49写道:
>
> > Hi ZheFu,
> >
> > 可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
> > 的数据是否都已经 Sink 到了 kafka.
> >
> > 也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
> > 还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。
> >
> > 具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。
> >
> > Best,
> > LakeShen
> >
> > Congxian Qiu  于2020年6月11日周四 上午9:50写道:
> >
> > > Hi
> > >
> > > 从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by:
> > > java.lang.IllegalStateException: Pending record count must be zero at
> > this
> > > point: 5”,需要看一下为什么会走到这里
> > >
> > > Best,
> > > Congxian
> > >
> > >
> > > 李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道:
> > >
> > > >
> > > >
> > >
> >
> 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。
> > > >
> > > > > 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> > > > >
> > > > > 补充一下,在TaskManager发现了如下错误日志:
> > > > >
> > > > > 2020-06-10 12:44:40,688 ERROR
> > > > > org.apache.flink.streaming.runtime.tasks.StreamTask   -
> Error
> > > > > during disposal of stream operator.
> > > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> > Failed
> > > > to
> > > > > send data to Kafka: Pending record count must be zero at this
> point:
> > 5
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> > > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> > > > > at java.lang.Thread.run(Thread.java:748)
> > > > > Caused by: java.lang.IllegalStateException: Pending record count
> must
> > > be
> > > > > zero at this point: 5
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> > > > > ... 8 more
> > > > >
> > > > > 希望得到帮助,感谢!
> > > > >
> > > > >
> > > > > Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> > > > >
> > > > >> Hi all,
> > > > >>
> > > > >> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> > > > >> Source: Custom Source -> Map -> Source_Map -> Empty_Filer ->
> > > > Field_Filter
> > > > >> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink:
> Unnamed
> > > > >>
> > > > >>
> > > > >>
> > > >
> > >
> >
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
> > > > >>
> > > > >> 部分报错信息如下:
> > > > >> 2020-06-10 12:02:49,083 INFO
> > > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > > Triggering
> > > > >> checkpoint 1 @ 1591761769060 for job
> > c41f4811262db1c4c270b136571c8201.
> > > > >> 2020-06-10 12:04:47,898 INFO
> > > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > Decline
> > > > >> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> > > > >> c41f4811262db1c4c270b136571c8201 at
> > > > >> container_e27_1591466310139_21670_01_06 @
> > > > >> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> > > > >> 2020-06-10 12:04:47,899 INFO
> > > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > > Discarding
> > > > >> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> > > > >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> > > > >> complete snapshot 1 for operator Source: Custom Source -> Map ->
> > > > Source_Map
> > > > >> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter ->
> Map
> > > ->
> > > > Map

Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-07-13 文章 Leonard Xu


> 反馈一下,如果有朋友遇到类似的问题,可以参考,给这个问题做一个闭环。谢谢各位的关注和帮忙。
> 
> Best,
> Zhefu

谢谢 zhefu,  给你大大点赞,很社区的方式,相信这样的积累越多,小伙伴们都能学习到更多。

祝好,
Leonard Xu
 


> 
> LakeShen  于2020年6月12日周五 上午9:49写道:
> 
>> Hi ZheFu,
>> 
>> 可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
>> 的数据是否都已经 Sink 到了 kafka.
>> 
>> 也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
>> 还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。
>> 
>> 具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。
>> 
>> Best,
>> LakeShen
>> 
>> Congxian Qiu  于2020年6月11日周四 上午9:50写道:
>> 
>>> Hi
>>> 
>>> 从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by:
>>> java.lang.IllegalStateException: Pending record count must be zero at
>> this
>>> point: 5”,需要看一下为什么会走到这里
>>> 
>>> Best,
>>> Congxian
>>> 
>>> 
>>> 李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道:
>>> 
 
 
>>> 
>> 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。
 
> 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> 
> 补充一下,在TaskManager发现了如下错误日志:
> 
> 2020-06-10 12:44:40,688 ERROR
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> during disposal of stream operator.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
>> Failed
 to
> send data to Kafka: Pending record count must be zero at this point:
>> 5
> at
> 
 
>>> 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> at
> 
 
>>> 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> at
> 
 
>>> 
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> at
> 
 
>>> 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> at
> 
 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> at
> 
 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> at
> 
 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Pending record count must
>>> be
> zero at this point: 5
> at
> 
 
>>> 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> at
> 
 
>>> 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> ... 8 more
> 
> 希望得到帮助,感谢!
> 
> 
> Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> 
>> Hi all,
>> 
>> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
>> Source: Custom Source -> Map -> Source_Map -> Empty_Filer ->
 Field_Filter
>> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
>> 
>> 
>> 
 
>>> 
>> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
>> 
>> 部分报错信息如下:
>> 2020-06-10 12:02:49,083 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
 Triggering
>> checkpoint 1 @ 1591761769060 for job
>> c41f4811262db1c4c270b136571c8201.
>> 2020-06-10 12:04:47,898 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
>>> Decline
>> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
>> c41f4811262db1c4c270b136571c8201 at
>> container_e27_1591466310139_21670_01_06 @
>> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
>> 2020-06-10 12:04:47,899 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
 Discarding
>> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
>> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
>> complete snapshot 1 for operator Source: Custom Source -> Map ->
 Source_Map
>> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map
>>> ->
 Map
>> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was
>>> declined.
>> at
>> 
 
>>> 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
>> at
>> 
 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
>> at
>> 
 
>>> 
>> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1354)
>> at
>> 
 
>>> 
>> org.apache.flink.streaming.runtime.ta

flink 1.11 on k8s native session cluster模式报找不到configmap

2020-07-13 文章 Yvette zhai
大佬们,请教个问题,我是在k8s上部署flink1.11 natvie session模式。
下载的flink-1.11.0-bin-scala_2.11.tgz

执行命令是
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=k8s-session-1 \
-Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.taskmanager.cpu=2 \
-Dtaskmanager.numberOfTaskSlots=4 \
-Dresourcemanager.taskmanager-timeout=360 \
-Dkubernetes.container.image=flink:1.11.0-scala_2.11

但是会报错,找不到configmap
[image: image.png]

我看是执行上述命令是会生成configmap的,为什么还会报找不到。


Re: 滑动窗口数据存储多份问题

2020-07-13 文章 Congxian Qiu
Hi

从 HeapListState#add 这里看是的,我跟了一个 WindowOperator 到最终 HeapListState
的逻辑,这里确实是只有一份数据,没有拷贝。这个东西的实现可能是因为性能好,我尝试确认下这个原因,多谢你的提问。

Best,
Congxian


Jimmy Zhang <13669299...@163.com> 于2020年7月12日周日 上午8:13写道:

> Hi,all!
>
> 从WindowOperator.java的processElement方法跟进去,使用windowState.add(element.getValue());添加数据,这里面找到add方法的HeapListState类的实现,
>
>
> @Override
>  public void add(V value) {
>   Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
>   final N namespace = currentNamespace;
>   final StateTable> map = stateTable;
>   List list = map.get(namespace);
>   if (list == null) {
>list = new ArrayList<>();
>map.put(namespace, list);
>   }
>   list.add(value);
>  }
> 就是这个方法,让我产生了 “此value只真实存在一份”的困惑!
> |
> Best,
> Jimmy
> |
> 签名由网易邮箱大师定制
> 在2020年7月11日 21:02,Congxian Qiu 写道:
> Hi
> 你说的 HeapListState 的困惑具体是什么呢?
>
> Best,
> Congxian
>
>
> Jimmy Zhang <13669299...@163.com> 于2020年7月11日周六 下午4:50写道:
>
> 嗯嗯,之前没有选择回复全部,不好意思。
>
>
> 我看源码关于RocksDB这块确实是需要序列化的,所以肯定是多份保存,如果状态后端是heap呢,也是一样的吗?从我测试内存来看,感觉也是多份,只是heapliststate那个类给了我一些困惑😦
>
>
> 在2020年07月11日 16:23,Congxian Qiu 写道:
> Hi
>
>
> 每个窗口都是一个单独的 state,至于你认为的不同 state 仅保持引用是不对的。这个你可以使用 RocksDBStateBackend
> 来考虑,RocksDBStateBackend 中会把 state 序列化成 bytes,然后写到 RocksDB 中,就是每个 State
> 中都会有一份。
>
>
> PS:回复邮件的时候可以选择「全部回复」这样就能够加上 "user-zh@flink.apache.org"),这样我们的邮件所有人都能看到了
>
>
> Best,
> Congxian
>
>
>
>
> 张浩  于2020年7月7日周二 上午10:34写道:
>
>
>
> Hi,我通过看源码发现每条数据到达时,是分配给了所有的窗口,但是我理解这单条数据是不是只是传递给了每个窗口,其实在内存中只有一份,窗口状态保持对它的引用,触发一次窗口就删掉对这些数据的引用?
> 很高兴与您探讨!
>
>
>
>
> | |
> 张浩
> |
> |
> 邮箱:zhanghao_w...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月06日 20:56,Congxian Qiu 写道:
> Hi
>
>
> 我理解,如果只存取一份的话,state 的管理会变得麻烦一些(所有需要这份数据的窗口都需要去某个地方取, state
> 什么时候清理逻辑也会变得麻烦一些)
>
>
> Best,
> Congxian
>
>
>
>
> 张浩  于2020年7月6日周一 下午1:57写道:
>
> 你好,我的思考是便于在状态信息中清除或者提取每一个窗口的数据信息。
> 不知道,我这样理解的对吗?
> 另外,为什么我们不能只存储一份数据呢?
> 非常感谢与您交流!
>
>
>
>
> | |
> 张浩
> |
> |
> 邮箱:zhanghao_w...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月06日 13:46,Congxian Qiu 写道:
> Hi
> 现在的实现是这样的,每条数据会在每个窗口中存一份
>
> Best,
> Congxian
>
>
> 张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道:
>
> Hi,all!
> 由于第一次咨询,我不确定上一份邮件大家是否收到。
> 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
> 份?
>
>
> | |
> 张浩
> |
> |
> 13669299...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: flink任务checkpoint无法完成snapshot,且报kafka异常

2020-07-13 文章 Zhefu PENG
Hi all,

这封邮件最开始发出已经一个月了,这一个月里尝试了很多朋友或者各位大佬的建议,目前经过一周末加上两个工作日的查看,问题看来是解决了。

问题的根本原因:Kafka集群的性能不足(怀疑是CPU负荷过大)。问题出现的时候线上kakfa集群只有七台机器,在排除所有别的原因以及能进行到的尝试方案后,决定进行扩容。扩到15台机器。目前来看,平稳运行,没有再报出类似错误。

反馈一下,如果有朋友遇到类似的问题,可以参考,给这个问题做一个闭环。谢谢各位的关注和帮忙。

Best,
Zhefu

LakeShen  于2020年6月12日周五 上午9:49写道:

> Hi ZheFu,
>
> 可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
> 的数据是否都已经 Sink 到了 kafka.
>
> 也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
> 还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。
>
> 具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。
>
> Best,
> LakeShen
>
> Congxian Qiu  于2020年6月11日周四 上午9:50写道:
>
> > Hi
> >
> > 从错误栈看是因为 task 端 snapshot 出问题了,原因是 “Caused by:
> > java.lang.IllegalStateException: Pending record count must be zero at
> this
> > point: 5”,需要看一下为什么会走到这里
> >
> > Best,
> > Congxian
> >
> >
> > 李奇 <359502...@qq.com> 于2020年6月10日周三 下午5:57写道:
> >
> > >
> > >
> >
> 哈喽,根据我自己遇到checkpoint失败,一般是因为你数据有问题,导致算子失败,有可能是数据格式,或者字段类型不匹配,字段数量等相关的原因造成,我看你补充的内容,好像是你kafka数据有问题样,你可以往这个方向看看数据是否正常。解析是否正确。
> > >
> > > > 在 2020年6月10日,下午1:24,Zhefu PENG  写道:
> > > >
> > > > 补充一下,在TaskManager发现了如下错误日志:
> > > >
> > > > 2020-06-10 12:44:40,688 ERROR
> > > > org.apache.flink.streaming.runtime.tasks.StreamTask   - Error
> > > > during disposal of stream operator.
> > > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed
> > > to
> > > > send data to Kafka: Pending record count must be zero at this point:
> 5
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1218)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:861)
> > > > at
> > > >
> > >
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
> > > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> > > > at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.lang.IllegalStateException: Pending record count must
> > be
> > > > zero at this point: 5
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:969)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:834)
> > > > ... 8 more
> > > >
> > > > 希望得到帮助,感谢!
> > > >
> > > >
> > > > Zhefu PENG  于2020年6月10日周三 下午1:03写道:
> > > >
> > > >> Hi all,
> > > >>
> > > >> 现在有一个简单的flink任务,大概chain在一起后的执行图为:
> > > >> Source: Custom Source -> Map -> Source_Map -> Empty_Filer ->
> > > Field_Filter
> > > >> -> Type_Filter -> Value_Filter -> Map -> Map -> Map -> Sink: Unnamed
> > > >>
> > > >>
> > > >>
> > >
> >
> 但是在上线一段时间后,开始报错,日志中有说到无法完成checkpoint,还提到有kafka的网络和连接异常。但还有别的flink任务在相同的broker上进行数据的读写,并且没有报错。我们暂时定位在,有可能每个checkpoint的完成时间比较长,需要几百毫秒,我们设的时间间隔又比较短,只有一秒,可能是这部分影响到了任务的性能。但是这只是一个不太靠谱的猜想,现在也没有什么排查的切入点,想看看大家有没有一些看法或者建议意见,非常感谢。
> > > >>
> > > >> 部分报错信息如下:
> > > >> 2020-06-10 12:02:49,083 INFO
> > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > Triggering
> > > >> checkpoint 1 @ 1591761769060 for job
> c41f4811262db1c4c270b136571c8201.
> > > >> 2020-06-10 12:04:47,898 INFO
> > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > Decline
> > > >> checkpoint 1 by task 0cb03590fdf18027206ef628b3ef5863 of job
> > > >> c41f4811262db1c4c270b136571c8201 at
> > > >> container_e27_1591466310139_21670_01_06 @
> > > >> hdp1-hadoop-datanode-4.novalocal (dataPort=44778).
> > > >> 2020-06-10 12:04:47,899 INFO
> > > >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> > > Discarding
> > > >> checkpoint 1 of job c41f4811262db1c4c270b136571c8201.
> > > >> org.apache.flink.runtime.checkpoint.CheckpointException: Could not
> > > >> complete snapshot 1 for operator Source: Custom Source -> Map ->
> > > Source_Map
> > > >> -> Empty_Filer -> Field_Filter -> Type_Filter -> Value_Filter -> Map
> > ->
> > > Map
> > > >> -> Map -> Sink: Unnamed (7/12). Failure reason: Checkpoint was
> > declined.
> > > >> at
> > > >>
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434)
> > > >> at
> > > >>
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1420)
> > > >> at
> > > >>
> > >
> >
> org.apache.fli

Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
创建kafka_table需要在default dialect下。

不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)

Best,
Jingsong

On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach  wrote:

> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
> 如果是default Dialect创建的表,是不是只是在临时会话有效
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 19:27:44,"Jingsong Li"  写道:
> >Hi,
> >
> >问题一:
> >
> >只要current catalog是HiveCatalog。
> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
> >
> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
> >
> >问题二:
> >
> >用filesystem创建出来的是filesystem的表,它和hive
> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
> >
> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
> >但是它的partition commit是不支持metastore的,所以不会有自动add
> >partition到hive的默认实现,你需要自定义partition-commit-policy.
> >
> >[1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach  wrote:
> >
> >> 尴尬
> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
> >> 还有两个问题问下,
> >> 问题1:
> >> 创建的kafka_table,在hive和Flink
> >>
> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
> >>
> >>
> >>
> >>
> >>
> >>
> >> 问题2:
> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
> >> java.util.concurrent.CompletionException:
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> Could not execute application.
> >> at
> >>
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >> ~[?:1.8.0_161]
> >> at
> >>
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >> ~[?:1.8.0_161]
> >> at
> >>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> >> ~[?:1.8.0_161]
> >> at
> >>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >> ~[?:1.8.0_161]
> >> at
> >>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >> ~[?:1.8.0_161]
> >> at
> >>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >> ~[?:1.8.0_161]
> >> at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> at
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> [?:1.8.0_161]
> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> [?:1.8.0_161]
> >> at
> >>
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> >> [qile-data-flow-1.0.jar:?]
> >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >> [qile-data-flow-1.0.jar:?]
> >> at
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> >> [qile-data-flow-1.0.jar:?]
> >> at
> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> [qile-data-flow-1.0.jar:?]
> >> at
> >>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> [qile-data-flow-1.0.jar:?]
> >> at
> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> [qile-data-flow-1.0.jar:?]
> >> at
> >>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> [qile-data-flow-1.0.jar:?]
> >> Caused by:
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> Could not execute application.
> >> ... 11 more
> >> Caused by: org.apache.flink.client.program.ProgramInvocationException:
> The
> >> main method caused an error: Unable to create a sink for writing table
> >> 'default_catalog.default_database.hive_table1'.
> >>
> >> Table options are:
> >>
> >> 'connector'='filesystem'
> >> 'hive.storage.file-format'='parquet'
> >> 'is_generic'='false'
> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> 'sink.partition-commit.delay'='0s'
> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]

Re:Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
如果是default Dialect创建的表,是不是只是在临时会话有效

















在 2020-07-13 19:27:44,"Jingsong Li"  写道:
>Hi,
>
>问题一:
>
>只要current catalog是HiveCatalog。
>理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
>
>明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
>
>问题二:
>
>用filesystem创建出来的是filesystem的表,它和hive
>metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
>
>filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
>但是它的partition commit是不支持metastore的,所以不会有自动add
>partition到hive的默认实现,你需要自定义partition-commit-policy.
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach  wrote:
>
>> 尴尬
>> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
>> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
>> 还有两个问题问下,
>> 问题1:
>> 创建的kafka_table,在hive和Flink
>> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
>>
>>
>>
>>
>>
>>
>> 问题2:
>> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
>> java.util.concurrent.CompletionException:
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> Could not execute application.
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>> ~[?:1.8.0_161]
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>> ~[?:1.8.0_161]
>> at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>> ~[?:1.8.0_161]
>> at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>> ~[?:1.8.0_161]
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> ~[?:1.8.0_161]
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>> ~[?:1.8.0_161]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> [?:1.8.0_161]
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> [?:1.8.0_161]
>> at
>> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
>> [qile-data-flow-1.0.jar:?]
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> [qile-data-flow-1.0.jar:?]
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> [qile-data-flow-1.0.jar:?]
>> at
>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> [qile-data-flow-1.0.jar:?]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> [qile-data-flow-1.0.jar:?]
>> at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> [qile-data-flow-1.0.jar:?]
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> [qile-data-flow-1.0.jar:?]
>> Caused by:
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> Could not execute application.
>> ... 11 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error: Unable to create a sink for writing table
>> 'default_catalog.default_database.hive_table1'.
>>
>> Table options are:
>>
>> 'connector'='filesystem'
>> 'hive.storage.file-format'='parquet'
>> 'is_generic'='false'
>> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> 'sink.partition-commit.delay'='0s'
>> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
>> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
>> ... 10 more
>> Caused by: org.apache.flink.table.api.ValidationException: Unable to
>> create a sink for 

Re: 单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及sink顺序,是否可以做到?

2020-07-13 文章 Jark Wu
你可以在 mysqlSinkFunction 中攒 buffer,在 timer trigger 或者 checkpoint 时 flush
mysql database,以及 output。

On Mon, 13 Jul 2020 at 15:36, jindy_liu <286729...@qq.com> wrote:

>
>
> 如果可以chain在一起,这个可以保证顺序性,我去试试。
>
> 这里再追问下,实际中,如里单流源里的数据也要顺序处理,可以设置并行度为1;
>
> 这里可能也要考虑下mysqlSinkFunction里的sink效率问题,需要积累一些数据再sink,这里可以做到跟kafka
> sink的batch_size和linger.ms配合联动吗?比如满1000条,或超过1s,就同时sink?
>
> 谢谢~
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
Hi,

问题一:

只要current catalog是HiveCatalog。
理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.

明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?

问题二:

用filesystem创建出来的是filesystem的表,它和hive
metastore是没有关系的,你需要使用创建filesystem表的语法[1]。

filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
但是它的partition commit是不支持metastore的,所以不会有自动add
partition到hive的默认实现,你需要自定义partition-commit-policy.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html

Best,
Jingsong

On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach  wrote:

> 尴尬
> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
> 还有两个问题问下,
> 问题1:
> 创建的kafka_table,在hive和Flink
> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
>
>
>
>
>
>
> 问题2:
> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
> java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> ~[?:1.8.0_161]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> ~[?:1.8.0_161]
> at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> ~[?:1.8.0_161]
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> ~[?:1.8.0_161]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> ~[?:1.8.0_161]
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> ~[?:1.8.0_161]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_161]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_161]
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> [qile-data-flow-1.0.jar:?]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> [qile-data-flow-1.0.jar:?]
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> [qile-data-flow-1.0.jar:?]
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [qile-data-flow-1.0.jar:?]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [qile-data-flow-1.0.jar:?]
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [qile-data-flow-1.0.jar:?]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [qile-data-flow-1.0.jar:?]
> Caused by:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
> ... 11 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error: Unable to create a sink for writing table
> 'default_catalog.default_database.hive_table1'.
>
> Table options are:
>
> 'connector'='filesystem'
> 'hive.storage.file-format'='parquet'
> 'is_generic'='false'
> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> 'sink.partition-commit.delay'='0s'
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> ... 10 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create a sink for writing table
> 'default_catalog.default_database.hive_table1'.
>
> Table options are:
>
> 'connector'='filesystem'
> 'hive.storage.file-format'='parquet'
> 'is_generic'='false'
> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> 'sink.partition-commit.delay'='0s'
> 'sink.parti

Re: flink 1.11 createTemporaryTable 指定 rowtime 字段报 Field null does not exist 错误

2020-07-13 文章 Jark Wu
能贴下完整的异常栈么?

Btw,TableEnvironment上的 connect API 目前不建议使用,有许多已知的问题和缺失的 feature,建议用
executeSql(ddl) 来替代。
社区计划在 1.12 中系统地重构和修复 connect API 。

Best,
Jark

On Mon, 13 Jul 2020 at 17:24, Hito Zhu  wrote:

> 使用 flink 1.11 的 tableEnv 的 createTemporaryTable 取注册表,指定
> createTemporaryTable
> 为事件时间,程序包 Field null does not exist 错误,是我用法有问题?
> 看了下  https://issues.apache.org/jira/browse/FLINK-16160
>    这个 issue 是解决的这个问题吗?
>
> tableEnv.connect(kafka)
> .withSchema(
>   new Schema().field("searchTime",
> DataTypes.TIMESTAMP()).rowtime(rowtime);
> )
> .withFormat(
> new Json().failOnMissingField(false)
> )
> .createTemporaryTable("tablename");
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11写入mysql问题

2020-07-13 文章 Jark Wu
请问你是怎么提交的作业呢? 是在本地 IDEA 里面执行的,还是打成 jar 包后提交到集群运行的呢?

On Mon, 13 Jul 2020 at 17:58, 小学霸  wrote:

> 各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic, CheckpointingMode
> from pyflink.table import StreamTableEnvironment, EnvironmentSettings
> source="""
> CREATE TABLE kafka_source_tab (
>  id VARCHAR,   
>  alarm_id VARCHAR,   
>  trck_id VARCHAR
>
>
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'alarm_test_g',   
>  'scan.startup.mode' = 'earliest-offset',
>  'properties.bootstrap.servers' = '10.2.2.73:2181',
>  'properties.bootstrap.servers' = '10.2.2.73:9092',
>  'format' = 'json' 
> )
> """
>
> sink="""
> CREATE TABLE g_source_tab (
>  id VARCHAR,   
>  alarm_id VARCHAR,     
>  trck_id VARCHAR
>
>
> ) WITH (
>  'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false', 
>  'table-name' = 'g',   
>  'username' = 'root',
>  'password' = '123456t',
>  'sink.buffer-flush.interval' = '1s'
> )
> """
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_parallelism(1)
> env_settings =
> EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(env,
> environment_settings=env_settings)
>
>
>
> t_env.execute_sql(source)
> t_env.execute_sql(sink)
>
>
> source = t_env.from_path("kafka_source_tab")\
>         .select("id,alarm_id,trck_id")
> source.execute_insert("g_source_tab")


Re:Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
尴尬
我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅 
这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
还有两个问题问下,
问题1:
创建的kafka_table,在hive和Flink 
SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore






问题2:
刚才有热心社区同学回答,不用hivecatalog,用filesystem connector 也是可以创建hive表,我尝试了一下,报错了:
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
 ~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) 
~[?:1.8.0_161]
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
 ~[?:1.8.0_161]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_161]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_161]
at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
 [qile-data-flow-1.0.jar:?]
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
[qile-data-flow-1.0.jar:?]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 [qile-data-flow-1.0.jar:?]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[qile-data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[qile-data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[qile-data-flow-1.0.jar:?]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[qile-data-flow-1.0.jar:?]
Caused by: 
org.apache.flink.client.deployment.application.ApplicationExecutionException: 
Could not execute application.
... 11 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: Unable to create a sink for writing table 
'default_catalog.default_database.hive_table1'.

Table options are:

'connector'='filesystem'
'hive.storage.file-format'='parquet'
'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
'sink.partition-commit.delay'='0s'
'sink.partition-commit.policy.kind'='metastore,success-file'
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) 
~[flink-clients_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:230)
 ~[flink-clients_2.11-1.11.0.jar:1.11.0]
... 10 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a 
sink for writing table 'default_catalog.default_database.hive_table1'.

Table options are:

'connector'='filesystem'
'hive.storage.file-format'='parquet'
'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
'sink.partition-commit.delay'='0s'
'sink.partition-commit.policy.kind'='metastore,success-file'
at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
 ~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.sca

回复: Flink es7 connector认证问题

2020-07-13 文章 李宇彬
感谢,已找到问题原因,这个provider变量应该放到setHttpClientConfigCallback内部,之前是作为私有成员变量transient声明的,会导致taskmanager无法拿到认证信息
String user = pt.get("es.user.name");
String password = pt.get("es.user.password");
esSinkBuilder.setRestClientFactory(
(RestClientBuilder restClientBuilder) ->
restClientBuilder
.setHttpClientConfigCallback(httpClientBuilder 
->
{
CredentialsProvider provider = new 
BasicCredentialsProvider();
provider.setCredentials(AuthScope.ANY,
new 
UsernamePasswordCredentials(user, password));
httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证
return httpClientBuilder.setDefaultCredentialsProvider(provider);
}
)
);
在2020年7月13日 15:33,Yangze Guo 写道:
Hi,

请问您有检查过pt.get("es.user.name"),
pt.get("es.user.password")这两个参数读出来是否都是正确的,另外更完整的错误栈方便提供下么?

Best,
Yangze Guo

On Mon, Jul 13, 2020 at 3:10 PM 李宇彬  wrote:

各位好,
请教一个问题
我们生产环境的es7是有用户名密码认证的,使用如下代码启动后会报,这段代码调用了es rest client api,单独使用是没问题的,不过放到 flink 
里就报错了
org.elasticsearch.client.ResponseException: method [HEAD], host [xxx], URI [/], 
status line [HTTP/1.1 401 Unauthorized]
ParameterTool pt = ParameterTool.fromArgs(args);
String confFile = pt.get("confFile");
pt = ParameterTool.fromPropertiesFile(new File(confFile));
provider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(pt.get("es.user.name"), 
pt.get("es.user.password")));

esSinkBuilder.setRestClientFactory(
(RestClientBuilder restClientBuilder) ->
restClientBuilder
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setSocketTimeout(18)
.setConnectionRequestTimeout(1)
)
.setHttpClientConfigCallback(httpClientBuilder ->
{
httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证
return httpClientBuilder.setDefaultCredentialsProvider(provider);
}
)
);


Re: flink 1.11 es未定义pk的sink问题

2020-07-13 文章 Leonard Xu
HI,fulin

如 Yangze所说,这是es6 new connector 引入的一个bug,  你可以使用用old 
connector的语法绕过,就是connector.type=’xx’ ,这样代码路径还走之前的代码, 或者使用es7 nconnector。

祝好,
Leonard Xu

> 在 2020年7月13日,17:19,Yangze Guo  写道:
> 
> 验证了一下,这确实是一个bug,原因出在这一行[1]。我会提一个ticket来解决它,争取在1.11.1修复。
> 
> [1] 
> https://github.com/apache/flink/blob/0fbea46ac0271dd84fa8acd7f99f449a9a0d458c/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java#L285
> 
> Best,
> Yangze Guo
> 
> On Mon, Jul 13, 2020 at 3:44 PM sunfulin  wrote:
>> 
>> hi,YangZe,Leonard,
>> 我增加了一个可以复现问题的测试类,可以执行下看看。可以明显观察到,两个sink在有PK时写入正常,在没有PK时只有一条记录(id是索引名)。
>> 
>> import org.apache.flink.api.common.typeinfo.Types;
>> 
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> 
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> 
>> import 
>> org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
>> 
>> import org.apache.flink.table.api.EnvironmentSettings;
>> 
>> import org.apache.flink.table.api.StatementSet;
>> 
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> 
>> import org.apache.flink.types.Row;
>> 
>> 
>> import static org.apache.flink.table.api.Expressions.$;
>> 
>> 
>> public class ESNewJobTest {
>> 
>> 
>>//构建StreamExecutionEnvironment
>> 
>>public static final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 
>>//构建EnvironmentSettings 并指定Blink Planner
>> 
>>private static final EnvironmentSettings bsSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> 
>> 
>>//构建StreamTableEnvironment
>> 
>>public static final StreamTableEnvironment tEnv = 
>> StreamTableEnvironment.create(env, bsSettings);
>> 
>> 
>>//DDL语句
>> 
>>public static final String ES_SINK_DDL_NO_PK = "CREATE TABLE 
>> es_sink_test_no_pk (\n" +
>> 
>>"  idx integer,\n" +
>> 
>>"  firstx varchar\n" +
>> 
>>") WITH (\n" +
>> 
>>"'connector' = 'elasticsearch-6',\n" +
>> 
>>"'hosts' = '168.61.113.171:9200',\n" +
>> 
>>"'index' = 'es_sink_test_no_pk',\n" +
>> 
>>"'document-type' = 'default',\n" +
>> 
>>"'document-id.key-delimiter' = '$',\n" +
>> 
>>"'sink.bulk-flush.interval' = '1000',\n" +
>> 
>>"'failure-handler' = 'fail',\n" +
>> 
>>"'format' = 'json'\n" +
>> 
>>")";
>> 
>>public static final String ES_SINK_DDL_WITH_PK = "CREATE TABLE 
>> es_sink_test_with_pk (\n" +
>> 
>>"  idx integer,\n" +
>> 
>>"  firstx varchar,\n" +
>> 
>>"  primary key (idx, firstx) not enforced\n" +
>> 
>>") WITH (\n" +
>> 
>>"'connector' = 'elasticsearch-6',\n" +
>> 
>>"'hosts' = '168.61.113.171:9200',\n" +
>> 
>>"'index' = 'es_sink_test_with_pk',\n" +
>> 
>>"'document-type' = 'default',\n" +
>> 
>>"'document-id.key-delimiter' = '$',\n" +
>> 
>>"'sink.bulk-flush.interval' = '1000',\n" +
>> 
>>"'failure-handler' = 'fail',\n" +
>> 
>>"'format' = 'json'\n" +
>> 
>>")";
>> 
>> 
>>public static String getCharAndNumr(int length) {
>> 
>>StringBuffer valSb = new StringBuffer();
>> 
>>for (int i = 0; i < length; i++) {
>> 
>>String charOrNum = Math.round(Math.random()) % 2 == 0 ? "char" : 
>> "num"; // 输出字母还是数字
>> 
>>if ("char".equalsIgnoreCase(charOrNum)) {
>> 
>>// 字符串
>> 
>>int choice = Math.round(Math.random()) % 2 == 0 ? 65 : 97;  
>> // 取得大写字母还是小写字母
>> 
>>valSb.append((char) (choice + Math.round(Math.random()*25)));
>> 
>>} else if ("num".equalsIgnoreCase(charOrNum)) {
>> 
>>// 数字
>> 
>>valSb.append(String.valueOf(Math.round(Math.random()*9)));
>> 
>>}
>> 
>>}
>> 
>>return valSb.toString();
>> 
>> 
>>}
>> 
>> 
>>public static void main(String[] args) throws Exception {
>> 
>> 
>>DataStream ds = env.addSource(new 
>> RichParallelSourceFunction() {
>> 
>> 
>>volatile boolean flag = true;
>> 
>> 
>>@Override
>> 
>>public void run(SourceContext ctx) throws Exception {
>> 
>>while (flag) {
>> 
>>Row row = new Row(2);
>> 
>>row.setField(0, 2207);
>> 
>>row.setField(1, getCharAndNumr(4));
>> 
>>ctx.collect(row);
>> 
>>Thread.sleep(1000);
>> 
>>}
>> 
>> 
>>}
>> 
>> 
>>@Override
>> 
>>public void cancel() {
>> 
>>flag = false;
>> 
>>}
>> 
>>}).setParallelism(1).returns(Types.ROW(Types.INT, Types.STRING));
>> 
>> 
>> 
>>//ES sink测试ddl
>> 

flink 1.11????mysql????

2020-07-13 文章 ??????
flink??Kafka??mysqlmysql??
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
 id VARCHAR,   
 alarm_id VARCHAR,   
 trck_id VARCHAR


) WITH (
 'connector' = 'kafka',
 'topic' = 'alarm_test_g',   
 'scan.startup.mode' = 'earliest-offset', 
 'properties.bootstrap.servers' = '10.2.2.73:2181',
 'properties.bootstrap.servers' = '10.2.2.73:9092',
 'format' = 'json'  
)
"""

sink="""
CREATE TABLE g_source_tab (
 id VARCHAR,    
 alarm_id VARCHAR,     
 trck_id VARCHAR


) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',  
 'table-name' = 'g',   
 'username' = 'root',
 'password' = '123456t',
 'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
        .select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")

回复:退订

2020-07-13 文章 苑士旸
谢谢,已经找到


| |
yuanshiyang
|
|
邮箱yuanshiy...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月13日 17:55,Jingsong Li 写道:
Hi

退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org

Best
Jingsong

On Mon, Jul 13, 2020 at 5:53 PM 苑士旸  wrote:

>
>
>
> | |
> yuanshiyang
> |
> |
> 邮箱yuanshiy...@163.com
> |
>
> 签名由 网易邮箱大师 定制



--
Best, Jingsong Lee


Re: 退订

2020-07-13 文章 Jingsong Li
Hi

退订应该发这个邮箱:user-zh-unsubscr...@flink.apache.org

Best
Jingsong

On Mon, Jul 13, 2020 at 5:53 PM 苑士旸  wrote:

>
>
>
> | |
> yuanshiyang
> |
> |
> 邮箱yuanshiy...@163.com
> |
>
> 签名由 网易邮箱大师 定制



-- 
Best, Jingsong Lee


Re: pyflink问题求助

2020-07-13 文章 Xingbo Huang
Hi hieule,
This work around method is used in flink 1.10, in flink 1.11 you can use
ddl directly (blink planner) which you can refer to [1].
For how to use blink planner in PyFlink, you can refer to following code:

t_env = BatchTableEnvironment.create(
environment_settings=EnvironmentSettings.new_instance()
.in_batch_mode().use_blink_planner().build())

t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
'80m')

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

Best,
Xingbo

hieule  于2020年7月13日周一 下午4:46写道:

>  hello Xingbo Huang,
>
> when I run , I had some error
>
> `TypeError: Could not found the Java class
> 'org.apache.flink.api.java.io.jdbc.JDBCAppendTableSinkBuilder'. The Java
> dependencies could be specified via command line argument '--jarfile' or
> the
> config option 'pipeline.jars' `
>
>
> how to solve issue ?
>
> Thank
> hieule
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Jingsong Li
你把完整的程序再贴下呢

Best,
Jingsong

On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach  wrote:

> Hi,
>
>
> 我现在改成了:
> 'sink.partition-commit.delay'='0s'
>
>
> checkpoint完成了20多次,hdfs文件也产生了20多个,
> hive表还是查不到数据
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 17:23:34,"夏帅"  写道:
>
> 你好,
> 你设置了1个小时的
> SINK_PARTITION_COMMIT_DELAY
>
>
> --
> 发件人:Zhou Zach 
> 发送时间:2020年7月13日(星期一) 17:09
> 收件人:user-zh 
> 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for
> discovering a connector.
>
>
> 开了checkpoint,
> val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>
> streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamExecutionEnv.enableCheckpointing(5 * 1000,
> CheckpointingMode.EXACTLY_ONCE)
> streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)
>
>
>
>
> 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 16:52:16,"Jingsong Li"  写道:
> >有开checkpoint吧?delay设的多少?
> >
> >Add partition 在 checkpoint完成 + delay的时间后
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach  wrote:
> >
> >> Hi,
> >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
> >> partition到hive表吗,我当前设置了参数
> >> 'sink.partition-commit.policy.kind'='metastore'
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> At 2020-07-13 15:01:28, "Jingsong Li"  wrote:
> >> >Hi,
> >> >
> >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
> >> >
> >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach  wrote:
> >> >
> >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
> >> >>
> >>
> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
> >> >> h','sink.partition-commit.policy.kind'='success-file');
> >> >> 也报错误
> >> >> query:
> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |
> >> >> |CREATE TABLE hive_table (
> >> >> |  user_id STRING,
> >> >> |  age INT
> >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
> >> >> TBLPROPERTIES (
> >> >> |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
> >> >> |  'sink.partition-commit.trigger'='partition-time',
> >> >> |  'sink.partition-commit.delay'='1 h',
> >> >> |  'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> |)
> >> >> |
> >> >> |""".stripMargin)
> >> >>
> >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |CREATE TABLE kafka_table (
> >> >> |uid VARCHAR,
> >> >> |-- uid BIGINT,
> >> >> |sex VARCHAR,
> >> >> |age INT,
> >> >> |created_time TIMESTAMP(3),
> >> >> |WATERMARK FOR created_time as created_time - INTERVAL '3'
> >> SECOND
> >> >> |) WITH (
> >> >> |'connector.type' = 'kafka',
> >> >> |'connector.version' = 'universal',
> >> >> | 'connector.topic' = 'user',
> >> >> |-- 'connector.topic' = 'user_long',
> >> >> |'connector.startup-mode' = 'latest-offset',
> >> >> |'connector.properties.zookeeper.connect' =
> >> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
> >> >> |'connector.properties.bootstrap.servers' =
> >> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
> >> >> |'connector.properties.group.id' = 'user_flink',
> >> >> |'format.type' = 'json',
> >> >> |'format.derive-schema' = 'true'
> >> >> |)
> >> >> |""".stripMargin)
> >> >>
> >> >>
> >> >>
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |INSERT INTO hive_table
> >> >> |SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'),
> >> >> DATE_FORMAT(created_time, 'HH')
> >> >> |FROM kafka_table
> >> >> |
> >> >> |""".stripMargin)
> >> >>
> >> >> streamTableEnv.executeSql(
> >> >> """
> >> >> |
> >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
> >> >> |
> >> >> |""".stripMargin)
> >> >> .print()
> >> >> 错误栈:
> >> >> Exception in thread "main"
> >> org.apache.flink.table.api.ValidationException:
> >> >> Unable to create a sink for writing table
> >> >> 'default_catalog.default_database.hive_table'.
> >> >>
> >> >> Table options are:
> >> >>
> >> >> 'hive.storage.file-format'='parquet'
> >> >> 'is_generic'='false'
> >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
> >> >> 'sink.partition-commit.delay'='1 h'
> >> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
> >> >> 'sink.partition-commit.trigger'='partition-time'
> >> >> at
> >> >>
> >>
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> >> >> at
> >> >>
> >>
> org.apache.flink.table.planner.delegation.Plann

退订

2020-07-13 文章 苑士旸



| |
yuanshiyang
|
|
邮箱yuanshiy...@163.com
|

签名由 网易邮箱大师 定制

flink 1.11????mysql????

2020-07-13 文章 ??????
flink??Kafka??mysqlmysql??
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
 id VARCHAR,   
 alarm_id VARCHAR,   
 trck_id VARCHAR


) WITH (
 'connector' = 'kafka',
 'topic' = 'alarm_test_g',   
 'scan.startup.mode' = 'earliest-offset', 
 'properties.bootstrap.servers' = '10.2.2.73:2181',
 'properties.bootstrap.servers' = '10.2.2.73:9092',
 'format' = 'json'  
)
"""

sink="""
CREATE TABLE g_source_tab (
 id VARCHAR,    
 alarm_id VARCHAR,     
 trck_id VARCHAR


) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',  
 'table-name' = 'g',   
 'username' = 'root',
 'password' = '123456t',
 'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = 
EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)



t_env.execute_sql(source)
t_env.execute_sql(sink)


source = t_env.from_path("kafka_source_tab")\
        .select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")

Re:回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-13 文章 Zhou Zach
Hi,


我现在改成了:
'sink.partition-commit.delay'='0s'


checkpoint完成了20多次,hdfs文件也产生了20多个,
hive表还是查不到数据













在 2020-07-13 17:23:34,"夏帅"  写道:

你好,
你设置了1个小时的
SINK_PARTITION_COMMIT_DELAY


--
发件人:Zhou Zach 
发送时间:2020年7月13日(星期一) 17:09
收件人:user-zh 
主 题:Re:Re: Re: Table options do not contain an option key 'connector' for 
discovering a connector.


开了checkpoint,
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)




间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据














在 2020-07-13 16:52:16,"Jingsong Li"  写道:
>有开checkpoint吧?delay设的多少?
>
>Add partition 在 checkpoint完成 + delay的时间后
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach  wrote:
>
>> Hi,
>> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> partition到hive表吗,我当前设置了参数
>> 'sink.partition-commit.policy.kind'='metastore'
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-07-13 15:01:28, "Jingsong Li"  wrote:
>> >Hi,
>> >
>> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >
>> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach  wrote:
>> >
>> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> 也报错误
>> >> query:
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |
>> >> |CREATE TABLE hive_table (
>> >> |  user_id STRING,
>> >> |  age INT
>> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> TBLPROPERTIES (
>> >> |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >> |  'sink.partition-commit.trigger'='partition-time',
>> >> |  'sink.partition-commit.delay'='1 h',
>> >> |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> |)
>> >> |
>> >> |""".stripMargin)
>> >>
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |CREATE TABLE kafka_table (
>> >> |uid VARCHAR,
>> >> |-- uid BIGINT,
>> >> |sex VARCHAR,
>> >> |age INT,
>> >> |created_time TIMESTAMP(3),
>> >> |WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >> |) WITH (
>> >> |'connector.type' = 'kafka',
>> >> |'connector.version' = 'universal',
>> >> | 'connector.topic' = 'user',
>> >> |-- 'connector.topic' = 'user_long',
>> >> |'connector.startup-mode' = 'latest-offset',
>> >> |'connector.properties.zookeeper.connect' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >> |'connector.properties.bootstrap.servers' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >> |'connector.properties.group.id' = 'user_flink',
>> >> |'format.type' = 'json',
>> >> |'format.derive-schema' = 'true'
>> >> |)
>> >> |""".stripMargin)
>> >>
>> >>
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |INSERT INTO hive_table
>> >> |SELECT uid, age, DATE_FORMAT(created_time, '-MM-dd'),
>> >> DATE_FORMAT(created_time, 'HH')
>> >> |FROM kafka_table
>> >> |
>> >> |""".stripMargin)
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >> |
>> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >> |
>> >> |""".stripMargin)
>> >> .print()
>> >> 错误栈:
>> >> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:
>> >> Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table'.
>> >>
>> >> Table options are:
>> >>
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='1 h'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> 'sink.partition-commit.trigger'='partition-time'
>> >> at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >> at
>> >>
>> scala.collec

  1   2   >