Re: slot问题

2020-11-24 文章 caozhen

 
一个slot可以运行多个task(同一个作业的不同task),每个task使用一个线程执行。




ゞ野蠻遊戲χ wrote
> Hi 大家好
> 
> 
> 一个slot同时只能运行一个线程吗?或者1个slot可以同时并行运行多个线程?
> 
> 
> 谢谢,
> 嘉治





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Reduce等函数的对下reuse问题

2020-11-24 文章 赵一旦
这个问题有人讨论下嘛?

赵一旦  于2020年11月16日周一 下午2:48写道:

> 再具体点,reduce中return的对象作为reduce之后输出(这里是否涉及立即序列化)。
>
> reduce(new ReduceFunction{
>
>   @Override
>   public ObjCls reduce( ObjCls ele1, ObjCls ele2 ){
> long resultPv = ele1.getPv() + ele2.getPv();
>
> ele1.setPv(999);   //  此处如果加这么一句setPv,会影响到什么算子呢?(各种可能DAG情况下)
>
> ele1.setPv( resultPv );
> return ele1;
>   }
>
> })
>
> 赵一旦  于2020年11月16日周一 下午2:40写道:
>
>> 如题,想知道reduce函数实现的时候,什么情况复用对下可能导致问题呢?or永远不可能导致问题呢?
>>
>>
>> 比如计算图中存在很多重复计算:
>>
>> streamA.reduce(reduceFunction1,);
>>
>> streamA.reduce(reduceFunction2,);
>>
>> streamA.
>>
>


Re: flink sql时间戳字段类型转换问题

2020-11-24 文章 Jark Wu
你可以用这篇文章中的 docker:
https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html
https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml

这个容器里面的 ts 数据格式是 SQL 格式的。

1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
TIMESTAMP WITH LOCAL TIME ZONE, 1.12 的 json formart 才支持。

2. 是的

3. Flink 目前还不支持 TIMESTAMP WITH TIME ZONE。
'-MM-dd HH:mm:ss' 这种,对应的是 TIMESTAMP,代表无时区 timestamp
long 值,或者  '-MM-dd HH:mm:ssZ' 这种是TIMESTAMP WITH LOCAL TIME ZONE
,代表session 时区的 timestamp

Best,
Jark



On Wed, 25 Nov 2020 at 12:03, 陈帅  wrote:

> 数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit
>  中的kafka消息,里面user_behavior消息例如
> {"user_id": "470572", "item_id":"3760258", "category_id": "1299190",
> "behavior": "pv", "ts": "2017-11-26T01:00:01Z"}
> 可以看到ts值是  '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下
>
> CREATE TABLE user_log (
> user_id VARCHAR,
> item_id VARCHAR,
> category_id VARCHAR,
> behavior VARCHAR,
> ts TIMESTAMP(3),
> WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'testGroup',
> 'format' = 'json',
> -- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL'
> 'scan.startup.mode' = 'earliest-offset'
> );
>
> 程序运行会抛错
> Caused by: java.time.format.DateTimeParseException: Text
> '2017-11-26T01:00:00Z' could not be parsed at index 10
>
> 我查了一下flink json官方文档
>
> https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard
> 目前只支持两种格式:SQL 和 ISO-8601
> 其中SQL支持的格式是 '-MM-dd HH:mm:ss',
> 而ISO-8601支持的格式是 '-MM-ddTHH:mm:ss.s{precision}'
> 确实不支持上面的 '-MM-ddTHH:mm:ssZ' (注意末尾的Z)
>
> 请问:
> 1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
> 2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string,
> pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string
> 里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z',
> '-MM-dd'T'HH:mm:ss'Z'')?
> 3. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME
> ZONE这两种类型在什么情况下会用到?有例子吗?
>
> 谢谢!
>


Re: 退订

2020-11-24 文章 Congxian Qiu
Hi
   退订请发邮件到 user-zh-unsubscr...@flink.apache.org,详情可以参考文档[1]

[1]
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
Best,
Congxian


回响 <939833...@qq.com> 于2020年11月24日周二 下午8:42写道:

>


Re: Unsubscribe

2020-11-24 文章 Xev Orm
-help

Xev Orm  于2020年11月25日周三 下午12:25写道:

> Unsubscribe
>


delete

2020-11-24 文章 Xev Orm
delete


flink sql时间戳字段类型转换问题

2020-11-24 文章 陈帅
数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit
 中的kafka消息,里面user_behavior消息例如
{"user_id": "470572", "item_id":"3760258", "category_id": "1299190",
"behavior": "pv", "ts": "2017-11-26T01:00:01Z"}
可以看到ts值是  '2017-11-26T01:00:00Z',现在要为它定义一张flink sql源表,如下

CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
-- 'json.timestamp-format.standard' = 'ISO-8601', // 不加这一行默认是'SQL'
'scan.startup.mode' = 'earliest-offset'
);

程序运行会抛错
Caused by: java.time.format.DateTimeParseException: Text
'2017-11-26T01:00:00Z' could not be parsed at index 10

我查了一下flink json官方文档
https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/table/connectors/formats/json.html#json-timestamp-format-standard
目前只支持两种格式:SQL 和 ISO-8601
其中SQL支持的格式是 '-MM-dd HH:mm:ss',
而ISO-8601支持的格式是 '-MM-ddTHH:mm:ss.s{precision}'
确实不支持上面的 '-MM-ddTHH:mm:ssZ' (注意末尾的Z)

请问:
1. 像上述时间格式字段在Flink SQL中应该解析成什么类型?
2. 如果不能直接支持的话是不是得先用VARCHAR类型接收,再利用 UNIX_TIMESTAMP(ts_string,
pattern_string) 函数转成 支持的时间格式?可问题是 pattern_string
里面如果包含单引号要如何转义?UNIX_TIMESTAMP('2017-11-26T01:00:00Z',
'-MM-dd'T'HH:mm:ss'Z'')?
3. TIMESTAMP WITH TIME ZONE和TIMESTAMP WITH LOCAL TIME
ZONE这两种类型在什么情况下会用到?有例子吗?

谢谢!


Flink Sink function 的 close() 在程序停止时一定会被调用到吗?

2020-11-24 文章 Lei Wang
我自己写了个 Sink 到数据库的 SinkFunction,SinkFunction 中指定只有数据到了一定条数(100)
才执行入库操作。我通过定义了一个 List 缓存需要入库的数据的方式实现。


public class SinkToJDBCWithJDBCStatementBatch extends
RichSinkFunction {

private List statementList = new
ArrayList();


@Override
public void close() throws Exception {
writeToDatabase();
this.statementList.clear();
super.close();
if (dataSource != null) {
dataSource.close();
}
}

@Override
public void invoke(JDBCStatement statement, Context context) throws
Exception {

if (statementList.size() < 100) {
statementList.add(statement);
return;
}
writeToDatabase();
this.statementList.clear();
}

public void writeToDatabase(){
.
}
}

我想确认一下 这个 close() 方法在程序停止的时候一定会被调用到吗?是通过怎样的机制实现的呢?

谢谢,
王磊


Re: slot问题

2020-11-24 文章 赵一旦
有啊,一个slot本身就可以运行多个线程的。但是不可以运行1个算子结点的多个任务,也不可以运行多个作业中的算子结点的多个任务。



ゞ野蠻遊戲χ  于2020年11月25日周三 上午10:33写道:

> Hi 大家好
>
>
> 
> 一个slot同时只能运行一个线程吗?或者1个slot可以同时并行运行多个线程?
>
>
> 谢谢,
> 嘉治


关于standalone集群中JobManager进程卡顿的问题

2020-11-24 文章 赵一旦
如题,standalone集群,目前我部署的模式都是所有机器同时启动jobmanager(StandaloneSessionClusterEntrypoint)+taskmanager。

问题是发布任务,取消任务等操作的时候FlinkWebUI很卡顿,有时候仅卡顿之后恢复正常,有时候则可能导致整个集群直接多个结点陆续失效(slot变少,有时候会自动变回来,估计是网络问题)。

(1)请问,这个是因为JobManager进程所在机器性能问题嘛,如果我单独一台机器跑JobManager会不会好一点。
(2)之前我提过个问题,当时主要是说关于HA问题,讲的是zk的进程失败导致任务全部重启的问题。
这里希望有人帮忙总结下,Flink standalone集群,<1> Jobmanager进程失败(但没有全部失败,多个JobManager有)
<2> ZK进程失败(但不影响ZK服务,比如3结点只失败1个,并且这1个可能是leader结点) <3>
TaskManager进程失败对任务的影响是怎么样的。


对于<3>,目前我采用slot-spread那种策略,所以基本失败一个tm,任务肯定都全部自动基于最新ckpt重启,这个我接受,没啥问题。
那么对于<1>和<2>的理论表现是什么呢?

目前没做过多实验,但之前遇到过的,最起码<2>情况下导致过整个集群出问题(比如任务全部吃重启等)。


slot????

2020-11-24 文章 ?g???U?[????
Hi ??


 
slot1??slot??


??


Re: 用flink 1.11.2 查询hive表自关联(self inner join) 结果不正确

2020-11-24 文章 Leonard Xu
Hi,

可以去社区jira上建个issue吗?如果有问题在1.11的版本里也需要修复的。

祝好
Leonard
[1] https://issues.apache.org/jira/projects/FLINK/issues/

> 在 2020年11月24日,01:03,macdoor  写道:
> 
> 自己回答一下,供其他人参考。
> 
> 换成flink 1.12.0-rc1,用相同sql处理相同数据,结果跟 hive 计算的结果相同,确认是 1.11.2
> 的一个bug,1.12应该已经改正了
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



(无主题)

2020-11-24 文章 gfjia
退订


| |
gfjia
|
|
邮箱:gfjia_t...@163.com
|

签名由 网易邮箱大师 定制

Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-24 文章 Jark Wu
Btw,能问一下为什么用 Stream API 而不是直接用 Flink SQL 么?

On Wed, 25 Nov 2020 at 00:21, Jark Wu  wrote:

> See the docs:
> https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts
>
> On Tue, 24 Nov 2020 at 23:54, yujianbo <15205029...@163.com> wrote:
>
>> 一、环境:
>> 1、版本:1.11.2
>> 2、flink CDC 用Stream  API 从mysql  同步到kudu
>>
>> 二、遇到的问题现象:
>> 1、目前线上已经同步了几张mysql表到kudu了,mysql的量级都在3千万左右。
>>  但是有一张mysql表同步了几次都遇到一个问题:大概能判断在全量阶段,还没到增量阶段。
>>
>>
>> 错误日志在下面。目前想采取“autoReconnect=true”看看来避免,到是不应该加在哪个地方,看日志感觉加了这个参数也是治标不治本,重点是为啥不发送packet,造成了卡顿?
>>
>>  下面是具体报错:
>> ==
>> 2020-11-24 20:00:37,547 *ERROR io.debezium.connector.mysql.SnapshotReader
>> *
>> [] - Failed due to error: Aborting snapshot due to error when last running
>> 'SELECT * FROM `uchome`.`forums_post_12`': *The last packet successfully
>> received from the server was 39 milliseconds ago.  The last packet sent
>> successfully to the server was 6,772,615 milliseconds ago. is longer than
>> the server configured value of 'wait_timeout'. You should consider either
>> expiring and/or testing connection validity before use in your
>> application,
>> increasing the server configured values for client timeouts, or using the
>> Connector/J connection property 'autoReconnect=true' to avoid this
>> problem.*
>> org.apache.kafka.connect.errors.ConnectException: The last packet
>> successfully received from the server was 39 milliseconds ago.  The last
>> packet sent successfully to the server was 6,772,615 milliseconds ago. is
>> longer than the server configured value of 'wait_timeout'. You should
>> consider either expiring and/or testing connection validity before use in
>> your application, increasing the server configured values for client
>> timeouts, or using the Connector/J connection property
>> 'autoReconnect=true'
>> to avoid this problem. Error code: 0; SQLSTATE: 08S01.
>> at
>> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
>>
>> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
>> at
>> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
>>
>> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
>> at
>>
>> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
>>
>> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> [?:1.8.0_231]
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> [?:1.8.0_231]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
>> *Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last
>> packet successfully received from the server was 39 milliseconds ago.  The
>> last packet sent successfully to the server was 6,772,615 milliseconds
>> ago.
>> is longer than the server configured value of 'wait_timeout'. *You should
>> consider either expiring and/or testing connection validity before use in
>> your application, increasing the server configured values for client
>> timeouts, or using the Connector/J connection property
>> 'autoReconnect=true'
>> to avoid this problem.
>> at
>>
>> com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
>>
>> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
>> at
>>
>> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
>>
>> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
>> ===
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>
>


Re: Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-24 文章 Jark Wu
See the docs:
https://github.com/ververica/flink-cdc-connectors/wiki/MySQL-CDC-Connector#setting-up-mysql-session-timeouts

On Tue, 24 Nov 2020 at 23:54, yujianbo <15205029...@163.com> wrote:

> 一、环境:
> 1、版本:1.11.2
> 2、flink CDC 用Stream  API 从mysql  同步到kudu
>
> 二、遇到的问题现象:
> 1、目前线上已经同步了几张mysql表到kudu了,mysql的量级都在3千万左右。
>  但是有一张mysql表同步了几次都遇到一个问题:大概能判断在全量阶段,还没到增量阶段。
>
>
> 错误日志在下面。目前想采取“autoReconnect=true”看看来避免,到是不应该加在哪个地方,看日志感觉加了这个参数也是治标不治本,重点是为啥不发送packet,造成了卡顿?
>
>  下面是具体报错:
> ==
> 2020-11-24 20:00:37,547 *ERROR io.debezium.connector.mysql.SnapshotReader
> *
> [] - Failed due to error: Aborting snapshot due to error when last running
> 'SELECT * FROM `uchome`.`forums_post_12`': *The last packet successfully
> received from the server was 39 milliseconds ago.  The last packet sent
> successfully to the server was 6,772,615 milliseconds ago. is longer than
> the server configured value of 'wait_timeout'. You should consider either
> expiring and/or testing connection validity before use in your application,
> increasing the server configured values for client timeouts, or using the
> Connector/J connection property 'autoReconnect=true' to avoid this
> problem.*
> org.apache.kafka.connect.errors.ConnectException: The last packet
> successfully received from the server was 39 milliseconds ago.  The last
> packet sent successfully to the server was 6,772,615 milliseconds ago. is
> longer than the server configured value of 'wait_timeout'. You should
> consider either expiring and/or testing connection validity before use in
> your application, increasing the server configured values for client
> timeouts, or using the Connector/J connection property 'autoReconnect=true'
> to avoid this problem. Error code: 0; SQLSTATE: 08S01.
> at
> io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
>
> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
> at
> io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
>
> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
> at
> io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
>
> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_231]
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_231]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
> *Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last
> packet successfully received from the server was 39 milliseconds ago.  The
> last packet sent successfully to the server was 6,772,615 milliseconds ago.
> is longer than the server configured value of 'wait_timeout'. *You should
> consider either expiring and/or testing connection validity before use in
> your application, increasing the server configured values for client
> timeouts, or using the Connector/J connection property 'autoReconnect=true'
> to avoid this problem.
> at
>
> com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
>
> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
> at
>
> com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
>
> ~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
> ===
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Flink CDC 遇到关于不发生packet造成的卡顿问题该如何解决

2020-11-24 文章 yujianbo
一、环境:
1、版本:1.11.2 
2、flink CDC 用Stream  API 从mysql  同步到kudu

二、遇到的问题现象:
1、目前线上已经同步了几张mysql表到kudu了,mysql的量级都在3千万左右。
 但是有一张mysql表同步了几次都遇到一个问题:大概能判断在全量阶段,还没到增量阶段。

错误日志在下面。目前想采取“autoReconnect=true”看看来避免,到是不应该加在哪个地方,看日志感觉加了这个参数也是治标不治本,重点是为啥不发送packet,造成了卡顿?

 下面是具体报错:
==
2020-11-24 20:00:37,547 *ERROR io.debezium.connector.mysql.SnapshotReader * 

[] - Failed due to error: Aborting snapshot due to error when last running
'SELECT * FROM `uchome`.`forums_post_12`': *The last packet successfully
received from the server was 39 milliseconds ago.  The last packet sent
successfully to the server was 6,772,615 milliseconds ago. is longer than
the server configured value of 'wait_timeout'. You should consider either
expiring and/or testing connection validity before use in your application,
increasing the server configured values for client timeouts, or using the
Connector/J connection property 'autoReconnect=true' to avoid this problem.*
org.apache.kafka.connect.errors.ConnectException: The last packet
successfully received from the server was 39 milliseconds ago.  The last
packet sent successfully to the server was 6,772,615 milliseconds ago. is
longer than the server configured value of 'wait_timeout'. You should
consider either expiring and/or testing connection validity before use in
your application, increasing the server configured values for client
timeouts, or using the Connector/J connection property 'autoReconnect=true'
to avoid this problem. Error code: 0; SQLSTATE: 08S01.
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
at
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207)
~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
at
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:831)
~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_231]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_231]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
*Caused by: com.mysql.cj.jdbc.exceptions.CommunicationsException: The last
packet successfully received from the server was 39 milliseconds ago.  The
last packet sent successfully to the server was 6,772,615 milliseconds ago.
is longer than the server configured value of 'wait_timeout'. *You should
consider either expiring and/or testing connection validity before use in
your application, increasing the server configured values for client
timeouts, or using the Connector/J connection property 'autoReconnect=true'
to avoid this problem.
at
com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
at
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
~[blob_p-b339a2f89b058d1dab7e01f8c235b6bcc0c26d10-90c2b905e5c1a69c13cf6a9259bd7be8:?]
===



--
Sent from: http://apache-flink.147419.n8.nabble.com/


测试用例调试问题

2020-11-24 文章 zilong xiao
本地运行测试用例有时会有一堆Scala文件报错,但是整体工程编译又没问题,求大佬解答这种情况该怎么办呢?能忽略Scala文件吗?


????

2020-11-24 文章 ????


flink 自定义AggregateFunction 如何识别HyperLogLog对象?

2020-11-24 文章 kandy.wang
自定义AggregateFunction 实现了UV的 HLL 近似计算,问题是 HyperLogLog 是第三方包,这个如何让flink 识别 ?
就不知道这个TypeInformation该如何写。


代码如下:
import io.airlift.slice.Slices;
import io.airlift.stats.cardinality.HyperLogLog;
import org.apache.flink.table.functions.AggregateFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.Iterator;




public class FlinkUDAFCardinalityEstimationFunction extends 
AggregateFunction {


private static final Logger LOG = 
LoggerFactory.getLogger(JsonArrayParseUDTF.class);


private static final int NUMBER_OF_BUCKETS = 4096;


@Override
public HyperLogLog createAccumulator() {
return HyperLogLog.newInstance(NUMBER_OF_BUCKETS);
}


@Override
public Long getValue(HyperLogLog acc) {
if(acc == null){
return 0L;
}
return acc.cardinality();
}


public void accumulate(HyperLogLog acc, String element) {
if(element == null){
return;
}
acc.add(Slices.utf8Slice(element));
}


public void retract(HyperLogLog acc, byte[] element) {
// do nothing
LOG.info("-- retract:" + new String(element));
}


public void merge(HyperLogLog acc, Iterable it) {
Iterator iter = it.iterator();
while (iter.hasNext()) {
HyperLogLog a = iter.next();
if(a != null) {
acc.mergeWith(a);
}
}
}


public void resetAccumulator(HyperLogLog acc) {
acc = HyperLogLog.newInstance(NUMBER_OF_BUCKETS);
}
}

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 zilong xiao
好的,感谢Benchao的解答~

Benchao Li  于2020年11月24日周二 下午7:49写道:

> 从这一行代码看出来的:
>
> https://github.com/yangyichao-mango/flink-protobuf/blob/616051d74d0973136f931189fd29bd78c0e5/src/main/java/flink/formats/protobuf/ProtobufRowDeserializationSchema.java#L107
>
> 现在社区还没有正式支持ProtoBuf Format,不过已经有相关issue和讨论了[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-18202
>
> zilong xiao  于2020年11月24日周二 下午4:46写道:
>
> > 这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢?
> >
> > Benchao Li  于2020年11月24日周二 下午4:33写道:
> >
> > > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。
> > >
> > > zilong xiao  于2020年11月24日周二 下午4:13写道:
> > >
> > > > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
> > > > https://github.com/yangyichao-mango/flink-protobuf
> > > >
> > > > Benchao Li  于2020年11月24日周二 下午3:43写道:
> > > >
> > > > > 看起来你的DDL写的没有什么问题。
> > > > >
> > > > > 你用的是哪个Flink版本呢?
> > > > > 此外就是可以发下更完整的异常栈么?
> > > > >
> > > > > zilong xiao  于2020年11月24日周二 下午2:54写道:
> > > > >
> > > > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
> > > > > >
> > > > > > Benchao Li  于2020年11月24日周二 下午2:49写道:
> > > > > >
> > > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> > > > > > >
> > > > > > > zilong xiao  于2020年11月24日周二 上午10:49写道:
> > > > > > >
> > > > > > > > [image: image.png]
> > > > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >
> > > > > > > Best,
> > > > > > > Benchao Li
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 Benchao Li
从这一行代码看出来的:
https://github.com/yangyichao-mango/flink-protobuf/blob/616051d74d0973136f931189fd29bd78c0e5/src/main/java/flink/formats/protobuf/ProtobufRowDeserializationSchema.java#L107

现在社区还没有正式支持ProtoBuf Format,不过已经有相关issue和讨论了[1]

[1] https://issues.apache.org/jira/browse/FLINK-18202

zilong xiao  于2020年11月24日周二 下午4:46写道:

> 这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢?
>
> Benchao Li  于2020年11月24日周二 下午4:33写道:
>
> > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。
> >
> > zilong xiao  于2020年11月24日周二 下午4:13写道:
> >
> > > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
> > > https://github.com/yangyichao-mango/flink-protobuf
> > >
> > > Benchao Li  于2020年11月24日周二 下午3:43写道:
> > >
> > > > 看起来你的DDL写的没有什么问题。
> > > >
> > > > 你用的是哪个Flink版本呢?
> > > > 此外就是可以发下更完整的异常栈么?
> > > >
> > > > zilong xiao  于2020年11月24日周二 下午2:54写道:
> > > >
> > > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
> > > > >
> > > > > Benchao Li  于2020年11月24日周二 下午2:49写道:
> > > > >
> > > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> > > > > >
> > > > > > zilong xiao  于2020年11月24日周二 上午10:49写道:
> > > > > >
> > > > > > > [image: image.png]
> > > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Best,
> > > > > > Benchao Li
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: SQL Cli中找不到DeserializationSchemaFactory

2020-11-24 文章 Jark Wu
YAML file 中定义的 source sink 是通过老的 factory 来寻找的,debezium format
只实现了新接口,所以会找不到。
目前也没有计划在 YAML 中支持新接口,因为 YAML 的方式已经被废弃了。
可以看下这个issue: https://issues.apache.org/jira/browse/FLINK-20260

Best,
Jark

On Tue, 24 Nov 2020 at 18:52, jy l  wrote:

> Hi:
> flink版本1.12.0:
>
> 我想在sql-client-defaults.yaml中配置一张表,配置如下:
>
> tables:
>
>   - name: t_users
>
> type: source-table
>
> connector:
>
> property-version: 1
>
> type: kafka
>
> version: universal
>
> topic: ods.userAnalysis.user_profile
>
> startup-mode: latest-offset
>
> properties:
>
> bootstrap.servers: hostname:9092
>
> group.id: flink-analysis
>
> format:
>
> type: debezium-avro-confluent
>
> property-version: 1
>
> debezium-avro-confluent.schema-registry.url: http://hostname:8081
>
> #schema-registry.url: http://hostname:8081
>
> schema:
>
> - name: userId
>
>   data-type: STRING
>
> - name: province
>
>   data-type: STRING
>
> - name: city
>
>   data-type: STRING
>
> - name: age
>
>   data-type: INT
>
> - name: education
>
>   data-type: STRING
>
> - name: jobType
>
>   data-type: STRING
>
> - name: marriage
>
>   data-type: STRING
>
> - name: sex
>
>   data-type: STRING
>
> - name: interest
>
>   data-type: STRING
>
>
>
>
> 我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
>
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
>
> at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
>
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
>
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.DeserializationSchemaFactory' in
>
> the classpath.
>
>
> Reason: Required context properties mismatch.
>
>
> The following properties are requested:
>
> connector.properties.bootstrap.servers=henghe66:9092
>
> connector.properties.group.id=flink-analysis
>
> connector.property-version=1
>
> connector.startup-mode=latest-offset
>
> connector.topic=ods.userAnalysis.user_profile
>
> connector.type=kafka
>
> connector.version=universal
>
> format.debezium-avro-confluent.schema-registry.url=
> http://192.168.101.43:8081
>
> format.property-version=1
>
> format.type=debezium-avro-confluent
>
> schema.0.data-type=VARCHAR(2147483647)
>
> schema.0.name=userId
>
> schema.1.data-type=VARCHAR(2147483647)
>
> schema.1.name=province
>
> schema.2.data-type=VARCHAR(2147483647)
>
> schema.2.name=city
>
> schema.3.data-type=INT
>
> schema.3.name=age
>
> schema.4.data-type=VARCHAR(2147483647)
>
> schema.4.name=education
>
> schema.5.data-type=VARCHAR(2147483647)
>
> schema.5.name=jobType
>
> schema.6.data-type=VARCHAR(2147483647)
>
> schema.6.name=marriage
>
> schema.7.data-type=VARCHAR(2147483647)
>
> schema.7.name=sex
>
> schema.8.data-type=VARCHAR(2147483647)
>
> schema.8.name=interest
>
>
> The following factories have been considered:
>
> org.apache.flink.formats.avro.AvroRowFormatFactory
>
> org.apache.flink.formats.csv.CsvRowFormatFactory
>
> org.apache.flink.formats.json.JsonRowFormatFactory
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>
> at
>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
>
> at
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)
>
> at
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)
>
> at
>
> org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)
>
> at
>
> org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)
>
> at
>
> org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)
>
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)
>
> at
>
> 

SQL Cli中找不到DeserializationSchemaFactory

2020-11-24 文章 jy l
Hi:
flink版本1.12.0:

我想在sql-client-defaults.yaml中配置一张表,配置如下:

tables:

  - name: t_users

type: source-table

connector:

property-version: 1

type: kafka

version: universal

topic: ods.userAnalysis.user_profile

startup-mode: latest-offset

properties:

bootstrap.servers: hostname:9092

group.id: flink-analysis

format:

type: debezium-avro-confluent

property-version: 1

debezium-avro-confluent.schema-registry.url: http://hostname:8081

#schema-registry.url: http://hostname:8081

schema:

- name: userId

  data-type: STRING

- name: province

  data-type: STRING

- name: city

  data-type: STRING

- name: age

  data-type: INT

- name: education

  data-type: STRING

- name: jobType

  data-type: STRING

- name: marriage

  data-type: STRING

- name: sex

  data-type: STRING

- name: interest

  data-type: STRING




我把相关的包都已经放到了lib目录下,启动sql cli时报错如下:

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected exception.
This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)

Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.

at
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)

at
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.DeserializationSchemaFactory' in

the classpath.


Reason: Required context properties mismatch.


The following properties are requested:

connector.properties.bootstrap.servers=henghe66:9092

connector.properties.group.id=flink-analysis

connector.property-version=1

connector.startup-mode=latest-offset

connector.topic=ods.userAnalysis.user_profile

connector.type=kafka

connector.version=universal

format.debezium-avro-confluent.schema-registry.url=
http://192.168.101.43:8081

format.property-version=1

format.type=debezium-avro-confluent

schema.0.data-type=VARCHAR(2147483647)

schema.0.name=userId

schema.1.data-type=VARCHAR(2147483647)

schema.1.name=province

schema.2.data-type=VARCHAR(2147483647)

schema.2.name=city

schema.3.data-type=INT

schema.3.name=age

schema.4.data-type=VARCHAR(2147483647)

schema.4.name=education

schema.5.data-type=VARCHAR(2147483647)

schema.5.name=jobType

schema.6.data-type=VARCHAR(2147483647)

schema.6.name=marriage

schema.7.data-type=VARCHAR(2147483647)

schema.7.name=sex

schema.8.data-type=VARCHAR(2147483647)

schema.8.name=interest


The following factories have been considered:

org.apache.flink.formats.avro.AvroRowFormatFactory

org.apache.flink.formats.csv.CsvRowFormatFactory

org.apache.flink.formats.json.JsonRowFormatFactory

at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)

at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)

at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)

at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)

at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:289)

at
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:171)

at
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:61)

at
org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:63)

at
org.apache.flink.table.factories.TableSourceFactory.createTableSource(TableSourceFactory.java:74)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:391)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$7(ExecutionContext.java:646)

at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:644)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:185)

at
org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:138)

at

退订

2020-11-24 文章 刘超



Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 zilong xiao
这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢?

Benchao Li  于2020年11月24日周二 下午4:33写道:

> 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。
>
> zilong xiao  于2020年11月24日周二 下午4:13写道:
>
> > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
> > https://github.com/yangyichao-mango/flink-protobuf
> >
> > Benchao Li  于2020年11月24日周二 下午3:43写道:
> >
> > > 看起来你的DDL写的没有什么问题。
> > >
> > > 你用的是哪个Flink版本呢?
> > > 此外就是可以发下更完整的异常栈么?
> > >
> > > zilong xiao  于2020年11月24日周二 下午2:54写道:
> > >
> > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
> > > >
> > > > Benchao Li  于2020年11月24日周二 下午2:49写道:
> > > >
> > > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> > > > >
> > > > > zilong xiao  于2020年11月24日周二 上午10:49写道:
> > > > >
> > > > > > [image: image.png]
> > > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 Benchao Li
看起来这个format是用的自动推导schema,而不是用的DDL写的schema。

zilong xiao  于2020年11月24日周二 下午4:13写道:

> 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
> https://github.com/yangyichao-mango/flink-protobuf
>
> Benchao Li  于2020年11月24日周二 下午3:43写道:
>
> > 看起来你的DDL写的没有什么问题。
> >
> > 你用的是哪个Flink版本呢?
> > 此外就是可以发下更完整的异常栈么?
> >
> > zilong xiao  于2020年11月24日周二 下午2:54写道:
> >
> > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
> > >
> > > Benchao Li  于2020年11月24日周二 下午2:49写道:
> > >
> > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> > > >
> > > > zilong xiao  于2020年11月24日周二 上午10:49写道:
> > > >
> > > > > [image: image.png]
> > > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re:flink on native k8s deploy issue

2020-11-24 文章 吴松
不好意思,这个报错应该是内存的问题。 我想说的是一下的报错。






2020-11-24 16:19:33,569 ERROR 
org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient [] - A Kubernetes 
exception occurred.
java.net.UnknownHostException: tuiwen-flink-rest.flink: Name or service not 
known
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) 
~[?:1.8.0_252]
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) 
~[?:1.8.0_252]
at 
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) 
~[?:1.8.0_252]
at java.net.InetAddress.getAllByName0(InetAddress.java:1277) 
~[?:1.8.0_252]
at java.net.InetAddress.getAllByName(InetAddress.java:1193) 
~[?:1.8.0_252]
at java.net.InetAddress.getAllByName(InetAddress.java:1127) 
~[?:1.8.0_252]
at java.net.InetAddress.getByName(InetAddress.java:1077) ~[?:1.8.0_252]
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:193)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:113)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:142)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:109)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:188)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 [flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:188)
 [flink-dist_2.12-1.11.2.jar:1.11.2]
2020-11-24 16:19:33,606 ERROR 
org.apache.flink.kubernetes.cli.KubernetesSessionCli
[] - Error while running the Flink session.
java.lang.RuntimeException: 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not create 
the RestClusterClient.
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:117)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:142)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:109)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:188)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:188)
 [flink-dist_2.12-1.11.2.jar:1.11.2]
Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: Could 
not create the RestClusterClient.
... 6 more
Caused by: java.net.UnknownHostException: tuiwen-flink-rest.flink: Name or 
service not known
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) 
~[?:1.8.0_252]
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929) 
~[?:1.8.0_252]
at 
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324) 
~[?:1.8.0_252]
at java.net.InetAddress.getAllByName0(InetAddress.java:1277) 
~[?:1.8.0_252]
at java.net.InetAddress.getAllByName(InetAddress.java:1193) 
~[?:1.8.0_252]
at java.net.InetAddress.getAllByName(InetAddress.java:1127) 
~[?:1.8.0_252]
at java.net.InetAddress.getByName(InetAddress.java:1077) ~[?:1.8.0_252]
at 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:193)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:113)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
... 5 more



The program finished with the following exception:


java.lang.RuntimeException: 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not create 
the RestClusterClient.
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:117)
at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:142)
at 
org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:109)
at 

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 zilong xiao
用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧,
https://github.com/yangyichao-mango/flink-protobuf

Benchao Li  于2020年11月24日周二 下午3:43写道:

> 看起来你的DDL写的没有什么问题。
>
> 你用的是哪个Flink版本呢?
> 此外就是可以发下更完整的异常栈么?
>
> zilong xiao  于2020年11月24日周二 下午2:54写道:
>
> > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~
> >
> > Benchao Li  于2020年11月24日周二 下午2:49写道:
> >
> > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。
> > >
> > > zilong xiao  于2020年11月24日周二 上午10:49写道:
> > >
> > > > [image: image.png]
> > > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink使用hive udf函数

2020-11-24 文章 Rui Li
Hi,

这是一个已知问题 [1][2],新版本中我们只是简单的把这几个函数在hive module里禁掉了 [3],建议先用flink的函数来绕一下。

[1] https://issues.apache.org/jira/browse/FLINK-16688
[2] https://issues.apache.org/jira/browse/FLINK-16618
[3] https://issues.apache.org/jira/browse/FLINK-18995

On Tue, Nov 24, 2020 at 11:54 AM 酷酷的浑蛋  wrote:

> Flink-1.11.1,  hive-2.2.0
> 在使用current_timestamp或者current_date函数时会报
> Caused by: java.lang.NullPointerException
> at
> org.apache.hadoop.hive.ql.udf.generic.GenericUDFCurrentTimestamp.initialize(GenericUDFCurrentTimestamp.java:51)
> at
> org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:141)
>
>
>
>

-- 
Best regards!
Rui Li


flink on native k8s deploy issue

2020-11-24 文章 吴松
使用-Dkubernetes.rest-service.exposed.type=ClusterIP 配置是启动的flink报错:


如下:


2020-11-24 15:49:19,796 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: jobmanager.rpc.address, 
0.0.0.0
2020-11-24 15:49:19,800 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: jobmanager.rpc.port, 6123
2020-11-24 15:49:19,801 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: 
jobmanager.memory.process.size, 1600m
2020-11-24 15:49:19,801 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: 
taskmanager.memory.process.size, 1800m
2020-11-24 15:49:19,801 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: 
taskmanager.numberOfTaskSlots, 1
2020-11-24 15:49:19,802 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: parallelism.default, 1
2020-11-24 15:49:19,802 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: high-availability, zookeeper
2020-11-24 15:49:19,803 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: high-availability.cluster-id, 
/tuiwen-flink
2020-11-24 15:49:19,803 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: high-availability.storageDir, 
file:/usr/flink/tuiwen-flink
2020-11-24 15:49:19,804 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: 
high-availability.zookeeper.quorum, 
data-kafka-zookeeper-headless.tuiwen-public:2181
2020-11-24 15:49:19,804 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: state.backend, rocksdb
2020-11-24 15:49:19,805 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: state.checkpoints.dir, 
file:/usr/flink/flink-checkpoints
2020-11-24 15:49:19,805 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: 
state.checkpoints.num-retained, 100
2020-11-24 15:49:19,805 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: state.savepoints.dir, 
file:/usr/flink/flink-savepoints
2020-11-24 15:49:19,806 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: 
jobmanager.execution.failover-strategy, region
2020-11-24 15:49:19,806 INFO 
org.apache.flink.configuration.GlobalConfiguration
 [] - Loading configuration property: web.upload.dir, /usr/flink
2020-11-24 15:49:19,990 INFO 
org.apache.flink.client.deployment.DefaultClusterClientServiceLoader [] - Could 
not load factory due to missing dependencies.
2020-11-24 15:49:22,366 INFO 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived 
from fraction jvm overhead memory (160.000mb (167772162 bytes)) is less than 
its min value 192.000mb (201326592 bytes), min value will be used instead
2020-11-24 15:49:22,399 INFO 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived 
from fraction jvm overhead memory (70.000mb (73400321 bytes)) is less than its 
min value 192.000mb (201326592 bytes), min value will be used instead
2020-11-24 15:49:22,401 INFO 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The derived 
from fraction network memory (25.200mb (26424115 bytes)) is less than its min 
value 64.000mb (67108864 bytes), min value will be used instead
2020-11-24 15:49:22,405 ERROR 
org.apache.flink.kubernetes.cli.KubernetesSessionCli
[] - Error while running the Flink session.
org.apache.flink.configuration.IllegalConfigurationException: Sum of configured 
Framework Heap Memory (128.000mb (134217728 bytes)), Framework Off-Heap Memory 
(128.000mb (134217728 bytes)), Task Off-Heap Memory (0 bytes), Managed Memory 
(100.800mb (105696462 bytes)) and Network Memory (64.000mb (67108864 bytes)) 
exceed configured Total Flink Memory (252.000mb (264241152 bytes)).
at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.deriveFromTotalFlinkMemory(TaskExecutorFlinkMemoryUtils.java:136)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils.deriveFromTotalFlinkMemory(TaskExecutorFlinkMemoryUtils.java:42)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.deriveProcessSpecWithTotalProcessMemory(ProcessMemoryUtils.java:105)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils.memoryProcessSpecFromConfig(ProcessMemoryUtils.java:79)
 ~[flink-dist_2.12-1.11.2.jar:1.11.2]
at