sink.rolling-policy.file-size不生效

2020-12-03 文章 admin
Hi all,
使用flink 1.11.1的filesystem 
connector,配置了sink.rolling-policy.file-size=50MB,结果依然有100+M的文件
DDL如下:Checkpoint间隔1min
CREATE TABLE cpc_bd_recall_log_hdfs (
log_timestamp BIGINT,
ip STRING,
`raw` STRING,
`day` STRING, `hour` STRING,`minute` STRING
) PARTITIONED BY (`day` , `hour` ,`minute`) WITH (
'connector'='filesystem',
'path'='hdfs://xxx/test.db/cpc_bd_recall_log_hdfs',
'format'='parquet',
'parquet.compression'='SNAPPY',
'sink.rolling-policy.file-size' = '50MB',
'sink.partition-commit.policy.kind' = 'success-file',
'sink.partition-commit.delay'='60s'
);


Hdfs文件如下:

  0 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/_SUCCESS
-rw-r--r--   3 hadoop hadoop 31.7 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-0-2500
-rw-r--r--   3 hadoop hadoop121.8 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-0-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-1-2499
-rw-r--r--   3 hadoop hadoop122.0 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-1-2500
-rw-r--r--   3 hadoop hadoop 31.8 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-10-2501
-rw-r--r--   3 hadoop hadoop121.8 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-10-2502
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-11-2500
-rw-r--r--   3 hadoop hadoop122.2 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-11-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-12-2500
-rw-r--r--   3 hadoop hadoop122.2 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-12-2501
-rw-r--r--   3 hadoop hadoop 31.8 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-13-2499
-rw-r--r--   3 hadoop hadoop122.0 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-13-2500
-rw-r--r--   3 hadoop hadoop 31.6 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-14-2500
-rw-r--r--   3 hadoop hadoop122.1 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-14-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-15-2498
-rw-r--r--   3 hadoop hadoop121.8 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-15-2499
-rw-r--r--   3 hadoop hadoop 31.7 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-16-2501
-rw-r--r--   3 hadoop hadoop122.0 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-16-2502
-rw-r--r--   3 hadoop hadoop 31.7 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-17-2500
-rw-r--r--   3 hadoop hadoop122.5 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-17-2501
-rw-r--r--   3 hadoop hadoop 31.8 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-18-2500
-rw-r--r--   3 hadoop hadoop121.7 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-18-2501
-rw-r--r--   3 hadoop hadoop 31.9 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-19-2501
-rw-r--r--   3 hadoop hadoop121.7 M 2020-12-04 14:56 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-19-2502
-rw-r--r--   3 hadoop hadoop 31.6 M 2020-12-04 14:55 
hdfs://xxx/test.db/hdfs_test/day=2020-12-04/hour=14/minute=55/part-3dca3b00-fd94-4f49-bdf8-a8b65bcfa92c-2-2499
-rw-r--r--   3 hadoop hadoop121

Re: 为什么要关闭calcite的隐式转换功能

2020-12-03 文章 stgztsw
我觉得既然社区准备兼容hive,隐式转换和其他hive的语法兼容还是必须的。实际生产环境里运行的hive
sql往往都是很复杂的,目前按flink对于hive的兼容程度,大部分的hivesql基本都无法运行成功。(其他欠缺的还有不支持bangEquel,
create table as 等等,这边就不一一列举了),希望社区能够对hive这块支持的更完善一点。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink TableAPI Issue: cannot assign instance of org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.LRUMap to field

2020-12-03 文章 Zed
我本地运行是没有问题的,上传到服务器就会出现这个问题,本地跟服务器都是1.11.2版本,flink-shaded-jackson是2.9.8-7.0。不太清楚flink-shaded-jackson跟flink的对应版本选择。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

????hive sql ????flink 11 ????????????????

2020-12-03 文章 ????????
?? hive sql  flink 1.11.2 ??flink 11 
??hive SQL1 ?? ??
2 != 
3 
4 split 
5??hive ??flink??
6??join Calcite bug  
https://issues.apache.org/jira/browse/CALCITE-2152??
7 create table table1 as select * from pokes;  as 



flink11 ?? hive SQL 
hive sql  ??flink ??

Re: Re:Re: flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-03 文章 yanzi
1、retry times =1报错日志:
[2020-12-02 22:01:00.800] [ERROR] [jdbc-upsert-output-format-thread-1]
[org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat] >>> JDBC
executeBatch error, retry times = 1
java.sql.SQLException: No operations allowed after statement closed.
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.StatementImpl.checkClosed(StatementImpl.java:426)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at 
com.mysql.jdbc.PreparedStatement.clearBatch(PreparedStatement.java:1051)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at
com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1323)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:954)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at
org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:71)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:200)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.attemptFlush(JdbcRowDataOutputFormat.java:153)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:120)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_262]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[?:1.8.0_262]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_262]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[?:1.8.0_262]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_262]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_262]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
2、retry times =2:
[2020-12-02 22:01:01.402] [ERROR] [jdbc-upsert-output-format-thread-1]
[org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat] >>> JDBC
executeBatch error, retry times = 2
java.sql.SQLException: No operations allowed after statement closed.
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.StatementImpl.checkClosed(StatementImpl.java:426)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at 
com.mysql.jdbc.PreparedStatement.setString(PreparedStatement.java:3943)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$createExternalConverter$57fde215$8(AbstractJdbcRowConverter.java:219)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableExternalConverter$2bf50691$1(AbstractJdbcRowConverter.java:193)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:86)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$createKeyedRowExecutor$3fd497bb$1(JdbcRowDataOutputFormat.java:164)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.executor.KeyedBatchStatementExecutor.executeBatch(KeyedBatchStatementExecutor.java:71)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.attemptFlush(JdbcRowDataOutputFormat.java:154)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.ap

Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-03 文章 yang xu
Hi 
如果不支持ACID,那如果监听binlog日志的更新和删除操作需要另外写任务来处理么,如何才能做到真的批流统一



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: taskmanager.out配置滚动

2020-12-03 文章 zilong xiao
好的,了解了,感谢您的解答

Yang Wang  于2020年12月4日周五 上午10:33写道:

> 目前是支持不了的,因为STDOUT/STDERR本身并不是通过slf4j来写的
> 如果要支持是需要在Flink代码里面将Stdout重定向之后,再配置log4j才能解决
>
> Best,
> Yang
>
> zilong xiao  于2020年12月3日周四 下午7:50写道:
>
> > 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
> >
>


Re: taskmanager.out配置滚动

2020-12-03 文章 Yang Wang
目前是支持不了的,因为STDOUT/STDERR本身并不是通过slf4j来写的
如果要支持是需要在Flink代码里面将Stdout重定向之后,再配置log4j才能解决

Best,
Yang

zilong xiao  于2020年12月3日周四 下午7:50写道:

> 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
>


Re: flink使用多个keytab

2020-12-03 文章 amen...@163.com
hi,

可以在调用executeSql()或execute()(多SQL)的时候使用ugi认证进行任务提交,以及在run命令中动态传入你所说的两个参数,以-yD的形式。

best,
amenhub



 
发件人: zhengmao776
发送时间: 2020-12-03 17:16
收件人: user-zh
主题: flink使用多个keytab
你好,我在使用flink
run提交任务时,由于集群是kerberos化的Hadoop集群,我想为不同的用户提供不同的keytab进行认证,但是我在flink-conf.yaml中看到了的security.kerberos.login.keytab和security.kerberos.login.principal的相关配置,但是这不能动态配置;我尝试了使用
-yD进行配置,但是并不起作用,我想知道如何处理这样的情况?期待您的回复~~~
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-cdc 无法读出binlog,程序也不报错

2020-12-03 文章 chenjb
谢谢老哥关注,我是int对应成bigint了,原以为bigint范围更大应该可以兼容,没认真看文档,然后mysql有错误日志但程序没报错,注意力就放mysql那边了。这两天试了下flink-cdc-sql发现有些有错误的场景不报错,比如连不上mysql(密码错误)也是程序直接退出exit
0,没任何报错或提示,是还要设置什么吗?我是java小白,这块不是很懂



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 文章 yinghua...@163.com
好的,感谢推荐!

> 在 2020年12月3日,21:57,Jark Wu  写道:
> 
> 可以尝试下 Zeppelin, 与 flink sql 的集成做的挺好的。
> 
> Best,
> Jark
> 
>> On Thu, 3 Dec 2020 at 21:55, yinghua...@163.com  wrote:
>> 
>> 这个我没说清楚,就是flink sql客户端,我们想弄个客户端给公司其他部门使用,那些部门同事只会一些sql,对于代码编程欠缺
>> 
 在 2020年12月3日,21:52,Shawn Huang  写道:
>>> 
>>> 你说的客户端是指什么?Flink 默认在 8081 端口提供了 Web UI,可以提交和取消任务,查看日志和一些基础指标。
>>> 
>>> Best,
>>> Shawn Huang
>>> 
>>> 
>>> yinghua...@163.com  于2020年12月3日周四 下午8:46写道:
>>> 
 
>> 


Re: flink sql实时计算分位数如何实现

2020-12-03 文章 Jark Wu
可以看下UDAF的文档:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregate-functions




On Thu, 3 Dec 2020 at 12:06, 爱成绕指柔 <1194803...@qq.com> wrote:

> 你好:
>     目前flink sql实时计算中没有percentile函数吗?如果没有,如何实现这一功能。
>     期待你的答复,谢谢!


Re: 帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 文章 Jark Wu
可以尝试下 Zeppelin, 与 flink sql 的集成做的挺好的。

Best,
Jark

On Thu, 3 Dec 2020 at 21:55, yinghua...@163.com  wrote:

> 这个我没说清楚,就是flink sql客户端,我们想弄个客户端给公司其他部门使用,那些部门同事只会一些sql,对于代码编程欠缺
>
> > 在 2020年12月3日,21:52,Shawn Huang  写道:
> >
> > 你说的客户端是指什么?Flink 默认在 8081 端口提供了 Web UI,可以提交和取消任务,查看日志和一些基础指标。
> >
> > Best,
> > Shawn Huang
> >
> >
> > yinghua...@163.com  于2020年12月3日周四 下午8:46写道:
> >
> >>
>


Re: 帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 文章 yinghua...@163.com
这个我没说清楚,就是flink sql客户端,我们想弄个客户端给公司其他部门使用,那些部门同事只会一些sql,对于代码编程欠缺

> 在 2020年12月3日,21:52,Shawn Huang  写道:
> 
> 你说的客户端是指什么?Flink 默认在 8081 端口提供了 Web UI,可以提交和取消任务,查看日志和一些基础指标。
> 
> Best,
> Shawn Huang
> 
> 
> yinghua...@163.com  于2020年12月3日周四 下午8:46写道:
> 
>> 


Re: 帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 文章 Shawn Huang
你说的客户端是指什么?Flink 默认在 8081 端口提供了 Web UI,可以提交和取消任务,查看日志和一些基础指标。

Best,
Shawn Huang


yinghua...@163.com  于2020年12月3日周四 下午8:46写道:

>


Re: flink-cdc 无法读出binlog,程序也不报错

2020-12-03 文章 Jark Wu
是不是 unsigned int 惹的祸...

On Thu, 3 Dec 2020 at 15:15, chenjb  wrote:

> 破案了,字段类型没按官网的要求对应起来,对应起来后正常了
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL共享source 问题

2020-12-03 文章 Jark Wu
4. 看下你的 sink 和 source 是不是 chain 在一个 task 里面的,如果是这样的,那么某个 sink task
慢,就会直接导致这个 source 分区的数据读的慢。

On Thu, 3 Dec 2020 at 21:42, Jark Wu  wrote:

> 1. 是不是共享了 source,看下 web ui 中的拓扑图就知道了
> 2. 追数据的时候,或者下游消费速度不一的时候,分区之间消费不均衡是很正常的。
> 3. 你可以调大 sink 的并发,以及增加 buffer size 来缓解这个问题。
>
> Best,
> Jark
>
> On Wed, 2 Dec 2020 at 19:22, zz  wrote:
>
>> hi各位:
>> 目前我有一个任务,source table是读取一个topic生成的,但是有6个sink,使用了多条insert
>> 语句输出到同一张mysql表中,按照我的理解,这些insert语句
>> 应该都是共享这个source table的,读取kafka只需要读取一次,但是在运行过程中发现kafka
>> topic有的分区消费的很快有的分区很慢,请问一下可能是什么原因呢?
>> topic一共是18个分区,任务是18个并行度
>
>


Re: Flink SQL共享source 问题

2020-12-03 文章 Jark Wu
1. 是不是共享了 source,看下 web ui 中的拓扑图就知道了
2. 追数据的时候,或者下游消费速度不一的时候,分区之间消费不均衡是很正常的。
3. 你可以调大 sink 的并发,以及增加 buffer size 来缓解这个问题。

Best,
Jark

On Wed, 2 Dec 2020 at 19:22, zz  wrote:

> hi各位:
> 目前我有一个任务,source table是读取一个topic生成的,但是有6个sink,使用了多条insert
> 语句输出到同一张mysql表中,按照我的理解,这些insert语句
> 应该都是共享这个source table的,读取kafka只需要读取一次,但是在运行过程中发现kafka
> topic有的分区消费的很快有的分区很慢,请问一下可能是什么原因呢?
> topic一共是18个分区,任务是18个并行度


Re: I defined a Kafka dynamic table in SQL-Client, but the kafka theme had some elements in the wrong format, so an exception was thrown in SQL-Client. Can we define the Kafka dynamic table with some

2020-12-03 文章 Jark Wu
我觉得这应该是个 bug,已创建 issue: https://issues.apache.org/jira/browse/FLINK-20470

On Wed, 2 Dec 2020 at 18:02, mr.meng...@ouglook.com 
wrote:

> <
> http://apache-flink.147419.n8.nabble.com/file/t1146/QQ%E6%88%AA%E5%9B%BE111.jpg>
>
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t1146/%E6%97%A0%E6%A0%87%E9%A2%981211.png
> >
> Caused by: java.io.IOException: Failed to deserialize JSON ''.
> at
>
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:126)
> ~[flink-json-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:76)
> ~[flink-json-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[flink-sql-connector-kafka_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> Caused by: java.lang.ClassCastException:
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.MissingNode
> cannot be cast to
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
>
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-03 文章 Rui Li
Hi,

我会找个hive
2.1.1的环境来复现一下这个问题。不过首先要说明的是,目前flink不支持hive的ACID表,即使你这个例子的数据写成功了也不满足ACID的语义,在hive那边可能也读不了。

On Thu, Dec 3, 2020 at 5:23 PM yang xu <316481...@qq.com> wrote:

> Hi Rui Li
> lib 下包如下:
>  flink-csv-1.11.2.jar
>  flink-dist_2.11-1.11.2.jar
>  flink-json-1.11.2.jar
>  flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar
>  flink-shaded-zookeeper-3.4.14.jar
>  flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar
>  flink-table_2.11-1.11.2.jar
>  flink-table-api-java-bridge_2.11-1.11.2.jar
>  flink-table-blink_2.11-1.11.2.jar
>  flink-table-planner-blink_2.11-1.11.2.jar
>  log4j-1.2-api-2.12.1.jar
>  log4j-api-2.12.1.jar
>  log4j-core-2.12.1.jar
>  log4j-slf4j-impl-2.12.1.jar
>
> 写hive的语句就是简单的insert:
> insert into hive_t1  SELECT  address  FROM  users
>
> 另外建表语句如下:
> create table hive_t1(address string)
> clustered by (address) into 8 buckets
> stored as orc TBLPROPERTIES ('transactional'='true','orc.compress' =
> 'SNAPPY');
>
> 非常感谢你的解答!
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: 为什么要关闭calcite的隐式转换功能

2020-12-03 文章 Jark Wu
隐式转换功能,是一个非常重要的 public API ,需要经过社区仔细地讨论,例如哪些类型之间可以类型转换。
目前社区还没有规划这个功能,如果需要的话,可以在社区中开个 issue。

Best,
Jark

On Wed, 2 Dec 2020 at 18:33, stgztsw  wrote:

> 目前flink sql,flink hive
> sql都不支持隐式转换功能。我们在调试的时候发现其实calcite本身是支持的。但是flink这边强制关闭了。而hive本身是支持隐式转换的。这导致我们的hive任务无法迁移到flink上执行。请问关闭的原因是什么?如果我们这边开启会带来什么问题吗?


Re: flink sql 1.11.1 貌似出现bug

2020-12-03 文章 Jark Wu
看样子是提交作业超时失败了,请确认
1. flink cluster 已经起来了
2. sql client 的环境与 flink cluster 环境连通
3. sql-client-defaults.yaml 中配置了正确的 gateway-address 地址 (如果是本地 cluster,则不用配置)

Best,
Jark

On Wed, 2 Dec 2020 at 14:12, zzy  wrote:

> 遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql
>
>
> sql语句如下:
> CREATE TABLE sls_log_sz_itsp (
>   request STRING,
>   http_bundleId STRING,
>   upstream_addr STRING,
>   http_appid STRING,
>   bodyUserId STRING,
>   http_sequence STRING,
>   http_version STRING,
>   response_body STRING,
>   uri STRING,
>   bytes_sent STRING,
>   http_userId STRING,
>   http_cityId STRING,
>   http_user_agent STRING,
>   http_deviceType STRING,
>   record_time STRING,
>   rt AS TO_TIMESTAMP(DATE_FORMAT(record_time,'-MM-dd HH:mm:ss')),
>   WATERMARK FOR rt AS rt - INTERVAL '5' SECOND,
>   request_time STRING,
>   request_body STRING,
>   request_length STRING,
>   nginx_id STRING,
>   proxy_add_x_forwarded_for STRING,
>   http_deviceId STRING,
>   host STRING,
>   upstream_response_time STRING,
>   status STRING
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.version' = '0.11',
>  'connector.topic' = 'sls',
>  'connector.properties.zookeeper.connect' =
> 'hadoop85:2181,hadoop86:2181,hadoop87:2181',
>  'connector.properties.bootstrap.servers' =
> 'hadoop85:9092,hadoop86:9092,hadoop87:9092',
>  'connector.properties.group.id' = 'log-sz-itsp',
>  'connector.startup-mode' = 'latest-offset',
>  'format.type' = 'json'
> );
>
>
>
>  CREATE TABLE sz_itsp_test(
> request STRING,
> request_count BIGINT NOT NULL,
> window_end TIMESTAMP(3)
> ) WITH (
> 'connector.type' = 'jdbc',
> 'connector.url' =
> 'jdbc:mysql://hadoop85:3306/test?useSSL=false&autoReconnect=true',
> 'connector.table' = 'sz_itsp_test',
> 'connector.driver' = 'com.mysql.jdbc.Driver',
> 'connector.username' = 'root',
> 'connector.password' = '00',
> 'connector.write.flush.max-rows' = '1',
> 'connector.write.flush.interval' = '2s',
> 'connector.write.max-retries' = '3'
> );
>
>
> INSERT INTO sz_itsp_test
> SELECT
>request,
>count(request) request_count,
>TUMBLE_END(rt, INTERVAL '5' MINUTE) AS window_end
>  FROM sls_log_sz_itsp
>  WHERE nginx_id = 'sz-itsp' AND nginx_id IS NOT NULL
>  GROUP BY TUMBLE(rt, INTERVAL '5' MINUTE), request
>  ;
>
>
> sql client使用中出现如下报错:
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
> Caused by: java.lang.RuntimeException: Error running SQL job.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:608)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:529)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:537)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:299)
> at java.util.Optional.ifPresent(Optional.java:159)
> at
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
> at
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:605)
> ... 8 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(Complet

Re: Flink TableAPI Issue: cannot assign instance of org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.LRUMap to field

2020-12-03 文章 Jark Wu
检查下提交作业的 flink 版本,和 yarn 集群上部署的 flink 版本是否一致。
或者可能是你集群中有两个不同版本的 flink-shaded-jackson 包。

On Wed, 2 Dec 2020 at 11:55, Zed  wrote:

> When I submitted a flink-table-sql job to yarn, the following exception
> came
> out. Wondering how to solve it. Anyone can help me with that? Appreciate
> it
>
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
> at
>
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.LRUMap
> to field
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
> of type java.util.concurrent.ConcurrentHashMap in instance of
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache
> at
>
> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)
> at
> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
>
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
>
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> at
>
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
> 

Flink1.9设置TTL不生效

2020-12-03 文章 Yang Peng
Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下:

private static final String EV_STATE_FLAG = "EV_EID_FLAG";

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(60))
.updateTtlOnCreateAndWrite()
.neverReturnExpired()
.cleanupInRocksdbCompactFilter(1000)
.build();
MapStateDescriptor eidMapStateDesc = new
MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO);
eidMapStateDesc.enableTimeToLive(ttlConfig);
eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);

设置TTL过期时间为60mins
但是目前已经运行了一天了,通过rocksdb监控我们查看EV_STATE_FLAG这个名称的SST文件一直在增加没有降低的趋势,我们从TM日志发现如下信息:
WARN org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
compaction filter for state < EV_EID_FLAG >: feature is disabled for the
state backend.
但是我们在添加完  state.backend.rocksdb.ttl.compaction.filter.enabled:
true这个参数重启任务之后上述warn
信息就会消失,但是任务运行一段时间后就会执行cp失败,我们查看jstack发现执行cp失败是卡在了获取state数据的代码位置,去掉这个参数之后任务就会恢复,但是TTL
配置不生效这个warn就会复现,大家有遇到过这种问题吗?


Re: 关于flink cdc sql转出Stream流问题

2020-12-03 文章 Jark Wu
row 里面的数据就是你 schema 中定义的字段和顺序,可以按下标取值。

On Tue, 1 Dec 2020 at 13:59, jsqf  wrote:

> 可以使用这种方式:
> DataStream dstream = tableEnv.toAppendStream(sourceTable,
> RowData.class);
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink sql client 报错java.net.NoRouteToHostException: 没有到主机的路由

2020-12-03 文章 Jark Wu
你本地 ping 一下 localhost 看看能不能 ping 通。
另外看看本地有没有开网络代理,有的话关掉试试。

Best,
Jark

On Tue, 1 Dec 2020 at 09:38, 奚焘 <759928...@qq.com> wrote:

> 本人刚学习flink ,下载解压了flink,启动./sql-client.sh embedded ,输入SELECT 'Hello
> World';报错
> Flink SQL> SELECT 'Hello World';
> [ERROR] Could not execute SQL statement. Reason:
> java.net.NoRouteToHostException: 没有到主机的路由
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


帮忙推荐下flink是用啥可视化的客户端?

2020-12-03 文章 yinghua...@163.com


Re: 摄像头视频流采集

2020-12-03 文章 Jark Wu
适合啊。

On Tue, 1 Dec 2020 at 09:37, Xia(Nate) Qu  wrote:

> 请教各位:
>
>
> 我们想做多个监控摄像头的视频流采集平台,摄像头的数量大概有1000-5000个,摄像头的流数据直接发到采集平台,之后平台可以将数据写到Hadoop或者用于机器学习消费,不知道flink是不是适合这样的场景呢?谢谢
>
>
> 屈夏
>


Re: flink cdc 如何保证group agg结果正确性

2020-12-03 文章 Jark Wu
你的数据源中是不是没有历史全量数据,所以发现结果对不上?

一般建议同步全量+增量数据到 kafka 中,然后flink 从头消费这个 topic。
另外 mysql-cdc connector [1] 也提供了全量+增量读取的能力。

Best,
Jark

[1]:
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector


On Mon, 30 Nov 2020 at 22:54, kandy.wang  wrote:

> insert into kudu.default_database.index_agg
> SELECT v_spu_id as spu_id,sum(leaving_num*vipshop_price) as
> leaving_price,DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss')
> FROM  XX.XX.XX
> group by v_spu_id;
>
>
> XX.XX.XX 是通过自定义cdc
> format消费公司的cdc数据源,cdc数据源在kafka,数据只保留7天数据,都是增量消费,如何保证结果准确。
> 怎么做初始化,这个初始化,是把数据初始化到state里么? 现在通过对数发现,数据量对不上。


Flink1.9设置TTL不生效

2020-12-03 文章 Yang Peng
Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下:

private static final String EV_STATE_FLAG = "EV_EID_FLAG";

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(60))
.updateTtlOnCreateAndWrite()
.neverReturnExpired()
.cleanupInRocksdbCompactFilter(1000)
.build();
MapStateDescriptor eidMapStateDesc = new
MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO);
eidMapStateDesc.enableTimeToLive(ttlConfig);
eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);

设置TTL过期时间为60mins
但是目前已经运行了一天了,通过rocksdb监控我们查看EV_STATE_FLAG这个名称的SST文件一直在增加没有降低的趋势,我们从TM日志发现如下信息:
WARN org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
compaction filter for state < EV_EID_FLAG >: feature is disabled for the
state backend.
但是我们在添加完  state.backend.rocksdb.ttl.compaction.filter.enabled:
true这个参数重启任务之后上述warn
信息就会消失,但是任务运行一段时间后就会执行cp失败,我们查看jstack发现执行cp失败是卡在了获取state数据的代码位置,去掉这个参数之后任务就会恢复,但是TTL
配置不生效这个warn就会复现,大家有遇到过这种问题吗?


Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 文章 Wei Zhong
这个错误信息显示问题在后续版本已经修复,新版本发布后升级版本就能直接从错误信息中看到是哪些TableFactory冲突了:

https://issues.apache.org/jira/browse/FLINK-20186 




> 在 2020年12月3日,20:08,Wei Zhong  写道:
> 
> Hi,
> 
> 现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:
> 
> List result = new LinkedList<>();
> ServiceLoader
>.load(Factory.class, Thread.currentThread().getContextClassLoader())
>.iterator()
>.forEachRemaining(result::add);
> List jdbcResult = result.stream().filter(f ->
>DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
>f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
> System.out.println(jdbcResult);
> 
> 
>> 在 2020年12月3日,19:50,hailongwang <18868816...@163.com 
>> > 写道:
>> 
>> Hi,
>> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 
>> 的 Connector?
>> 
>> 
>> Best,
>> Hailong
>> 在 2020-12-03 14:44:18,"xuzh" mailto:huazhe...@qq.com>> 写道:
>>> 错误:
>>> 
>>> 
>>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>>> factories for identifier 'jdbc' that implement 
>>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>>> classpath
>>> 
>>> 
>>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>>> 
>>> 
>>> 代码如下:
>>> package org.apache.flink.examples;
>>> 
>>> 
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>>> 
>>> 
>>> public class CDC2ss2 {
>>>     public static void main(String[] args) throws Exception {
>>> 
>>> 
>>>         // set up execution environment
>>>         StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>         StreamTableEnvironment tEnv;
>>> 
>>> 
>>>         EnvironmentSettings settings = 
>>> EnvironmentSettings.newInstance()
>>>                 .useBlinkPlanner()
>>>                 .inStreamingMode()
>>>                 .build();
>>>         tEnv = StreamTableEnvironment.create(env, 
>>> settings);
>>>         String src_sql = "CREATE TABLE userss (\n" +
>>>                 "    
>>>  user_id INT,\n" +
>>>                 "    
>>>  user_nm STRING\n" +
>>>                 ") WITH (\n" +
>>>                 "    
>>>   'connector' = 'mysql-cdc',\n" +
>>>                 "    
>>>   'hostname' = '10.12.5.37',\n" +
>>>                 "    
>>>   'port' = '3306',\n" +
>>>                 "    
>>>   'username' = 'dps',\n" +
>>>                 "    
>>>   'password' = 'dps1234',\n" +
>>>                 "    
>>>   'database-name' = 'rpt',\n" +
>>>                 "    
>>>   'table-name' = 'users'\n" +
>>>                 "    
>>>   )";
>>> 
>>> 
>>>         tEnv.executeSql(src_sql); // 创建表
>>> 
>>> 
>>>         String sink="CREATE TABLE sink (\n" +
>>>                 "    
>>>  user_id INT,\n" +
>>>                 "    
>>>  user_nm STRING,\n" +
>>>                 "    
>>>  primary key(user_id)  NOT ENFORCED \n" +
>>>                 ") WITH (\n" +
>>>                 "    
>>>   'connector' = 'jdbc',\n" +
>>>                 "    
>>>   'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n 
>>> " +
>>>                 "    
>>>   'username' = 'dps',\n" +
>>>                 "    
>>>   'password' = 'dps1234',\n" +
>>>                 "    
>>>   'table-name' = 'sink'\n" +
>>>                 "    
>>>   )";
>>>         String to_print_sql="insert into sink select 
>>> user_id  ,user_nm   from userss";
>>>          tEnv.executeSql(sink);
>>>         tEnv.executeSql(to_print_sql);
>>>         env.execute();
>>>     }
>>> 
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 详细错误:
>>> 
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> Unable to create a sink for writing table 
>>> 'default_catalog.default_database.sink'.
>>> 
>>> 
>>> Table options are:
>>> 
>>> 
>>> 'connector'='jdbc'
>>> 'password'='dps1234'
>>> 'table-name'='sink'
>>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false' 
>>> 
>>> 'username'='dps'
>>> at 
>>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at scala.collection.

Re:Re: flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-03 文章 hailongwang
这个应该只是个 error 的日志,方便也发下retry times = 1 和 retry times = 2 的日志看下吗





在 2020-12-03 16:17:27,"yanzi"  写道:
>hi Leonard:
>
>报错信息如下:
>[2020-12-02 22:01:03.403] [ERROR] [jdbc-upsert-output-format-thread-1]
>[org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat] >>> JDBC
>executeBatch error, retry times = 3
>java.sql.SQLException: No operations allowed after statement closed.
>   at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at com.mysql.jdbc.StatementImpl.checkClosed(StatementImpl.java:426)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at 
> com.mysql.jdbc.PreparedStatement.setString(PreparedStatement.java:3943)
>~[mysql-connector-java-5.1.49.jar:5.1.49]
>   at
>org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$createExternalConverter$57fde215$8(AbstractJdbcRowConverter.java:219)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableExternalConverter$2bf50691$1(AbstractJdbcRowConverter.java:193)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:86)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$createKeyedRowExecutor$3fd497bb$1(JdbcRowDataOutputFormat.java:164)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.internal.executor.KeyedBatchStatementExecutor.executeBatch(KeyedBatchStatementExecutor.java:71)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.attemptFlush(JdbcRowDataOutputFormat.java:154)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at
>org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:120)
>~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>[?:1.8.0_262]
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>[?:1.8.0_262]
>   at
>java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>[?:1.8.0_262]
>   at
>java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>[?:1.8.0_262]
>   at
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>[?:1.8.0_262]
>   at
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>[?:1.8.0_262]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink CEP 动态加载 pattern

2020-12-03 文章 huang botao
好的,谢谢你的回答。现在在看alex的这两篇文档(
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html ;
https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html
),感觉有点儿启发。

On Wed, Dec 2, 2020 at 8:36 PM Wei Zhong  wrote:

> Hi 你好,
>
> 现在Flink CEP还不支持动态加载规则。社区现在有一个JIRA来跟踪这个需求:
>
> https://issues.apache.org/jira/browse/FLINK-7129 <
> https://issues.apache.org/jira/browse/FLINK-7129>
>
> 您可以关注这个JIRA来获取最新进展。
>
> > 在 2020年12月2日,17:48,huang botao  写道:
> >
> > Hi,在项目中常遇到规则变更的情况,我们一般怎么动态加载这些规则?Flink CEP有原生支持动态加载规则的API吗?
>
>


Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 文章 Wei Zhong
Hi,

现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:

List result = new LinkedList<>();
ServiceLoader
   .load(Factory.class, Thread.currentThread().getContextClassLoader())
   .iterator()
   .forEachRemaining(result::add);
List jdbcResult = result.stream().filter(f ->
   DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
   f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
System.out.println(jdbcResult);


> 在 2020年12月3日,19:50,hailongwang <18868816...@163.com> 写道:
> 
> Hi,
> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 
> 的 Connector?
> 
> 
> Best,
> Hailong
> 在 2020-12-03 14:44:18,"xuzh"  写道:
>> 错误:
>> 
>> 
>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>> factories for identifier 'jdbc' that implement 
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>> classpath
>> 
>> 
>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>> 
>> 
>> 代码如下:
>> package org.apache.flink.examples;
>> 
>> 
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>> 
>> 
>> public class CDC2ss2 {
>>     public static void main(String[] args) throws Exception {
>> 
>> 
>>         // set up execution environment
>>         StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         StreamTableEnvironment tEnv;
>> 
>> 
>>         EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance()
>>                 .useBlinkPlanner()
>>                 .inStreamingMode()
>>                 .build();
>>         tEnv = StreamTableEnvironment.create(env, 
>> settings);
>>         String src_sql = "CREATE TABLE userss (\n" +
>>                 "    
>>  user_id INT,\n" +
>>                 "    
>>  user_nm STRING\n" +
>>                 ") WITH (\n" +
>>                 "    
>>   'connector' = 'mysql-cdc',\n" +
>>                 "    
>>   'hostname' = '10.12.5.37',\n" +
>>                 "    
>>   'port' = '3306',\n" +
>>                 "    
>>   'username' = 'dps',\n" +
>>                 "    
>>   'password' = 'dps1234',\n" +
>>                 "    
>>   'database-name' = 'rpt',\n" +
>>                 "    
>>   'table-name' = 'users'\n" +
>>                 "    
>>   )";
>> 
>> 
>>         tEnv.executeSql(src_sql); // 创建表
>> 
>> 
>>         String sink="CREATE TABLE sink (\n" +
>>                 "    
>>  user_id INT,\n" +
>>                 "    
>>  user_nm STRING,\n" +
>>                 "    
>>  primary key(user_id)  NOT ENFORCED \n" +
>>                 ") WITH (\n" +
>>                 "    
>>   'connector' = 'jdbc',\n" +
>>                 "    
>>   'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>>                 "    
>>   'username' = 'dps',\n" +
>>                 "    
>>   'password' = 'dps1234',\n" +
>>                 "    
>>   'table-name' = 'sink'\n" +
>>                 "    
>>   )";
>>         String to_print_sql="insert into sink select 
>> user_id  ,user_nm   from userss";
>>          tEnv.executeSql(sink);
>>         tEnv.executeSql(to_print_sql);
>>         env.execute();
>>     }
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 详细错误:
>> 
>> 
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> Unable to create a sink for writing table 
>> 'default_catalog.default_database.sink'.
>> 
>> 
>> Table options are:
>> 
>> 
>> 'connector'='jdbc'
>> 'password'='dps1234'
>> 'table-name'='sink'
>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>> 'username'='dps'
>>  at 
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>  at 
>> org.apac

Re:用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 文章 hailongwang
Hi,
 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 的 
Connector?


Best,
Hailong
在 2020-12-03 14:44:18,"xuzh"  写道:
>错误:
>
>
>Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
>for identifier 'jdbc' that implement 
>'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>classpath
>
>
>看意思是找到了两个一样的类:DynamicTableSinkFactory
>
>
>代码如下:
>package org.apache.flink.examples;
>
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.factories.DynamicTableSinkFactory;
>
>
>public class CDC2ss2 {
>    public static void main(String[] args) throws Exception {
>
>
>        // set up execution environment
>        StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>        StreamTableEnvironment tEnv;
>
>
>        EnvironmentSettings settings = 
>EnvironmentSettings.newInstance()
>                .useBlinkPlanner()
>                .inStreamingMode()
>                .build();
>        tEnv = StreamTableEnvironment.create(env, 
>settings);
>        String src_sql = "CREATE TABLE userss (\n" +
>                "    
> user_id INT,\n" +
>                "    
> user_nm STRING\n" +
>                ") WITH (\n" +
>                "      
>'connector' = 'mysql-cdc',\n" +
>                "      
>'hostname' = '10.12.5.37',\n" +
>                "      
>'port' = '3306',\n" +
>                "      
>'username' = 'dps',\n" +
>                "      
>'password' = 'dps1234',\n" +
>                "      
>'database-name' = 'rpt',\n" +
>                "      
>'table-name' = 'users'\n" +
>                "      
>)";
>
>
>        tEnv.executeSql(src_sql); // 创建表
>
>
>        String sink="CREATE TABLE sink (\n" +
>                "    
> user_id INT,\n" +
>                "    
> user_nm STRING,\n" +
>                "    
> primary key(user_id)  NOT ENFORCED \n" +
>                ") WITH (\n" +
>                "      
>'connector' = 'jdbc',\n" +
>                "      
>'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>                "      
>'username' = 'dps',\n" +
>                "      
>'password' = 'dps1234',\n" +
>                "      
>'table-name' = 'sink'\n" +
>                "      
>)";
>        String to_print_sql="insert into sink select 
>user_id  ,user_nm   from userss";
>         tEnv.executeSql(sink);
>        tEnv.executeSql(to_print_sql);
>        env.execute();
>    }
>
>
>}
>
>
>
>
>
>详细错误:
>
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Unable to create a sink for writing table 
>'default_catalog.default_database.sink'.
>
>
>Table options are:
>
>
>'connector'='jdbc'
>'password'='dps1234'
>'table-name'='sink'
>'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>'username'='dps'
>   at 
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>   at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
>Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
>connector using option ''connector'='jdbc''.
>   at 
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(Factor

taskmanager.out配置滚动

2020-12-03 文章 zilong xiao
想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?


Re: zookeeper更换leader对flink的影响

2020-12-03 文章 Yang Wang
我查了一下,社区目前已经有相关的ticket和PR了,你可以关注一下

https://issues.apache.org/jira/browse/FLINK-10052
https://github.com/apache/flink/pull/11338

Best,
Yang

Yang Wang  于2020年12月3日周四 下午4:52写道:

> 我理解Curator和ZooKeeper的各个节点(包括Leader, Followers)之间都是长连接
> 如果你重启了ZK节点的其中一个,应该会导致和这个节点连着的Curator Client都Suspend,
> 进而导致相应的JobManager丢掉leader ship,所以会cancel掉当前任务然后重新运行
>
> 你可以验证一下是不是重启一个ZK节点只是特定的连到这台上面的Flink任务Failover,而不是全部的
>
> 最后,这个问题目前应该是没有办法通过Flink配置直接解决,除非是Curator#LeaderLatch对Suspend的状态处理可以进行改进
> 同时Flink里也需要在LeaderRetrieval中对Suspend状态的处理进行改进,不是直接notify一个empty leader
>
>
> Best,
> Yang
>
> 赵一旦  于2020年12月3日周四 上午10:28写道:
>
>>
>> 那Curator的state为什么会变成suspended或lost呢?我重启zk一般都是一台一台重启,而且我最近才刚刚又试过一次,我是先重启了follower
>> zk节点,结果刚刚kill一瞬间flink任务全部出问题了。
>>
>> Yang Wang  于2020年12月1日周二 下午8:18写道:
>>
>> > Flink是利用Curator Framework来进行Leader Election和Retrieval,当时Curator的State
>> > 变成Suspended或者Lost的时候都会触发leader的revoke,进而导致需要Cancel掉之前的job
>> > 等待新的leader出现再重新调度
>> >
>> > 你可以提供一下JobManager的log或者自己观察一下JobManager的log是不是有Curator Connection
>> State的变化
>> > 进而导致了Failover
>> >
>> >
>> > Best,
>> > Yang
>> >
>> > 赵一旦  于2020年12月1日周二 下午7:13写道:
>> >
>> > > 又石沉大海了,有没有懂的人出来解释下。
>> > >
>> > > RS  于2020年11月17日周二 上午9:35写道:
>> > >
>> > > > 哈哈, 我的也是, flink和ZK断开连接的话, 任务会全部重启, 这边测试了各种场景, 比如部署HA方案,
>> > > > 部署多个jobmanager都测试过, 任务都是会重启的, 同样不知道如何解决.
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > 在 2020-11-16 18:39:29,"赵一旦"  写道:
>> > > >
>> > > >
>> > >
>> >
>> >按照我在工作中经验,有过几次需要重启zk集群,我是单个zk节点逐个重启。结论是导致了flink集群中任务的全部自动重启(基于最近一次的ckpt)。这对任务还是有一定影响的,因为ckpt是10分钟一次,会导致瞬间压力变高。
>> > > > >
>> > > > >问下这个合理嘛,还是我配置的有问题or操作有问题。
>> > > >
>> > >
>> >
>>
>


Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-03 文章 yang xu
Hi Rui Li
lib 下包如下:
 flink-csv-1.11.2.jar
 flink-dist_2.11-1.11.2.jar
 flink-json-1.11.2.jar
 flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar
 flink-shaded-zookeeper-3.4.14.jar
 flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar
 flink-table_2.11-1.11.2.jar
 flink-table-api-java-bridge_2.11-1.11.2.jar
 flink-table-blink_2.11-1.11.2.jar
 flink-table-planner-blink_2.11-1.11.2.jar
 log4j-1.2-api-2.12.1.jar
 log4j-api-2.12.1.jar
 log4j-core-2.12.1.jar
 log4j-slf4j-impl-2.12.1.jar

写hive的语句就是简单的insert:
insert into hive_t1  SELECT  address  FROM  users

另外建表语句如下:
create table hive_t1(address string) 
clustered by (address) into 8 buckets 
stored as orc TBLPROPERTIES ('transactional'='true','orc.compress' =
'SNAPPY');

非常感谢你的解答!




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flinksql1.11长时间没有数据写入mysql,会报ava.sql.SQLException: No operations allowed after statement closed.

2020-12-03 文章 yanzi
hi Leonard:

报错信息如下:
[2020-12-02 22:01:03.403] [ERROR] [jdbc-upsert-output-format-thread-1]
[org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat] >>> JDBC
executeBatch error, retry times = 3
java.sql.SQLException: No operations allowed after statement closed.
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:861)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at com.mysql.jdbc.StatementImpl.checkClosed(StatementImpl.java:426)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at 
com.mysql.jdbc.PreparedStatement.setString(PreparedStatement.java:3943)
~[mysql-connector-java-5.1.49.jar:5.1.49]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$createExternalConverter$57fde215$8(AbstractJdbcRowConverter.java:219)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableExternalConverter$2bf50691$1(AbstractJdbcRowConverter.java:193)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:86)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.lambda$createKeyedRowExecutor$3fd497bb$1(JdbcRowDataOutputFormat.java:164)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.executor.KeyedBatchStatementExecutor.executeBatch(KeyedBatchStatementExecutor.java:71)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.table.JdbcRowDataOutputFormat.attemptFlush(JdbcRowDataOutputFormat.java:154)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:171)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:120)
~[flink-connector-jdbc_2.11-1.11.0.jar:1.11.0]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_262]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[?:1.8.0_262]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_262]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[?:1.8.0_262]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_262]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_262]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_262]



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink使用多个keytab

2020-12-03 文章 zhengmao776
你好,我在使用flink
run提交任务时,由于集群是kerberos化的Hadoop集群,我想为不同的用户提供不同的keytab进行认证,但是我在flink-conf.yaml中看到了的security.kerberos.login.keytab和security.kerberos.login.principal的相关配置,但是这不能动态配置;我尝试了使用
-yD进行配置,但是并不起作用,我想知道如何处理这样的情况?期待您的回复~~~



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 为什么要关闭calcite的隐式转换功能

2020-12-03 文章 Rui Li
Hi,

我理解可能是calcite隐式类型转换功能还比较新,暂时没有启用。不过即使开启
了跟hive的隐式转换逻辑也不一定完全一致,比如某些hive允许的转换calcite可能不允许。目前社区也在做hive语法兼容的工作,这个功能有了以后迁移hive任务会更容易。

On Wed, Dec 2, 2020 at 6:43 PM tangshiwei 
wrote:

> 目前flink sql,flink hive
> sql都不支持隐式转换功能。我们在调试的时候发现其实calcite本身是支持的。但是flink这边强制关闭了。而hive本身是支持隐式转换的。这导致我们的hive任务无法迁移到flink上执行。请问关闭的原因是什么?如果我们这边开启会带来什么问题吗?



-- 
Best regards!
Rui Li


Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-03 文章 Rui Li
Hi,

你的flink lib下都添加了哪些依赖呢,另外出问题的SQL是怎么写的?

On Thu, Dec 3, 2020 at 4:15 PM yang xu <316481...@qq.com> wrote:

> flink 版本1.11.2
> hive 版本2.1.1  基于cdh 6.2.1
> 写普通表或parquet没问题,写orc报如下错误:
> <
> http://apache-flink.147419.n8.nabble.com/file/t1150/flink_hive%E5%8C%85%E5%86%B2%E7%AA%81.png>
>
>
> 也看到其它邮件列表说修改:
> flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar
> OrcFile:
> WriterVersion CURRENT_WRITER = WriterVersion.HIVE_13083
> 重新编译即可,但是这样尝试之后还是报同样的错误,是Hive必须升级到3.x版本么?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: zookeeper更换leader对flink的影响

2020-12-03 文章 Yang Wang
我理解Curator和ZooKeeper的各个节点(包括Leader, Followers)之间都是长连接
如果你重启了ZK节点的其中一个,应该会导致和这个节点连着的Curator Client都Suspend,
进而导致相应的JobManager丢掉leader ship,所以会cancel掉当前任务然后重新运行

你可以验证一下是不是重启一个ZK节点只是特定的连到这台上面的Flink任务Failover,而不是全部的

最后,这个问题目前应该是没有办法通过Flink配置直接解决,除非是Curator#LeaderLatch对Suspend的状态处理可以进行改进
同时Flink里也需要在LeaderRetrieval中对Suspend状态的处理进行改进,不是直接notify一个empty leader


Best,
Yang

赵一旦  于2020年12月3日周四 上午10:28写道:

>
> 那Curator的state为什么会变成suspended或lost呢?我重启zk一般都是一台一台重启,而且我最近才刚刚又试过一次,我是先重启了follower
> zk节点,结果刚刚kill一瞬间flink任务全部出问题了。
>
> Yang Wang  于2020年12月1日周二 下午8:18写道:
>
> > Flink是利用Curator Framework来进行Leader Election和Retrieval,当时Curator的State
> > 变成Suspended或者Lost的时候都会触发leader的revoke,进而导致需要Cancel掉之前的job
> > 等待新的leader出现再重新调度
> >
> > 你可以提供一下JobManager的log或者自己观察一下JobManager的log是不是有Curator Connection
> State的变化
> > 进而导致了Failover
> >
> >
> > Best,
> > Yang
> >
> > 赵一旦  于2020年12月1日周二 下午7:13写道:
> >
> > > 又石沉大海了,有没有懂的人出来解释下。
> > >
> > > RS  于2020年11月17日周二 上午9:35写道:
> > >
> > > > 哈哈, 我的也是, flink和ZK断开连接的话, 任务会全部重启, 这边测试了各种场景, 比如部署HA方案,
> > > > 部署多个jobmanager都测试过, 任务都是会重启的, 同样不知道如何解决.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > 在 2020-11-16 18:39:29,"赵一旦"  写道:
> > > >
> > > >
> > >
> >
> >按照我在工作中经验,有过几次需要重启zk集群,我是单个zk节点逐个重启。结论是导致了flink集群中任务的全部自动重启(基于最近一次的ckpt)。这对任务还是有一定影响的,因为ckpt是10分钟一次,会导致瞬间压力变高。
> > > > >
> > > > >问下这个合理嘛,还是我配置的有问题or操作有问题。
> > > >
> > >
> >
>


Re: Flink SQL使用Tumble窗口函数报NoSuchMethodError functions/AggregateFunction 异常

2020-12-03 文章 JasonLee
hi

从报错信息看应该jar包冲突了,可以贴一下相关的依赖包吗



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-03 文章 yang xu
flink 版本1.11.2
hive 版本2.1.1  基于cdh 6.2.1 
写普通表或parquet没问题,写orc报如下错误:

 

也看到其它邮件列表说修改:
flink-sql-connector-hive-2.2.0_2.11-1.11.2.jar 
OrcFile:
WriterVersion CURRENT_WRITER = WriterVersion.HIVE_13083
重新编译即可,但是这样尝试之后还是报同样的错误,是Hive必须升级到3.x版本么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/