Re: taskmanager.out配置滚动

2020-12-21 文章 李杰
Hi,
这个功能我们之前做过,可以看下这里。
https://issues.apache.org/jira/browse/FLINK-20713

zilong xiao  于2020年12月3日周四 下午7:50写道:

> 想问下社区的大佬,标准输出文件taskmanager.out可以配置成滚动的吗?
>


flink 1.11.2 创建hive表的问题

2020-12-21 文章 曹武
大佬好,我在使用create table if not
exists创建hive表时,对于已存在的hive表,在hive的日志中会抛出AlreadyExistsException(message:Table
bm_tsk_001 already exists异常,查看源码发现if not
exists貌似只是用于判断捕获异常后是否抛出,对于这个问题有建议的解决方案嘛?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 1.11.2 创建hive表的问题

2020-12-21 文章 曹武
大佬好,我在使用create table if not
exists创建hive表时,对于已存在的hive表,在hive的日志中会抛出AlreadyExistsException(message:Table
bm_tsk_001 already exists异常,查看源码发现if not
exists貌似只是用于判断捕获异常后是否抛出,对于这个问题有建议的解决方案嘛?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

?????? flink-shaded-hadoop-2-uber????????????

2020-12-21 文章 liujian
Thanks,flink-confhistory 
server,??hdfs??,??web ui??,




----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html

 Best,
 Yang

 liujian <13597820...@qq.comgt; ??2020??12??21?? 1:35??

 gt; 
??history-server,,,??,
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 <
 gt; danrtsey...@gmail.comamp;gt;;
 gt; :amp;nbsp;2020??12??21??(??) 10:15
 gt; 
??:amp;nbsp;"user-zh"https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
 gt
 


flink1.10 广播流更新卡住

2020-12-21 文章 洪雪芬
Hi!
在使用flink广播流实现配置定时更新的过程中,出现下游算子并行度大于1时,下游算子获取更新到的广播流卡住的情况,即广播流算子持续发送数据,但下游算子只接收到前一小部分数据,然后就没有接收到新数据的情况,但无报错日志。
但该问题在本地IDEA运行时无法复现,提交到集群上以yarn-cluster模式运行时则会出现。
大家有没有遇到过类似的情况?是什么原因导致这样的问题,有什么解决方案吗?


java.lang.IllegalStateException: Trying to access closed classloader.

2020-12-21 文章 jy l
Hi:
我在Idea里面运行我的flink程序,报了如下异常:
Exception in thread "Thread-22" java.lang.IllegalStateException: Trying to
access closed classloader. Please check if you store classloaders directly
or indirectly in static fields. If the stacktrace suggests that the leak
occurs in a third party library and cannot be fixed immediately, you can
disable this check with the configuration
'classloader.check-leaked-classloader'.
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:161)
at
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:179)
at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2780)
at
org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3036)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2995)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2968)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2848)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1200)
at
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1812)
at
org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1789)
at
org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
at
org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
at
org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
at
org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)

是什么原因导致这样的异常,出现这样的异常,我该怎么处理?

望知道的人告知一下,感谢。


pyflink1.12 进行多表关联后的结果类型是TableResult,如何转为Table类型

2020-12-21 文章 肖越
通过sql进行左连接查询,sql语句为:
sql = ''' Insert into print_sink select a.id, a.pf_id, b.symbol_id from  a \
 left join b on b.day_id = a.biz_date where a.ccy_type = 'AC' and \
 a.pf_id = '1030100122' and b.symbol_id = '2030004042' and a.biz_date 
between '20160701' and '20170307' '''


table_result = env.execute_sql(sql)
通过env.execute_sql()执行后的结果是 TableResult , 如何转成Table类型?
或者有哪些其他的方式,可以直接执行表的连接操作,返回结果是Table类型?



Re: yarn application模式提交任务失败

2020-12-21 文章 Yang Wang
silence的回答是对的
如果用-t参数,搭配的都是-D来引导的,不需要prefix,文档里面也是[1]
这个和之前-m yarn-cluster是不一样的,以前的方式需要-yD来引导

[1].
https://ci.apache.org/projects/flink/flink-docs-master/deployment/resource-providers/yarn.html#application-mode

Best,
Yang

silence  于2020年12月21日周一 上午10:53写道:

> 应该是-D不是-yD
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink1.11.1版本Application Mode job on K8S集群,too old resource version问题

2020-12-21 文章 Yang Wang
我之前在另一个邮件里面回复过,我再拷贝过来。

目前我已经建了一个JIRA来跟进too old resource version的问题[1]

在Flink里面采用了Watcher来监控Pod的状态变化,当Watcher被异常close的时候就会触发fatal
error进而导致JobManager的重启

我这边做过一些具体的测试,在minikube、自建的K8s集群、阿里云ACK集群,稳定运行一周以上都是正常的。这个问题复现是通过重启
K8s的APIServer来做到的。所以我怀疑你那边Pod和APIServer之间的网络是不是不稳定,从而导致这个问题经常出现。


[1]. https://issues.apache.org/jira/browse/FLINK-20417

Best,
Yang

lichunguang  于2020年12月21日周一 下午3:51写道:

> Flink1.11.1版本job以Application Mode在K8S集群上运行,jobmanager每个小时会重启一次,报错【Fatal
> error
> occurred in
> ResourceManager.io.fabric8.kubernetes.client.KubernetesClientException: too
> old resource version】
>
> pod重启:
> 
>
> 重启原因:
> 2020-12-10 07:21:19,290 ERROR
> org.apache.flink.kubernetes.KubernetesResourceManager[] - Fatal
> error occurred in ResourceManager.
> io.fabric8.kubernetes.client.KubernetesClientException: too old resource
> version: 247468999 (248117930)
>   at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_202]
>   at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_202]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
> 2020-12-10 07:21:19,291 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
> io.fabric8.kubernetes.client.KubernetesClientException: too old resource
> version: 247468999 (248117930)
>   at
>
> io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager$1.onMessage(WatchConnectionManager.java:259)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.onReadMessage(RealWebSocket.java:323)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.readMessageFrame(WebSocketReader.java:219)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .WebSocketReader.processNextFrame(WebSocketReader.java:105)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket.loopReader(RealWebSocket.java:274)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.ws
> .RealWebSocket$2.onResponse(RealWebSocket.java:214)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall$AsyncCall.execute(RealCall.java:206)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> org.apache.flink.kubernetes.shaded.okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>   at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_202]
>   at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_202]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
>
>
> 网上查的原因是因为:
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient类中212行
>
> @Override
> public KubernetesWatch watchPodsAndDoCallback(Map labels,
> PodCallbackHandler podCallbackHandler) {
> return new KubernetesWatch(
> this.internalClient.pods()
> .withLabels(labels)
> .watch(new
> KubernetesPodsWatcher(podCallbackHandler)));
> }
>
> 而ETCD中只会保留一段时间的version信息
> 【 I think it's standard behavior of Kubernetes to give 410 after some time
> during watch. It's usually client's responsibility to handle it. In the
> context of a watch, it will return HTTP_GONE when you ask to see changes
> for
> a resourceVersion that is too old - i.e. when it can no longer tell you
> what
> 

Re: flink-shaded-hadoop-2-uber版本如何选择

2020-12-21 文章 Yang Wang
history-server和native
k8s没有关系的,如果你想使用,就需要用一个deployment单独部署history-server在K8s集群内

native k8s覆盖的场景是Flink任务如何原生地提交到K8s集群内

Best,
yang

liujian <13597820...@qq.com> 于2020年12月21日周一 下午8:16写道:

> Thanks, 使用你下面的docker方式我测试确实可以,但是不知道Native K8s如何来操作,可以详细说一下
> 我现在是Dockerfile如下两种情况都试过
>
>
> COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> ENTRYPOINT ["/docker-entrypoint.sh"]
> EXPOSE 6123 8081 8082
> CMD ["help","history-server"]
>
> 
> COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> ENTRYPOINT ["/docker-entrypoint.sh","history-server"]
> EXPOSE 6123 8081 8082
> CMD ["help"]
>
>
>
> 这两种都尝试过...,请帮忙指教一下
>
>
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> danrtsey...@gmail.com;
> 发送时间:2020年12月21日(星期一) 下午3:08
> 收件人:"user-zh"
> 主题:Re: flink-shaded-hadoop-2-uber版本如何选择
>
>
>
> 是的,理解没有错,history-server会启动后listen一个端口
>
> 我这边尝试是没有问题的,你可以通过如下命令启动
> docker run -p 8082:8082 --env
> FLINK_PROPERTIES="historyserver.archive.fs.dir: file:///tmp/flink-jobs"
> flink:latest history-server
>
> 更多配置你参考如下文档
>
> https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html
>
> Best,
> Yang
>
> liujian <13597820...@qq.com 于2020年12月21日周一 下午1:35写道:
>
>  我理解的是启动一个history-server,会有一个进程,然后会暴露指定的端口,但是我好像并没有看到这样的效果,是我的理解有错吗
> 
> 
> 
> 
>  --nbsp;原始邮件nbsp;--
>  发件人:
> 
> "user-zh"
> 
> <
>  danrtsey...@gmail.comgt;;
>  发送时间:nbsp;2020年12月21日(星期一) 上午10:15
>  收件人:nbsp;"user-zh" 
>  主题:nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
> 
> 
> 
> 
> 
> 你不需要修改CMD,entrypoint默认是docker-entrypoint.sh[1],是支持history-server的,只要传一个history-server的参数就可以了
> 
>  [1].
> 
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
> 
> 
> ;
>  Best,
>  Yang
> 
> 
>  liujian <13597820...@qq.comgt; 于2020年12月20日周日 下午12:45写道:
> 
>  gt; Thanks,amp;nbsp;
>  gt; amp;nbsp; amp;nbsp;
> amp;nbsp;但是我需要访问historyServer,那么应该需要如何操作我将flink
>  gt; 1.12.0的Dockerfile 修改成CMD ["history-server"]amp;nbsp;
>  并暴露8082端口,但是好像达不到这个效果
>  gt;
>  gt;
>  gt;
>  gt;
>  gt;
> --amp;nbsp;原始邮件amp;nbsp;--
>  gt; 发件人:
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  "user-zh"
> 
> gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
>  <
>  gt; danrtsey...@gmail.comamp;gt;;
>  gt; 发送时间:amp;nbsp;2020年12月19日(星期六) 晚上9:35
>  gt; 收件人:amp;nbsp;"user-zh" amp;gt;;
>  gt;
>  gt; 主题:amp;nbsp;Re: flink-shaded-hadoop-2-uber版本如何选择
>  gt;
>  gt;
>  gt;
>  gt; 你只需要在Flink Client端设置HADOOP_CONF_DIR的环境就可以了
>  gt; Flink
>  gt;
>  gt;
> 
> Client会自动把hdfs-site.xml、core-site.xml文件通过创建一个单独ConfigMap,然后挂载给JobManager和TaskManager的
>  gt;
> 
> 同时这两个配置也会自动加载到classpath下,只需要lib下放了flink-shaded-hadoop,就不需要做其他事情,可以直接访问hdfs的
>  gt;
>  gt;
>  gt; Best,
>  gt; Yang
>  gt;
>  gt; liujian <13597820...@qq.comamp;gt; 于2020年12月19日周六
> 下午8:29写道:
>  gt;
>  gt; amp;gt;
>  gt; amp;gt;
>  gt;
> 
> HDFS是Ha模式,需要指定hdfs-site.xml,这该怎么处理,使用configMap还是将hdfs-site.xml放入到$FLINK_HOME/conf目录下
>  gt; amp;gt;
>  gt; amp;gt;
>  gt; amp;gt;
> 
> --amp;amp;nbsp;原始邮件amp;amp;nbsp;--
>  gt; amp;gt; 发件人:
>  gt;
> 
> amp;gt;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;amp;nbsp;
>  gt; "user-zh"
>  gt;
> 
> 

Re: Native Kubernetes 需要访问HDFS

2020-12-21 文章 Yang Wang
对,是的,自动ship hadoop配置是从1.11开始支持的

在1.10的版本你需要把配置打到镜像里面

Best,
Yang

Akisaya  于2020年12月21日周一 下午5:02写道:

> 1.10 版本好像还没有支持,看了下 1.10 代码里创建 cm 的时候没有去读取 hadoop 配置
>
> Yang Wang  于2020年12月19日周六 上午12:18写道:
>
> > 你可以在Flink client端设置HADOOP_CONF_DIR环境变量即可,这样会自动ship
> > hadoop的配置并且挂载给JobManager和TaskManager的
> >
> > Best,
> > Yang
> >
> > liujian <13597820...@qq.com> 于2020年12月18日周五 下午5:26写道:
> >
> > > Hi:
> > >  在使用Native Kubernetes
> > > 需要访问HDFS,已经将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib目录
> > >  但是hdfs是HA,那么就需要hdfs-site.xml等文件了,那么是如何指定这个文件呢 
> >
>


Re: Application Mode job on K8S集群,无法缩容问题

2020-12-21 文章 Yang Wang
是的,如果CA不能直接释放Pod的话,那是会导致它没有办法被驱逐

Flink TaskManager Pod的生命周期都是交给JobManager管理的,并且不会重启,挂了之后就会申请新的
和CA结合起来,是会有你所说的限制。不过如果能够带上annotation标识pod可以被驱逐的话,不清楚CA是否可以work

Best,
Yang

lichunguang  于2020年12月21日周一 下午4:16写道:

> Yang Wang你好:
> 我想表达的意思是:
>   Native Flink on K8s采用单Pod的申请资源的方式,和K8s自动伸缩机制有些冲突。
>
> 原因:
> 比如job比较多时,各node负载都比较高;而剩余job比较少时,每个node只有少量pod,但因为【Pods that are not backed
> by a controller object】,没法驱逐资源利用率最低的node,导致整体利用率较低。
>
> What types of pods can prevent CA from removing a node?
>
> https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/FAQ.md#what-types-of-pods-can-prevent-ca-from-removing-a-node
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.11.2写hive分区表,hive识别不到分区

2020-12-21 文章 Rui Li
具体是怎么写hive的呢?

On Mon, Dec 21, 2020 at 11:28 PM 赵一旦  wrote:

> 即使不是flink写入,其他方式写入也需要这样做的哈。
>
> r pp  于2020年12月21日周一 下午9:28写道:
>
> > 程序中,创建表后,执行命令。
> >
> > kingdomad  于2020年12月21日周一 下午4:55写道:
> >
> > >
> >
> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
> > > 需要执行msck repair table修复分区表后,hive才能读取到数据。
> > > 求助大佬,要如何解决。
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > kingdomad
> > >
> > >
> >
>


-- 
Best regards!
Rui Li


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 xiao cai
Hi
可以考虑使用yarn的node label特性,将flink的任务提交到特定的node上


 Original Message 
Sender: r pp
Recipient: user-zh
Date: Monday, Dec 21, 2020 21:25
Subject: Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点


嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大? 
 于2020年12月21日周一 下午5:48写道: > 通过yarn label可以实现 > > 
-邮件原件- > 发件人: user-zh-return-10095-afweijian=163@flink.apache.org > 
 代表 yujianbo > 发送时间: 
2020年12月21日 16:44 > 收件人: user-zh@flink.apache.org > 主题: Flink on yarn 
如何指定固定几台yarn节点当做flink任务的运行节点 > > 各位大佬好: > 请问Flink on yarn 
如何指定固定几台yarn节点当做flink任务的运行节点? > > > > -- > Sent from: 
http://apache-flink.147419.n8.nabble.com/ >

Re: 请教关于KeyedState的恢复机制

2020-12-21 文章 赵一旦
目前来说,按照我讲的方式去实现应该不难。我怕的是flink在恢复keyedState的时候,无法适应我的这种partition机制。

现有的机制,restore的时候实际是 keyGroup 到window并行实例之间的一个重分配。

换成我的partition机制后,能否还正常restore呢?

赵一旦  于2020年12月22日周二 上午12:03写道:

> 如题,目前对于OperatorState来说,API层面有2个接口,即CheckpointedFunction和ListCheckpointed
> 。即UDF中有入口对restore做自定义。
>
> 问题(1)KeyedState的恢复则相对黑盒。想知道相关实现在哪。
>
> 引申问题(2),我的原始目的为。我期望实现
> keyBy(...).timwWindow(x).xxx()这种统计。在保留keyBy的keySelector机制前提下(即window算子部分仍然会按照key分窗口统计),通过重写部分flink的api层代码方式,强制去除keyBy中加入的
> KeyGroupStreamPartitioner
> ,换成使用可传入的自定义Partitioner。目的呢是希望解决“数据倾斜”,但我不想通过双层keyBy解决,因为本身key数量很少(假设100),即使是双层,那么第一层需要将key起码扩大1000倍我感觉才能足够均衡。如果能仅仅扩大比如30倍(这个倍数可以考虑和下游window算子并发一致),然后在partition中实现类似rebalance的分发机制。
> 当然,更高级的可能还可以做智能的,比如部分key扩大,部分key不扩大。
>
>
> 描述比较乱,换言之,我就直接非KeyedStream情况下,使用dataStream.flatMap,然后flatMap中使用MapState统计。类似这种效果。当然我还是希望通过改造window实现,因为window部分还有watermark以及分窗机制,flatMap需要自己实现分窗。
>
>


Re: flink1.11.2写hive分区表,hive识别不到分区

2020-12-21 文章 赵一旦
即使不是flink写入,其他方式写入也需要这样做的哈。

r pp  于2020年12月21日周一 下午9:28写道:

> 程序中,创建表后,执行命令。
>
> kingdomad  于2020年12月21日周一 下午4:55写道:
>
> >
> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
> > 需要执行msck repair table修复分区表后,hive才能读取到数据。
> > 求助大佬,要如何解决。
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > kingdomad
> >
> >
>


Re: Re: 如何通过现实时间控制事件时间的窗口

2020-12-21 文章 赵一旦
大概懂了,但还不够清晰。
因为tumble window中,如果是5s窗口,则是按照0-5,5-10,10-15这样的。不是基于第一条数据的。

如果你要按照第一条数据到达开始开窗,那就不要使用flink的window机制。
直接基于process function提供的底层功能,自己通过timeservice实现。
再或者如果需要使用window,则使用global
window,自己定义window的trigger触发机制即可(比如进来第一条数据,就设置定时器,定时器到了则触发窗口计算然后清理窗口状态)。

guoliubi...@foxmail.com  于2020年12月21日周一 上午9:15写道:

> 因为表格样式被吃掉了,所以看不清,用图片说明下。
> https://i.bmp.ovh/imgs/2020/12/78d7dee70d88ebc9.png
>
> 定义了3秒的滚动窗口
> 第一条消息的eventTime是9:00:01,是在系统实际时间9:00:01收到的。
> 第二条消息的eventTime是9:00:02,但是是在系统实际时间9:00:11分收到的。
>
> 想要达成的目标是在系统时间9:00:05时把这一窗口关闭掉进行运算,忽略迟到的第二条消息,更不必要等到第3条消息触发到下一个窗口的时间了再关闭这个窗口。
> 找了下用ProcessingTimeoutTrigger可以达到目的,不过不知道有没有更详细的文档说明trigger怎么用的。
>
>
> guoliubi...@foxmail.com
>
> 发件人: 赵一旦
> 发送时间: 2020-12-20 23:30
> 收件人: user-zh
> 主题: Re: 如何通过现实时间控制事件时间的窗口
> 描述比较乱,看不懂。
>
> guoliubi...@foxmail.com  于2020年12月17日周四 下午2:16写道:
>
> > Hi,
> > 我们现在以eventTime作为时间标准,每3秒做一次TumbleWindow,数据假设如下
> >  系统时间
> >  与上一条间隔
> >  事件时间
> >  与上一条间隔
> >  9:00:01
> >
> >  9:00:01
> >
> >  9:00:11
> >  10s
> >  9:00:02
> >  1s
> >  9:00:12
> >  1s
> >  9:00:12
> >  10s
> > 从事件时间上看,第一条和第二条数据是归集到同一窗口的。
> > 不过现在的处理需要在窗口有第一条数据后系统时间超时4s以上就强制关闭该窗口,即在系统时间9:00:05时关闭窗口,忽略掉第二条数据。
> > 请问这种情况需要怎么生成watermark?
> > 使用过
> > WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(4L))
> > 或者
> >
> >
> WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofSeconds(4L))
> > 结果都把第一条和第二条数据归集到同一个窗口中了,
> > 都没有达到预想的结果。
> > 要如何设置才能在窗口中仅有一条数据而忽略第二条数据?
> >
> >
> > guoliubi...@foxmail.com
> >
>


Re: 请教一下flink1.12可以指定时间清除state吗?

2020-12-21 文章 赵一旦
窗口不会重复?重叠?是否重叠取决于你使用什么窗口。tumble窗口是不重叠的。

三色堇 <25977...@qq.com> 于2020年12月21日周一 上午8:47写道:

> 大佬,按天开窗滑动窗口会重复吗?滚动好像不行。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> hinobl...@gmail.com;
> 发送时间:2020年12月18日(星期五) 晚上9:50
> 收件人:"user-zh"
> 主题:Re: 请教一下flink1.12可以指定时间清除state吗?
>
>
>
> 你这个直接按照天分窗就可以呀。
>
> 三色堇 <25977...@qq.com 于2020年12月18日周五 下午3:20写道:
>
> 
> Hi,社区的各位大家好:nbsp;请教一下,我目前生产上使用的flink1.12,根据公司需求,统计每天的日报,每天出一组结果。已经做了group
>  by current_date,userIdnbsp; 过程中我发现隔天的flink
> state未清理,还是在前一天的结果上累加,自己也测试了
>  1、Stream的TTLnbsp;
> 
> 2、tabEnv.getConfig().setIdleStateRetention(Duration.ofDays(1))这两种方式并不能满足我的需求,请教一下有其他方法可以实现这种日报需求吗?


Re: 执行mvn构建错误 编译flink1.9遇到了相同的问题 请问解决了吗?我编译最新代码没这个问题

2020-12-21 文章 r pp
编译问题,大多包没下好,多来几次

mvn clean install -DskipTests -Drat.skip=true

亲测有效


shaoshuai <762290...@qq.com> 于2020年12月21日周一 下午4:53写道:

> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile
> (default-testCompile) on project flink-parquet_2.11: Compilation failure:
> Compilation failure:
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[390,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[416,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[418,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[458,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[461,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[533,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[559,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[561,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[576,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[579,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[318,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[344,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[346,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[367,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[370,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[317,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[343,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[345,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[354,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[357,51]
> 找不到符号
> [ERROR]   符号:   方法 readFieldOrderIfDiff()
> [ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[246,31]
> 找不到符号
> [ERROR]   符号:   类 AvroMissingFieldException
> [ERROR]   位置: 程序包 org.apache.avro
> [ERROR]
>
> /Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[272,3]
> 方法不会覆盖或实现超类型的方法
> [ERROR]
>
> 

Re: flink1.11.2写hive分区表,hive识别不到分区

2020-12-21 文章 r pp
程序中,创建表后,执行命令。

kingdomad  于2020年12月21日周一 下午4:55写道:

> flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
> 需要执行msck repair table修复分区表后,hive才能读取到数据。
> 求助大佬,要如何解决。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> --
>
> kingdomad
>
>


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 r pp
嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大?

 于2020年12月21日周一 下午5:48写道:

> 通过yarn label可以实现
>
> -邮件原件-
> 发件人: user-zh-return-10095-afweijian=163@flink.apache.org
>  代表 yujianbo
> 发送时间: 2020年12月21日 16:44
> 收件人: user-zh@flink.apache.org
> 主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
>
> 各位大佬好:
>  请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: SQL执行模式

2020-12-21 文章 r pp
sql 的本质其实是 让用户不用关心 是流处理 还是 批处理,比如 ,计算  当天某个视频的点击总数。是一个累加结果,可以实时查询出变化。
但flink 不是一个存储系统,就会存在一个问题,使用sql 状态值 怎么办?
官博 都有说明,也说了哪些算子背后 适用于 Streaming or Batch or both。以及存在的使用注意事项
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/


jiangjiguang719  于2020年12月21日周一 下午7:44写道:

> flink1.12版本中,streamAPI 通过 -Dexecution.runtime-mode 指定是批还是流
> 的执行模式,那么在SQL中如何指定呢


Re: 请教个Flink sql问题

2020-12-21 文章 占英华
学习了,感谢回复!

> 在 2020年12月21日,20:39,hailongwang <18868816...@163.com> 写道:
> 
> 
> 
> 
> 不是的。在提交运行之后,如果那两个 insert 是从同一张表 select 出来的话,是会分流发送到 table1 和 table2,并没有先后顺序。
>> 在 2020-12-21 10:45:25,"占英华"  写道:
>> 这样是不是第一条select和第二条的select出来的结果会有差异,因为执行第一条有耗时,第二条执行时查询的结果是在耗时后查询得到的
>> 
 在 2020年12月21日,11:14,hailongwang <18868816...@163.com> 写道:
>>> 
>>> 
>>> 
>>> 可以的,比如将结果写入table1,table2 ……
>>> Insert into table1 ……;
>>> Insert into table2 ……;
>>> 
>>> 
>>> 
>>> Best,
>>> Hailong
 在 2020-12-19 08:30:23,"占英华"  写道:
 Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?


Re:Re: 请教个Flink sql问题

2020-12-21 文章 hailongwang



不是的。在提交运行之后,如果那两个 insert 是从同一张表 select 出来的话,是会分流发送到 table1 和 table2,并没有先后顺序。
在 2020-12-21 10:45:25,"占英华"  写道:
>这样是不是第一条select和第二条的select出来的结果会有差异,因为执行第一条有耗时,第二条执行时查询的结果是在耗时后查询得到的
>
>> 在 2020年12月21日,11:14,hailongwang <18868816...@163.com> 写道:
>> 
>> 
>> 
>> 可以的,比如将结果写入table1,table2 ……
>> Insert into table1 ……;
>> Insert into table2 ……;
>> 
>> 
>> 
>> Best,
>> Hailong
>>> 在 2020-12-19 08:30:23,"占英华"  写道:
>>> Flink sql的dml语句可以将结果写入不同的sink表中吗?如果可以可以怎么处理?


?????? flink-shaded-hadoop-2-uber????????????

2020-12-21 文章 liujian
Thanks, docker??,??Native 
K8s??,??
Dockerfile??


COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 
/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
ENTRYPOINT ["/docker-entrypoint.sh"]
EXPOSE 6123 8081 8082
CMD ["help","history-server"]


COPY ./jar/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 
/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
ENTRYPOINT ["/docker-entrypoint.sh","history-server"]
EXPOSE 6123 8081 8082
CMD ["help"]



??...,??






----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/deployment/advanced/historyserver.html

Best,
Yang

liujian <13597820...@qq.com ??2020??12??21?? 1:35??

 
??history-server,,,??,




 --nbsp;nbsp;--
 ??:

 "user-zh"

 <
 danrtsey...@gmail.comgt;;
 :nbsp;2020??12??21??(??) 10:15
 ??:nbsp;"user-zh"https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh

 Best,
 Yang


 liujian <13597820...@qq.comgt; ??2020??12??20?? 12:45??

 gt; Thanks,amp;nbsp;
 gt; amp;nbsp; amp;nbsp; 
amp;nbsp;??historyServer,flink
 gt; 1.12.0??Dockerfile ??CMD ["history-server"]amp;nbsp;
 ??8082,??
 gt;
 gt;
 gt;
 gt;
 gt; 
--amp;nbsp;amp;nbsp;--
 gt; ??:
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 "user-zh"
 
gt;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;nbsp;
 <
 gt; danrtsey...@gmail.comamp;gt;;
 gt; :amp;nbsp;2020??12??19??(??) 9:35
 gt; 
??:amp;nbsp;"user-zh"https://github.com/apache/flink-shaded
 gt 

SQL执行模式

2020-12-21 文章 jiangjiguang719
flink1.12版本中,streamAPI 通过 -Dexecution.runtime-mode 指定是批还是流 的执行模式,那么在SQL中如何指定呢

Re: pyflink1.12 连接Mysql报错 : Missing required options

2020-12-21 文章 Wei Zhong
Hi,

正如报错中提示的,with参数里需要的是"url"参数,你可以尝试将connector.url改成url试试看会不会报错了。

> 在 2020年12月21日,13:44,肖越 <18242988...@163.com> 写道:
> 
> 在脚本中定义了两个源数据 ddl,但是第二就总会报缺option的问题,pyflink小白,求大神解答?
> #DDL定义
> source_ddl2 = """CREATE TABLE ts_pf_sec_yldrate (id DECIMAL,pf_id VARCHAR,\
> 
>symbol_id VARCHAR,biz_date VARCHAR,\
> 
>ccy_type VARCHAR,cur_id_d VARCHAR,yldrate DECIMAL,\
> 
>is_valid DECIMAL,time_mark TIMESTAMP) WITH (
> 
>'connector' = 'jdbc',
> 
>'connector.url' = 'jdbc:mysql://ip:port/db_base',
> 
>'connector.table' = 'ts_pf_sec_yldrate',
> 
>'table-name' = 'ts_pf_sec_yldrate',
> 
>'connector.driver' = 'com.mysql.jdbc.Driver',
> 
>'connector.username' = 'xxx',
> 
>'connector.password' = 'xxx')
> 
> """
> 错误信息:
> Traceback (most recent call last):
>  File 
> "C:/projects/dataService-calculate-code-python/src/test/test_mysql_connector.py",
>  line 67, in 
>print(join.to_pandas().head(6))
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table.py",
>  line 807, in to_pandas
>.collectAsPandasDataFrame(self._j_table, max_arrow_batch_size)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py",
>  line 1286, in __call__
>answer, self.gateway_client, self.target_id, self.name)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>  line 147, in deco
>return f(*a, **kw)
>  File 
> "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>  line 328, in get_return_value
>format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> z:org.apache.flink.table.runtime.arrow.ArrowUtils.collectAsPandasDataFrame.
> : org.apache.flink.table.api.ValidationException: Unable to create a source 
> for reading table 'default_catalog.default_database.ts_pf_sec_yldrate'.
> 
> 
> Table options are:
> 
> 
> 'connector'='jdbc'
> 'connector.driver'='com.mysql.jdbc.Driver'
> 'connector.password'='xxx'
> 'connector.table'='ts_pf_sec_yldrate'
> 'connector.url'='jdbc:mysql://ip:port/db_base'
> 'connector.username'='xxx'
> 'table-name'='ts_pf_sec_yldrate'
> at 
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:265)
> at 
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:100)
> at 
> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:339)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
> at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
> at 
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
> at java.util.Collections$SingletonList.forEach(Collections.java:4824)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
> at 
> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:145)
> at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:145)
> at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
> at 
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:62)
> at 
> org.apache.flink.table.operations.JoinQueryOperation.accept(JoinQueryOperation.java:128)
> at 
> 

回复:请教一个flink消费多kafka topic如何进行数据分配的问题

2020-12-21 文章 Shuai Xia
Hi,可以看下KafkaTopicPartitionAssigner类的assign方式
是根据Topic名称哈希之后对并行度取余,加上分区值再次对并行度取余
最终的结果分配是存在不均匀



--
发件人:bradyMk 
发送时间:2020年12月21日(星期一) 17:40
收件人:user-zh 
主 题:请教一个flink消费多kafka topic如何进行数据分配的问题

Hi~想请教一下大家:

现在我用flink消费5个不同的kafka topic,每个topic都有12个分区,所以我设置了60个并行度;

env.setParallelism(60)

我认为程序设置的并行度是和topic的总分区一一对应的;

但是,程序运行后,我发现只有14个task有从topic消费到数据,其余消费数据量都为0,且有几个是每秒几千条,有几个是每秒几百条。所以现在很疑惑,flink消费多kafka
topic到底是如何进行数据分配的呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

答复: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 afweijian
通过yarn label可以实现

-邮件原件-
发件人: user-zh-return-10095-afweijian=163@flink.apache.org 
 代表 yujianbo
发送时间: 2020年12月21日 16:44
收件人: user-zh@flink.apache.org
主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


请教一个flink消费多kafka topic如何进行数据分配的问题

2020-12-21 文章 bradyMk
Hi~想请教一下大家:

现在我用flink消费5个不同的kafka topic,每个topic都有12个分区,所以我设置了60个并行度;

env.setParallelism(60)

我认为程序设置的并行度是和topic的总分区一一对应的;

但是,程序运行后,我发现只有14个task有从topic消费到数据,其余消费数据量都为0,且有几个是每秒几千条,有几个是每秒几百条。所以现在很疑惑,flink消费多kafka
topic到底是如何进行数据分配的呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Native Kubernetes 需要访问HDFS

2020-12-21 文章 Akisaya
1.10 版本好像还没有支持,看了下 1.10 代码里创建 cm 的时候没有去读取 hadoop 配置

Yang Wang  于2020年12月19日周六 上午12:18写道:

> 你可以在Flink client端设置HADOOP_CONF_DIR环境变量即可,这样会自动ship
> hadoop的配置并且挂载给JobManager和TaskManager的
>
> Best,
> Yang
>
> liujian <13597820...@qq.com> 于2020年12月18日周五 下午5:26写道:
>
> > Hi:
> >  在使用Native Kubernetes
> > 需要访问HDFS,已经将flink-shaded-hadoop-2-uber-2.8.3-10.0.jar放到lib目录
> >  但是hdfs是HA,那么就需要hdfs-site.xml等文件了,那么是如何指定这个文件呢 
>


flink1.11.2写hive分区表,hive识别不到分区

2020-12-21 文章 kingdomad
flink1.11.2写hive3.12的分区表,flink新创建的分区数据hive无法识别,在hdfs上能看到写入了文件,但是hive读取不了分区。
需要执行msck repair table修复分区表后,hive才能读取到数据。
求助大佬,要如何解决。
















--

kingdomad



Re: 执行mvn构建错误 编译flink1.9遇到了相同的问题 请问解决了吗?我编译最新代码没这个问题

2020-12-21 文章 shaoshuai
[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile
(default-testCompile) on project flink-parquet_2.11: Compilation failure:
Compilation failure: 
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[390,31]
找不到符号
[ERROR]   符号:   类 AvroMissingFieldException
[ERROR]   位置: 程序包 org.apache.avro
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[416,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[418,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[458,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/SimpleRecord.java:[461,51]
找不到符号
[ERROR]   符号:   方法 readFieldOrderIfDiff()
[ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[533,31]
找不到符号
[ERROR]   符号:   类 AvroMissingFieldException
[ERROR]   位置: 程序包 org.apache.avro
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[559,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[561,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[576,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java:[579,51]
找不到符号
[ERROR]   符号:   方法 readFieldOrderIfDiff()
[ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[318,31]
找不到符号
[ERROR]   符号:   类 AvroMissingFieldException
[ERROR]   位置: 程序包 org.apache.avro
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[344,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[346,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[367,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/MapItem.java:[370,51]
找不到符号
[ERROR]   符号:   方法 readFieldOrderIfDiff()
[ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[317,31]
找不到符号
[ERROR]   符号:   类 AvroMissingFieldException
[ERROR]   位置: 程序包 org.apache.avro
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[343,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[345,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[354,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[357,51]
找不到符号
[ERROR]   符号:   方法 readFieldOrderIfDiff()
[ERROR]   位置: 类型为org.apache.avro.io.ResolvingDecoder的变量 in
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[246,31]
找不到符号
[ERROR]   符号:   类 AvroMissingFieldException
[ERROR]   位置: 程序包 org.apache.avro
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[272,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[274,3]
方法不会覆盖或实现超类型的方法
[ERROR]
/Users/goushaoshuai/code/flink/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Bar.java:[287,3]
方法不会覆盖或实现超类型的方法
[ERROR]

Job结束blob目录及文件没有删除

2020-12-21 文章 Luna Wong
https://issues.apache.org/jira/browse/FLINK-20696

有一定概率发生,提交很多Job,Job结束后,会有个别Blob目录没有清理。我还没Debug出原因。


Re:Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 felixzh
这个没办法做到吧。想做资源隔离的话,应该只需要分队列就行

















在 2020-12-21 16:43:35,"yujianbo" <15205029...@163.com> 写道:
>各位大佬好:
> 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 amen...@163.com
这个问题应该问yarn吧。。。



 
发件人: yujianbo
发送时间: 2020-12-21 16:43
收件人: user-zh
主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?
 
 
 
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 yujianbo
各位大佬好:
 请问Flink on  yarn 如何指定固定几台yarn节点当做flink任务的运行节点?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Application Mode job on K8S集群,无法缩容问题

2020-12-21 文章 lichunguang
Yang Wang你好:
我想表达的意思是:
  Native Flink on K8s采用单Pod的申请资源的方式,和K8s自动伸缩机制有些冲突。

原因:
比如job比较多时,各node负载都比较高;而剩余job比较少时,每个node只有少量pod,但因为【Pods that are not backed
by a controller object】,没法驱逐资源利用率最低的node,导致整体利用率较低。

What types of pods can prevent CA from removing a node?
https://github.com/kubernetes/autoscaler/blob/master/cluster-autoscaler/FAQ.md#what-types-of-pods-can-prevent-ca-from-removing-a-node



--
Sent from: http://apache-flink.147419.n8.nabble.com/