Re:Re: Flink SQL并发度设置问题

2020-12-27 文章 hailongwang
根据 keyGroup 的实现特性,并发度最好是 2 的 n 次方。


在 2020-12-28 10:38:23,"赵一旦"  写道:
>是否有必要将并行度设置为128的约数我意思是。
>
>Shengkai Fang  于2020年12月28日周一 上午10:38写道:
>
>> hi, 如果热点是某个key的数据量较大造成的,那么re-partition依旧无法解决这个问题。
>> 个人认为最好的解决办法是基于window的 mini-batch 以及 local-global agg,社区正在解这类问题,可以关注下[1]
>>
>> [1]https://issues.apache.org/jira/browse/FLINK-19604
>>
>> 赵一旦  于2020年12月28日周一 上午10:31写道:
>>
>> > 还有个问题。对于window算子来说,keyBy的partition的最大并行度会设置为下游算子的最大并行度。
>> >
>> >
>> 然后假设我window的并行度为30,那么默认情况window的最大并行度是128。我在想,如果按照平均考虑,这种情况是不是从机制上就已经有大概率会导致数据倾斜了呢?设置成32对于128才可以均衡不是吗。
>> >
>> > Shengkai Fang  于2020年12月27日周日 下午3:46写道:
>> >
>> > > 可以通过该配置[1]来设置
>> > >
>> > > [1]
>> > >
>> > >
>> >
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-resource-default-parallelism
>> > >
>> > > 赵一旦  于2020年12月27日周日 下午12:44写道:
>> > >
>> > > > 了解下多少数据量呀,128的并发其实很高了感觉。
>> > > >
>> > > > guaishushu1...@163.com  于2020年12月26日周六
>> > 下午5:39写道:
>> > > >
>> > > > > Flink
>> > > > >
>> > > >
>> > >
>> >
>> SQL中Source和sink可以通过修改connector配置实现并发度配置,而其他算子的并发度都是根据Source并发度来设置的,这样最多是128个并发度。但是有些算子做聚合等处理,128并发明显不够这个应该怎么解决呢?支持通过配置设置其他算子并发度吗?
>> > > > >
>> > > > >
>> > > > >
>> > > > > guaishushu1...@163.com
>> > > > >
>> > > >
>> > >
>> >
>>


Re: flink1.12????OSError: Expected IPC message of type schema but got record batch

2020-12-27 文章 ??????
??pyarrow??0.17.1??

Re: flink-connector-clickhouse写入ClickHouse 问题

2020-12-27 文章 张锴
换个第三方工具看看 https://github.com/blynkkk/clickhouse4j


cc.blynk.clickhouse
clickhouse4j
1.4.4



DanielGu <610493...@qq.com> 于2020年12月28日周一 上午12:22写道:

> 使用了阿里的包,写入clickhouse
> 阿里云flink-connector-clickhouse写入ClickHouse
> <
> https://help.aliyun.com/document_detail/185696.html?spm=5176.12901015.0.i12901015.2b41525cECNyYW&accounttraceid=1ac9126237284ef9b0a25f666c3030dfxaso>
>
>
> 测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬
> +-+
> | default_catalog.default_database.sink_table |
> +-+
> |  -1 |
> +-+
>
>
> 代码如下
> package com.daniel
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.sources._
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.api._
> import org.apache.flink.types.Row
> import org.apache.flink.table.api.{
>   TableEnvironment,
>   TableSchema,
>   Types,
>   ValidationException
> }
>
> object StreamingJob {
>   def main(args: Array[String]) {
> val SourceCsvPath =
>   "/Users/flink-sql-demo/flink-sql-source.csv"
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.getConfig.disableClosureCleaner
>
> val tEnv = StreamTableEnvironment.create(env)
>
> val csvTableSource = CsvTableSource
>   .builder()
>   .path(SourceCsvPath)
>   .ignoreFirstLine()
>   .fieldDelimiter(",")
>   .field("name", DataTypes.STRING())
>   .field("age", DataTypes.BIGINT())
> //  .field("sex", DataTypes.STRING())
> //  .field("grade", DataTypes.INT())
>   .field("rate", DataTypes.FLOAT())
>   .build()
>
> tEnv.registerTableSource("source", csvTableSource)
>
> val create_sql =
>   s"""
>  | CREATE TABLE sink_table (
>  |name VARCHAR
>  |) WITH (
>  |'connector' = 'clickhouse',
>  |'url' = 'clickhouse://*:8080',
>  |'username' = '',
>  |'password' = '',
>  |'database-name' = '***',
>  |'table-name' = 'live.d_sink_table',
>  |'sink.batch-size' = '1',
>  |'sink.partition-strategy' = 'hash',
>  |'sink.partition-key' = 'name'
>  |)
>  |""".stripMargin
>
>
>
> tEnv.executeSql(create_sql);
>
> val result = tEnv.executeSql(
>   "INSERT INTO sink_table SELECT name FROM source"
> )
> result.print()
>   }
>
> }
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: 邮件退订

2020-12-27 文章 Xingbo Huang
Hi,

退订请发邮件到  user-zh-unsubscr...@flink.apache.org

详细的可以参考 [1]

[1] https://flink.apache.org/zh/community.html#section-1

Best,
Xingbo

㊣ 俊 猫 ㊣ <877144...@qq.com> 于2020年12月27日周日 上午11:15写道:

> 您好,邮件退订一下


Re: flink1.12错误OSError: Expected IPC message of type schema but got record batch

2020-12-27 文章 Xingbo Huang
Hi,

你这个报错源自pyarrow反序列数据时的报错。你使用的pyarrow的版本能提供一下吗
pip list | grep pyarrow可以查看

Best,
Xingbo


小学生 <201782...@qq.com> 于2020年12月28日周一 上午10:37写道:

> 请教一下各位,使用pyflink中的向量化udf后,程序运行一段时间报错,查资料没有类似的问题,麻烦各位看看是咋回事
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 8: Traceback (most recent call last):
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
>     response = task()
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, in      lambda: self.create_worker().do_instruction(request),
> request)
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 515, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 978, in process_bundle
>     element.data)
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 218, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 330, in
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 332, in
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 195, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71,
> in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73,
> in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 625, in decode_from_stream
>     yield self._decode_one_batch_from_stream(in_stream,
> in_stream.read_var_int64())
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 636, in _decode_one_batch_from_stream
>     return arrow_to_pandas(self._timezone, self._field_types,
> [next(self._batch_reader)])
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 629, in _load_from_stream
>     reader = pa.ipc.open_stream(stream)
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py",
> line 146, in open_stream
>     return RecordBatchStreamReader(source)
>   File
> "/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py",
> line 62, in __init__
>     self._open(source)
>   File "pyarrow/ipc.pxi", line 360, in
> pyarrow.lib._RecordBatchStreamReader._open
>   File "pyarrow/error.pxi", line 123, in
> pyarrow.lib.pyarrow_internal_check_status
>   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
> OSError: Expected IPC message of type schema but got record batch


Re: 根据业务需求选择合适的flink state

2020-12-27 文章 news_...@163.com
这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。



news_...@163.com
 
发件人: 张锴
发送时间: 2020-12-28 13:35
收件人: user-zh
主题: 根据业务需求选择合适的flink state
各位大佬帮我分析下如下需求应该怎么写
 
需求说明:
公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。
 
我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。
 
不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。
 
flink 版本1.10.1


根据业务需求选择合适的flink state

2020-12-27 文章 张锴
各位大佬帮我分析下如下需求应该怎么写

需求说明:
公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A
在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。

我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。

不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。

flink 版本1.10.1


Re: Flink SQL并发度设置问题

2020-12-27 文章 赵一旦
是否有必要将并行度设置为128的约数我意思是。

Shengkai Fang  于2020年12月28日周一 上午10:38写道:

> hi, 如果热点是某个key的数据量较大造成的,那么re-partition依旧无法解决这个问题。
> 个人认为最好的解决办法是基于window的 mini-batch 以及 local-global agg,社区正在解这类问题,可以关注下[1]
>
> [1]https://issues.apache.org/jira/browse/FLINK-19604
>
> 赵一旦  于2020年12月28日周一 上午10:31写道:
>
> > 还有个问题。对于window算子来说,keyBy的partition的最大并行度会设置为下游算子的最大并行度。
> >
> >
> 然后假设我window的并行度为30,那么默认情况window的最大并行度是128。我在想,如果按照平均考虑,这种情况是不是从机制上就已经有大概率会导致数据倾斜了呢?设置成32对于128才可以均衡不是吗。
> >
> > Shengkai Fang  于2020年12月27日周日 下午3:46写道:
> >
> > > 可以通过该配置[1]来设置
> > >
> > > [1]
> > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-resource-default-parallelism
> > >
> > > 赵一旦  于2020年12月27日周日 下午12:44写道:
> > >
> > > > 了解下多少数据量呀,128的并发其实很高了感觉。
> > > >
> > > > guaishushu1...@163.com  于2020年12月26日周六
> > 下午5:39写道:
> > > >
> > > > > Flink
> > > > >
> > > >
> > >
> >
> SQL中Source和sink可以通过修改connector配置实现并发度配置,而其他算子的并发度都是根据Source并发度来设置的,这样最多是128个并发度。但是有些算子做聚合等处理,128并发明显不够这个应该怎么解决呢?支持通过配置设置其他算子并发度吗?
> > > > >
> > > > >
> > > > >
> > > > > guaishushu1...@163.com
> > > > >
> > > >
> > >
> >
>


Re: flink1.12.0 HA on k8s native运行一段时间后jobmanager-leader产生大量ConfigMap问题

2020-12-27 文章 Yang Wang
感谢使用K8s的HA mode,你用的是Session模式还是Application模式

* 如果是Application模式,那在flink job达到terminal state(FAILED, CANCELED,
SUCCEED)时会自动清理掉所有HA相关的ConfigMap,你可以在webui上面cancel任务或者用flink
cancel,然后观察一下,应该不会有残留的
*
如果是Session模式,你提交了很多任务,每个job都会对应一个ConfigMap的,这个ConfigMap的内容会在任务结束以后清理,但ConfigMap还存在,已经有一个ticket[1]来跟进Session模式下改进清理的过程,目前你可以在Session确认不使用的情况下用命令kubectl
delete cm --selector='app=,configmap-type=high-availability'来清理

[1]. https://issues.apache.org/jira/browse/FLINK-20219

Best,
Yang

tao7 <99727...@qq.com> 于2020年12月28日周一 上午10:26写道:

> 大家好,我使用native k8s方式部署flink1.12
>
> HA到k8s一段时间后,jobmanager-leader产生了大量的ConfigMap,这些ConfigMaps都是正常需要的吗?大家都是如何清理和维护的呢?
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink 1.12.0 kubernetes-session部署问题

2020-12-27 文章 Yang Wang
你整个流程理由有两个问题:

1. 镜像找不到
原因应该是和minikube的driver设置有关,如果是hyperkit或者其他vm的方式,你需要minikube
ssh到虚拟机内部查看镜像是否正常存在

2. JM链接无法访问
2020-12-27 22:08:12,387 INFO
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
flink session cluster session001 successfully, JobManager Web Interface:
http://192.168.99.100:8081

我猜你上面的这行log应该不是你贴出来的命令打印的,因为你给的命令是NodePort方式,打印出来的JM地址不应该是8081端口的。
只要你在minikube上提交的任务加上kubernetes.rest-service.exposed.type=NodePort,并且JM能起来,打印出来的JM地址就是可以访问的

当然你也可以手动拼接出来这个链接,minikube ip拿到APIServer地址,然后用kubectl get svc 去查看你创建的Flink
Session Cluster对应的rest svc的NodePort,拼起来访问就好了


Best,
Yang

陈帅  于2020年12月27日周日 下午10:51写道:

>
> 本人第一次尝试在k8s上部署flink,版本用的是1.12.0,jdk是1.8.0_275,scala是2.12.12,在我的mac机器上安装有minikube单机环境,以下是实验步骤:
>
>
> git clone
> https://github.com/apache/flink-dockercdflink-docker/1.12/scala_2.12-java8-debian
> docker build --tag flink:1.12.0-scala_2.12-java8 .
>
>
> cd flink-1.12.0
> ./bin/kubernetes-session.sh \
> -Dkubernetes.container.image=flink:1.12.0-scala_2.12-java8 \
> -Dkubernetes.rest-service.exposed.type=NodePort \
> -Dtaskmanager.numberOfTaskSlots=2 \
> -Dkubernetes.cluster-id=flink-session-cluster
>
>
> 显示JM启起来了,但无法通过web访问
>
> 2020-12-27 22:08:12,387 INFO
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create
> flink session cluster session001 successfully, JobManager Web Interface:
> http://192.168.99.100:8081
>
>
>
>
> 通过 `kubectl get pods` 命令查看到pod一直处理ContainerCreating状态
>
> NAME   READY   STATUS
> RESTARTS   AGE
>
> flink-session-cluster-858bd55dff-bzjk2 0/1
>  ContainerCreating   0  5m59s
>
> kubernetes-dashboard-1608509744-6bc8455756-mp47w   1/1 Running
>  0  6d14h
>
>
>
>
> 于是通过 `kubectl describe pod
> flink-session-cluster-858bd55dff-bzjk2`命令查看详细,结果如下:
>
>
>
>
> Name: flink-session-cluster-858bd55dff-bzjk2
>
> Namespace:default
>
> Priority: 0
>
> Node: minikube/192.168.99.100
>
> Start Time:   Sun, 27 Dec 2020 22:21:56 +0800
>
> Labels:   app=flink-session-cluster
>
>   component=jobmanager
>
>   pod-template-hash=858bd55dff
>
>   type=flink-native-kubernetes
>
> Annotations:  
>
> Status:   Pending
>
> IP:   172.17.0.4
>
> IPs:
>
>   IP:   172.17.0.4
>
> Controlled By:  ReplicaSet/flink-session-cluster-858bd55dff
>
> Containers:
>
>   flink-job-manager:
>
> Container ID:
>
> Image: flink:1.12.0-scala_2.12-java8
>
> Image ID:
>
> Ports: 8081/TCP, 6123/TCP, 6124/TCP
>
> Host Ports:0/TCP, 0/TCP, 0/TCP
>
> Command:
>
>   /docker-entrypoint.sh
>
> Args:
>
>   native-k8s
>
>   $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824
> -Xms1073741824 -XX:MaxMetaspaceSize=268435456
> -Dlog.file=/opt/flink/log/jobmanager.log
> -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
> -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
> -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties
> org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
> -D jobmanager.memory.off-heap.size=134217728b -D
> jobmanager.memory.jvm-overhead.min=201326592b -D
> jobmanager.memory.jvm-metaspace.size=268435456b -D
> jobmanager.memory.heap.size=1073741824b -D
> jobmanager.memory.jvm-overhead.max=201326592b
>
> State:  Waiting
>
>   Reason:   ImagePullBackOff
>
> Ready:  False
>
> Restart Count:  0
>
> Limits:
>
>   cpu: 1
>
>   memory:  1600Mi
>
> Requests:
>
>   cpu: 1
>
>   memory:  1600Mi
>
> Environment:
>
>   _POD_IP_ADDRESS:   (v1:status.podIP)
>
>   HADOOP_CONF_DIR:  /opt/hadoop/conf
>
> Mounts:
>
>   /opt/flink/conf from flink-config-volume (rw)
>
>   /opt/hadoop/conf from hadoop-config-volume (rw)
>
>   /var/run/secrets/kubernetes.io/serviceaccount from
> default-token-s47ht (ro)
>
> Conditions:
>
>   Type  Status
>
>   Initialized   True
>
>   Ready False
>
>   ContainersReady   False
>
>   PodScheduled  True
>
> Volumes:
>
>   hadoop-config-volume:
>
> Type:  ConfigMap (a volume populated by a ConfigMap)
>
> Name:  hadoop-config-flink-session-cluster
>
> Optional:  false
>
>   flink-config-volume:
>
> Type:  ConfigMap (a volume populated by a ConfigMap)
>
> Name:  flink-config-flink-session-cluster
>
> Optional:  false
>
>   default-token-s47ht:
>
> Type:Secret (a volume populated by a Secret)
>
> SecretName:  default-token-s47ht
>
> Optional:false
>
> QoS Class:   Guaranteed
>
> Node-Selectors:  
>
> Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
>
>  node.kubernetes.io/unreachable:NoExecute op=Exists for
> 300s
>
> Events:
>
>   Type Reason   Age  From   Message
>
>    --   ---

Re: Flink SQL并发度设置问题

2020-12-27 文章 Shengkai Fang
hi, 如果热点是某个key的数据量较大造成的,那么re-partition依旧无法解决这个问题。
个人认为最好的解决办法是基于window的 mini-batch 以及 local-global agg,社区正在解这类问题,可以关注下[1]

[1]https://issues.apache.org/jira/browse/FLINK-19604

赵一旦  于2020年12月28日周一 上午10:31写道:

> 还有个问题。对于window算子来说,keyBy的partition的最大并行度会设置为下游算子的最大并行度。
>
> 然后假设我window的并行度为30,那么默认情况window的最大并行度是128。我在想,如果按照平均考虑,这种情况是不是从机制上就已经有大概率会导致数据倾斜了呢?设置成32对于128才可以均衡不是吗。
>
> Shengkai Fang  于2020年12月27日周日 下午3:46写道:
>
> > 可以通过该配置[1]来设置
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-resource-default-parallelism
> >
> > 赵一旦  于2020年12月27日周日 下午12:44写道:
> >
> > > 了解下多少数据量呀,128的并发其实很高了感觉。
> > >
> > > guaishushu1...@163.com  于2020年12月26日周六
> 下午5:39写道:
> > >
> > > > Flink
> > > >
> > >
> >
> SQL中Source和sink可以通过修改connector配置实现并发度配置,而其他算子的并发度都是根据Source并发度来设置的,这样最多是128个并发度。但是有些算子做聚合等处理,128并发明显不够这个应该怎么解决呢?支持通过配置设置其他算子并发度吗?
> > > >
> > > >
> > > >
> > > > guaishushu1...@163.com
> > > >
> > >
> >
>


flink1.12????OSError: Expected IPC message of type schema but got record batch

2020-12-27 文章 ??????
??pyflink??udf
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 8: Traceback (most recent call last):
  File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 253, in _execute
    response = task()
  File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 310, in 

flink 1.12 插入 hive 表找不到 .staging_xxxx 文件

2020-12-27 文章 macdoor
flink 1.12 standalone cluster,定时batch 模式 insert overwrite 到 hive 表,会随机出现找不到
.staging_ 文件的错误,完整错误信息如下:org.apache.flink.runtime.JobException: Recovery
is suppressed by NoRestartBackoffTimeStrategy   at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:89)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:240)
at
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyGlobalFailure(UpdateSchedulerNgOnInternalFailuresListener.java:65)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.failGlobal(ExecutionGraph.java:1055)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1305)
at
org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:849)
at
org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:1127)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateStateInternal(ExecutionGraph.java:1512)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1485)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:604)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
at sun.reflect.GeneratedMethodAccessor56.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)   at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)  at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at
akka.actor.Actor$class.aroundReceive(Actor.scala:517)   at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at
akka.actor.ActorCell.invoke(ActorCell.scala:561)at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
akka.dispatch.Mailbox.run(Mailbox.scala:225)at
akka.dispatch.Mailbox.exec(Mailbox.scala:235)   at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)   at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
by: java.lang.Exception: Failed to finalize execution on master ... 33
moreCaused by: org.apache.flink.table.api.TableException: Exception in
finalizeGlobal  at
org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:97)
at
org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:131)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1299)
... 32 moreCaused by: java.io.FileNotFoundException: File
hdfs://service1/user/hive/warehouse/snmpprobe.db/p_port_packet_loss_5m/.staging_1609040810292
does not exist. at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:901)
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:112)
at
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:961)
at
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:958)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:958)
at
org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:165)
at
org.apache.flink.table.filesystem.PartitionTempFileManager.headCheckpoints(PartitionTempFileManager.java:140)
at
org.apache.flink.table.filesystem.FileSystemCommitter.commitUpToCheckpoint(FileSystemCommitter.java:98)
at
org.apache.flink.table.filesystem.FileSystemOutpu

flink1.12.0 HA on k8s native运行一段时间后jobmanager-leader产生大量ConfigMap问题

2020-12-27 文章 tao7
大家好,我使用native k8s方式部署flink1.12
HA到k8s一段时间后,jobmanager-leader产生了大量的ConfigMap,这些ConfigMaps都是正常需要的吗?大家都是如何清理和维护的呢?




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink SQL并发度设置问题

2020-12-27 文章 赵一旦
还有个问题。对于window算子来说,keyBy的partition的最大并行度会设置为下游算子的最大并行度。
然后假设我window的并行度为30,那么默认情况window的最大并行度是128。我在想,如果按照平均考虑,这种情况是不是从机制上就已经有大概率会导致数据倾斜了呢?设置成32对于128才可以均衡不是吗。

Shengkai Fang  于2020年12月27日周日 下午3:46写道:

> 可以通过该配置[1]来设置
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-exec-resource-default-parallelism
>
> 赵一旦  于2020年12月27日周日 下午12:44写道:
>
> > 了解下多少数据量呀,128的并发其实很高了感觉。
> >
> > guaishushu1...@163.com  于2020年12月26日周六 下午5:39写道:
> >
> > > Flink
> > >
> >
> SQL中Source和sink可以通过修改connector配置实现并发度配置,而其他算子的并发度都是根据Source并发度来设置的,这样最多是128个并发度。但是有些算子做聚合等处理,128并发明显不够这个应该怎么解决呢?支持通过配置设置其他算子并发度吗?
> > >
> > >
> > >
> > > guaishushu1...@163.com
> > >
> >
>


Re: checkpoint持久化问题

2020-12-27 文章 赵一旦
首先,要保重在保留数量范围内。
其次,你的任务不能是stop的任务,flink会在任务stop的时候删除所有检查点。
cancel的时候不会删除。

Yun Tang  于2020年12月27日周日 下午5:55写道:

> Hi
>
> 既然UI上已经显示成功了,一定是成功且成功保存到HDFS上了,可以看下父目录的情况,chk-x 目录可能随着新的checkpoint完成而被删除
>
> 祝好
> 唐云
> 
> From: chen310 <1...@163.com>
> Sent: Friday, December 25, 2020 16:01
> To: user-zh@flink.apache.org 
> Subject: checkpoint持久化问题
>
> 问题:
> flink sql中设置了job挂掉后checkpoint保留
>
> execution.checkpointing.externalized-checkpoint-retention
> RETAIN_ON_CANCELLATION
>
> 并且配置了checkpoint保存到hdfs上
>
> state.backend rocksdb
>
> #增量checkpoint
> #state.backend.incremental true
> state.checkpoints.dir hdfs:///tmp/flink/checkpoint
>
>
> flink实际也做了checkpoint,但是用这个路径去hdfs上查询,并不存在对应的路径的目录,好像并不是每次做checkpoint都会持久化到hdfs上,这个是要做啥配置么?让每次checkpoint都保存到磁盘
>
> <
> http://apache-flink.147419.n8.nabble.com/file/t572/4C10EE51C7A01D45B9278BE2D2CD6C10.jpg
> >
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Command: Flink savepoint -d reported an error。

2020-12-27 文章 赢峰


这个有解决吗?我的也是报 Missing required argument: savepoint path. Usage: bin/flink 
savepoint -d 
| |
赢峰
|
|
si_ji_f...@163.com
|
签名由网易邮箱大师定制


On 09/30/2019 15:06,pengchengl...@163.com wrote:
你好,感谢回复,我的命令是参照官方网站,https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#trigger-a-savepoint
$ bin/flink savepoint -d :savepointPath
这个命令是删除savepoint,你说的是触发savepoint,是没有问题的。


pengchengl...@163.com

发件人: 狄玉坤
发送时间: 2019-09-30 14:59
收件人: user-zh@flink.apache.org
主题: Re: Re: Command: Flink savepoint -d reported an error。
缺少了jobId 参数;
./bin/flink savepoint  [savepointDirectory]
This will trigger a savepoint for the job with ID jobId, and returns the path 
of the created savepoint. You need this path to restore and dispose savepoints.
| |
diyukun2019
|
|
diyukun2...@163.com
|
签名由网易邮箱大师定制
On 9/30/2019 14:56,pengchengl...@163.com wrote:
谢谢你的回复,我试了下,还是说缺少必填参数。
如下:
$ bin/flink savepoint -d 
file://home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/


The program finished with the following exception:

java.lang.NullPointerException: Missing required argument: savepoint path. 
Usage: bin/flink savepoint -d 
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at 
org.apache.flink.client.cli.CliFrontend.disposeSavepoint(CliFrontend.java:721)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$8(CliFrontend.java:657)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:945)
at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:654)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)




pengchengl...@163.com

发件人: 星沉
发送时间: 2019-09-30 14:47
收件人: user-zh
主题: Re: Command: Flink savepoint -d reported an error。
savepoint路径参数不对。file后面多了个一个/吧。


pengchengl...@163.com wrote:
$ bin/flink savepoint 
-dfile:///home/pengchenglin/flinkresult/savepoint/savepoint-ffbc58-9daf3085de9a/


The program finished with the following exception:

java.lang.NullPointerException: Missing required argument: savepoint path. 
Usage: bin/flink savepoint -d 


Re: test

2020-12-27 文章 liang zhao
请不要发送这些无意义的邮件

> 2020年12月27日 23:19,蒋德祥  写道:
> 
> 



flink-connector-clickhouse写入ClickHouse 问题

2020-12-27 文章 DanielGu
使用了阿里的包,写入clickhouse
阿里云flink-connector-clickhouse写入ClickHouse

  

测试写入clickhouse ,返回如下,无报错,但并未成功写入,不知从何下手排查,请教各位大佬
+-+
| default_catalog.default_database.sink_table |
+-+
|  -1 |
+-+


代码如下
package com.daniel
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sources._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api._
import org.apache.flink.types.Row
import org.apache.flink.table.api.{
  TableEnvironment,
  TableSchema,
  Types,
  ValidationException
}

object StreamingJob {
  def main(args: Array[String]) {
val SourceCsvPath =
  "/Users/flink-sql-demo/flink-sql-source.csv"

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.getConfig.disableClosureCleaner

val tEnv = StreamTableEnvironment.create(env)

val csvTableSource = CsvTableSource
  .builder()
  .path(SourceCsvPath)
  .ignoreFirstLine()
  .fieldDelimiter(",")
  .field("name", DataTypes.STRING())
  .field("age", DataTypes.BIGINT())
//  .field("sex", DataTypes.STRING())
//  .field("grade", DataTypes.INT())
  .field("rate", DataTypes.FLOAT())
  .build()

tEnv.registerTableSource("source", csvTableSource)

val create_sql =
  s"""
 | CREATE TABLE sink_table (
 |name VARCHAR
 |) WITH (
 |'connector' = 'clickhouse',
 |'url' = 'clickhouse://*:8080',
 |'username' = '',
 |'password' = '',
 |'database-name' = '***',
 |'table-name' = 'live.d_sink_table',
 |'sink.batch-size' = '1',
 |'sink.partition-strategy' = 'hash',
 |'sink.partition-key' = 'name'
 |)
 |""".stripMargin



tEnv.executeSql(create_sql);

val result = tEnv.executeSql(
  "INSERT INTO sink_table SELECT name FROM source"
)
result.print()
  }

}






--
Sent from: http://apache-flink.147419.n8.nabble.com/


test

2020-12-27 文章 蒋德祥



flink 1.12.0 kubernetes-session部署问题

2020-12-27 文章 陈帅
本人第一次尝试在k8s上部署flink,版本用的是1.12.0,jdk是1.8.0_275,scala是2.12.12,在我的mac机器上安装有minikube单机环境,以下是实验步骤:


git clone 
https://github.com/apache/flink-dockercdflink-docker/1.12/scala_2.12-java8-debian
docker build --tag flink:1.12.0-scala_2.12-java8 .


cd flink-1.12.0
./bin/kubernetes-session.sh \ 
-Dkubernetes.container.image=flink:1.12.0-scala_2.12-java8 \ 
-Dkubernetes.rest-service.exposed.type=NodePort \ 
-Dtaskmanager.numberOfTaskSlots=2 \ 
-Dkubernetes.cluster-id=flink-session-cluster


显示JM启起来了,但无法通过web访问

2020-12-27 22:08:12,387 INFO  
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create flink 
session cluster session001 successfully, JobManager Web Interface: 
http://192.168.99.100:8081




通过 `kubectl get pods` 命令查看到pod一直处理ContainerCreating状态

NAME   READY   STATUS  
RESTARTS   AGE

flink-session-cluster-858bd55dff-bzjk2 0/1 ContainerCreating   
0  5m59s

kubernetes-dashboard-1608509744-6bc8455756-mp47w   1/1 Running 
0  6d14h




于是通过 `kubectl describe pod flink-session-cluster-858bd55dff-bzjk2`命令查看详细,结果如下:




Name: flink-session-cluster-858bd55dff-bzjk2

Namespace:default

Priority: 0

Node: minikube/192.168.99.100

Start Time:   Sun, 27 Dec 2020 22:21:56 +0800

Labels:   app=flink-session-cluster

  component=jobmanager

  pod-template-hash=858bd55dff

  type=flink-native-kubernetes

Annotations:  

Status:   Pending

IP:   172.17.0.4

IPs:

  IP:   172.17.0.4

Controlled By:  ReplicaSet/flink-session-cluster-858bd55dff

Containers:

  flink-job-manager:

Container ID:  

Image: flink:1.12.0-scala_2.12-java8

Image ID:  

Ports: 8081/TCP, 6123/TCP, 6124/TCP

Host Ports:0/TCP, 0/TCP, 0/TCP

Command:

  /docker-entrypoint.sh

Args:

  native-k8s

  $JAVA_HOME/bin/java -classpath $FLINK_CLASSPATH -Xmx1073741824 
-Xms1073741824 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/opt/flink/log/jobmanager.log 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties 
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties 
org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint -D 
jobmanager.memory.off-heap.size=134217728b -D 
jobmanager.memory.jvm-overhead.min=201326592b -D 
jobmanager.memory.jvm-metaspace.size=268435456b -D 
jobmanager.memory.heap.size=1073741824b -D 
jobmanager.memory.jvm-overhead.max=201326592b

State:  Waiting

  Reason:   ImagePullBackOff

Ready:  False

Restart Count:  0

Limits:

  cpu: 1

  memory:  1600Mi

Requests:

  cpu: 1

  memory:  1600Mi

Environment:

  _POD_IP_ADDRESS:   (v1:status.podIP)

  HADOOP_CONF_DIR:  /opt/hadoop/conf

Mounts:

  /opt/flink/conf from flink-config-volume (rw)

  /opt/hadoop/conf from hadoop-config-volume (rw)

  /var/run/secrets/kubernetes.io/serviceaccount from default-token-s47ht 
(ro)

Conditions:

  Type  Status

  Initialized   True 

  Ready False 

  ContainersReady   False 

  PodScheduled  True 

Volumes:

  hadoop-config-volume:

Type:  ConfigMap (a volume populated by a ConfigMap)

Name:  hadoop-config-flink-session-cluster

Optional:  false

  flink-config-volume:

Type:  ConfigMap (a volume populated by a ConfigMap)

Name:  flink-config-flink-session-cluster

Optional:  false

  default-token-s47ht:

Type:Secret (a volume populated by a Secret)

SecretName:  default-token-s47ht

Optional:false

QoS Class:   Guaranteed

Node-Selectors:  

Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s

 node.kubernetes.io/unreachable:NoExecute op=Exists for 300s

Events:

  Type Reason   Age  From   Message

   --       ---

  Normal   Scheduled21m  default-scheduler  Successfully 
assigned default/flink-session-cluster-858bd55dff-bzjk2 to minikube

  Warning  FailedMount  21m (x2 over 21m)kubelet
MountVolume.SetUp failed for volume "flink-config-volume" : configmap 
"flink-config-flink-session-cluster" not found

  Warning  FailedMount  21m (x2 over 21m)kubelet
MountVolume.SetUp failed for volume "hadoop-config-volume" : configmap 
"hadoop-config-flink-session-cluster" not found

  Normal   Pulling  13m (x4 over 21m)kubeletPulling image 
"flink:1.12.0-scala_2.12-java8"

  Warning  Failed   13m (x4 over 15m)kubeletFailed to pull 
image "flink:1.12.0-scala_2.12-java8": rpc error: code = Unknown desc = Error 
response from daemon: manifest for flink:1.12.0-scala_2.12-j

Re: checkpoint持久化问题

2020-12-27 文章 Yun Tang
Hi

既然UI上已经显示成功了,一定是成功且成功保存到HDFS上了,可以看下父目录的情况,chk-x 目录可能随着新的checkpoint完成而被删除

祝好
唐云

From: chen310 <1...@163.com>
Sent: Friday, December 25, 2020 16:01
To: user-zh@flink.apache.org 
Subject: checkpoint持久化问题

问题:
flink sql中设置了job挂掉后checkpoint保留

execution.checkpointing.externalized-checkpoint-retention
RETAIN_ON_CANCELLATION

并且配置了checkpoint保存到hdfs上

state.backend rocksdb

#增量checkpoint
#state.backend.incremental true
state.checkpoints.dir hdfs:///tmp/flink/checkpoint

flink实际也做了checkpoint,但是用这个路径去hdfs上查询,并不存在对应的路径的目录,好像并不是每次做checkpoint都会持久化到hdfs上,这个是要做啥配置么?让每次checkpoint都保存到磁盘





--
Sent from: http://apache-flink.147419.n8.nabble.com/