Re: slot问题
一个slot可以运行多个task(同一个作业的不同task),每个task使用一个线程执行。 ゞ野蠻遊戲χ wrote > Hi 大家好 > > > 一个slot同时只能运行一个线程吗?或者1个slot可以同时并行运行多个线程? > > > 谢谢, > 嘉治 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Reduce等函数的对下reuse问题
这个问题有人讨论下嘛? 赵一旦 于2020年11月16日周一 下午2:48写道: > 再具体点,reduce中return的对象作为reduce之后输出(这里是否涉及立即序列化)。 > > reduce(new ReduceFunction{ > > @Override > public ObjCls reduce( ObjCls ele1, ObjCls ele2 ){ > long resultPv = ele1.getPv() + ele2.getPv(); > > ele1.setPv(999); // 此处如果加这么一句setPv,会影响到什么算子呢?(各种可能DAG情况下) > > ele1.setPv( resultPv ); > return ele1; > } > > }) > > 赵一旦 于2020年11月16日周一 下午2:40写道: > >> 如题,想知道reduce函数实现的时候,什么情况复用对下可能导致问题呢?or永远不可能导致问题呢? >> >> >> 比如计算图中存在很多重复计算: >> >> streamA.reduce(reduceFunction1,); >> >> streamA.reduce(reduceFunction2,); >> >> streamA. >> >
Re: flink sql时间戳字段类型转换问题
你可以用这篇文章中的 docker: https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml 这个容器里面的 ts 数据格式是 SQL 格式的。 1. 像上述时间格式字段在Flink SQL中应该解析成什么类型? TIMESTAMP WITH LOCAL TIME ZONE, 1.12 的 json formart 才支持。 2. 是的 3. Flink 目前还不支持 TIMESTAMP WITH TIME ZONE。 '-MM-dd HH:mm:ss' 这种,对应的是 TIMESTAMP,代表无时区 timestamp long 值,或者 '-MM-dd HH:mm:ssZ' 这种是TIMESTAMP WITH LOCAL TIME ZONE ,代表session 时区的 timestamp Best, Jark On Wed, 25 Nov 2020 at 12:03, 陈帅 wrote: > 数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit > 中的kafka消息,里面user_behavior消息例如 > {"user_id": "470572", "item_id":"3760258", "category_id": "1299190", > "behavior": "pv", "ts": "2017-11-26T01:00:01Z"} > 可以看到ts值是 '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下 > > CREATE TABLE user_log ( > user_id VARCHAR, > item_id VARCHAR, > category_id VARCHAR, > behavior VARCHAR, > ts TIMESTAMP(3), > WATERMARK FOR ts AS ts - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'format' = 'json', > -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL' > 'scan.startup.mode' = 'earliest-offset' > ); > > 程序运行会抛错 > Caused by: java.time.format.DateTimeParseException: Text > '2017-11-26T01:00:00Z' could not be parsed at index 10 > > 我查了一下flink json官方文档 > > https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard > 目前只支持两种格式:SQL 和 ISO-8601 > 其中SQL支持的格式是 '-MM-dd HH:mm:ss', > 而ISO-8601支持的格式是 '-MM-ddTHH:mm:ss.s{precision}' > 确实不支持上面的 '-MM-ddTHH:mm:ssZ' (注意末尾的Z) > > 请问: > 1. 像上述时间格式字段在Flink SQL中应该解析成什么类型? > 2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string, > pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string > 里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z', > '-MM-dd'T'HH:mm:ss'Z'')? > 3. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME > ZONE这两种类型在什么情况下会用到?有例子吗? > > 谢谢! >
Re: 退订
Hi 退订请发邮件到 user-zh-unsubscr...@flink.apache.org,详情可以参考文档[1] [1] https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list Best, Congxian 回响 <939833...@qq.com> 于2020年11月24日周二 下午8:42写道: >
Re: Unsubscribe
-help Xev Orm 于2020年11月25日周三 下午12:25写道: > Unsubscribe >
delete
delete
flink sql时间戳字段类型转换问题
数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit 中的kafka消息,里面user_behavior消息例如 {"user_id": "470572", "item_id":"3760258", "category_id": "1299190", "behavior": "pv", "ts": "2017-11-26T01:00:01Z"} 可以看到ts值是 '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下 CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'json', -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL' 'scan.startup.mode' = 'earliest-offset' ); 程序运行会抛错 Caused by: java.time.format.DateTimeParseException: Text '2017-11-26T01:00:00Z' could not be parsed at index 10 我查了一下flink json官方文档 https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard 目前只支持两种格式:SQL 和 ISO-8601 其中SQL支持的格式是 '-MM-dd HH:mm:ss', 而ISO-8601支持的格式是 '-MM-ddTHH:mm:ss.s{precision}' 确实不支持上面的 '-MM-ddTHH:mm:ssZ' (注意末尾的Z) 请问: 1. 像上述时间格式字段在Flink SQL中应该解析成什么类型? 2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string, pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string 里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z', '-MM-dd'T'HH:mm:ss'Z'')? 3. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME ZONE这两种类型在什么情况下会用到?有例子吗? 谢谢!
Flink Sink function 的 close() 在程序停止时一定会被调用到吗?
我自己写了个 Sink 到数据库的 SinkFunction,SinkFunction 中指定只有数据到了一定条数(100) 才执行入库操作。我通过定义了一个 List 缓存需要入库的数据的方式实现。 public class SinkToJDBCWithJDBCStatementBatch extends RichSinkFunction { private List statementList = new ArrayList(); @Override public void close() throws Exception { writeToDatabase(); this.statementList.clear(); super.close(); if (dataSource != null) { dataSource.close(); } } @Override public void invoke(JDBCStatement statement, Context context) throws Exception { if (statementList.size() < 100) { statementList.add(statement); return; } writeToDatabase(); this.statementList.clear(); } public void writeToDatabase(){ . } } 我想确认一下 这个 close() 方法在程序停止的时候一定会被调用到吗?是通过怎样的机制实现的呢? 谢谢, 王磊
Re: slot问题
有啊,一个slot本身就可以运行多个线程的。但是不可以运行1个算子结点的多个任务,也不可以运行多个作业中的算子结点的多个任务。 ゞ野蠻遊戲χ 于2020年11月25日周三 上午10:33写道: > Hi 大家好 > > > > 一个slot同时只能运行一个线程吗?或者1个slot可以同时并行运行多个线程? > > > 谢谢, > 嘉治
关于standalone集群中JobManager进程卡顿的问题
如题,standalone集群,目前我部署的模式都是所有机器同时启动jobmanager(StandaloneSessionClusterEntrypoint)+taskmanager。 问题是发布任务,取消任务等操作的时候FlinkWebUI很卡顿,有时候仅卡顿之后恢复正常,有时候则可能导致整个集群直接多个结点陆续失效(slot变少,有时候会自动变回来,估计是网络问题)。 (1)请问,这个是因为JobManager进程所在机器性能问题嘛,如果我单独一台机器跑JobManager会不会好一点。 (2)之前我提过个问题,当时主要是说关于HA问题,讲的是zk的进程失败导致任务全部重启的问题。 这里希望有人帮忙总结下,Flink standalone集群,<1> Jobmanager进程失败(但没有全部失败,多个JobManager有) <2> ZK进程失败(但不影响ZK服务,比如3结点只失败1个,并且这1个可能是leader结点) <3> TaskManager进程失败对任务的影响是怎么样的。 对于<3>,目前我采用slot-spread那种策略,所以基本失败一个tm,任务肯定都全部自动基于最新ckpt重启,这个我接受,没啥问题。 那么对于<1>和<2>的理论表现是什么呢? 目前没做过多实验,但之前遇到过的,最起码<2>情况下导致过整个集群出问题(比如任务全部吃重启等)。
slot????
Hi ?? slot1??slot?? ??
Re: 用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确
Hi, 可以去社区jira上建个issue吗?如果有问题在1.11的版本里也需要修复的。 祝好 Leonard [1] https://issues.apache.org/jira/projects/FLINK/issues/ > 在 2020年11月24日,01:03,macdoor 写道: > > 自己回答一下,供其他人参考。 > > 换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2 > 的一个bug,1.12应该已经改正了 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
(无主题)
退订 | | gfjia | | 邮箱:gfjia_t...@163.com | 签名由 网易邮箱大师 定制
Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决
Btw,能问一下为什么用 Stream API 而不是直接用 Flink SQL 么? On Wed, 25 Nov 2020 at 00:21, Jark Wu wrote: > See the docs: > https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts > > On Tue, 24 Nov 2020 at 23:54, yujianbo <15205029...@163.com> wrote: > >> 一、环境: >> 1、版本:1.11.2 >> 2、flink CDC 用Stream API 从mysql 同步到kudu >> >> 二、遇到的问题现象: >> 1、目前线上已经同步了几张mysql表到kudu了,mysql的量级都在3千万左右。 >> 但是有一张mysql表同步了几次都遇到一个问题:大概能判断在全量阶段,还没到增量阶段。 >> >> >> 错误日志在下面。目前想采取“autoReconnect=true”看看来避免,到是不应该加在哪个地方,看日志感觉加了这个参数也是治标不治本,重点是为啥不发送packet,造成了卡顿? >> >> 下面是具体报错: >> == >> 2020-11-24 20:00:37,547 *ERROR io.debezium.connector.mysql.SnapshotReader >> * >> [] - Failed due to error: Aborting snapshot due to error when last running >> 'SELECT * FROM `uchome`.`forums_post_12`': *The last packet successfully >> received from the server was 39 milliseconds ago. The last packet sent >> successfully to the server was 6,772,615 milliseconds ago. is longer than >> the server configured value of 'wait_timeout'. You should consider either >> expiring and/or testing connection validity before use in your >> application, >> increasing the server configured values for client timeouts, or using the >> Connector/J connection property 'autoReconnect=true' to avoid this >> problem.* >> org.apache.kafka.connect.errors.ConnectException: The last packet >> successfully received from the server was 39 milliseconds ago. The last >> packet sent successfully to the server was 6,772,615 milliseconds ago. is >> longer than the server configured value of 'wait_timeout'. You should >> consider either expiring and/or testing connection validity before use in >> your application, increasing the server configured values for client >> timeouts, or using the Connector/J connection property >> 'autoReconnect=true' >> to avoid this problem. Error code: 0; SQLSTATE: 08S01. >> at >> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) >> >> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] >> at >> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) >> >> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] >> at >> >> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) >> >> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] >> at >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> [?:1.8.0_231] >> at >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> [?:1.8.0_231] >> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231] >> *Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last >> packet successfully received from the server was 39 milliseconds ago. The >> last packet sent successfully to the server was 6,772,615 milliseconds >> ago. >> is longer than the server configured value of 'wait_timeout'. *You should >> consider either expiring and/or testing connection validity before use in >> your application, increasing the server configured values for client >> timeouts, or using the Connector/J connection property >> 'autoReconnect=true' >> to avoid this problem. >> at >> >> com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174) >> >> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] >> at >> >> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64) >> >> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] >> === >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> >
Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决
See the docs: https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts On Tue, 24 Nov 2020 at 23:54, yujianbo <15205029...@163.com> wrote: > 一、环境: > 1、版本:1.11.2 > 2、flink CDC 用Stream API 从mysql 同步到kudu > > 二、遇到的问题现象: > 1、目前线上已经同步了几张mysql表到kudu了,mysql的量级都在3千万左右。 > 但是有一张mysql表同步了几次都遇到一个问题:大概能判断在全量阶段,还没到增量阶段。 > > > 错误日志在下面。目前想采取“autoReconnect=true”看看来避免,到是不应该加在哪个地方,看日志感觉加了这个参数也是治标不治本,重点是为啥不发送packet,造成了卡顿? > > 下面是具体报错: > == > 2020-11-24 20:00:37,547 *ERROR io.debezium.connector.mysql.SnapshotReader > * > [] - Failed due to error: Aborting snapshot due to error when last running > 'SELECT * FROM `uchome`.`forums_post_12`': *The last packet successfully > received from the server was 39 milliseconds ago. The last packet sent > successfully to the server was 6,772,615 milliseconds ago. is longer than > the server configured value of 'wait_timeout'. You should consider either > expiring and/or testing connection validity before use in your application, > increasing the server configured values for client timeouts, or using the > Connector/J connection property 'autoReconnect=true' to avoid this > problem.* > org.apache.kafka.connect.errors.ConnectException: The last packet > successfully received from the server was 39 milliseconds ago. The last > packet sent successfully to the server was 6,772,615 milliseconds ago. is > longer than the server configured value of 'wait_timeout'. You should > consider either expiring and/or testing connection validity before use in > your application, increasing the server configured values for client > timeouts, or using the Connector/J connection property 'autoReconnect=true' > to avoid this problem. Error code: 0; SQLSTATE: 08S01. > at > io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) > > ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] > at > io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) > > ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] > at > io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) > > ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > [?:1.8.0_231] > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > [?:1.8.0_231] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231] > *Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last > packet successfully received from the server was 39 milliseconds ago. The > last packet sent successfully to the server was 6,772,615 milliseconds ago. > is longer than the server configured value of 'wait_timeout'. *You should > consider either expiring and/or testing connection validity before use in > your application, increasing the server configured values for client > timeouts, or using the Connector/J connection property 'autoReconnect=true' > to avoid this problem. > at > > com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174) > > ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] > at > > com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64) > > ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] > === > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决
一、环境: 1、版本:1.11.2 2、flink CDC 用Stream API 从mysql 同步到kudu 二、遇到的问题现象: 1、目前线上已经同步了几张mysql表到kudu了,mysql的量级都在3千万左右。 但是有一张mysql表同步了几次都遇到一个问题:大概能判断在全量阶段,还没到增量阶段。 错误日志在下面。目前想采取“autoReconnect=true”看看来避免,到是不应该加在哪个地方,看日志感觉加了这个参数也是治标不治本,重点是为啥不发送packet,造成了卡顿? 下面是具体报错: == 2020-11-24 20:00:37,547 *ERROR io.debezium.connector.mysql.SnapshotReader * [] - Failed due to error: Aborting snapshot due to error when last running 'SELECT * FROM `uchome`.`forums_post_12`': *The last packet successfully received from the server was 39 milliseconds ago. The last packet sent successfully to the server was 6,772,615 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem.* org.apache.kafka.connect.errors.ConnectException: The last packet successfully received from the server was 39 milliseconds ago. The last packet sent successfully to the server was 6,772,615 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem. Error code: 0; SQLSTATE: 08S01. at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831) ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_231] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_231] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231] *Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last packet successfully received from the server was 39 milliseconds ago. The last packet sent successfully to the server was 6,772,615 milliseconds ago. is longer than the server configured value of 'wait_timeout'. *You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem. at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174) ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64) ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?] === -- Sent from: http://apache-flink.147419.n8.nabble.com/
测试用例调试问题
本地运行测试用例有时会有一堆Scala文件报错,但是整体工程编译又没问题,求大佬解答这种情况该怎么办呢?能忽略Scala文件吗?
????
flink 自定义AggregateFunction 如何识别HyperLogLog对象?
自定义AggregateFunction 实现了UV的 HLL 近似计算,问题是 HyperLogLog 是第三方包,这个如何让flink 识别 ? 就不知道这个TypeInformation该如何写。 代码如下: import io.airlift.slice.Slices; import io.airlift.stats.cardinality.HyperLogLog; import org.apache.flink.table.functions.AggregateFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; public class FlinkUDAFCardinalityEstimationFunction extends AggregateFunction { private static final Logger LOG = LoggerFactory.getLogger(JsonArrayParseUDTF.class); private static final int NUMBER_OF_BUCKETS = 4096; @Override public HyperLogLog createAccumulator() { return HyperLogLog.newInstance(NUMBER_OF_BUCKETS); } @Override public Long getValue(HyperLogLog acc) { if(acc == null){ return 0L; } return acc.cardinality(); } public void accumulate(HyperLogLog acc, String element) { if(element == null){ return; } acc.add(Slices.utf8Slice(element)); } public void retract(HyperLogLog acc, byte[] element) { // do nothing LOG.info("-- retract:" + new String(element)); } public void merge(HyperLogLog acc, Iterable it) { Iterator iter = it.iterator(); while (iter.hasNext()) { HyperLogLog a = iter.next(); if(a != null) { acc.mergeWith(a); } } } public void resetAccumulator(HyperLogLog acc) { acc = HyperLogLog.newInstance(NUMBER_OF_BUCKETS); } }
Re: Flink SQL Row里嵌套Array该如何用DDL定义?
好的,感谢Benchao的解答~ Benchao Li 于2020年11月24日周二 下午7:49写道: > 从这一行代码看出来的: > > https://github.com/yangyichao-mango/flink-protobuf/blob/616051d74d0973136f931189fd29bd78c0e5/src/main/java/flink/formats/protobuf/ProtobufRowDeserializationSchema.java#L107 > > 现在社区还没有正式支持ProtoBuf Format,不过已经有相关issue和讨论了[1] > > [1] https://issues.apache.org/jira/browse/FLINK-18202 > > zilong xiao 于2020年11月24日周二 下午4:46写道: > > > 这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢? > > > > Benchao Li 于2020年11月24日周二 下午4:33写道: > > > > > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。 > > > > > > zilong xiao 于2020年11月24日周二 下午4:13写道: > > > > > > > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, > > > > https://github.com/yangyichao-mango/flink-protobuf > > > > > > > > Benchao Li 于2020年11月24日周二 下午3:43写道: > > > > > > > > > 看起来你的DDL写的没有什么问题。 > > > > > > > > > > 你用的是哪个Flink版本呢? > > > > > 此外就是可以发下更完整的异常栈么? > > > > > > > > > > zilong xiao 于2020年11月24日周二 下午2:54写道: > > > > > > > > > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > > > > > > > > > > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > > > > > > > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > > > > > > > > > > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > > > > > > > > > > > > > [image: image.png] > > > > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > Best, > > > > > > > Benchao Li > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Best, > > > > > Benchao Li > > > > > > > > > > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > > > -- > > Best, > Benchao Li >
Re: Flink SQL Row里嵌套Array该如何用DDL定义?
从这一行代码看出来的: https://github.com/yangyichao-mango/flink-protobuf/blob/616051d74d0973136f931189fd29bd78c0e5/src/main/java/flink/formats/protobuf/ProtobufRowDeserializationSchema.java#L107 现在社区还没有正式支持ProtoBuf Format,不过已经有相关issue和讨论了[1] [1] https://issues.apache.org/jira/browse/FLINK-18202 zilong xiao 于2020年11月24日周二 下午4:46写道: > 这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢? > > Benchao Li 于2020年11月24日周二 下午4:33写道: > > > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。 > > > > zilong xiao 于2020年11月24日周二 下午4:13写道: > > > > > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, > > > https://github.com/yangyichao-mango/flink-protobuf > > > > > > Benchao Li 于2020年11月24日周二 下午3:43写道: > > > > > > > 看起来你的DDL写的没有什么问题。 > > > > > > > > 你用的是哪个Flink版本呢? > > > > 此外就是可以发下更完整的异常栈么? > > > > > > > > zilong xiao 于2020年11月24日周二 下午2:54写道: > > > > > > > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > > > > > > > > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > > > > > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > > > > > > > > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > > > > > > > > > > > [image: image.png] > > > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Best, > > > > > > Benchao Li > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re: SQL Cli中找不到DeserializationSchemaFactory
YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format 只实现了新接口,所以会找不到。 目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。 可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260 Best, Jark On Tue, 24 Nov 2020 at 18:52, jy l wrote: > Hi: > flink版本1.12.0: > > 我想在sql-client-defaults.yaml中配置一张表,配置如下: > > tables: > > - name: t_users > > type: source-table > > connector: > > property-version: 1 > > type: kafka > > version: universal > > topic: ods.userAnalysis.user_profile > > startup-mode: latest-offset > > properties: > > bootstrap.servers: hostname:9092 > > group.id: flink-analysis > > format: > > type: debezium-avro-confluent > > property-version: 1 > > debezium-avro-confluent.schema-registry.url: http://hostname:8081 > > #schema-registry.url: http://hostname:8081 > > schema: > > - name: userId > > data-type: STRING > > - name: province > > data-type: STRING > > - name: city > > data-type: STRING > > - name: age > > data-type: INT > > - name: education > > data-type: STRING > > - name: jobType > > data-type: STRING > > - name: marriage > > data-type: STRING > > - name: sex > > data-type: STRING > > - name: interest > > data-type: STRING > > > > > 我把相关的包都已经放到了lib目录下,启动sql cli时报错如下: > > Exception in thread "main" > org.apache.flink.table.client.SqlClientException: Unexpected exception. > This is a bug. Please consider filing an issue. > > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208) > > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878) > > at > > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226) > > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) > > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: > Could not find a suitable table factory for > 'org.apache.flink.table.factories.DeserializationSchemaFactory' in > > the classpath. > > > Reason: Required context properties mismatch. > > > The following properties are requested: > > connector.properties.bootstrap.servers=henghe66:9092 > > connector.properties.group.id=flink-analysis > > connector.property-version=1 > > connector.startup-mode=latest-offset > > connector.topic=ods.userAnalysis.user_profile > > connector.type=kafka > > connector.version=universal > > format.debezium-avro-confluent.schema-registry.url= > http://192.168.101.43:8081 > > format.property-version=1 > > format.type=debezium-avro-confluent > > schema.0.data-type=VARCHAR(2147483647) > > schema.0.name=userId > > schema.1.data-type=VARCHAR(2147483647) > > schema.1.name=province > > schema.2.data-type=VARCHAR(2147483647) > > schema.2.name=city > > schema.3.data-type=INT > > schema.3.name=age > > schema.4.data-type=VARCHAR(2147483647) > > schema.4.name=education > > schema.5.data-type=VARCHAR(2147483647) > > schema.5.name=jobType > > schema.6.data-type=VARCHAR(2147483647) > > schema.6.name=marriage > > schema.7.data-type=VARCHAR(2147483647) > > schema.7.name=sex > > schema.8.data-type=VARCHAR(2147483647) > > schema.8.name=interest > > > The following factories have been considered: > > org.apache.flink.formats.avro.AvroRowFormatFactory > > org.apache.flink.formats.csv.CsvRowFormatFactory > > org.apache.flink.formats.json.JsonRowFormatFactory > > at > > org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) > > at > > org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) > > at > > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) > > at > > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > > at > > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289) > > at > > org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171) > > at > > org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61) > > at > > org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63) > > at > > org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74) > > at > > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391) > > at > >
SQL Cli中找不到DeserializationSchemaFactory
Hi: flink版本1.12.0: 我想在sql-client-defaults.yaml中配置一张表,配置如下: tables: - name: t_users type: source-table connector: property-version: 1 type: kafka version: universal topic: ods.userAnalysis.user_profile startup-mode: latest-offset properties: bootstrap.servers: hostname:9092 group.id: flink-analysis format: type: debezium-avro-confluent property-version: 1 debezium-avro-confluent.schema-registry.url: http://hostname:8081 #schema-registry.url: http://hostname:8081 schema: - name: userId data-type: STRING - name: province data-type: STRING - name: city data-type: STRING - name: age data-type: INT - name: education data-type: STRING - name: jobType data-type: STRING - name: marriage data-type: STRING - name: sex data-type: STRING - name: interest data-type: STRING 我把相关的包都已经放到了lib目录下,启动sql cli时报错如下: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.DeserializationSchemaFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector.properties.bootstrap.servers=henghe66:9092 connector.properties.group.id=flink-analysis connector.property-version=1 connector.startup-mode=latest-offset connector.topic=ods.userAnalysis.user_profile connector.type=kafka connector.version=universal format.debezium-avro-confluent.schema-registry.url= http://192.168.101.43:8081 format.property-version=1 format.type=debezium-avro-confluent schema.0.data-type=VARCHAR(2147483647) schema.0.name=userId schema.1.data-type=VARCHAR(2147483647) schema.1.name=province schema.2.data-type=VARCHAR(2147483647) schema.2.name=city schema.3.data-type=INT schema.3.name=age schema.4.data-type=VARCHAR(2147483647) schema.4.name=education schema.5.data-type=VARCHAR(2147483647) schema.5.name=jobType schema.6.data-type=VARCHAR(2147483647) schema.6.name=marriage schema.7.data-type=VARCHAR(2147483647) schema.7.name=sex schema.8.data-type=VARCHAR(2147483647) schema.8.name=interest The following factories have been considered: org.apache.flink.formats.avro.AvroRowFormatFactory org.apache.flink.formats.csv.CsvRowFormatFactory org.apache.flink.formats.json.JsonRowFormatFactory at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289) at org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171) at org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61) at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63) at org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391) at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646) at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:185) at org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:138) at
退订
Re: Flink SQL Row里嵌套Array该如何用DDL定义?
这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢? Benchao Li 于2020年11月24日周二 下午4:33写道: > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。 > > zilong xiao 于2020年11月24日周二 下午4:13写道: > > > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, > > https://github.com/yangyichao-mango/flink-protobuf > > > > Benchao Li 于2020年11月24日周二 下午3:43写道: > > > > > 看起来你的DDL写的没有什么问题。 > > > > > > 你用的是哪个Flink版本呢? > > > 此外就是可以发下更完整的异常栈么? > > > > > > zilong xiao 于2020年11月24日周二 下午2:54写道: > > > > > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > > > > > > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > > > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > > > > > > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > > > > > > > > > [image: image.png] > > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > Best, > > > > > Benchao Li > > > > > > > > > > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > > > -- > > Best, > Benchao Li >
Re: Flink SQL Row里嵌套Array该如何用DDL定义?
看起来这个format是用的自动推导schema,而不是用的DDL写的schema。 zilong xiao 于2020年11月24日周二 下午4:13写道: > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, > https://github.com/yangyichao-mango/flink-protobuf > > Benchao Li 于2020年11月24日周二 下午3:43写道: > > > 看起来你的DDL写的没有什么问题。 > > > > 你用的是哪个Flink版本呢? > > 此外就是可以发下更完整的异常栈么? > > > > zilong xiao 于2020年11月24日周二 下午2:54写道: > > > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > > > > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > > > > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > > > > > > > [image: image.png] > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > > > > > > > > > > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > > > > -- > > > > Best, > > Benchao Li > > > -- Best, Benchao Li
Re:flink on native k8s deploy issue
不好意思,这个报错应该是内存的问题。 我想说的是一下的报错。 2020-11-24 16:19:33,569 ERROR org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient [] - A Kubernetes exception occurred. java.net.UnknownHostException: tuiwen-flink-rest.flink: Name or service not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[?:1.8.0_252] at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) ~[?:1.8.0_252] at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) ~[?:1.8.0_252] at java.net.InetAddress.getAllByName0(InetAddress.java:1277) ~[?:1.8.0_252] at java.net.InetAddress.getAllByName(InetAddress.java:1193) ~[?:1.8.0_252] at java.net.InetAddress.getAllByName(InetAddress.java:1127) ~[?:1.8.0_252] at java.net.InetAddress.getByName(InetAddress.java:1077) ~[?:1.8.0_252] at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:193) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:113) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:142) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:109) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:188) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) [flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:188) [flink-dist_2.12-1.11.2.jar:1.11.2] 2020-11-24 16:19:33,606 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli [] - Error while running the Flink session. java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not create the RestClusterClient. at org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:117) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:142) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:109) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:188) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:188) [flink-dist_2.12-1.11.2.jar:1.11.2] Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: Could not create the RestClusterClient. ... 6 more Caused by: java.net.UnknownHostException: tuiwen-flink-rest.flink: Name or service not known at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[?:1.8.0_252] at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) ~[?:1.8.0_252] at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) ~[?:1.8.0_252] at java.net.InetAddress.getAllByName0(InetAddress.java:1277) ~[?:1.8.0_252] at java.net.InetAddress.getAllByName(InetAddress.java:1193) ~[?:1.8.0_252] at java.net.InetAddress.getAllByName(InetAddress.java:1127) ~[?:1.8.0_252] at java.net.InetAddress.getByName(InetAddress.java:1077) ~[?:1.8.0_252] at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:193) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:113) ~[flink-dist_2.12-1.11.2.jar:1.11.2] ... 5 more The program finished with the following exception: java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not create the RestClusterClient. at org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:117) at org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:142) at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:109) at
Re: Flink SQL Row里嵌套Array该如何用DDL定义?
用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, https://github.com/yangyichao-mango/flink-protobuf Benchao Li 于2020年11月24日周二 下午3:43写道: > 看起来你的DDL写的没有什么问题。 > > 你用的是哪个Flink版本呢? > 此外就是可以发下更完整的异常栈么? > > zilong xiao 于2020年11月24日周二 下午2:54写道: > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > > > > > [image: image.png] > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > > > > > > > > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > > > -- > > Best, > Benchao Li >
Re: flink使用hive udf函数
Hi, 这是一个已知问题 [1][2],新版本中我们只是简单的把这几个函数在hive module里禁掉了 [3],建议先用flink的函数来绕一下。 [1] https://issues.apache.org/jira/browse/FLINK-16688 [2] https://issues.apache.org/jira/browse/FLINK-16618 [3] https://issues.apache.org/jira/browse/FLINK-18995 On Tue, Nov 24, 2020 at 11:54 AM 酷酷的浑蛋 wrote: > Flink-1.11.1, hive-2.2.0 > 在使用current_timestamp或者current_date函数时会报 > Caused by: java.lang.NullPointerException > at > org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51) > at > org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:141) > > > > -- Best regards! Rui Li
flink on native k8s deploy issue
使用-Dkubernetes.rest-service.exposed.type=ClusterIP 配置是启动的flink报错: 如下: 2020-11-24 15:49:19,796 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, 0.0.0.0 2020-11-24 15:49:19,800 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123 2020-11-24 15:49:19,801 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m 2020-11-24 15:49:19,801 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1800m 2020-11-24 15:49:19,801 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2020-11-24 15:49:19,802 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1 2020-11-24 15:49:19,802 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability, zookeeper 2020-11-24 15:49:19,803 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability.cluster-id, /tuiwen-flink 2020-11-24 15:49:19,803 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability.storageDir, file:/usr/flink/tuiwen-flink 2020-11-24 15:49:19,804 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: high-availability.zookeeper.quorum, data-kafka-zookeeper-headless.tuiwen-public:2181 2020-11-24 15:49:19,804 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.backend, rocksdb 2020-11-24 15:49:19,805 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.checkpoints.dir, file:/usr/flink/flink-checkpoints 2020-11-24 15:49:19,805 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.checkpoints.num-retained, 100 2020-11-24 15:49:19,805 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.savepoints.dir, file:/usr/flink/flink-savepoints 2020-11-24 15:49:19,806 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region 2020-11-24 15:49:19,806 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: web.upload.dir, /usr/flink 2020-11-24 15:49:19,990 INFO org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could not load factory due to missing dependencies. 2020-11-24 15:49:22,366 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead 2020-11-24 15:49:22,399 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction jvm overhead memory (70.000mb (73400321 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead 2020-11-24 15:49:22,401 INFO org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived from fraction network memory (25.200mb (26424115 bytes)) is less than its min value 64.000mb (67108864 bytes), min value will be used instead 2020-11-24 15:49:22,405 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli [] - Error while running the Flink session. org.apache.flink.configuration.IllegalConfigurationException: Sum of configured Framework Heap Memory (128.000mb (134217728 bytes)), Framework Off-Heap Memory (128.000mb (134217728 bytes)), Task Off-Heap Memory (0 bytes), Managed Memory (100.800mb (105696462 bytes)) and Network Memory (64.000mb (67108864 bytes)) exceed configured Total Flink Memory (252.000mb (264241152 bytes)). at org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.deriveFromTotalFlinkMemory(TaskExecutorFlinkMemoryUtils.java:136) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.deriveFromTotalFlinkMemory(TaskExecutorFlinkMemoryUtils.java:42) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.deriveProcessSpecWithTotalProcessMemory(ProcessMemoryUtils.java:105) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.memoryProcessSpecFromConfig(ProcessMemoryUtils.java:79) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at