Re: Deploying Jobmanager on k8s as a Deployment

2022-09-07 Thread Yang Wang
For native K8s integration, the Flink ResourceManager will delete the
JobManager K8s deployment as well as the HA data once the job reached a
globally terminal state.

However, it is indeed a problem for standalone mode since the JobManager
will be restarted again even the job has finished. I think the
flink-kubernetes-operator could handle this situation by doing the cleanup.


Best,
Yang

Austin Cawley-Edwards  于2022年9月8日周四 06:01写道:

> Hey Gil,
>
> I'm referring to when a pod exits on its own, not when being deleted.
> Deployments only support the "Always" restart policy [1].
>
> In my understanding, the JM only cleans up HA data when it is shutdown[2],
> after which the process will exit which leads to the problem with k8s
> Deployment restart policies.
>
> Best,
> Austin
>
> [1]:
> https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#pod-template
> [2]:
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/rest_api/#cluster
>
> On Wed, Sep 7, 2022 at 4:43 PM Gil De Grove 
> wrote:
>
>> Hello Austin,
>>
>> I'm not aware of any limitations of deployement not letting pod exit
>> (correctly or incorrectly). What do you mean by that exactly? Would it be
>> possible for you to point out to piece of documentation that make you think
>> that ?
>>
>> A pod, if correctly setup will be exited when receiving it's sigterm or
>> sigkill from the orchestrator.
>> So when "deleting" the deployment, the pods are quitted correctly. In the
>> case flink did triggered a savepoint before, you can then restart from that
>> savepoint.
>> Usually, when a pod is not being terminated this means that the SIG is
>> not transferred to the correct process.
>>
>> Hopes this helps.
>>
>> Regards,
>> Gil
>>
>>
>> On Wed, Sep 7, 2022, 21:16 Austin CawleyEdwards 
>> wrote:
>>
>>> Cool, thanks! How does it clean up the HA data, if the cluster is never
>>> able to shut down (due to the k8s Deployment restriction)?
>>>
>>> Best,
>>> Austin
>>>
>>> On Mon, Sep 5, 2022 at 6:51 PM Gyula Fóra  wrote:
>>>
 Hi!

 The operator supports both Flink native and standalone deployment modes
 and in both cases the JM is deployed as k8s Deployment.

 During upgrade Flink/operator deletes the deployment after savepoint
 and waits for termination before it creates a new one with the updated
 spec.

 Cheers,
 Gyula

 On Mon, 5 Sep 2022 at 07:41, Austin Cawley-Edwards <
 austin.caw...@gmail.com> wrote:

> Hey Marco,
>
> Unfortunately there is no built in k8s API that models an application
> mode JM exactly but Deployments should be fine, in general. As Gyula 
> notes,
> where they can be difficult is during application upgrades as Deployments
> never let their pods exit, even if successful, so there is no way to stop
> the cluster gracefully.
>
> Is stopping your application with a savepoint and redeploying a
> workable solution for image upgrades? In this way a Job could still be
> used.
>
>
> @Gyula, how are JMs handled in the operator? Job, Deployment, or
> something custom?
>
>
> Best,
> Austin
>
>
>
> On Mon, Sep 5, 2022 at 6:15 AM Gyula Fóra 
> wrote:
>
>> You can use deployments of course , the operator and native k8s
>> integration does exactly that.
>>
>> Even then job updates can be tricky so I believe you are much better
>> off with the operator.
>>
>> Gyula
>>
>> On Sun, 4 Sep 2022 at 11:11, marco andreas 
>> wrote:
>>
>>> Hello,
>>>
>>> Thanks for the response, I will take a look at it.
>>>
>>> But if we aren't able to use the flink operator due to technical
>>> constraints is it possible to deploy the JM as deployment without any
>>> consequences that I am not aware of?
>>>
>>> Sincerely,
>>>
>>> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a
>>> écrit :
>>>
 Hi!
 You should check out the Flink Kubernetes Operator. I think that
 covers all your needs .


 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/

 Cheers,
 Gyula

 On Sat, 3 Sep 2022 at 13:45, marco andreas <
 marcoandreas...@gmail.com> wrote:

>
> We are deploying a flink application cluster on k8S. Following the
> official documentation the JM is deployed As a job resource , however 
> we
> are deploying a long running flink job that is not supposed to be
> terminated and also we need to update the image of the flink job.
>
>  The problem is that the job is an immutable resource, we
> cant update it.
>
> So I'm wondering if it's possible to use a deployment resource for
> the jobmanager and if there will be any side effects or repercussions.
>
> Thanks,
>

Re: [Flink 1.15.1 - Application mode native k8s Exception] - Exception occurred while acquiring lock 'ConfigMapLock

2022-09-07 Thread Yang Wang
Given that you are running multiple JobManagers, it does not matter for the
"already exists" exception during leader election.

BTW, I think running multiple JobManagers does not take enough advantages
when deploying Flink on Kubernetes. Because a new JobManager will be
started immediately once the old one crashed.
And Flink JobManager always needs to recover the job from the latest
checkpoint no matter how many JobManager are running.

Best,
Yang

Tamir Sagi  于2022年9月5日周一 21:48写道:

> Hey Yang,
>
> The flink-conf.yaml submitted to the cluster does not contain 
> "kubernetes.config.file"
> at all.
> In addition, I verified flink config maps under cluster's namespace do not
> contain "kubernetes.config.file".
>
> In addition, we also noticed the following exception (appears to happen
> sporadically)
>
> 2022-09-04T21:06:35,231][Error] {} [i.f.k.c.e.l.LeaderElector]: Exception
> occurred while acquiring lock 'ConfigMapLock: dev-0-flink-jobs -
> data-agg-events-insertion-cluster-config-map
> (fa3dbbc5-1753-46cd-afaf-0baf8ff0947f)'
> io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
> Unable to create ConfigMapLock
>
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
> executing: POST at:
> https://172.20.0.1/api/v1/namespaces/dev-0-flink-jobs/configmaps.
> Message: configmaps "data-agg-events-insertion-cluster-config-map" already
> exists.
>
> Log file is enclosed.
>
> Thanks,
> Tamir.
>
> --
> *From:* Yang Wang 
> *Sent:* Monday, September 5, 2022 3:03 PM
> *To:* Tamir Sagi 
> *Cc:* user@flink.apache.org ; Lihi Peretz <
> lihi.per...@niceactimize.com>
> *Subject:* Re: [Flink 1.15.1 - Application mode native k8s Exception] -
> Exception occurred while acquiring lock 'ConfigMapLock
>
>
> *EXTERNAL EMAIL*
>
>
> Could you please check whether the "kubernetes.config.file" is configured
> to /opt/flink/.kube/config in the Flink configmap?
> It should be removed before creating the Flink configmap.
>
> Best,
> Yang
>
> Tamir Sagi  于2022年9月4日周日 18:08写道:
>
> Hey All,
>
> We recently updated to Flink 1.15.1. We deploy stream cluster in
> Application mode in Native K8S.(Deployed on Amazon EKS).  The cluster is
> configured with Kubernetes HA Service, Minimum 3 replicas of Job manager
> and pod-template which is configured with topologySpreadConstraints to
> enable distribution across different availability zones.
> HA storage directory is on S3.
>
> The cluster is deployed and running properly, however, after a while we
> noticed the following exception in Job manager instance(the log file is
> enclosed)
>
> 2022-09-04T02:05:33,097][Error] {} [i.f.k.c.e.l.LeaderElector]: Exception
> occurred while acquiring lock 'ConfigMapLock: dev-0-flink-jobs -
> data-agg-events-insertion-cluster-config-map
> (b6da2ae2-ad2b-471c-801e-ea460a348fab)'
> io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]
>  for kind: [ConfigMap]  with name:
> [data-agg-events-insertion-cluster-config-map]  in namespace:
> [dev-0-flink-jobs]  failed.
> Caused by: java.io.FileNotFoundException: /opt/flink/.kube/config (No such
> file or directory)
> at java.io.FileInputStream.open0(Native Method) ~[?:?]
> at java.io.FileInputStream.open(Unknown Source) ~[?:?]
> at java.io.FileInputStream.(Unknown Source) ~[?:?]
> at
> org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.dataformat.yaml.YAMLFactory.createParser(YAMLFactory.java:354)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.dataformat.yaml.YAMLFactory.createParser(YAMLFactory.java:15)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3494)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.internal.KubeConfigUtils.parseConfig(KubeConfigUtils.java:42)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.utils.TokenRefreshInterceptor.intercept(TokenRefreshInterceptor.java:44)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createApplicableInterceptors$6(HttpClientUtils.java:290)
> 

Re: Where will the state be stored in the taskmanager when using rocksdbstatebend?

2022-09-07 Thread Yuan Mei
Hey Hjw,

Under the current Flink architecture (i.e., task states are stored locally
and periodically uploaded to remote durable storage during checkpointing),
there is no other way rather than scaling out your application to solve the
problem. This is equivalent to making the state size in each task smaller
so that it can fit into a single container.

We have seen similar issues from other users/customers, and have plans to
solve this problem in a more fundamental way to support remote states as
well (when the local quota is used up, the state can also directly writes
remotely).

For now, I would suggest increasing the parallelism of your job to solve
this problem.

Best
Yuan

On Tue, Sep 6, 2022 at 7:59 PM Alexander Fedulov 
wrote:

> Well, in that case, it is similar to the situation of hitting the limits
> of vertical scaling - you'll have to scale out horizontally.
> You could consider sizing down the number of CPU and RAM you allocate to
> each task manager, but instead increase their count (and your job's
> parallelism).
> It might come with its own downsides, so measure as you go. This might
> also be problematic if you have significant key skew for some of your key
> ranges.
>
> Best,
> Alex
>
> On Tue, Sep 6, 2022 at 8:09 AM hjw <1010445...@qq.com> wrote:
>
>> Hi,Alexander
>>
>> When Flink job deployed on Native k8s, taskmanager is a Pod.The data
>> directory size of a single container is limited in our company.Are there
>> any idea to deal with this ?
>>
>> --
>> Best,
>> Hjw
>>
>>
>>
>> -- 原始邮件 --
>> *发件人:* "Alexander Fedulov" ;
>> *发送时间:* 2022年9月6日(星期二) 凌晨3:19
>> *收件人:* "hjw"<1010445...@qq.com>;
>> *抄送:* "user";
>> *主题:* Re: Where will the state be stored in the taskmanager when using
>> rocksdbstatebend?
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#state-backend-rocksdb-localdir
>> Make sure to use a local SSD disk (not NFS/EBS).
>>
>> Best,
>> Alexander Fedulov
>>
>> On Mon, Sep 5, 2022 at 7:24 PM hjw <1010445...@qq.com> wrote:
>>
>>> The EmbeddedRocksDBStateBackend holds in-flight data in a RocksDB
>>>  database that is (per default) stored in the
>>> TaskManager local data directories.
>>> Which path does local data directories store RocksDB database in
>>> TaskManager point to in operating system?
>>> If the job state is very large, I think I should  take some measures to
>>> deal with it.(mount a volume for local data directories store RocksDB
>>> database etc...)
>>>
>>> thx.
>>>
>>> --
>>> Best,
>>> Hjw
>>>
>>


Re: 关于 UDAF 里面 ListView 的疑问

2022-09-07 Thread Zhiwen Sun
hi,

感谢你的回复。

报错是在 getValue 的时候。

at GroupAggsHandler$439.getValue(Unknown Source)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:146)



我的疑问是使用一个 Class 包装下 ListView 就能正常工作,而直接使用 ListView 是会报错。

比如使用  AggregateFunction  就正常,而使用
AggregateFunction>  就会 NPE。


我怀疑使用 ListView 时,无法正常获得 TypeInference。


Zhiwen Sun



On Wed, Sep 7, 2022 at 11:46 PM Xuyang  wrote:

> Hi,
> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。
>
>
>
>
> 实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2022-09-07 16:23:25,"Zhiwen Sun"  写道:
>
> Hi,
> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象


flink 1.13 发现下游busy是0 ,上游backpressure 是100%

2022-09-07 Thread 周虓岗




通过metrics观察,下游的input queue都是0, 上游queue是满的




观察监控,发现当出现速率跌零的时候,下游的input queue是0
max by 
(exported_task_name)(flink_taskmanager_job_task_Network_Input_0_maxQueueLen{job_name=~'$job_name'})


此时上游的队列还是满的
(flink_taskmanager_job_task_Network_Output_0_maxQueueLen{job_name=~'$job_name'})








这个问题有没有什么想法

关于flink table store的疑问

2022-09-07 Thread Kyle Zhang
Hi all,
  看table
store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目?

Best.


Re: Deploying Jobmanager on k8s as a Deployment

2022-09-07 Thread Austin Cawley-Edwards
Hey Gil,

I'm referring to when a pod exits on its own, not when being deleted.
Deployments only support the "Always" restart policy [1].

In my understanding, the JM only cleans up HA data when it is shutdown[2],
after which the process will exit which leads to the problem with k8s
Deployment restart policies.

Best,
Austin

[1]:
https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#pod-template
[2]:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/rest_api/#cluster

On Wed, Sep 7, 2022 at 4:43 PM Gil De Grove  wrote:

> Hello Austin,
>
> I'm not aware of any limitations of deployement not letting pod exit
> (correctly or incorrectly). What do you mean by that exactly? Would it be
> possible for you to point out to piece of documentation that make you think
> that ?
>
> A pod, if correctly setup will be exited when receiving it's sigterm or
> sigkill from the orchestrator.
> So when "deleting" the deployment, the pods are quitted correctly. In the
> case flink did triggered a savepoint before, you can then restart from that
> savepoint.
> Usually, when a pod is not being terminated this means that the SIG is not
> transferred to the correct process.
>
> Hopes this helps.
>
> Regards,
> Gil
>
>
> On Wed, Sep 7, 2022, 21:16 Austin CawleyEdwards 
> wrote:
>
>> Cool, thanks! How does it clean up the HA data, if the cluster is never
>> able to shut down (due to the k8s Deployment restriction)?
>>
>> Best,
>> Austin
>>
>> On Mon, Sep 5, 2022 at 6:51 PM Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> The operator supports both Flink native and standalone deployment modes
>>> and in both cases the JM is deployed as k8s Deployment.
>>>
>>> During upgrade Flink/operator deletes the deployment after savepoint and
>>> waits for termination before it creates a new one with the updated spec.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Mon, 5 Sep 2022 at 07:41, Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hey Marco,

 Unfortunately there is no built in k8s API that models an application
 mode JM exactly but Deployments should be fine, in general. As Gyula notes,
 where they can be difficult is during application upgrades as Deployments
 never let their pods exit, even if successful, so there is no way to stop
 the cluster gracefully.

 Is stopping your application with a savepoint and redeploying a
 workable solution for image upgrades? In this way a Job could still be
 used.


 @Gyula, how are JMs handled in the operator? Job, Deployment, or
 something custom?


 Best,
 Austin



 On Mon, Sep 5, 2022 at 6:15 AM Gyula Fóra  wrote:

> You can use deployments of course , the operator and native k8s
> integration does exactly that.
>
> Even then job updates can be tricky so I believe you are much better
> off with the operator.
>
> Gyula
>
> On Sun, 4 Sep 2022 at 11:11, marco andreas 
> wrote:
>
>> Hello,
>>
>> Thanks for the response, I will take a look at it.
>>
>> But if we aren't able to use the flink operator due to technical
>> constraints is it possible to deploy the JM as deployment without any
>> consequences that I am not aware of?
>>
>> Sincerely,
>>
>> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a
>> écrit :
>>
>>> Hi!
>>> You should check out the Flink Kubernetes Operator. I think that
>>> covers all your needs .
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Sat, 3 Sep 2022 at 13:45, marco andreas <
>>> marcoandreas...@gmail.com> wrote:
>>>

 We are deploying a flink application cluster on k8S. Following the
 official documentation the JM is deployed As a job resource , however 
 we
 are deploying a long running flink job that is not supposed to be
 terminated and also we need to update the image of the flink job.

  The problem is that the job is an immutable resource, we
 cant update it.

 So I'm wondering if it's possible to use a deployment resource for
 the jobmanager and if there will be any side effects or repercussions.

 Thanks,

>>> Gil De Grove
>


Re: Deploying Jobmanager on k8s as a Deployment

2022-09-07 Thread Gil De Grove
Hello Austin,

I'm not aware of any limitations of deployement not letting pod exit
(correctly or incorrectly). What do you mean by that exactly? Would it be
possible for you to point out to piece of documentation that make you think
that ?

A pod, if correctly setup will be exited when receiving it's sigterm or
sigkill from the orchestrator.
So when "deleting" the deployment, the pods are quitted correctly. In the
case flink did triggered a savepoint before, you can then restart from that
savepoint.
Usually, when a pod is not being terminated this means that the SIG is not
transferred to the correct process.

Hopes this helps.

Regards,
Gil


On Wed, Sep 7, 2022, 21:16 Austin CawleyEdwards 
wrote:

> Cool, thanks! How does it clean up the HA data, if the cluster is never
> able to shut down (due to the k8s Deployment restriction)?
>
> Best,
> Austin
>
> On Mon, Sep 5, 2022 at 6:51 PM Gyula Fóra  wrote:
>
>> Hi!
>>
>> The operator supports both Flink native and standalone deployment modes
>> and in both cases the JM is deployed as k8s Deployment.
>>
>> During upgrade Flink/operator deletes the deployment after savepoint and
>> waits for termination before it creates a new one with the updated spec.
>>
>> Cheers,
>> Gyula
>>
>> On Mon, 5 Sep 2022 at 07:41, Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey Marco,
>>>
>>> Unfortunately there is no built in k8s API that models an application
>>> mode JM exactly but Deployments should be fine, in general. As Gyula notes,
>>> where they can be difficult is during application upgrades as Deployments
>>> never let their pods exit, even if successful, so there is no way to stop
>>> the cluster gracefully.
>>>
>>> Is stopping your application with a savepoint and redeploying a workable
>>> solution for image upgrades? In this way a Job could still be used.
>>>
>>>
>>> @Gyula, how are JMs handled in the operator? Job, Deployment, or
>>> something custom?
>>>
>>>
>>> Best,
>>> Austin
>>>
>>>
>>>
>>> On Mon, Sep 5, 2022 at 6:15 AM Gyula Fóra  wrote:
>>>
 You can use deployments of course , the operator and native k8s
 integration does exactly that.

 Even then job updates can be tricky so I believe you are much better
 off with the operator.

 Gyula

 On Sun, 4 Sep 2022 at 11:11, marco andreas 
 wrote:

> Hello,
>
> Thanks for the response, I will take a look at it.
>
> But if we aren't able to use the flink operator due to technical
> constraints is it possible to deploy the JM as deployment without any
> consequences that I am not aware of?
>
> Sincerely,
>
> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a
> écrit :
>
>> Hi!
>> You should check out the Flink Kubernetes Operator. I think that
>> covers all your needs .
>>
>>
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>>
>> Cheers,
>> Gyula
>>
>> On Sat, 3 Sep 2022 at 13:45, marco andreas 
>> wrote:
>>
>>>
>>> We are deploying a flink application cluster on k8S. Following the
>>> official documentation the JM is deployed As a job resource , however we
>>> are deploying a long running flink job that is not supposed to be
>>> terminated and also we need to update the image of the flink job.
>>>
>>>  The problem is that the job is an immutable resource, we
>>> cant update it.
>>>
>>> So I'm wondering if it's possible to use a deployment resource for
>>> the jobmanager and if there will be any side effects or repercussions.
>>>
>>> Thanks,
>>>
>> Gil De Grove


Re: Mixed up session aggregations for same key

2022-09-07 Thread David Anderson
The way that Flink handles session windows is that every new event is
initially assigned to its own session window, and then overlapping sessions
are merged. I imagine this is why you are seeing so many calls
to createAccumulator.

This implementation choice is deeply embedded in the code; I don't think it
can be avoided.

If you can afford to wait until a session ends to emit the session start
event, then you will only be reporting once for each session. Another
solution might be to implement your own windowing using a process function
-- but if you are using event time logic, and if the events can be
processed out of order, I suspect it would be difficult to do much better.

David

On Mon, Sep 5, 2022 at 9:45 PM Kristinn Danielsson via user <
user@flink.apache.org> wrote:

> Hi,
>
>
>
> I'm trying to migrate a KafkaStreams application to Flink (using
> DataStream API).
>
> The application consumes a high traffic (millions of events per second)
> Kafka
>
> topic and collects events into sessions keyed by id. To reduce the load on
>
> subsequent processing steps I want to output one event on session start
> and one
>
> event on session end. So, I set up a pipeline which keys the stream by id,
>
> aggregates the events over a event time session window with a gap of 4
> seconds.
>
> I also implemented a custom trigger to trigger when the first event
>
> arrives in a window.
>
>
>
> When I run this pipeline I somtimes observe that I get multiple calls to
> the
>
> aggregator's "createAccumulator" method for a given session id, and
> therefore I
>
> also get duplicate session start and session end events for the session id.
>
> So it looks to me that the Flink is collecting the events into multiple
> sessions
>
> even if they have the same session id.
>
>
>
> Examples:
>
>
>
> Input events:
>
> Event timestamp Id
>
> 2022-09-06 08:00:00 ABC
>
> 2022-09-06 08:00:01 ABC
>
> 2022-09-06 08:00:02 ABC
>
> 2022-09-06 08:00:03 ABC
>
> 2022-09-06 08:00:04 ABC
>
> 2022-09-06 08:00:05 ABC
>
>
>
> Problem 1:
>
> Output events:
>
> Event time  Id  Type
>
> 2022-09-06 08:00:00 ABC Start
>
> 2022-09-06 08:00:03 ABC End
>
> 2022-09-06 08:00:04 ABC Start
>
> 2022-09-06 08:00:05 ABC End
>
> Problem 2:
>
> Output events:
>
> Event time  Id  Type
>
> 2022-09-06 08:00:00 ABC Start
>
> 2022-09-06 08:00:03 ABC Start
>
> 2022-09-06 08:00:04 ABC End
>
> 2022-09-06 08:00:05 ABC End
>
>
>
> Expected output:
>
> Event time  Id  Type
>
> 2022-09-06 08:00:00 ABC Start
>
> 2022-09-06 08:00:05 ABC End
>
>
>
>
>
> Is this expected behaviour? How can I avoid getting duplicate session
> windows?
>
>
>
> Thanks for your help
>
> Kristinn
>


Re: Deploying Jobmanager on k8s as a Deployment

2022-09-07 Thread Austin Cawley-Edwards
Cool, thanks! How does it clean up the HA data, if the cluster is never
able to shut down (due to the k8s Deployment restriction)?

Best,
Austin

On Mon, Sep 5, 2022 at 6:51 PM Gyula Fóra  wrote:

> Hi!
>
> The operator supports both Flink native and standalone deployment modes
> and in both cases the JM is deployed as k8s Deployment.
>
> During upgrade Flink/operator deletes the deployment after savepoint and
> waits for termination before it creates a new one with the updated spec.
>
> Cheers,
> Gyula
>
> On Mon, 5 Sep 2022 at 07:41, Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hey Marco,
>>
>> Unfortunately there is no built in k8s API that models an application
>> mode JM exactly but Deployments should be fine, in general. As Gyula notes,
>> where they can be difficult is during application upgrades as Deployments
>> never let their pods exit, even if successful, so there is no way to stop
>> the cluster gracefully.
>>
>> Is stopping your application with a savepoint and redeploying a workable
>> solution for image upgrades? In this way a Job could still be used.
>>
>>
>> @Gyula, how are JMs handled in the operator? Job, Deployment, or
>> something custom?
>>
>>
>> Best,
>> Austin
>>
>>
>>
>> On Mon, Sep 5, 2022 at 6:15 AM Gyula Fóra  wrote:
>>
>>> You can use deployments of course , the operator and native k8s
>>> integration does exactly that.
>>>
>>> Even then job updates can be tricky so I believe you are much better off
>>> with the operator.
>>>
>>> Gyula
>>>
>>> On Sun, 4 Sep 2022 at 11:11, marco andreas 
>>> wrote:
>>>
 Hello,

 Thanks for the response, I will take a look at it.

 But if we aren't able to use the flink operator due to technical
 constraints is it possible to deploy the JM as deployment without any
 consequences that I am not aware of?

 Sincerely,

 Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a
 écrit :

> Hi!
> You should check out the Flink Kubernetes Operator. I think that
> covers all your needs .
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>
> Cheers,
> Gyula
>
> On Sat, 3 Sep 2022 at 13:45, marco andreas 
> wrote:
>
>>
>> We are deploying a flink application cluster on k8S. Following the
>> official documentation the JM is deployed As a job resource , however we
>> are deploying a long running flink job that is not supposed to be
>> terminated and also we need to update the image of the flink job.
>>
>>  The problem is that the job is an immutable resource, we cant update
>> it.
>>
>> So I'm wondering if it's possible to use a deployment resource for
>> the jobmanager and if there will be any side effects or repercussions.
>>
>> Thanks,
>>
>


Re:关于 UDAF 里面 ListView 的疑问

2022-09-07 Thread Xuyang
Hi,  
理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。




实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象




--

Best!
Xuyang





在 2022-09-07 16:23:25,"Zhiwen Sun"  写道:

Hi,  
理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象

Re: New licensing for Akka

2022-09-07 Thread Robin Cassan via user
Thanks a lot for your answers, this is reassuring!

Cheers

Le mer. 7 sept. 2022 à 13:12, Chesnay Schepler  a
écrit :

> Just to squash concerns, we will make sure this license change will not
> affect Flink users in any way.
>
> On 07/09/2022 11:14, Robin Cassan via user wrote:
> > Hi all!
> > It seems Akka have announced a licensing change
> > https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka
> > If I understand correctly, this could end-up increasing cost a lot for
> > companies using Flink in production. Do you know if the Flink
> > developers have any initial reaction as to how this could be handled
> > (using a Fork? moving out of akka, even though it's probably
> > incredibly complex?)? Are we right to assume that this license applies
> > when using akka through Flink?
> >
> > Thanks a lot!
> > Robin
>
>
>


Re: Slow Tests in Flink 1.15

2022-09-07 Thread Alexey Trenikhun
The class contains single test method, which runs single job (the job is quite 
complex), then waits for job being running after that waits for data being 
populated in output topic, and this doesn't happen during 5 minutes (test 
timeout). Tried under debugger, set breakpoint in Kafka record deserializer it 
is hit but very slow, roughly 3 records per 5 minute (the topic was 
pre-populated)

No table/sql API, only stream API

From: Chesnay Schepler 
Sent: Wednesday, September 7, 2022 5:20 AM
To: Alexey Trenikhun ; David Jost ; 
Matthias Pohl 
Cc: user@flink.apache.org 
Subject: Re: Slow Tests in Flink 1.15

The test that gotten slow; how many test cases does it actually contain / how 
many jobs does it actually run?
Are these tests using the table/sql API?

On 07/09/2022 14:15, Alexey Trenikhun wrote:
We are also observing extreme slow down (5+ minutes vs 15 seconds) in 1 of 2 
integration tests . Both tests use Kafka. The slow test uses 
org.apache.flink.runtime.minicluster.TestingMiniCluster, this test tests 
complete job, which consumes and produces Kafka messages. Not affected test 
extends org.apache.flink.test.util.AbstractTestBase which uses 
MiniClusterWithClientResource, this test is simpler and only produce Kafka 
messages.

Thanks,
Alexey

From: Matthias Pohl via user 

Sent: Tuesday, September 6, 2022 6:36 AM
To: David Jost 
Cc: user@flink.apache.org 

Subject: Re: Slow Tests in Flink 1.15

Hi David,
I guess, you're referring to [1]. But as Chesnay already pointed out in the 
previous thread: It would be helpful to get more insights into what exactly 
your tests are executing (logs, code, ...). That would help identifying the 
cause.
> Can you give us a more complete stacktrace so we can see what call in
> Flink is waiting for something?
>
> Does this happen to all of your tests?
> Can you provide us with an example that we can try ourselves? If not,
> can you describe the test structure (e.g., is it using a
> MiniClusterResource).

Matthias

[1] https://lists.apache.org/thread/yhhprwyf29kgypzzqdmjgft4qs25yyhk

On Mon, Sep 5, 2022 at 4:59 PM David Jost 
mailto:david.j...@uniberg.com>> wrote:
Hi,

we were going to upgrade our application from Flink 1.14.4 to Flink 1.15.2, 
when we noticed, that all our job tests, using a MiniClusterWithClientResource, 
are multiple times slower in 1.15 than before in 1.14. I, unfortunately, have 
not found mentions in that regard in the changelog or documentation. The 
slowdown is rather extreme I hope to find a solution to this. I saw it 
mentioned once in the mailing list, but there was no (public) outcome to it.

I would appreciate any help on this. Thank you in advance.

Best
 David



Re: Cassandra sink with Flink 1.15

2022-09-07 Thread Chesnay Schepler
Are you running into this in the IDE, or when submitting the job to a 
Flink cluster?


If it is the first, then you're probably affected by the Scala-free 
Flink efforts. Either add an explicit dependency on 
flink-streaming-scala or migrate to Flink tuples.


On 07/09/2022 14:17, Lars Skjærven wrote:

Hello,

When upgrading from 1.14 to 1.15 we bumped into a type issue when 
attempting to sink to Cassandra (scala 2.12.13). This was working 
nicely in 1.14. Any tip is highly appreciated.


Using a MapFunction() to generate the stream of tuples:

CassandraSink
 .addSink(
mystream.map(new ToTupleMapper)
  )...

Exception: No support for the type of the given DataStream: 
GenericType


Or with a lambda function:

CassandraSink
 .addSink(
    mystream.map((v: MyCaseClass => (v.key v.someLongValue))
  )...

Caused by: 
org.apache.flink.api.common.functions.InvalidTypesException: The 
generic type parameters of 'Tuple2' are missing. In many cases lambda 
methods don't provide enough information for automatic type extraction 
when Java generics are involved. An easy workaround is to use an 
(anonymous) class instead that implements the 
'org.apache.flink.api.common.functions.MapFunction' interface. 
Otherwise the type has to be specified explicitly using type information.






Re: Slow Tests in Flink 1.15

2022-09-07 Thread Chesnay Schepler
The test that gotten slow; how many test cases does it actually contain 
/ how many jobs does it actually run?

Are these tests using the table/sql API?

On 07/09/2022 14:15, Alexey Trenikhun wrote:
We are also observing extreme slow down (5+ minutes vs 15 seconds) in 
1 of 2 integration tests . Both tests use Kafka. The slow test 
uses org.apache.flink.runtime.minicluster.TestingMiniCluster, this 
test tests complete job, which consumes and produces Kafka messages. 
Not affected test extends org.apache.flink.test.util.AbstractTestBase 
which uses MiniClusterWithClientResource, this test is simpler 
and only produce Kafka messages.


Thanks,
Alexey

*From:* Matthias Pohl via user 
*Sent:* Tuesday, September 6, 2022 6:36 AM
*To:* David Jost 
*Cc:* user@flink.apache.org 
*Subject:* Re: Slow Tests in Flink 1.15
Hi David,
I guess, you're referring to [1]. But as Chesnay already pointed out 
in the previous thread: It would be helpful to get more insights into 
what exactly your tests are executing (logs, code, ...). That would 
help identifying the cause.

> Can you give us a more complete stacktrace so we can see what call in
> Flink is waiting for something?
>
> Does this happen to all of your tests?
> Can you provide us with an example that we can try ourselves? If not,
> can you describe the test structure (e.g., is it using a
> MiniClusterResource).

Matthias

[1] https://lists.apache.org/thread/yhhprwyf29kgypzzqdmjgft4qs25yyhk

On Mon, Sep 5, 2022 at 4:59 PM David Jost  wrote:

Hi,

we were going to upgrade our application from Flink 1.14.4 to
Flink 1.15.2, when we noticed, that all our job tests, using a
MiniClusterWithClientResource, are multiple times slower in 1.15
than before in 1.14. I, unfortunately, have not found mentions in
that regard in the changelog or documentation. The slowdown is
rather extreme I hope to find a solution to this. I saw it
mentioned once in the mailing list, but there was no (public)
outcome to it.

I would appreciate any help on this. Thank you in advance.

Best
 David



Cassandra sink with Flink 1.15

2022-09-07 Thread Lars Skjærven
Hello,

When upgrading from 1.14 to 1.15 we bumped into a type issue when
attempting to sink to Cassandra (scala 2.12.13). This was working nicely in
1.14. Any tip is highly appreciated.

Using a MapFunction() to generate the stream of tuples:

CassandraSink
 .addSink(
mystream.map(new ToTupleMapper)
  )...

Exception: No support for the type of the given DataStream:
GenericType

Or with a lambda function:

CassandraSink
 .addSink(
mystream.map((v: MyCaseClass => (v.key v.someLongValue))
  )...

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The
generic type parameters of 'Tuple2' are missing. In many cases lambda
methods don't provide enough information for automatic type extraction when
Java generics are involved. An easy workaround is to use an (anonymous)
class instead that implements the
'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise
the type has to be specified explicitly using type information.


Re: Slow Tests in Flink 1.15

2022-09-07 Thread Alexey Trenikhun
We are also observing extreme slow down (5+ minutes vs 15 seconds) in 1 of 2 
integration tests . Both tests use Kafka. The slow test uses 
org.apache.flink.runtime.minicluster.TestingMiniCluster, this test tests 
complete job, which consumes and produces Kafka messages. Not affected test 
extends org.apache.flink.test.util.AbstractTestBase which uses 
MiniClusterWithClientResource, this test is simpler and only produce Kafka 
messages.

Thanks,
Alexey

From: Matthias Pohl via user 
Sent: Tuesday, September 6, 2022 6:36 AM
To: David Jost 
Cc: user@flink.apache.org 
Subject: Re: Slow Tests in Flink 1.15

Hi David,
I guess, you're referring to [1]. But as Chesnay already pointed out in the 
previous thread: It would be helpful to get more insights into what exactly 
your tests are executing (logs, code, ...). That would help identifying the 
cause.
> Can you give us a more complete stacktrace so we can see what call in
> Flink is waiting for something?
>
> Does this happen to all of your tests?
> Can you provide us with an example that we can try ourselves? If not,
> can you describe the test structure (e.g., is it using a
> MiniClusterResource).

Matthias

[1] https://lists.apache.org/thread/yhhprwyf29kgypzzqdmjgft4qs25yyhk

On Mon, Sep 5, 2022 at 4:59 PM David Jost 
mailto:david.j...@uniberg.com>> wrote:
Hi,

we were going to upgrade our application from Flink 1.14.4 to Flink 1.15.2, 
when we noticed, that all our job tests, using a MiniClusterWithClientResource, 
are multiple times slower in 1.15 than before in 1.14. I, unfortunately, have 
not found mentions in that regard in the changelog or documentation. The 
slowdown is rather extreme I hope to find a solution to this. I saw it 
mentioned once in the mailing list, but there was no (public) outcome to it.

I would appreciate any help on this. Thank you in advance.

Best
 David


Re: New licensing for Akka

2022-09-07 Thread Chesnay Schepler
Just to squash concerns, we will make sure this license change will not 
affect Flink users in any way.


On 07/09/2022 11:14, Robin Cassan via user wrote:

Hi all!
It seems Akka have announced a licensing change 
https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka
If I understand correctly, this could end-up increasing cost a lot for 
companies using Flink in production. Do you know if the Flink 
developers have any initial reaction as to how this could be handled 
(using a Fork? moving out of akka, even though it's probably 
incredibly complex?)? Are we right to assume that this license applies 
when using akka through Flink?


Thanks a lot!
Robin





Re: Flink upgrade path

2022-09-07 Thread Jing Ge
Hi,

I would recommend you to check the release notes of 1.14[1] and 1.15[2]. If
your Flink jobs are using Flink features that have big improvements in
these two releases, it would be better to upgrade step by step without
skipping 1.14.x.

In general, depending on how complicated your jobs are, it is always a big
challenge to upgrade Flink with skipping version(s), i.e. it is recommended
to upgrade constantly following the Flink release period.

[1]
https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/

Best regards,
Jing

On Wed, Sep 7, 2022 at 11:14 AM Congxian Qiu  wrote:

> In addition to the state compatibility mentioned above, the interfaces
> provided by Flink are stable if they have public annotation[1]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java
>
> Best,
> Congxian
>
>
> Hangxiang Yu  于2022年9月7日周三 10:31写道:
>
>> Hi, Alexey.
>> You could check the state compatibility in the compatibility table.
>> The page includes how to upgrade and whether it is compatible among
>> different versions.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/upgrading/#compatibility-table
>>
>> On Wed, Sep 7, 2022 at 7:04 AM Alexey Trenikhun  wrote:
>>
>>> Hello,
>>> Can I upgade from 1.13.6 directly to 1.15.2 skipping 1.14.x ?
>>>
>>> Thanks,
>>> Alexey
>>>
>>>
>>
>> --
>> Best,
>> Hangxiang.
>>
>


Re: New licensing for Akka

2022-09-07 Thread Matthias Pohl via user
There is some more discussion going on in the related PR [1]. Based on the
current state of the discussion, akka 2.6.20 will be the last version under
Apache 2.0 license. But, I guess, we'll have to see where this discussion
is heading considering that it's kind of fresh.

[1] https://github.com/akka/akka/pull/31561

On Wed, Sep 7, 2022 at 11:30 AM Chesnay Schepler  wrote:

> We'll have to look into it.
>
> The license would apply to usages of Flink.
> That said, I'm not sure if we'd even be allowed to use Akka under that
> license since it puts significant restrictions on the use of the software.
> If that is the case, then it's either use a fork created by another
> party or switch to a different library.
>
> On 07/09/2022 11:14, Robin Cassan via user wrote:
> > Hi all!
> > It seems Akka have announced a licensing change
> > https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka
> > If I understand correctly, this could end-up increasing cost a lot for
> > companies using Flink in production. Do you know if the Flink
> > developers have any initial reaction as to how this could be handled
> > (using a Fork? moving out of akka, even though it's probably
> > incredibly complex?)? Are we right to assume that this license applies
> > when using akka through Flink?
> >
> > Thanks a lot!
> > Robin
>
>
>


Re: New licensing for Akka

2022-09-07 Thread Chesnay Schepler

We'll have to look into it.

The license would apply to usages of Flink.
That said, I'm not sure if we'd even be allowed to use Akka under that 
license since it puts significant restrictions on the use of the software.
If that is the case, then it's either use a fork created by another 
party or switch to a different library.


On 07/09/2022 11:14, Robin Cassan via user wrote:

Hi all!
It seems Akka have announced a licensing change 
https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka
If I understand correctly, this could end-up increasing cost a lot for 
companies using Flink in production. Do you know if the Flink 
developers have any initial reaction as to how this could be handled 
(using a Fork? moving out of akka, even though it's probably 
incredibly complex?)? Are we right to assume that this license applies 
when using akka through Flink?


Thanks a lot!
Robin





Re: flink作业生成保存点失败

2022-09-07 Thread Congxian Qiu
Hi

有 savepoint/checkpoint 失败时的具体 jobmanager log 以及失败 task 对应的 taskmanager log
的话可以发一下,大家帮助看一下

Best,
Congxian


Xuyang  于2022年8月30日周二 23:18写道:

>
> Hi,看起来这个报错是用于输出信息的文件找不到了,可以尝试加一下这个配置再试一下“taskmanager.log.path”,找一下导致tasks超时的根本原因。
> 还可以试一下用火焰图或jstack查看一下那几个tasks超时的时候是卡在哪个方法上。
>
>
>
>
>
>
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
>
> Hi,看起来这个报错是用于输出信息的文件找不到了,可以尝试加一下这个配置再试一下“taskmanager.log.path”,找一下导致tasks超时的根本原因。还可以试一下用火焰图或jstack查看一下那几个tasks超时的时候是卡在哪个方法上。
> 在 2022-08-29 16:19:15,"casel.chen"  写道:
>
> >有一个线上flink作业在人为主动创建保存点时失败,作业有两个算子:从kafka读取数据和写到mongodb,都是48个并行度,出错后查看到写mongodb算子一共48个task,完成了45个,还有3个tasks超时(超时时长设为3分钟),正常情况下完成一次checkpoint要4秒,状态大小只有23.7kb。出错后,查看作业日志如下。在创建保存点失败后作业周期性的检查点生成也都失败了(每个算子各有3个tasks超时)。使用的是FileStateBackend,DFS用的是阿里云oss。请问出错会是因为什么原因造成的?
> >
> >
> >+5
> >[2022-08-29 15:38:32]
> >content:
> >2022-08-29 15:38:32,617 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
> [] - Failed to transfer file from TaskExecutor
> sqrc-session-prod-taskmanager-1-30.
> >+6
> >[2022-08-29 15:38:32]
> >content:
> >java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: The file STDOUT does not exist on the
> TaskExecutor.
> >+7
> >[2022-08-29 15:38:32]
> >content:
> >at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >+8
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_312]
> >+9
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_312]
> >+10
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_312]
> >+11
> >[2022-08-29 15:38:32]
> >content:
> >at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
> >+12
> >[2022-08-29 15:38:32]
> >content:
> >Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not
> exist on the TaskExecutor.
> >+13
> >[2022-08-29 15:38:32]
> >content:
> >... 5 more
> >+14
> >[2022-08-29 15:38:32]
> >content:
> >2022-08-29 15:38:32,617 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
> [] - Unhandled exception.
> >+15
> >[2022-08-29 15:38:32]
> >content:
> >org.apache.flink.util.FlinkException: The file STDOUT does not exist on
> the TaskExecutor.
> >+16
> >[2022-08-29 15:38:32]
> >content:
> >at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >+17
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_312]
> >+18
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_312]
> >+19
> >[2022-08-29 15:38:32]
> >content:
> >at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_312]
> >+20
> >[2022-08-29 15:38:32]
> >content:
> >at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
>


New licensing for Akka

2022-09-07 Thread Robin Cassan via user
Hi all!
It seems Akka have announced a licensing change
https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka
If I understand correctly, this could end-up increasing cost a lot for
companies using Flink in production. Do you know if the Flink developers
have any initial reaction as to how this could be handled (using a Fork?
moving out of akka, even though it's probably incredibly complex?)? Are we
right to assume that this license applies when using akka through Flink?

Thanks a lot!
Robin


Re: Flink upgrade path

2022-09-07 Thread Congxian Qiu
In addition to the state compatibility mentioned above, the interfaces
provided by Flink are stable if they have public annotation[1]

[1]
https://github.com/apache/flink/blob/master/flink-annotations/src/main/java/org/apache/flink/annotation/Public.java

Best,
Congxian


Hangxiang Yu  于2022年9月7日周三 10:31写道:

> Hi, Alexey.
> You could check the state compatibility in the compatibility table.
> The page includes how to upgrade and whether it is compatible among
> different versions.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/upgrading/#compatibility-table
>
> On Wed, Sep 7, 2022 at 7:04 AM Alexey Trenikhun  wrote:
>
>> Hello,
>> Can I upgade from 1.13.6 directly to 1.15.2 skipping 1.14.x ?
>>
>> Thanks,
>> Alexey
>>
>>
>
> --
> Best,
> Hangxiang.
>


关于 UDAF 里面 ListView 的疑问

2022-09-07 Thread Zhiwen Sun
Hello all,

我看 ListView 使用的时候,有以下示例

 public class MyAccumulator {

public ListView list = new ListView<>();

// or explicit:
// {@literal @}DataTypeHint("ARRAY")
// public ListView list = new ListView<>();

public long count = 0L;
  }

public class MyAggregateFunction extends AggregateFunction

我想请教下大家,为什么需要在外层包裹一个 MyAccumulator 呢, 我实际测下来, 直接时用
AggregateFunction> 在 getValue 的时候会报空指针异常

Flink 版本: 1.13.1

谢谢。


Zhiwen Sun