Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。

2021-11-04 文章 Caizhi Weng
Hi!

1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。

正确

2 除此以外,多个检查点之间的文件是没有办法合并的对吧。

正确

实际部分节点做的是后台IO了事情,是不是反映不到busy情况上

是的,busy 的计算方式是通过采样看有多少个线程正在工作。对于 sink 这种线程都在等待后台 io 的节点来说确实 busy 值不会很高。

yidan zhao  于2021年11月4日周四 下午5:57写道:

> hi,还想继续问下。这个合并机制,根据文档介绍如下。
> Whether to enable automatic compaction in streaming sink or not. The data
> will be written to temporary files. After the checkpoint is completed, the
> temporary files generated by a checkpoint will be compacted. The temporary
> files are invisible before compaction.
> 看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。
> 1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。
>
> 2 除此以外,多个检查点之间的文件是没有办法合并的对吧。
>
> 3 另外一个问题:目前看flinksql写hive,streaming情况。从web
>
> ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。
>
> 按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。
> 所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。
>


Re: Tumbling Windows 窗口可开的最小单位

2021-11-04 文章 Caizhi Weng
Hi!

没有限制,1ms 都可以,理论上小 tumble window 对性能影响不大。

可以具体说一下为什么需要小 tumble window 吗?小 tumble window 确实比较少见,也许用其他算子更合适。

李航飞  于2021年11月5日周五 下午12:32写道:

> 滚动窗口最小可开多大,100ms?
> 对性能有什么影响吗?


Tumbling Windows 窗口可开的最小单位

2021-11-04 文章 李航飞
滚动窗口最小可开多大,100ms?
对性能有什么影响吗?

Re: flink1.12.1 读取kafka的数据写入到clickhouse如何支持upsert操作呢

2021-11-04 文章 Caizhi Weng
Hi!

你需要在 sink 节点之前添加一个按 uuid 的 hash shuffle 将相同的 uuid 送到相同的并发。如果 processData
是一个 data stream 的话,通过 keyBy 方法 key by uuid,再写入 sink 即可。

另:我记得我已经回复了两封相同的邮件,之前的回复是丢失了吗?

扯  于2021年11月5日周五 上午10:50写道:

>
> 您好!感谢你在万忙之中,抽出时间来看我发的邮件。最近我在研究使用flink写入数据到clickHouse,如何能满足公司业务需求。但是在用flink1.12.1版本读取kafka的数据,实现upsert的形式写入数据到clickhouse出现了一些问题。问题详细情况描述如下:
>
> clickhouse建表语句如下:
> CREATE TABLE test_local.tzling_tb3(
> uuid String,
> product String,
> platform String,
> batchId String,
> id String,
> account String,
> customerId String,
> reportName String,
> dt String,
> campaign String,
> adGroup String,
> generalField String,
> currency String,
> impressions String,
> cost String,
> clicks String,
> conversions String,
> createDateTime String,
> createTime BIGINT,
> key String,
> pdate String
> )engine = MergeTree PARTITION BY pdate order by createTime;
> 将uuid作为主键,主键存在就更新数据 update,不存在的话,就直接append。
>
> processData.addSink(new MSKUpsertClickHouseSink());
> 附件文件MSKUpsertClickHouseSink.java是我写入clickhouse的sink类,设计逻辑为:
> 先查询表中是否存在要添加数据的uuid,如果存在就先做条件删除操作,再做append操作;如果要添加的数据uuid不存在,就直接append操作。当时这样写出现了并发问题,如果并行度大于1,那么clickhouse中会出现uuid不唯一的情况出现。
>
> 请问一下,基于上述所说的情况,您有什么好的实践方案可以推荐一下的呢?
>


flink1.12.1 ????kafka????????????clickhouse????????upsert??????

2021-11-04 文章 ??
   
flink??clickHouse??flink1.12.1kafkaupsertclickhouse??


clickhouse??
CREATE TABLE test_local.tzling_tb3(
  uuid String,
  product String,
  platform String,
batchId String,
id String,
account String,
customerId String,
reportName String,
dt String,
campaign String,
adGroup String,
generalField String,
currency String,
impressions String,
cost String,
clicks String,
conversions String,
createDateTime String,
createTime BIGINT,
key String,
pdate String
)engine = MergeTree PARTITION BY pdate order by createTime;


??uuid updateappend??


processData.addSink(new MSKUpsertClickHouseSink());
MSKUpsertClickHouseSink.javaclickhouse??sink 
??uuid??append??uuid??append??,??1??clickhouseuuid??




Re: flink 1.14 Hybrid Source切换source时机问题

2021-11-04 文章 Caizhi Weng
Hi!

目前 Flink 虽然已经有相应接口[1],但还没有任何 source 实现这个功能。可以在
https://issues.apache.org/jira/browse/FLINK-23633 里追踪这个问题的进展。

当然,如果你的 hive 表是以天为 partition 的,可以设置固定的切换时间点,然后 hive 只读之前的 partition。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/#dynamic-start-position-at-switch-time

casel.chen  于2021年11月5日周五 上午8:39写道:

> 我有一个Hybrid Source切换时机问题:
> 在Hive+Kafka场景下,假如Kafka保留数据时长(retension)是1天,为了实现无缝衔接,我需要从Hive消费历史存量数据直到距离当前时间小于1天时才切换到kafka
> source,假设Hive中有字段表示处理时间的话,请问目前Flink Hybrid Source支持这种用法吗?如果支持的话程序应该要怎么写?谢谢!


flink 1.14 Hybrid Source切换source时机问题

2021-11-04 文章 casel.chen
我有一个Hybrid Source切换时机问题: 
在Hive+Kafka场景下,假如Kafka保留数据时长(retension)是1天,为了实现无缝衔接,我需要从Hive消费历史存量数据直到距离当前时间小于1天时才切换到kafka
 source,假设Hive中有字段表示处理时间的话,请问目前Flink Hybrid Source支持这种用法吗?如果支持的话程序应该要怎么写?谢谢!

Re:Re: flink启动yarn-session失败

2021-11-04 文章 casel.chen
flink-conf.yaml 内容如下



#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.





#==
# Common
#==


# The external address of the host on which the JobManager runs and can be
# reached by the TaskManagers and any clients which want to connect. This 
setting
# is only used in Standalone mode and may be overwritten on the JobManager side
# by specifying the --host  parameter of the bin/jobmanager.sh 
executable.
# In high availability mode, if you use the bin/start-cluster.sh script and 
setup
# the conf/masters file, this will be taken care of automatically. Yarn/Mesos
# automatically configure the host name based on the hostname of the node where 
the
# JobManager runs.


jobmanager.rpc.address: master


# The RPC port where the JobManager is reachable.


jobmanager.rpc.port: 6123




# The total process memory size for the JobManager.
#
# Note this accounts for all memory usage within the JobManager process, 
including JVM metaspace and other overhead.


jobmanager.memory.process.size: 1600m




# The total process memory size for the TaskManager.
#
# Note this accounts for all memory usage within the TaskManager process, 
including JVM metaspace and other overhead.


taskmanager.memory.process.size: 1728m


# To exclude JVM metaspace and overhead, please, use total Flink memory size 
instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink 
memory.
#
# taskmanager.memory.flink.size: 1280m


# The number of task slots that each TaskManager offers. Each slot runs one 
parallel pipeline.


taskmanager.numberOfTaskSlots: 8


# The parallelism used for programs that did not specify and other parallelism.


parallelism.default: 1


# The default file system scheme and authority.
# 
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme


#==
# High Availability
#==


# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
#
# high-availability: zookeeper


# The path where metadata for master recovery is persisted. While ZooKeeper 
stores
# the small ground truth for checkpoint and leader election, this location 
stores
# the larger objects, like persisted dataflow graphs.
# 
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
#
# high-availability.storageDir: hdfs:///flink/ha/


# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
#
# high-availability.zookeeper.quorum: localhost:2181




# ACL options are based on 
https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" 
(ZOO_OPEN_ACL_UNSAFE)
# The default value is "open" and it can be changed to "creator" if ZK security 
is enabled
#
# high-availability.zookeeper.client.acl: open


#==
# Fault tolerance and checkpointing
#==


execution.checkpointing.interval: 60s


execution.checkpointing.unaligned: true


execution.checkpointing.timeout: 1200s


# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# .
#
state.backend: filesystem


# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#

Re: flink启动yarn-session失败

2021-11-04 文章 Caizhi Weng
Hi!

没有在邮件里发现附件,可以考虑把 flink-conf.yaml 的内容贴在邮件里,或者外部剪贴板。

casel.chen  于2021年11月4日周四 下午6:42写道:

> flink 1.13.2 + hadoop 3.2.1
> yarn上已经成功跑了hive和spark作业
> flink上通过运行 bin/yarn-session.sh 启动yarn session集群的时候一直报如下INFO日志,查看yarn web
> console发现并没有启flink-session集群,我的flink-conf.yaml配置如附件,hadoop集群并没有开启认证SSL之类的,改用standalone模式是可以启动3节点集群的,请问这会是什么原因造成的?要怎么修复?谢谢!
>
>
>
> 2021-11-04 16:51:39,964 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:39,986 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,004 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,020 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,041 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,059 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,078 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,097 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,114 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,134 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,155 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,175 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,193 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
> 2021-11-04 16:51:40,212 INFO
> org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []
> - SASL encryption trust check: localHostTrusted = false, remoteHostTrusted
> = false
>
>
>
>
>
>


??????flink on yarn ??pre_job????????,????session????????????

2021-11-04 文章 JasonLee
hi


?? jar ??Flink ??


Best
JasonLee
??2021??11??4?? 18:41<2572805...@qq.com.INVALID> ??
yarn??:
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
 Caused by: java.lang.VerifyError: class 
org.apache.flink.yarn.YarnResourceManager overrides final method 
onStop.()Ljava/util/concurrent/CompletableFuture;   at 
java.lang.ClassLoader.defineClass1(Native Method)   at 
java.lang.ClassLoader.defineClass(ClassLoader.java:763)   at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)   at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:467)   at 
java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at 
java.security.AccessController.doPrivileged(Native Method)   at 
java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:424)   at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:357)   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
   at java.security.AccessController.doPrivileged(Native Method)   at 
javax.security.auth.Subject.doAs(Subject.java:422)   at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
   at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
   ... 2 common frames omitted

Diagnostics:
  Application application_1635998548270_0028 failed 1  times (global limit 
=2; local limit is =1) due to AM Container for  
appattempt_1635998548270_0028_01 exited with  exitCode: 1   
For more detailed output, check the application  tracking page:  
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028  Then 
click on links to logs of each attempt.   
Diagnostics: Exception from container-launch.   
Container id: container_e391_1635998548270_0028_01_01   
Exit code: 1   
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:944)   
at org.apache.hadoop.util.Shell.run(Shell.java:848)   
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)   

at  
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
   
at java.util.concurrent.FutureTask.run(FutureTask.java:266)   
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
  
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
  
at java.lang.Thread.run(Thread.java:748)   


Container exited with a non-zero exit code 1   
Failing this attempt. Failing the application.   

??




flink启动yarn-session失败

2021-11-04 文章 casel.chen
flink 1.13.2 + hadoop 3.2.1
yarn上已经成功跑了hive和spark作业
flink上通过运行 bin/yarn-session.sh 启动yarn session集群的时候一直报如下INFO日志,查看yarn web 
console发现并没有启flink-session集群,我的flink-conf.yaml配置如附件,hadoop集群并没有开启认证SSL之类的,改用standalone模式是可以启动3节点集群的,请问这会是什么原因造成的?要怎么修复?谢谢!



2021-11-04 16:51:39,964 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:39,986 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,004 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,020 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,041 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,059 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,078 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,097 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,114 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,134 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,155 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,175 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,193 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false

2021-11-04 16:51:40,212 INFO  
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient [] - 
SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false







Re: flink on yarn 的pre_job提交失败,但是session模式可以成功

2021-11-04 文章 刘建刚
通过你上面的信息是看不出来的,里头的链接你可以看下详细日志
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月4日周四 下午6:29写道:

> yarn的错误日志:
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint YarnJobClusterEntrypoint.   at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
>  at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
>  at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
> Caused by: java.lang.VerifyError: class
> org.apache.flink.yarn.YarnResourceManager overrides final method
> onStop.()Ljava/util/concurrent/CompletableFuture;  at
> java.lang.ClassLoader.defineClass1(Native Method)at
> java.lang.ClassLoader.defineClass(ClassLoader.java:763)  at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>   at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>  at java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at
> java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at
> java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at
> java.security.AccessController.doPrivileged(Native Method)   at
> java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at
> java.lang.ClassLoader.loadClass(ClassLoader.java:424)at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)at
> java.lang.ClassLoader.loadClass(ClassLoader.java:357)at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
>at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
> at java.security.AccessController.doPrivileged(Native Method)   at
> javax.security.auth.Subject.doAs(Subject.java:422)   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
>at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
>  ... 2 common frames omitted
>
> Diagnostics:
> Application application_1635998548270_0028 failed 1  times
> (global limit =2; local limit is =1) due to AM Container for
> appattempt_1635998548270_0028_01 exited with  exitCode: 1
>
> For more detailed output, check
> the application  tracking page:
> http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028
> Then click on links to logs of each attempt.
> Diagnostics: Exception from
> container-launch.
> Container id:
> container_e391_1635998548270_0028_01_01
> Exit code: 1
> Stack trace: ExitCodeException
> exitCode=1:
> at
> org.apache.hadoop.util.Shell.runCommand(Shell.java:944)
> at
> org.apache.hadoop.util.Shell.run(Shell.java:848)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
>
> at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at
> java.lang.Thread.run(Thread.java:748)
>
>
> Container exited with a non-zero
> exit code 1
> Failing this attempt. Failing the
> application.
>
> 陈
>
>
> 


flink on yarn ??pre_job????????,????session????????????

2021-11-04 文章 ??????
yarn??:
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
 Caused by: java.lang.VerifyError: class 
org.apache.flink.yarn.YarnResourceManager overrides final method 
onStop.()Ljava/util/concurrent/CompletableFuture;  at 
java.lang.ClassLoader.defineClass1(Native Method)at 
java.lang.ClassLoader.defineClass(ClassLoader.java:763)  at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)  at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at 
java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at 
java.security.AccessController.doPrivileged(Native Method)   at 
java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:424)at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)at 
java.lang.ClassLoader.loadClass(ClassLoader.java:357)at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
  at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
  at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
 at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
  at java.security.AccessController.doPrivileged(Native Method)   at 
javax.security.auth.Subject.doAs(Subject.java:422)   at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
 at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
   ... 2 common frames omitted

Diagnostics:
  Application application_1635998548270_0028 failed 1  times (global limit 
=2; local limit is =1) due to AM Container for  
appattempt_1635998548270_0028_01 exited with  exitCode: 1   
For more detailed output, check the 
application  tracking page:  
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028  Then 
click on links to logs of each attempt.   
Diagnostics: Exception from 
container-launch.   
Container id: 
container_e391_1635998548270_0028_01_01   
Exit code: 1   
Stack trace: ExitCodeException 
exitCode=1:
at 
org.apache.hadoop.util.Shell.runCommand(Shell.java:944)   
at 
org.apache.hadoop.util.Shell.run(Shell.java:848)   
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)  
 
at  
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
   
at 
java.util.concurrent.FutureTask.run(FutureTask.java:266)   
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
  
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
  
at 
java.lang.Thread.run(Thread.java:748)   
   
   

Re: FlinkSQL 使用 streamingSink 写入 hive orc数据,如何控制文件数量。

2021-11-04 文章 yidan zhao
hi,还想继续问下。这个合并机制,根据文档介绍如下。
Whether to enable automatic compaction in streaming sink or not. The data
will be written to temporary files. After the checkpoint is completed, the
temporary files generated by a checkpoint will be compacted. The temporary
files are invisible before compaction.
看文档,是指每次检查点完成后,会将单个检查点产生的文件进行合并。也就是说只有单个检查点产生的文件会被合并。
1 换言之,是针对每个检查点,合并了多个并发subtask产生的文件对吧。

2 除此以外,多个检查点之间的文件是没有办法合并的对吧。

3 另外一个问题:目前看flinksql写hive,streaming情况。从web
ui上看不开启compact情况下,几乎每个节点都是蓝色,而且数据量不大。开启compact情况,几乎也都是蓝色,数据量也不大,但只有compact节点是持续红色。
按照我的理解写hive这种情况下,实际部分节点做的是后台IO了事情,是不是反映不到busy情况上,busy比如只考虑对接受元素的处理,至于这个元素导致这个算子有多少background的工作并反映不出来。对吗。
所以即使看起来都是蓝色的,也不能降低并行度,而是自行根据数据量采用一个差不多的并行度。


Re:flink 1.13.1 通过yarn-application运行批应用,处理mysql源一亿条数据到hive,发现需要配置16G+的Taskmangaer内存

2021-11-04 文章 RS
看看任务并行度是多少,可能是并发太大导致的内存占用?? 

















在 2021-11-04 15:52:14,"Asahi Lee" <978466...@qq.com.INVALID> 写道:
>hi!
>我通过flink sql,将mysql的一亿条数据传输到hive库中,通过yarn-application方式运行,结果配置16G的内存,执行失败!


Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-04 文章 zhisheng


Daisy Tsang  于2021年11月3日周三 上午9:36写道:

> Hey everyone, we have a new two-part post published on the Apache Flink
> blog about the sort-based blocking shuffle implementation in Flink.  It
> covers benchmark results, design and implementation details, and more!  We
> hope you like it and welcome any sort of feedback on it. :)
>
>
> https://flink.apache.org/2021/10/26/sort-shuffle-part1.html
> https://flink.apache.org/2021/10/26/sort-shuffle-part2.html
>


Re: Re: 回复:回复:Re: 在开启checkpoint后如何设置offset的自动提交以方便监控

2021-11-04 文章 zhisheng
考虑 currentOffsets 吧

杨浩  于2021年10月27日周三 下午5:40写道:

> 明白这个逻辑,这个就导致consumer
> lag值不能反映真实情况,而很难监控系统延迟一个场景:业务状态很大,5分钟保存一次,QPS在1~100之间波动,那么需要配置延迟大于5*60*100来监控系统,这会导致监控非常不准确
> 在 2021-10-27 17:34:13,"Qingsheng Ren"  写道:
> >你好!
> >
> >如果使用的是基于 FLIP-27 实现的 KafkaSource,可以配置 enable.auto.commit = true 和
> auto.commit.interval.ms = {commit_interval} 使 KafkaSource 按照指定的时间间隔自动提交
> offset。基于 SourceFunction 的 FlinkKafkaConsumer 在 checkpoint 开启时不支持自动提交,只能在
> checkpoint 时提交位点。
> >
> >--
> >Best Regards,
> >
> >Qingsheng Ren
> >Email: renqs...@gmail.com
> >On Oct 27, 2021, 4:59 PM +0800, 杨浩 , wrote:
> >> 请问有办法和现有监控兼容么?开启checkpoint时,让消费组的offset实时更新
> >> 在 2021-10-25 21:58:28,"杨浩"  写道:
> >> > currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
> >> > 在 2021-10-25 10:31:12,"Caizhi Weng"  写道:
> >> > > Hi!
> >> > >
> >> > > 这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取
> offset,可以通过
> >> > > metrics 读取,见 [1]。
> >> > >
> >> > > [1]
> >> > >
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors
> >> > >
> >> > > 杨浩  于2021年10月25日周一 上午10:20写道:
> >> > >
> >> > > >
> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度
>


Re: flink-yarn的pre-job模式

2021-11-04 文章 zhisheng
可以检查两个:

1、yarn 队列是否资源足够,如果不够可能是资源的问题

2、检查作业本身是否有包冲突?

Shuiqiang Chen  于2021年10月27日周三 上午10:04写道:

> 你好,
>
> 上传的图片无法加载。 这种情况是 yarn 无法提供拉起taskmanager,检查下yarn资源是否充足?
>
> 王健 <13166339...@163.com> 于2021年10月26日周二 下午7:50写道:
>
> > 您好:
> >   我部署flink yarn的pre-job模式运行报错,麻烦看看是啥原因,非常感谢。
> >
> >  1.运行命令:/usr/local/flink-1.13.2/bin/flink run -t yarn-per-job -c
> > com.worktrans.flink.wj.ods.FlinkCDC01 /usr/local/flink-1.13.2/flink_x.jar
> >  提交正常,如图:
> >
> >  2.yarn 截图
> >
> >
> > 3.flink截图:
> >   现象:taskslot和taskmanager数量都为0,一直在申请
> >
> >
> >  4.最后结果:报错如下
> > 2021-10-25 16:17:49
> > java.util.concurrent.CompletionException:
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > Slot request bulk is not fulfillable! Could not allocate the required
> slot
> > within slot request timeout
> > at
> >
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> > at
> >
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> > at
> >
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> > at
> >
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> > at
> >
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> > at
> >
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> > at
> >
> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:222)
> > at
> >
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:164)
> > at
> >
> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
> > at
> >
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
> > at
> >
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> > at
> >
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> > at
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > at akka.actor.Actor.aroundReceive(Actor.scala:517)
> > at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by:
> >
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> > Slot request bulk is not fulfillable! Could not allocate the required
> slot
> > within slot request timeout
> > at
> >
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> > ... 26 more
> > Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
> > 30 ms
> > ... 27 more
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
>


Re: Flink没有Operator级别的数据量Metrics

2021-11-04 文章 zhisheng
webui 有 operator 级别的,仔细看看

Ada Luna  于2021年10月26日周二 下午4:08写道:

> Web-UI中的就是Flink原生正常的Metrics,都是Task级别
>
> xiazhl  于2021年10月26日周二 下午2:31写道:
> >
> > web-ui里面有metrics
> >
> >
> >
> >
> > --原始邮件--
> > 发件人:
> "user-zh"
>   <
> gfen...@gmail.com;
> > 发送时间:2021年10月26日(星期二) 中午1:55
> > 收件人:"user-zh" >
> > 主题:Flink没有Operator级别的数据量Metrics
> >
> >
> >
> >
> Flink只能看到Task级别的流入流出数据量,而没有Operator级别的。这个是出于性能考量吗?未来会加入一个开关,可以看到Operator级别的,方便debug吗?
>


Flink在下游算子没有idle的情况下自动产生背压

2021-11-04 文章 carryxyh
flink 版本1.13.0,source算子均为mysql-cdc 1.4,使用flink sql


全都阻塞在获取segment上(Localbufferpool.requestMemorySegmentBlocking)


outpoolusage指标,有的背压算子是一直是0,有的是一直是1。


下游算子一直是idle状态,在等待上游数据。阻塞在mailbox上


mysql-cdc在运行过程中出现了一些异常:


ERROR io.debezium.connector.mysql.SnapshotReader:219 - Fa
iled due to error: Aborting snapshot due to error when last running 'SELECT * 
FROM `db`.`table1`': Streaming result set com.mysql.cj
.protocol.a.result.ResultsetRowsStreaming@45edf631 is still active. No 
statements may be issued when any streaming result sets are open 
and in use on a given connection. Ensure that you have called .close() on any 
active streaming result sets before attempting more querie
s.
org.apache.kafka.connect.errors.ConnectException: Streaming result set 
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@45edf631 is
 still active. No statements may be issued when any streaming result sets are 
open and in use on a given connection. Ensure that you hav
e called .close() on any active streaming result sets before attempting more 
queries. Error code: 0; SQLSTATE: S1000.
at 
io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241)
at 
io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:218)
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:857)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Streaming result set 
com.mysql.cj.protocol.a.result.ResultsetRowsStreaming@45edf631 is still active. 
N
o statements may be issued when any streaming result sets are open and in use 
on a given connection. Ensure that you have called .close(
) on any active streaming result sets before attempting more queries.
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at 
com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at 
com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ConnectionImpl.commit(ConnectionImpl.java:814)
at 
io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:773)
... 3 common frames omitted


但是任务没有发生restarting,一直在运行。


感觉上像是发生了泄露,不知道有什么排查思路么?

Re: flink 1.13.1 通过yarn-application运行批应用,处理mysql源一亿条数据到hive,发现需要配置16G+的Taskmangaer内存

2021-11-04 文章 Yangze Guo
失败的原因呢?有没有报错栈和日志?

Best,
Yangze Guo

On Thu, Nov 4, 2021 at 4:01 PM Asahi Lee <978466...@qq.com.invalid> wrote:
>
> hi!
> 我通过flink sql,将mysql的一亿条数据传输到hive库中,通过yarn-application方式运行,结果配置16G的内存,执行失败!


flink 1.13.1 ????yarn-application????????????????mysql??????????????hive??????????????16G+??Taskmangaer????

2021-11-04 文章 Asahi Lee
hi!
??flink 
sqlmysql??hive??yarn-application??16G??