Re: 回复: flink 1.12 Cancel Job内存未释放(问)

2021-01-04 文章 Yun Tang
Hi 徐州州

请查看一下checkpoint UI部分的overview,观察restored部分的是否为空,也就是没有从checkpoint恢复,同样可以观察job 
manager 部分日志,看是否从checkpoint resume。
如果没有从checkpoint/savepoint恢复,作业其实相当于是从头重新跑,除非作业有其他的外部访问,否则不应该有任何历史数据能看到。

祝好
唐云

From: 徐州州 <25977...@qq.com>
Sent: Tuesday, January 5, 2021 10:34
To: user-zh@flink.apache.org 
Subject: 回复: flink 1.12 Cancel Job内存未释放(问)

这是我完整的配置文件,并没有设置任何状态后端,和保存点,任务kill执行的命令是/opt/module/hadoop3.2.1/bin/yarn 
application -kill  jobid,启动命令执行的是,/opt/module/flink1.12/bin/flink run -d -m 
yarn-cluster -yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm 
App_Bs_Drainage_Launch_200105,我猜想会不会是因为队列的问题,我集群中只有一个queue队列。

-- 原始邮件 --
发件人: "user-zh" ;
发送时间: 2021年1月5日(星期二) 上午10:03
收件人: "user-zh@flink.apache.org";
主题: 回复: flink 1.12 Cancel Job内存未释放(问)

这种情况貌似和检查点、保存点还有状态后端有关,可以排查排查,重新启动任务在昨天的基础上累加这个逻辑是正确的(如果配置了检查点、保存点还有状态后端),只是现在昨天你杀死正在执行的job的时候最后保存的状态结果和你实际的结果不一致


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月5日 09:04,徐州州<25977...@qq.com> 写道:
我一个flink-sql任务,每次隔天计算都会在昨天的计算结果上累加,我使用代码jar的方式提交,代码中设置了MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert
 into app_bs_drainage_place
|SELECT
| do.GrouporgName,
| du.Name,
| COUNT(DISTINCT dooi.Code) AS TotalSingular,
|md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND 
dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON 
dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name 
IN ('金装驼奶', '血糖仪')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND 
left(do.GrouporgName, 2) = '引流'
| WHERE dooi.As_Of_Date=current_date and dooi.Status < 60 AND dooi.Status 
< 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




--原始邮件--
发件人: "赵一旦"

FlinkSQL 1.10.0 where条件包含关键字列名的过滤条件不能使用=判断

2021-01-04 文章 Robin Zhang


测试代码如下:

create view sink_test as
select
id
,type
,student_id
,kefu_id
,action_time
,action_user
,distribute_status
,unbind_type
,`comment`
,time_created
,pull_from
from distribute_new_log
where `comment` ='娃娃鱼';

print table sink_test;

当使用关键字列做过滤条件时,能过滤出符合的数据,但是关键字列comment的值输出为unicode码(\u5B9A\u5411\u5206\u914DTMK\u5C0F\u7EC4),并不是中文。正常来说SQL中字符串判断用=是支持的吧。测试下来发现使用`comment`
like '娃娃鱼',输出就没问题,写如结果表数据显示正常,不知道这是不是10版本的sql限制?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


退订

2021-01-04 文章 fuzs
退订

Re: Flink SQL>查询的hive表数据全部为NULL

2021-01-04 文章 Rui Li
Hello,

Flink和Hive版本是什么呢?ORC表的数据是否有过schema evolution?

On Mon, Jan 4, 2021 at 9:19 AM Jacob <17691150...@163.com> wrote:

> Dear All,
>
> Flink SQL>select * from table1;
>
>
> 在Flink客户端查询hive数据,结果每个字段全为NULL,但数据条数是对的,select
> count是正确的,查具体数据就是NULL,不知何故?同样的查询在hive客户端是可以查到数据的
>
>
> hive表时orc文件load的数据。
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


?????? apache flink

2021-01-04 文章 Waldeinsamkeit.
??




----
??: 
   "user-zh"

<573693...@qq.com;
:2021??1??5??(??) 11:19
??:"user-zh"

关于flink rest api的监控接口问题

2021-01-04 文章 赵一旦
当前vertex的结点监控,有个获取全部指标的接口,和基于get参数逗号分割获取指标值的接口。

现在问题是我的采集脚本在获取监控值时候,因为是get导致超长,于是我5个5个的获取,但这导致我每30s一次采集,每次采集上百次请求,耗时达到几十秒。

是否可以搞个post接口;或者在metrics那个获取全部metric指标id的接口中就直接返回全部value呢?


Re: apache flink

2021-01-04 文章 kcz
我的理解,flink是一个任务执行引擎,你需要的功能应该是任务调度器吧,比如airflow等。





-- Original --
From: Waldeinsamkeit. <1214316...@qq.com
Date: Tue,Jan 5,2021 11:13 AM
To: user-zh 

apache flink

2021-01-04 文章 Waldeinsamkeit.
??   
??flinkflink??
   ??

?????? flink 1.12 Cancel Job??????????(??)

2021-01-04 文章 ??????
kill/opt/module/hadoop3.2.1/bin/yarn
 application -kill 
jobid/opt/module/flink1.12/bin/flink run -d -m yarn-cluster 
-yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm 
App_Bs_Drainage_Launch_200105queue??


----
??: 
   "user-zh"



?????? flink 1.12 Cancel Job??????????(??)

2021-01-04 文章 ????
job


| |

|
|
liuha...@163.com
|
??
??2021??1??5?? 09:04<25977...@qq.com> ??
??flink-sqljarMemoryStateBackend??23:57-killjob00:30azkaban00:30??kill??NodeManager??00:301000??900??|insert
 into app_bs_drainage_place
|SELECT
| do.GrouporgName,
| du.Name,
| COUNT(DISTINCT dooi.Code) AS TotalSingular,
|md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND 
dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON 
dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name 
IN ('', '??')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND 
left(do.GrouporgName, 2) = ''
| WHERE dooi.As_Of_Date=current_date and dooi.Status < 60 AND dooi.Status 
< 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




----
??: "??"

Re: pyflink-udaf

2021-01-04 文章 Xingbo Huang
Hi,

我这边没有看到你提供的附件。关于这个报错,我在你上封邮件回复你了,你可以看下是不是你的weighted_avg
没有进行注册(可以通过create_temporary_system_function或者register_function来注册,这样就可以通过字符串的方式进行使用)。当然你要是直接使用DSL的方式(文档中的例子),是不用你注册的。

Best,
Xingbo

hepeitan  于2021年1月4日周一 下午8:48写道:

> 您好:
>   我们项目组计划使用pyflink的udaf对数据进行聚合处理,拜读社区文章得知,对于批处理方式或窗口式聚合,建议使用Vectorized
> Aggregate Functions
> 
> 。但此case提供的代码不完全,不是完整的case,
> 自己编写测试程序报错:"org.apache.flink.table.api.ValidationException: Undefined
> function: weighted_avg",附件为测试代码
>   请问能否提供一个完整的pyflink的udaf对数据进行聚合处理示例,多谢!!!
>
>
>
>


Re: flink 1.12 Cancel Job内存未释放(问)

2021-01-04 文章 赵一旦
具体SQL。其实我没特别明白你表达的问题。
什么叫做释放内存,还有在之前的结果上累加。这2是什么跟什么没啥关系的东西,没听懂你表达啥。
前者是内存,后者反映的状态。如果是基于检查点/保存点重启任务,当然会保留状态,就是继续累加。

徐州州 <25977...@qq.com> 于2021年1月4日周一 上午8:45写道:

> 即使我切换了,yarn-cluster模式,我23:50,通过/opt/module/hadoop3.2.1/bin/yarn
> application -kill
> application_1609656886263_0043,kill掉job,第二天1:30重启,结果还是在昨天的结果上累加的,执行的kill-job好像并不能释放state,这个真的一点办法都没有了吗?
>
>
>
> --原始邮件--
> 发件人: "赵一旦" 发送时间: 2020年12月29日(星期二) 晚上9:35
> 收件人: "user-zh" 主题: Re: flink 1.12 Cancel Job内存未释放(问)
>
>
>
> 不可以吧。任务是任务。taskManager是taskManager。 taskManager是提前启动好的一个进程,任务提交的时候会由
> taskManager 帮你执行。cancel后taskManager继续它自己的事情(比如等新的任务)。
> 或者考虑yarn方式,per-job模式啥的。
>
> 徐州州 <25977...@qq.com 于2020年12月29日周二 上午9:00写道:
>
>  请教一下,我flink
> 
> sql任务Cancel之后,隔一个小时后重启,还是接着Cancel的点进行累加计算的。我在IDEA中开发,代码中没有设置任何Checkpoints,请问我该如何在任务Cancel的时候同时释放掉job所使用的TaskManager内存?


Re: 如何优雅的开发Flink SQL作业

2021-01-04 文章 赵一旦
1 kafka table和group id是啥意思。group id随意写一个就可以了。
2 本身就可以复用。
3 听不懂表达啥。


HideOnBushKi <1405977...@qq.com> 于2021年1月4日周一 下午3:43写道:

> 大佬们好,现在业务需求多,生产作业开始变得繁重,请教3个生产业务场景,主要是想看下各位有什么优雅的思路
>
> 1.kakfa table和group_ID应该怎么去维护呢?一个业务一张表吗?
> 2.如何做到复用表的效果?
> 3.如果新增一个业务需求,用到之前的一张kafka table,似乎要在一个程序里。执行多次 executeSql("sql
> 1")才不会乱序,但是这样的话,程序的耦合度会很高,请问该怎么优雅的处理这些场景呢?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink on k8s 1.11.3版本,使用 hdfs 的任务, taskmanager 无法启动的问题

2021-01-04 文章 Yang Wang
1.11版本以后可以直接在Flink Client的机器上export HADOOP_CONF_DIR
然后运行flink run-application或者kubernetes_session.sh启动Flink任务,这样Flink
Client会自动通过ConfigMap将Hadoop配置ship到JobManager和TaskManager pod
并且加到classpath的

Best,
Yang

龙逸尘  于2021年1月4日周一 下午4:39写道:

> 各位下午好,目前我正在使用 Flink on k8s application-mode 构建构建一个消费 kafka 写入 hive 的
> demo,使用 hdfs 作为 statebackend,遇到了一些问题,希望能得到帮助。这里我描述一下问题与 debug 的过程。
>
> Dockerfile 如下
>
> FROM flink:1.11.3-scala_2.11
> RUN mkdir -p $FLINK_HOME/usrlib
> RUN mkdir -p /opt/hadoop/conf
> COPY flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> $FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> COPY flink-on-k8s-1.0-SNAPSHOT.jar
> $FLINK_HOME/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar
> COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
> ENV HADOOP_CONF_DIR /opt/hadoop/conf
> ENV YARN_CONF_DIR /opt/hadoop/conf
> COPY yarn-site.xml /opt/hadoop/conf/yarn-site.xml
> COPY hdfs-site.xml /opt/hadoop/conf/hdfs-site.xml
> COPY core-site.xml /opt/hadoop/conf/core-site.xml
>
> 启动命令如下
>
> flink-1.11.3/bin/flink run-application -p 1 -t kubernetes-application
> -Dkubernetes.cluster-id=my-first-application-cluster-demo7-4
> -Dkubernetes.jobmanager.service-account=flink
> -Dtaskmanager.memory.process.size=1024m   -Dkubernetes.taskmanager.cpu=1
> -Dtaskmanager.numberOfTaskSlots=1
> -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
> %jvmopts% %logging% %class% %args%"
> -Dkubernetes.container.image=flink:demo7-4
> -Dkubernetes.rest-service.exposed.type=NodePort
> local:///opt/flink/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar
>
> flink-on-k8s-1.0-SNAPSHOT.jar 这个 jar 包仅仅是消费 kafka,使用 hdfs 作为 statebackend
> 记录状态。
>
> 一开始尝试只把 yarn-site.xml 等三个文件放在 usrlib 目录下,JobManager 无法启动,报错是
> UnknownHost。参考邮件列表中的信息,设置HADOOP_CONF_DIR之后,JobManager 成功启动且没有报错日志,但是
> TaskManager 一直处于 ContainerCreating 状态,7-8 分钟后 deployment 自动退出。使用 describe
> pod 获取 tm 报错信息如下:
>
> Events:
>   Type Reason   Age   From
>  Message
>    --     
>  ---
>   Normal   Scheduled default-scheduler
> Successfully assigned
> default/my-first-application-cluster-demo7-4-taskmanager-1-1 to
> k8s-node0002
>   Warning  FailedMount  37s (x10 over 4m46s)  kubelet,
> k8s-ci-dcn-bigdata-node0002  MountVolume.SetUp failed for volume
> "hadoop-config-volume" : configmap
> "hadoop-config-my-first-application-cluster-demo7-4" not found
>   Warning  FailedMount  29s (x2 over 2m44s)   kubelet, k8s-node0002  Unable
> to attach or mount volumes: unmounted volumes=[hadoop-config-volume],
> unattached volumes=[hadoop-config-volume flink-config-volume
> default-token-fhkhf]: timed out waiting for the condition
>
> 请问我是否配置有误,还是需要别的配置来启用 hdfs。
> 期待您的回复~
>
> ---
> Best Regards!
>
> Yichen
>


Re: flink on k8s application mode指定运行作业jar包路径问题

2021-01-04 文章 Yang Wang
目前native的方式只能支持local,也就是用户jar需要打到镜像里面,暂时不能支持hdfs或oss

是可以通过init container来下载,目前pod template[1]这个功能还没有支持,你可以跟进进度

[1]. https://issues.apache.org/jira/browse/FLINK-15656

Best,
Yang

陈帅  于2021年1月2日周六 下午8:08写道:

> 官网给的示例命令如下
> ./bin/flink run-application -p 8 -t kubernetes-application \
>
>   -Dkubernetes.cluster-id=flink-k8s-application-cluster \
>
>   -Dtaskmanager.memory.process.size=4096m \
>
>   -Dkubernetes.taskmanager.cpu=2 \
>
>   -Dtaskmanager.numberOfTaskSlots=4 \
>
>   -Dkubernetes.container.image=flink:latest \
>
>   local:///opt/flink/examples/streaming/TopSpeedWindowing.jar
>
>
>
> 这最后一行参数指定了作业jar包路径,请问只支持local模式吗?那这样的话就只能将作业打包进镜像了吧?有没有可能访问外部文件系统,例如hdfs或oss地址?
> 如果当前暂时不支持的话,还有别的workaround办法吗?我听说有一个init container,具体要如何操作呢?


Re: flink on k8s作业监控问题

2021-01-04 文章 Yang Wang
可以通过配置PrometheusPushGateway[1]将Metrics导入到Prometheus,然后对接Grafana进行查看

日志除了sidecar,也可以通过log4j2自定义appender的方式,来直接将Log写到分布式存储(阿里云Oss,ElasticSearch)等

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/metric_reporters.html#prometheuspushgateway-orgapacheflinkmetricsprometheusprometheuspushgatewayreporter

Best,
Yang

陈帅  于2021年1月2日周六 下午7:54写道:

> 请问运行在k8s per
> job上的flink作业要如何正确监控?一方面通过sidecar的方式收集日志,另一方面要怎么收集那些flink作业metrics?
> 这方面有什么资料参考吗?


pyflink-udaf

2021-01-04 文章 hepeitan
您好:

  我们项目组计划使用pyflink的udaf对数据进行聚合处理,拜读社区文章得知,对于批处理方式或窗口式聚合,建议使用Vectorized 
Aggregate Functions。但此case提供的代码不完全,不是完整的case,
自己编写测试程序报错:"org.apache.flink.table.api.ValidationException: Undefined function: 
weighted_avg",附件为测试代码
  请问能否提供一个完整的pyflink的udaf对数据进行聚合处理示例,多谢!!!

Re: Re: flink 1.12.0 kubernetes-session部署问题

2021-01-04 文章 Yang Wang
native方式默认使用的是LoadBalancer的方式来暴露,所以会打印出来一个你无法访问的地址
你可以加一个-Dkubernetes.rest-service.exposed.type=NodePort的方式来使用NodePort来暴露
这样Flink Client端打印出来的地址就是正确的了

另外你可以可以使用minikube ip来查看ip地址,同时用kubectl get svc获取你创建的Flink cluster
svc的NodePort,拼起来就可以


至于你说的NoResourceAvailableException,你可以看下是不是TaskManager的Pod已经创建出来了,但是pending状态
如果是,那就是你minikube资源不够了,可以把minikube资源调大或者把JobManager、TaskManager的Pod资源调小
如果不是,你可以把完整的JobManager日志发一下,这样方便查问题


Best,
Yang

陈帅  于2021年1月2日周六 上午10:43写道:

> 环境:MacBook Pro 单机安装了 minkube v1.15.1 和 kubernetes v1.19.4
> 我在flink v1.11.3发行版下执行如下命令
> kubectl create namespace flink-session-cluster
>
>
> kubectl create serviceaccount flink -n flink-session-cluster
>
>
> kubectl create clusterrolebinding flink-role-binding-flink \
> --clusterrole=edit \ --serviceaccount=flink-session-cluster:flink
>
>
> ./bin/kubernetes-session.sh \ -Dkubernetes.namespace=flink-session-cluster
> \ -Dkubernetes.jobmanager.service-account=flink \
> -Dkubernetes.cluster-id=session001 \
> -Dtaskmanager.memory.process.size=8192m \ -Dkubernetes.taskmanager.cpu=1 \
> -Dtaskmanager.numberOfTaskSlots=4 \
> -Dresourcemanager.taskmanager-timeout=360
>
>
> 屏幕打印的结果显示flink web UI启在了 http://192.168.64.2:8081 而不是类似于
> http://192.168.50.135:31753 这样的5位数端口,是哪里有问题?这里的host ip应该是minikube
> ip吗?我本地浏览器访问不了http://192.168.64.2:8081
>
>
>
> 2021-01-02 10:28:04,177 INFO
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The
> derived from fraction jvm overhead memory (160.000mb (167772162 bytes)) is
> less than its min value 192.000mb (201326592 bytes), min value will be used
> instead
>
> 2021-01-02 10:28:04,907 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
> flink session cluster session001 successfully, JobManager Web Interface:
> http://192.168.64.2:8081
>
>
>
>
> 查看了pods, service, deployment都正常启动好了,显示全绿色的
>
>
> 接下来提交任务
> ./bin/flink run -d \ -e kubernetes-session \
> -Dkubernetes.namespace=flink-session-cluster \
> -Dkubernetes.cluster-id=session001 \ examples/streaming/WindowJoin.jar
>
>
>
> Using windowSize=2000, data rate=3
>
> To customize example, use: WindowJoin [--windowSize
> ] [--rate ]
>
> 2021-01-02 10:21:48,658 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve
> flink cluster session001 successfully, JobManager Web Interface:
> http://10.106.136.236:8081
>
>
>
>
> 这里显示的 http://10.106.136.236:8081 我是能够通过浏览器访问到的,打开显示作业正在运行,而且available
> slots一项显示的是 0,查看JM日志有如下error
>
>
>
>
> Causedby:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Couldnot allocate the required slot within slot request timeout. Please
> make sure that the cluster has enough resources.
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
> ~[flink-dist_2.12-1.11.3.jar:1.11.3]
> ... 47 more
> Causedby: java.util.concurrent.CompletionException:
> java.util.concurrent.TimeoutException
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
> ~[?:1.8.0_275]
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> ~[?:1.8.0_275]
> ... 27 more
> Causedby: java.util.concurrent.TimeoutException
> ... 25 more
>
>
> 为什么会报这个资源配置不足的错?谢谢解答!
>
>
>
>
>
>
>
>
> 在 2020-12-29 09:53:48,"Yang Wang"  写道:
> >ConfigMap不需要提前创建,那个Warning信息可以忽略,是正常的,主要原因是先创建的deployment,再创建的ConfigMap
> >你可以参考社区的文档[1]把Jm的log打到console看一下
> >
> >我怀疑是你没有创建service account导致的[2]
> >
> >[1].
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#log-files
> >[2].
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#rbac
> >
> >Best,
> >Yang
> >
> >陈帅  于2020年12月28日周一 下午5:54写道:
> >
> >> 今天改用官方最新发布的flink镜像版本1.11.3也启不起来
> >> 这是我的命令
> >> ./bin/kubernetes-session.sh \
> >>   -Dkubernetes.cluster-id=rtdp \
> >>   -Dtaskmanager.memory.process.size=4096m \
> >>   -Dkubernetes.taskmanager.cpu=2 \
> >>   -Dtaskmanager.numberOfTaskSlots=4 \
> >>   -Dresourcemanager.taskmanager-timeout=360 \
> >>   -Dkubernetes.container.image=flink:1.11.3-scala_2.12-java8 \
> >>   -Dkubernetes.namespace=rtdp
> >>
> >>
> >>
> >> Events:
> >>
> >>   Type Reason  AgeFrom   Message
> >>
> >>    --        ---
> >>
> >>   Normal   Scheduled   88sdefault-scheduler
> >> Successfully assigned rtdp/rtdp-6d7794d65d-g6mb5 to
> >> cn-shanghai.192.168.16.130
> >>
> >>   Warning  FailedMount 88skubelet
> >> MountVolume.SetUp failed for volume "flink-config-volume" : configmap
> >> "flink-config-rtdp" not found
> >>
> >>   

Re: pyflink-udaf

2021-01-04 文章 Xingbo Huang
Hi,

你好,事例已经提供了UDF的注册和使用,只剩下数据源的读取和输出没有提供(这有单独的部分来讲)。
关于你的报错,因为你没有提供具体咋使用的,只能猜测你没有按照示例使用DSL的方式,而是使用的字符串的方式,但却没有register函数导致报了这个错

Best,
Xingbo

消息室  于2021年1月4日周一 下午8:10写道:

> 您好:  
> 我们项目组计划使用pyflink的udaf对数据进行聚合处理,拜读社区文章得知,对于批处理方式或窗口式聚合,建议使用Vectorized
> Aggregate Functions。但此case提供的代码不完全,不是完整的case,
> 自己编写测试程序报错:"org.apache.flink.table.api.ValidationException: Undefined
> function: weighted_avg"
>请问能否提供一个完整的pyflink的udaf对数据进行聚合处理示例,多谢!!!
>
>
>
>
>   


pyflink-udaf

2021-01-04 文章 ??????
??   
??pyflink??udaf??Vectorized
 Aggregate Functions??casecase??
??"org.apache.flink.table.api.ValidationException: 
Undefined function: weighted_avg"
   
??pyflink??udaf??




  

Re:Re: pyflink-1.12.0 stream api任务执行失败

2021-01-04 文章 ゛无邪
Hi,您好,非常感谢您的回复!



我刚刚检查了下pyflink的依赖,发现里面用的apache-flink模块确实是flink-1.11的,替换成1.12后问题就解决了,再次感谢您的帮助,谢谢!!














在 2021-01-04 16:36:22,"Xingbo Huang"  写道:
>Hi,
>
>看报错应该是你集群上使用的pyflink的版本是1.11的(那个报错No logging endpoint
>provided.是1.11才有的)。你可以把版本升级到1.12试试
>
>Best,
>Xingbo
>
>゛无邪 <17379152...@163.com> 于2021年1月4日周一 下午4:28写道:
>
>> Hi,您好!
>> 我们参考Flink官网上提供的Python API中的DataStream API用户指南文档编写了一份python脚本,文档地址:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/python/datastream-api-users-guide/operators.html
>> flink运行方式是 on yarn,通过-py参数指定了脚本,能成功提交到yarn上,但是会遇到如下错误
>> Job has been submitted with JobID ee9e3a89eae69f457b81d1ebf4a45264
>> Traceback (most recent call last):
>>   File "official_example_2blk.py", line 44, in 
>> env.execute("tutorial_job")
>>   File
>> "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
>> line 623, in execute
>>   File
>> "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
>> line 1286, in __call__
>>   File
>> "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
>> line 147, in deco
>>   File
>> "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
>> line 328, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
>> : java.util.concurrent.ExecutionException:
>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>> (JobID: ee9e3a89eae69f457b81d1ebf4a45264)
>> 完整的堆栈报错可以参考附件中,求助!!
>>


回复: FlinkSQL 下推的值类型与字段类型不对应

2021-01-04 文章 automths
非常感谢你的指导,我将按照你的建议,实现一个TypeVisitor来对下推的类型转化成预期的类型。


祝好!
| |
automths
|
|
autom...@163.com
|
在2021年01月4日 16:49,Sebastian Liu 写道:
Hi automths,

RexNode中的Literal type,在calcite convert to relNode的过程中,以col1 > 10为例,
10从calcite parse出来首先是SqlNumericLiteral, 其中类型会是Decimal(prec: 2, scale: 0).
在创建其对应的RelDataType时,如果其值域在Interger.MIN ~ Interger.Max之间,那就是Interger type。
如果不在就是decimal, 这里没有类似Hive的auto cast功能,而是calcite进行了隐式类型转换。
这里具体隐式转换的规则可以参考:
https://calcite.apache.org/docs/reference.html#implicit-type-conversion

对于Function中,参数的类型,Flink也有一套规则进行推导。

select * from shortRow1 where col1 > CAST(10 AS SMALLINT) and col1 <=
CAST(15 AS SMALLINT) 可以保证
在applyPredicates时看到的expression中,literal是预期的type,但不是特别通用,建议在相关connector中实现
一个TypeVisitor, 把literal转成预期的type。

Just my thoughts

automths  于2021年1月4日周一 上午9:36写道:



谢谢你的回答。

但是我的col1,col2就已经是SMALLINT类型的了,我的问题是where条件中值下推过程中是Integer类型的,我希望值也是SMALLINT的。



祝好!
| |
automths
|
|
autom...@163.com
|
在2020年12月31日 18:17,whirly 写道:
Hi.

查询语句中可以使用 cast 内置函数将值强制转换为指定的类型,如 select CAST(A.`col1` AS SMALLINT) as
col1 from table


参考:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#type-conversion-functions

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/types.html#data-types




best 2021.


在 2020-12-31 17:13:20,"automths"  写道:
Hi:
我自定义connect并实现了FilterableTableSource接口,可是发现flink planner 下推的Filter中,
Literal类型与字段类型不匹配。
比如:下面的SQL:
select * from shortRow1 where key in (1, 2, 3) and col1 > 10 and col1 <= 15
其中DDL定义时, key、col1、col1都是SMALLINT类型
在下推的Filter中, GreaterThan中的Literal是Integer类型,这样是合理的吗?或者我的查询语句中要做什么处理?


祝好!
| |
automths
|
|
autom...@163.com
|



--

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Re: FlinkSQL 下推的值类型与字段类型不对应

2021-01-04 文章 Sebastian Liu
Hi automths,

RexNode中的Literal type,在calcite convert to relNode的过程中,以col1 > 10为例,
10从calcite parse出来首先是SqlNumericLiteral, 其中类型会是Decimal(prec: 2, scale: 0).
在创建其对应的RelDataType时,如果其值域在Interger.MIN ~ Interger.Max之间,那就是Interger type。
如果不在就是decimal, 这里没有类似Hive的auto cast功能,而是calcite进行了隐式类型转换。
这里具体隐式转换的规则可以参考:
https://calcite.apache.org/docs/reference.html#implicit-type-conversion

对于Function中,参数的类型,Flink也有一套规则进行推导。

select * from shortRow1 where col1 > CAST(10 AS SMALLINT) and col1 <=
CAST(15 AS SMALLINT) 可以保证
在applyPredicates时看到的expression中,literal是预期的type,但不是特别通用,建议在相关connector中实现
一个TypeVisitor, 把literal转成预期的type。

Just my thoughts

automths  于2021年1月4日周一 上午9:36写道:

>
>
> 谢谢你的回答。
>
> 但是我的col1,col2就已经是SMALLINT类型的了,我的问题是where条件中值下推过程中是Integer类型的,我希望值也是SMALLINT的。
>
>
>
> 祝好!
> | |
> automths
> |
> |
> autom...@163.com
> |
> 在2020年12月31日 18:17,whirly 写道:
> Hi.
>
> 查询语句中可以使用 cast 内置函数将值强制转换为指定的类型,如 select CAST(A.`col1` AS SMALLINT) as
> col1 from table
>
>
> 参考:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#type-conversion-functions
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/types.html#data-types
>
>
>
>
> best 2021.
>
>
> 在 2020-12-31 17:13:20,"automths"  写道:
> Hi:
> 我自定义connect并实现了FilterableTableSource接口,可是发现flink planner 下推的Filter中,
> Literal类型与字段类型不匹配。
> 比如:下面的SQL:
> select * from shortRow1 where key in (1, 2, 3) and col1 > 10 and col1 <= 15
> 其中DDL定义时, key、col1、col1都是SMALLINT类型
> 在下推的Filter中, GreaterThan中的Literal是Integer类型,这样是合理的吗?或者我的查询语句中要做什么处理?
>
>
> 祝好!
> | |
> automths
> |
> |
> autom...@163.com
> |
>
>

-- 

*With kind regards

Sebastian Liu 刘洋
Institute of Computing Technology, Chinese Academy of Science
Mobile\WeChat: +86—15201613655
E-mail: liuyang0...@gmail.com 
QQ: 3239559*


Flink on k8s 1.11.3版本,使用 hdfs 的任务, taskmanager 无法启动的问题

2021-01-04 文章 龙逸尘
各位下午好,目前我正在使用 Flink on k8s application-mode 构建构建一个消费 kafka 写入 hive 的
demo,使用 hdfs 作为 statebackend,遇到了一些问题,希望能得到帮助。这里我描述一下问题与 debug 的过程。

Dockerfile 如下

FROM flink:1.11.3-scala_2.11
RUN mkdir -p $FLINK_HOME/usrlib
RUN mkdir -p /opt/hadoop/conf
COPY flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
$FLINK_HOME/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
COPY flink-on-k8s-1.0-SNAPSHOT.jar
$FLINK_HOME/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar
COPY TopSpeedWindowing.jar $FLINK_HOME/usrlib/TopSpeedWindowing.jar
ENV HADOOP_CONF_DIR /opt/hadoop/conf
ENV YARN_CONF_DIR /opt/hadoop/conf
COPY yarn-site.xml /opt/hadoop/conf/yarn-site.xml
COPY hdfs-site.xml /opt/hadoop/conf/hdfs-site.xml
COPY core-site.xml /opt/hadoop/conf/core-site.xml

启动命令如下

flink-1.11.3/bin/flink run-application -p 1 -t kubernetes-application
-Dkubernetes.cluster-id=my-first-application-cluster-demo7-4
-Dkubernetes.jobmanager.service-account=flink
-Dtaskmanager.memory.process.size=1024m   -Dkubernetes.taskmanager.cpu=1
-Dtaskmanager.numberOfTaskSlots=1
-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem%
%jvmopts% %logging% %class% %args%"
-Dkubernetes.container.image=flink:demo7-4
-Dkubernetes.rest-service.exposed.type=NodePort
local:///opt/flink/usrlib/flink-on-k8s-1.0-SNAPSHOT.jar

flink-on-k8s-1.0-SNAPSHOT.jar 这个 jar 包仅仅是消费 kafka,使用 hdfs 作为 statebackend
记录状态。

一开始尝试只把 yarn-site.xml 等三个文件放在 usrlib 目录下,JobManager 无法启动,报错是
UnknownHost。参考邮件列表中的信息,设置HADOOP_CONF_DIR之后,JobManager 成功启动且没有报错日志,但是
TaskManager 一直处于 ContainerCreating 状态,7-8 分钟后 deployment 自动退出。使用 describe
pod 获取 tm 报错信息如下:

Events:
  Type Reason   Age   From
 Message
   --     
 ---
  Normal   Scheduled default-scheduler
Successfully assigned
default/my-first-application-cluster-demo7-4-taskmanager-1-1 to k8s-node0002
  Warning  FailedMount  37s (x10 over 4m46s)  kubelet,
k8s-ci-dcn-bigdata-node0002  MountVolume.SetUp failed for volume
"hadoop-config-volume" : configmap
"hadoop-config-my-first-application-cluster-demo7-4" not found
  Warning  FailedMount  29s (x2 over 2m44s)   kubelet, k8s-node0002  Unable
to attach or mount volumes: unmounted volumes=[hadoop-config-volume],
unattached volumes=[hadoop-config-volume flink-config-volume
default-token-fhkhf]: timed out waiting for the condition

请问我是否配置有误,还是需要别的配置来启用 hdfs。
期待您的回复~

---
Best Regards!

Yichen


Re: pyflink-1.12.0 stream api任务执行失败

2021-01-04 文章 Xingbo Huang
Hi,

看报错应该是你集群上使用的pyflink的版本是1.11的(那个报错No logging endpoint
provided.是1.11才有的)。你可以把版本升级到1.12试试

Best,
Xingbo

゛无邪 <17379152...@163.com> 于2021年1月4日周一 下午4:28写道:

> Hi,您好!
> 我们参考Flink官网上提供的Python API中的DataStream API用户指南文档编写了一份python脚本,文档地址:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/python/datastream-api-users-guide/operators.html
> flink运行方式是 on yarn,通过-py参数指定了脚本,能成功提交到yarn上,但是会遇到如下错误
> Job has been submitted with JobID ee9e3a89eae69f457b81d1ebf4a45264
> Traceback (most recent call last):
>   File "official_example_2blk.py", line 44, in 
> env.execute("tutorial_job")
>   File
> "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
> line 623, in execute
>   File
> "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>   File
> "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>   File
> "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException:
> org.apache.flink.client.program.ProgramInvocationException: Job failed
> (JobID: ee9e3a89eae69f457b81d1ebf4a45264)
> 完整的堆栈报错可以参考附件中,求助!!
>


pyflink-1.12.0 stream api任务执行失败

2021-01-04 文章 ゛无邪
Hi,您好!我们参考Flink官网上提供的Python API中的DataStream API用户指南文档编写了一份python脚本,文档地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/python/datastream-api-users-guide/operators.htmlflink运行方式是 on yarn,通过-py参数指定了脚本,能成功提交到yarn上,但是会遇到如下错误Job has been submitted with JobID ee9e3a89eae69f457b81d1ebf4a45264Traceback (most recent call last):  File "official_example_2blk.py", line 44, in     env.execute("tutorial_job")  File "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", line 623, in execute  File "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__  File "/usr/local/service/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco  File "/usr/local/service/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_valuepy4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: ee9e3a89eae69f457b81d1ebf4a45264)完整的堆栈报错可以参考附件中,还请帮忙看下具体原因!

flink.log
Description: Binary data


Flink SQL查询ORC表结果全部为NULL

2021-01-04 文章 Jacob
Flink SQL> select * from table1 where dt='1609739880002';



table1是张orc表,有分区(dt是分区),在flink sql客户端查询表的结果全部为NULL,但select
count是可以查出数据条数。找了好几天的原因,实在不知道是什么原因了,求教!!!

Flink SQL> select * from table1 where dt='1609739880002';


 





Flink SQL> select count(*) from `table1` where  dt='1609739880002';


 







-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/