Re: Deployment on k8s via API

2022-05-17 Thread Yang Wang
Maybe you could have a try on the flink-kubernetes-operator[1]. It is
designed for using Kubernetes CRD to manage the Flink applications.


[1].
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-0.1/

Best,
Yang

Devin Bost  于2022年5月18日周三 08:29写道:

> Hi,
>
> I'm looking at my options for automating the deployment of Flink jobs on
> k8s (ideally using application mode), and I noticed that most of the
> examples of deploying Flink jobs in the docs use calls to the Flink binary,
> such as:
>
> $ ./bin/flink run-application \--target kubernetes-application \
> -Dkubernetes.cluster-id=my-first-application-cluster \
> -Dkubernetes.container.image=custom-image-name \
> local:///opt/flink/usrlib/my-flink-job.jar
>
> However, my automation function won't be running in the same container as
> Flink, so I'm trying to determine what my options are here. Does Flink have
> an API available for submitting jobs?
> If not, how hard would it be to use the Kubernetes API to construct the
> deployment configs for new Flink applications? Is there a better way?
>
> Thanks,
>
> Devin G. Bost
>


Upgrade help

2022-05-17 Thread yinghua_zh
 I see that Flink 1.15.0 supports Kafka 0.10 , and the Kafka used in our 
production cluster is 0.10.0.1. However, when we upgrade Flink from 1.11.3 to 
1.15.0, it is prompted that many classes of Kafka cannot be 
found(eg:ProducerFencedException). These classes are only available in the 
higher version of Kafka. If flink1 15.0 supports Kafka 0.10.0.1. What should we 
do?
  
The official website is described below:
Apache Flink ships with a universal Kafka connector which attempts to track the 
latest version of the Kafka client. The version of the client it uses may 
change between Flink releases. Modern Kafka clients are backwards compatible 
with broker versions 0.10.0 or later. For details on Kafka compatibility, 
please refer to the official Kafka documentation.



????????Job??????TaskManager ??????oom?

2022-05-17 Thread ????????
??
??
??Flink 1.14
standalone Flink job??supervisorctl??JM??TM??systemd


??
jobflink??restart-strategy.fixed-delay.attempts: 2??
??jobsupervisorctljob
TMOOM(??)??TMTM??job??TM??TM??Systemd??TM??
standalone??TM??


??
TM??TM metaspace oom
JM??Association with remote system [akka.tcp://flink@localhost:43583] has 
failed, address is now gated for [50] ms. Reason: [Association failed with 
[akka.tcp://flink@localhost:43583]] Caused by: [java.net.ConnectException: 
Connection refused: localhost/127.0.0.1:43583]
JM  TM??unreachable



??
512M
job
tm metaspace ??179MB 183MB 207MB 232MB 
256MB 280MB 352MB 372MB
TM metaspace oom



??
1.??TM??oomjob??job??
2.TM??OOMJMTM??TM



1.
2.standalone??TM??

退订

2022-05-17 Thread 孙洪龙
退订


Re: flink on k8s native开启ha后根据sp启动任务报错找不到job id 0000

2022-05-17 Thread shimin huang
flink版本1.13.0
/home/hdfs/flink-1.13.0/bin/flink run-application \
-t kubernetes-application \
-s spPath \
-p 32 \
-Dresourcemanager.taskmanager-timeout=6 \
-Dkubernetes.namespace=xxx \
-Dkubernetes.service-account=xxx \
-Dkubernetes.taskmanager.service-account=xxx \
-Dkubernetes.cluster-id= \
-Dkubernetes.container.image.pull-secrets= \
-Dkubernetes.rest-service.exposed.type=NodePort  \
-Dkubernetes.config.file=/cce.conf \
-Denv.java.opts="-DHADOOP_USER_NAME=hdfs" \
-Dkubernetes.pod-template-file=/home/hdfs/jars/flink-pod.yaml \
-Dkubernetes.taskmanager.cpu=1 \
-Dkubernetes.jobmanager.cpu=0.5 \
-Dtaskmanager.numberOfTaskSlots=16 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.memory.managed.fraction=0.1 \
-Dtaskmanager.memory.network.fraction=0.1 \
-Dtaskmanager.memory.network.max=2048m \
-Dtaskmanager.memory.network.min=512m \
-Dstate.checkpoints.num-retained=20 \
-Dstate.backend.rocksdb.memory.managed=true \
-Dstate.backend.rocksdb.checkpoint.transfer.thread.num=5 \
-Dstate.backend.rocksdb.localdir=/tmp/rocksdb \
-Dstate.backend.incremental=true \
-Dclassloader.resolve-order=parent-first \

-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
\

-Dhigh-availability.storageDir=hdfs://bmr-cluster/flink/kubernetes/ha/recovery
\
-c  \

Weihua Hu  于2022年5月17日周二 21:54写道:

> Hi, shimin
> 用的哪个版本的 Flink?提交命令是什么呢?
>
>
> Best,
> Weihua
>
> > 2022年5月17日 下午1:48,shimin huang  写道:
> >
> > flink on native k8s根据savepoint停止任务后在根据savepoint启动任务报错找不到job
> > 错误堆栈如下:
> > java.util.concurrent.ExecutionException:
> > org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not
> find
> > Flink job ()
> > at
> >
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> > at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> > at
> >
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
> > at
> >
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> > at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
> > at
> >
> com.xxx.xxx..streaming.job.segment.xx.xxx.main(ProfileConditionJudgmentJob.java:150)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> > at
> >
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> > at
> >
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
> > at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
> > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > at
> >
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException:
> > Could not find Flink job ()
> > at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$14(Dispatcher.java:596)
> > at java.util.Optional.orElseGet(Optional.java:267)
> > at
> >
> org.apache.flink.runtime.dispatcher.Dispatcher.requestJobStatus(Dispatcher.java:590)
> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498)
> > at
> >
> 

Re: Question of Flink Operator Application Cluster Deployment

2022-05-17 Thread Xiao Ma
Hi John,

No such deployment or services in the K8S cluster. Same issue happens to
the flink native kubernetes deployment. We have the podsecuritypolicy
defined, but I have added flink service account into the psp.

*Xiao Ma*
*Geotab*
Software Developer, Data Engineering | B.Sc, M.Sc
Direct  +1 (416) 836 - 3541
Toll-free   +1 (877) 436 - 8221
Visit   www.geotab.com
Twitter  | Facebook
 | YouTube
 | LinkedIn



On Tue, May 17, 2022 at 9:50 PM John Gerassimou 
wrote:

> Hi Xiao,
>
> Is istio or something similar deployed to the K8S cluster?
>
> John
>
> On Tue, May 17, 2022 at 4:26 PM Xiao Ma  wrote:
>
>> loop in
>> *Xiao Ma*
>> *Geotab*
>> Software Developer, Data Engineering | B.Sc, M.Sc
>> Direct  +1 (416) 836 - 3541
>> Toll-free   +1 (877) 436 - 8221
>> Visit   www.geotab.com
>> Twitter  | Facebook
>>  | YouTube
>>  | LinkedIn
>> 
>>
>>
>> -- Forwarded message -
>> From: Xiao Ma 
>> Date: Tue, May 17, 2022 at 4:18 PM
>> Subject: Re: Question of Flink Operator Application Cluster Deployment
>> To: Őrhidi Mátyás 
>>
>>
>> Fyi, I didn't manually mount the service account token into the job pod.
>> It is automatically mounted into the pod, with the "bound service account
>> token volume". I also found that the fabric8 cannot read the service
>> account token if it is the "bound service account token volume". link:
>> https://github.com/fabric8io/kubernetes-client/issues/2271
>>
>> Thank you very much.
>>
>> Best,
>> *Xiao Ma*
>> *Geotab*
>> Software Developer, Data Engineering | B.Sc, M.Sc
>> Direct  +1 (416) 836 - 3541
>> Toll-free   +1 (877) 436 - 8221
>> Visit   www.geotab.com
>> Twitter  | Facebook
>>  | YouTube
>>  | LinkedIn
>> 
>>
>>
>> On Tue, May 17, 2022 at 10:55 AM Xiao Ma  wrote:
>>
>>> Hi Őrhidi,
>>>
>>> Thank you very much for the help.
>>>
>>> The attached are flink-operator yaml files and the application job yaml
>>> file.
>>>
>>> Best,
>>> *Xiao Ma*
>>> *Geotab*
>>> Software Developer, Data Engineering | B.Sc, M.Sc
>>> Direct  +1 (416) 836 - 3541
>>> Toll-free   +1 (877) 436 - 8221
>>> Visit   www.geotab.com
>>> Twitter  | Facebook
>>>  | YouTube
>>>  | LinkedIn
>>> 
>>>
>>>
>>> On Tue, May 17, 2022 at 12:22 AM Őrhidi Mátyás 
>>> wrote:
>>>
 You don't have to mount the service account explicitly, this should
 be auto-mounted for you. Please share your (redacted) yamls for the RBAC
 configs (
 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/rbac/#cluster-scoped-flink-operator-with-jobs-running-in-other-namespaces)
 and your deployment yaml, we could probably spot what's missing.

 Best,
 Matyas

 On Tue, May 17, 2022 at 5:37 AM Xiao Ma  wrote:

> Hi Flink Community,
>
> First of all, I would like to express my great thankfulness about the
> flink operator on Kubernetes. It is a new door to help us deploy the Flink
> application on top of the K8s.
>
> Our team is currently doing the Application cluster deployment through
> the operator. We have set up the service account as "flink-operator" and
> "flink", with the roles and rolebindings. However, after the job yaml is
> submitted to the api-server and the pod is created, the resources manager
> cannot be created because this error log:
> 
> 2022-05-17 02:37:22,293 WARN  io.fabric8.kubernetes.client.Config
>  [] - Error reading service account token from:
> [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
> 2022-05-17 02:37:22,308 WARN  io.fabric8.kubernetes.client.Config
>  [] - Error reading service account token from:
> [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
> 2022-05-17 02:37:25,699 INFO
>  org.apache.flink.runtime.jobmaster.JobMaster [] -
> Connecting to ResourceManager akka.tcp://fl...@flink-application-job.bip
> :6123/user/rpc/resourcemanager_*()
> 2022-05-17 02:37:26,094 WARN
>  io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener [] -
> Exec Failure: HTTP 403, Status: 403 - pods is forbidden: User
> "system:anonymous" cannot watch resource "pods" in API group "" in the
> namespace "x"
> 
>
> It looks like the jobmanager pod cannot fetch the "flink" service
> account 

Re: Question of Flink Operator Application Cluster Deployment

2022-05-17 Thread John Gerassimou
Hi Xiao,

Is istio or something similar deployed to the K8S cluster?

John

On Tue, May 17, 2022 at 4:26 PM Xiao Ma  wrote:

> loop in
> *Xiao Ma*
> *Geotab*
> Software Developer, Data Engineering | B.Sc, M.Sc
> Direct  +1 (416) 836 - 3541
> Toll-free   +1 (877) 436 - 8221
> Visit   www.geotab.com
> Twitter  | Facebook
>  | YouTube
>  | LinkedIn
> 
>
>
> -- Forwarded message -
> From: Xiao Ma 
> Date: Tue, May 17, 2022 at 4:18 PM
> Subject: Re: Question of Flink Operator Application Cluster Deployment
> To: Őrhidi Mátyás 
>
>
> Fyi, I didn't manually mount the service account token into the job pod.
> It is automatically mounted into the pod, with the "bound service account
> token volume". I also found that the fabric8 cannot read the service
> account token if it is the "bound service account token volume". link:
> https://github.com/fabric8io/kubernetes-client/issues/2271
>
> Thank you very much.
>
> Best,
> *Xiao Ma*
> *Geotab*
> Software Developer, Data Engineering | B.Sc, M.Sc
> Direct  +1 (416) 836 - 3541
> Toll-free   +1 (877) 436 - 8221
> Visit   www.geotab.com
> Twitter  | Facebook
>  | YouTube
>  | LinkedIn
> 
>
>
> On Tue, May 17, 2022 at 10:55 AM Xiao Ma  wrote:
>
>> Hi Őrhidi,
>>
>> Thank you very much for the help.
>>
>> The attached are flink-operator yaml files and the application job yaml
>> file.
>>
>> Best,
>> *Xiao Ma*
>> *Geotab*
>> Software Developer, Data Engineering | B.Sc, M.Sc
>> Direct  +1 (416) 836 - 3541
>> Toll-free   +1 (877) 436 - 8221
>> Visit   www.geotab.com
>> Twitter  | Facebook
>>  | YouTube
>>  | LinkedIn
>> 
>>
>>
>> On Tue, May 17, 2022 at 12:22 AM Őrhidi Mátyás 
>> wrote:
>>
>>> You don't have to mount the service account explicitly, this should
>>> be auto-mounted for you. Please share your (redacted) yamls for the RBAC
>>> configs (
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/rbac/#cluster-scoped-flink-operator-with-jobs-running-in-other-namespaces)
>>> and your deployment yaml, we could probably spot what's missing.
>>>
>>> Best,
>>> Matyas
>>>
>>> On Tue, May 17, 2022 at 5:37 AM Xiao Ma  wrote:
>>>
 Hi Flink Community,

 First of all, I would like to express my great thankfulness about the
 flink operator on Kubernetes. It is a new door to help us deploy the Flink
 application on top of the K8s.

 Our team is currently doing the Application cluster deployment through
 the operator. We have set up the service account as "flink-operator" and
 "flink", with the roles and rolebindings. However, after the job yaml is
 submitted to the api-server and the pod is created, the resources manager
 cannot be created because this error log:
 
 2022-05-17 02:37:22,293 WARN  io.fabric8.kubernetes.client.Config
[] - Error reading service account token from:
 [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
 2022-05-17 02:37:22,308 WARN  io.fabric8.kubernetes.client.Config
[] - Error reading service account token from:
 [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
 2022-05-17 02:37:25,699 INFO
  org.apache.flink.runtime.jobmaster.JobMaster [] -
 Connecting to ResourceManager akka.tcp://fl...@flink-application-job.bip
 :6123/user/rpc/resourcemanager_*()
 2022-05-17 02:37:26,094 WARN
  io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener [] -
 Exec Failure: HTTP 403, Status: 403 - pods is forbidden: User
 "system:anonymous" cannot watch resource "pods" in API group "" in the
 namespace "x"
 

 It looks like the jobmanager pod cannot fetch the "flink" service
 account token and cannot communicate with api-server, though I have created
 the "flink" service account and set up "serviceAccount" config in the job
 template.
 

 apiVersion: flink.apache.org/v1beta1
 kind: FlinkDeployment
 metadata:
   name: flink-application-job
 spec:
   image: flink:1.15.0-scala_2.12-java11
   flinkVersion: v1_15
   flinkConfiguration:
 taskmanager.numberOfTaskSlots: "2"
 jobmanager.rpc.address: flink-jobmanager
   serviceAccount: flink

 

 The below shows the volumeMounts in the pod. The service account is
 mounted through the "bound service account token volume". Is it desirable?
 
   Mounts:
   

Deployment on k8s via API

2022-05-17 Thread Devin Bost
Hi,

I'm looking at my options for automating the deployment of Flink jobs on
k8s (ideally using application mode), and I noticed that most of the
examples of deploying Flink jobs in the docs use calls to the Flink binary,
such as:

$ ./bin/flink run-application \--target kubernetes-application \
 -Dkubernetes.cluster-id=my-first-application-cluster \
-Dkubernetes.container.image=custom-image-name \
local:///opt/flink/usrlib/my-flink-job.jar

However, my automation function won't be running in the same container as
Flink, so I'm trying to determine what my options are here. Does Flink have
an API available for submitting jobs?
If not, how hard would it be to use the Kubernetes API to construct the
deployment configs for new Flink applications? Is there a better way?

Thanks,

Devin G. Bost


Re: Flink MiniCluster: Multiple Task Managers

2022-05-17 Thread Κωνσταντίνος Αγαπίδης
Hi Roman,

Just used it. Thanks for your help.

14 Μαΐου 2022 10:06 ΜΜ, "Roman Grebennikov"  έγραψε:

> Hey Κωνσταντίνος,
> 
> check out this sample code we use for testing 
> https://github.com/metarank/metarank . It is in
> scala, but should be quite straightforward to port to java:
> 
> val cluster = new MiniClusterWithClientResource(
> new MiniClusterResourceConfiguration.Builder()
> .setNumberTaskManagers(1) // here it is
> .setNumberSlotsPerTaskManager(1)
> .setConfiguration(conf)
> .build()
> )
> cluster.before()
> val client = cluster.getClusterClient
> val env = new StreamExecutionEnvironment(new 
> TestStreamEnvironment(cluster.getMiniCluster, 1))
> // do some stuff with env
> val graph = env.getStreamGraph.getJobGraph
> client.submitJob(graph)
> // here you need to wait until the job is finished
> // you may use cluster client instance here to poll for completion
> client.close()
> cluster.after()
> 
> For a full working example, check these files:
> *
> https://github.com/metarank/metarank/blob/master/src/main/scala/ai/metarank/mode/AsyncFlinkJob.scala
> *
> https://github.com/metarank/metarank/blob/master/src/main/scala/ai/metarank/mode/inference/FlinkMini
> luster.scala
> 
> with best regards,
> Roman Grebennikov | g...@dfdx.me
> 
> On Sat, May 14, 2022, at 18:45, Αγαπίδης wrote:
> 
>> Hi list,
>> 
>> I am using Java 8, Flink 1.15, and IntelliJ.
>> 
>> I wonder if it is posible to open an additional TaskManager in a Stream
>> Job in the Java code, to run it in IntelliJ Local Cluster (MiniCluster)
>> Debug mode. I found this method in the code reference, but I don't know
>> hot to call it.
>> 
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/runtime/minicluster/M
>> niCluster.html#startTaskManager--
>> 
>> I thought that I need to get the current MiniCluster somehow, before
>> starting the Job Excecution. But how?
>> 
>> Thank you in advance!
>> 
>> --
>> Best Regards,
>> Kostas


--
Με εκτίμηση,
Κωνσταντίνος Αγαπίδης


Re: Kinesis Sink - Data being received with intermittent breaks

2022-05-17 Thread Danny Cranmer
Hello Zain,

Thanks for providing the additional information. Going back to the original
issue:
- You are seeing bursty throughput, but the job is keeping up? There is no
backpressure?
- What is the throughput at the sink?
- On the graph screenshot, what is the period and stat (sum/average/etc)?

Let me shed some light on the log messages, let's take this example:

LogInputStreamReader ... Stage 1 Triggers ...  { stream:
'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3,
UserRecords: 6, KinesisRecords: 3 }

Flush trigger reason:
- manual: the flush was manually triggered
- count: flush was triggered by the number of records in the container
- size: the flush was triggered by the number of bytes in the container
- matches: the predicate was matched
- timed: the flush is triggered by elapsed timer

Input/Output:
- UserRecords: Number of input records KPL flushed (this can be higher than
KinesisRecords when aggregation is enabled)
- KinesisRecords: Number of records shipped to Kinesis Data Streams

Stage 2 triggers tells us the number of API invocations via the PutRecords
field.

I can see from your logs that the majority of flushes are due to the timer,
and it does not look overly bursty. Seems to sit at around 3 records per 15
seconds, or 1 record every 5 seconds. This seems very low, is it expected?

Thanks,
Danny Cranmer

On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati 
wrote:

> Hey Danny,
> Thanks for having a look at the issue.
> I am using a custom flink operator to segregate the data into a consistent
> format of length 100 which is no more than 1 MB. The configurations I
> shared were after I was exploring tweaking some of them to see if it
> improves the throughput.
>
> Regarding your queries :
> - Which Flink version is this? -- > *Version 1.13*
> - Can you see any errors in the Flink logs?  -->* No, Im attaching flink
> logs after I have set all the configurations to default*
> - Do you see any errors/throttles in the Kinesis Data Stream metrics?  --> *I
> was before segregating into smaller chunks not anymore*
> - How many shards does your stream have? --> *It has 4 shards*
> - What is your sink operator parallelism? --> *1*
> - What is the general health of your job graph? --> *This is the only job
> running at the moment, it isn't unhealthy*
>   - Are the operators upstream of the sink backpressured? --> *No*
>   - Are you sure the sink is actually the issue here? --> *I have used
> the .print() as a sink and Im seeing all the records in real time it chokes
> when paired with sink*
>   - Are there any other potential bottlenecks? --> *So data is coming in
> from source correctly, I have a flatmap transformation enabled which reads
> and segments it into chunks of <=1MB which is also tested using the
> .print() sink*
> - When you say you are trying to achieve "1MB chunks", I assume this is
> per Kinesis record, not per PutRecords batch? --> *Correct*
>
> Attaching a small chunk of the log file from when the job is started [It
> goes down to 0 records for some periods of time as well, in the log file it
> shows mostly between 3-6 records]
>
> Really appreciate your response on this, since I have not been able to
> gather much help from other resources online. Would be great if you can let
> me know what the issue here could be, let me know if you need to know
> anything else as well !
>
> Cheers
>
>
> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer 
> wrote:
>
>> Hello Zain,
>>
>> When you say "converting them to chunks of <= 1MB " does this mean you
>> are creating these chunks in a custom Flink operator, or you are relying on
>> the connector to do so? If you are generating your own chunks you can
>> potentially disable Aggregation at the sink.
>>
>> Your throughput is incredibly bursty, I have a few questions:
>> - Which Flink version is this?
>> - Can you see any errors in the Flink logs?
>> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>> - How many shards does your stream have?
>> - What is your sink operator parallelism?
>> - What is the general health of your job graph?
>>   - Are the operators upstream of the sink backpressured?
>>   - Are you sure the sink is actually the issue here?
>>   - Are there any other potential bottlenecks?
>> - When you say you are trying to achieve "1MB chunks", I assume this is
>> per Kinesis record, not per PutRecords batch?
>>
>> Some comments on your configuration:
>>
>> As previously mentioned, if you are generating the chunks you can
>> potentially remove the aggregation config and disable it.
>> - producerConfig.put(“AggregationMaxCount”, “3”);
>> - producerConfig.put(“AggregationMaxSize”, “256”);
>> + producerConfig.put("AggregationEnabled”, “false”);
>>
>> This is very low, and could conflict with your chunk size. These
>> configurations are regarding the PutRecords request, which has a quota of
>> 500 records and 5MiB. You are setting the max size to 100kB, which is less
>> than your largest chunk. 

Memory configuration for Queue

2022-05-17 Thread Zain Haider Nemati
Hi,
I am using a kafka source with a kinesis sink and the speed of data coming
in is not the same as data flowing out hence the need to configure a
relatively larger queue to hold the data before backpressuring. Which
memory configuration corresponds to this that I'll need to configure?


Fwd: Question of Flink Operator Application Cluster Deployment

2022-05-17 Thread Xiao Ma
loop in
*Xiao Ma*
*Geotab*
Software Developer, Data Engineering | B.Sc, M.Sc
Direct  +1 (416) 836 - 3541
Toll-free   +1 (877) 436 - 8221
Visit   www.geotab.com
Twitter  | Facebook
 | YouTube
 | LinkedIn



-- Forwarded message -
From: Xiao Ma 
Date: Tue, May 17, 2022 at 4:18 PM
Subject: Re: Question of Flink Operator Application Cluster Deployment
To: Őrhidi Mátyás 


Fyi, I didn't manually mount the service account token into the job pod. It
is automatically mounted into the pod, with the "bound service account
token volume". I also found that the fabric8 cannot read the service
account token if it is the "bound service account token volume". link:
https://github.com/fabric8io/kubernetes-client/issues/2271

Thank you very much.

Best,
*Xiao Ma*
*Geotab*
Software Developer, Data Engineering | B.Sc, M.Sc
Direct  +1 (416) 836 - 3541
Toll-free   +1 (877) 436 - 8221
Visit   www.geotab.com
Twitter  | Facebook
 | YouTube
 | LinkedIn



On Tue, May 17, 2022 at 10:55 AM Xiao Ma  wrote:

> Hi Őrhidi,
>
> Thank you very much for the help.
>
> The attached are flink-operator yaml files and the application job yaml
> file.
>
> Best,
> *Xiao Ma*
> *Geotab*
> Software Developer, Data Engineering | B.Sc, M.Sc
> Direct  +1 (416) 836 - 3541
> Toll-free   +1 (877) 436 - 8221
> Visit   www.geotab.com
> Twitter  | Facebook
>  | YouTube
>  | LinkedIn
> 
>
>
> On Tue, May 17, 2022 at 12:22 AM Őrhidi Mátyás 
> wrote:
>
>> You don't have to mount the service account explicitly, this should
>> be auto-mounted for you. Please share your (redacted) yamls for the RBAC
>> configs (
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/rbac/#cluster-scoped-flink-operator-with-jobs-running-in-other-namespaces)
>> and your deployment yaml, we could probably spot what's missing.
>>
>> Best,
>> Matyas
>>
>> On Tue, May 17, 2022 at 5:37 AM Xiao Ma  wrote:
>>
>>> Hi Flink Community,
>>>
>>> First of all, I would like to express my great thankfulness about the
>>> flink operator on Kubernetes. It is a new door to help us deploy the Flink
>>> application on top of the K8s.
>>>
>>> Our team is currently doing the Application cluster deployment through
>>> the operator. We have set up the service account as "flink-operator" and
>>> "flink", with the roles and rolebindings. However, after the job yaml is
>>> submitted to the api-server and the pod is created, the resources manager
>>> cannot be created because this error log:
>>> 
>>> 2022-05-17 02:37:22,293 WARN  io.fabric8.kubernetes.client.Config
>>>[] - Error reading service account token from:
>>> [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
>>> 2022-05-17 02:37:22,308 WARN  io.fabric8.kubernetes.client.Config
>>>[] - Error reading service account token from:
>>> [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
>>> 2022-05-17 02:37:25,699 INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster [] -
>>> Connecting to ResourceManager akka.tcp://fl...@flink-application-job.bip
>>> :6123/user/rpc/resourcemanager_*()
>>> 2022-05-17 02:37:26,094 WARN
>>>  io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener [] -
>>> Exec Failure: HTTP 403, Status: 403 - pods is forbidden: User
>>> "system:anonymous" cannot watch resource "pods" in API group "" in the
>>> namespace "x"
>>> 
>>>
>>> It looks like the jobmanager pod cannot fetch the "flink" service
>>> account token and cannot communicate with api-server, though I have created
>>> the "flink" service account and set up "serviceAccount" config in the job
>>> template.
>>> 
>>>
>>> apiVersion: flink.apache.org/v1beta1
>>> kind: FlinkDeployment
>>> metadata:
>>>   name: flink-application-job
>>> spec:
>>>   image: flink:1.15.0-scala_2.12-java11
>>>   flinkVersion: v1_15
>>>   flinkConfiguration:
>>> taskmanager.numberOfTaskSlots: "2"
>>> jobmanager.rpc.address: flink-jobmanager
>>>   serviceAccount: flink
>>>
>>> 
>>>
>>> The below shows the volumeMounts in the pod. The service account is
>>> mounted through the "bound service account token volume". Is it desirable?
>>> 
>>>   Mounts:
>>>   /opt/flink/conf from flink-config-volume (rw)
>>>   /opt/flink/log from flink-logs (rw)
>>>   /opt/flink/pod-template from pod-template-volume (rw)
>>>   /var/run/secrets/kubernetes.io/serviceaccount from
>>> kube-api-access-f69zl (ro)
>>> 
>>>
>>> This issue has blocked our progress 

Metrics in Flink UI

2022-05-17 Thread Zain Haider Nemati
Hi,
I'm running a job on a local flink cluster but metrics are showing as Bytes
received,records received,bytes sent,backpressure all 0 in the flink UI
even though I'm receiving data in the sink.
Do I need to additionally configure something to see these metrics work in
real time?


RE: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread Schwalbe Matthias
Hi James,

From reading the thread … I assume, your file:/tmp/Flink/State folder is not 
shared across all machines, right?

In this case it cannot work:
- checkpoints and savepoints need to go to a path that can be commonly accessed 
by jobmanager and all taskmanagers in order to work
- as your jobmanager can not access the checkpoint files of it can also not 
clean-up those files

Hope that helps

Regards

Thias

From: James Sandys-Lumsdaine 
Sent: Tuesday, May 17, 2022 3:55 PM
To: Hangxiang Yu ; user@flink.apache.org
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Thanks for your replay.

To be clear on my setup with the problem:

  *   4 taskmanagers running across different containers and machines. Each 
container has its own filesystem including / and /tmp.
  *   1 jobmanager also running in its own container and machine. Also has its 
own filesystem.
  *   I have configured the FS checkpoint address to be "file:/tmp/Flink/State" 
- therefore each process (JM and TMs) are reading and writing to their own 
/tmp. i.e. there is no shared access like if it was NFS or HDFS.
So when the checkpointing happens the directories are created and populated but 
only the JM's old checkpoint directories and cleaned up. Each of the TM 
/tmp/Flink/State old "chk-x" directories remain and are not cleared up.

From your email I don't know if you think I am writing to a "shared" path or 
not?

I started looking at the in memory checkpoint storage but this has a max size 
with an int so can't have for 5GB of state. I need the checkpointing to trigger 
my sinks to persist (GenericWriteAheadSink) so it seem I have​ to create a 
proper shared file path all my containers can access.

James.

From: Hangxiang Yu mailto:master...@gmail.com>>
Sent: 17 May 2022 14:38
To: James Sandys-Lumsdaine mailto:jas...@hotmail.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then upload 
them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the entire 
deletion for checkpoints.
Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,



I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.



When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.



However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.



Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.



My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.

Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche 

Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread James Sandys-Lumsdaine
Thanks for your replay.

To be clear on my setup with the problem:

  *   4 taskmanagers running across different containers and machines. Each 
container has its own filesystem including / and /tmp.
  *   1 jobmanager also running in its own container and machine. Also has its 
own filesystem.
  *   I have configured the FS checkpoint address to be "file:/tmp/Flink/State" 
- therefore each process (JM and TMs) are reading and writing to their own 
/tmp. i.e. there is no shared access like if it was NFS or HDFS.

So when the checkpointing happens the directories are created and populated but 
only the JM's old checkpoint directories and cleaned up. Each of the TM 
/tmp/Flink/State old "chk-x" directories remain and are not cleared up.

From your email I don't know if you think I am writing to a "shared" path or 
not?

I started looking at the in memory checkpoint storage but this has a max size 
with an int so can't have for 5GB of state. I need the checkpointing to trigger 
my sinks to persist (GenericWriteAheadSink) so it seem I have​ to create a 
proper shared file path all my containers can access.

James.

From: Hangxiang Yu 
Sent: 17 May 2022 14:38
To: James Sandys-Lumsdaine ; user@flink.apache.org 

Subject: Re: Checkpoint directories not cleared as TaskManagers run

Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then upload 
them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the entire 
deletion for checkpoints.

Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
mailto:jas...@hotmail.com>> wrote:
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,


I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.


When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.


However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.


Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.


My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.



Re: flink on k8s native开启ha后根据sp启动任务报错找不到job id 0000

2022-05-17 Thread Weihua Hu
Hi, shimin
用的哪个版本的 Flink?提交命令是什么呢?


Best,
Weihua

> 2022年5月17日 下午1:48,shimin huang  写道:
> 
> flink on native k8s根据savepoint停止任务后在根据savepoint启动任务报错找不到job
> 错误堆栈如下:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
> Flink job ()
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
> org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:123)
> at
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:80)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)
> at
> com.xxx.xxx..streaming.job.segment.xx.xxx.main(ProfileConditionJudgmentJob.java:150)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:212)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException:
> Could not find Flink job ()
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$requestJobStatus$14(Dispatcher.java:596)
> at java.util.Optional.orElseGet(Optional.java:267)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.requestJobStatus(Dispatcher.java:590)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> ... 4 common frames omitted
> 2022-05-17 13:43:28.676 [flink-akka.actor.default-dispatcher-4] WARN
> o.a.f.c.d.application.ApplicationDispatcherBootstrap  - Application failed
> unexpectedly:
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
> Flink job ()
> at
> 

Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread Hangxiang Yu
Hi, James.
I may not get what the problem is.
All checkpoints will store in the address as you set.
IIUC, TMs will write some checkpoint info in their local dir and then
upload them to the address and then delete local one.
JM will write some metas of checkpoint to the address and also do the
entire deletion for checkpoints.

Best,
Hangxiang.

On Tue, May 17, 2022 at 9:09 PM James Sandys-Lumsdaine 
wrote:

> Some further Googling says on a StackOverflow posting it is the jobmanager
> that does the deletion and not the taskmanagers.
>
> Currently my taskmanagers are writing their checkpoints to their own
> private disks (/tmp) rather than a share - so my suspicion is the
> jobmanager can't access the folder on other machine. I thought the
> jobmanagers could clear up their own state when instructed to by the
> jobmanager.
>
> I can not yet use an nfs mount in my deployment so I may have to switch to
> heap checkpoint state instead of using the file storage checkpoint system.
> Now I understand what's going on a bit better it seems pointless for me to
> have file checkpoints that can't be read by the jobmanager for failover.
>
> If anyone can clarify/correct me I would appreciate.
>
> James.
> --
> *From:* James Sandys-Lumsdaine
> *Sent:* 16 May 2022 18:52
> *To:* user@flink.apache.org 
> *Subject:* Checkpoint directories not cleared as TaskManagers run
>
>
> Hello,
>
>
> I'm seeing my Flink deployment's checkpoint storage directories build up
> and never clear down.
>
>
> When I run from my own IDE, I see the only the *latest *"chk-x" directory
> under the job id folder. So the first checkpoint is "chk-1", which is then
> replaced with "chk-2" etc.
>
>
> However, when I run as a proper application mode deployment, each of the 4
> taskmanagers running in their own containers retain every one of the
> "chk-x" directories meaning they eat a lot of disk space after as time
> progresses. Interestingly, the jobmanager itself is fine.
>
>
> Does anyone have any suggestion on how to debug this? Anything obvious
> that would cause such behaviour? I'm currently using Flink 1.14.0.
>
>
> My set up is essentially below (trimmed for simplicity):
>
>Configuration conf = new Configuration();
>
> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
>
>
> conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
> true);
>
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>
>
>
> env.enableCheckpointing(5 * 1000);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);
>
>
>
> env.setStateBackend(new HashMapStateBackend());
>
>
> env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");
>
>
> Thanks in advance,
>
> James.
>
>


Re: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread James Sandys-Lumsdaine
Some further Googling says on a StackOverflow posting it is the jobmanager that 
does the deletion and not the taskmanagers.

Currently my taskmanagers are writing their checkpoints to their own private 
disks (/tmp) rather than a share - so my suspicion is the jobmanager can't 
access the folder on other machine. I thought the jobmanagers could clear up 
their own state when instructed to by the jobmanager.

I can not yet use an nfs mount in my deployment so I may have to switch to heap 
checkpoint state instead of using the file storage checkpoint system. Now I 
understand what's going on a bit better it seems pointless for me to have file 
checkpoints that can't be read by the jobmanager for failover.

If anyone can clarify/correct me I would appreciate.

James.

From: James Sandys-Lumsdaine
Sent: 16 May 2022 18:52
To: user@flink.apache.org 
Subject: Checkpoint directories not cleared as TaskManagers run


Hello,


I'm seeing my Flink deployment's checkpoint storage directories build up and 
never clear down.


When I run from my own IDE, I see the only the latest "chk-x" directory under 
the job id folder. So the first checkpoint is "chk-1", which is then replaced 
with "chk-2" etc.


However, when I run as a proper application mode deployment, each of the 4 
taskmanagers running in their own containers retain every one of the "chk-x" 
directories meaning they eat a lot of disk space after as time progresses. 
Interestingly, the jobmanager itself is fine.


Does anyone have any suggestion on how to debug this? Anything obvious that 
would cause such behaviour? I'm currently using Flink 1.14.0.


My set up is essentially below (trimmed for simplicity):

   Configuration conf = new Configuration();

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);


conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);



env.enableCheckpointing(5 * 1000);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10 * 1000);



env.setStateBackend(new HashMapStateBackend());

env.getCheckpointConfig().setCheckpointStorage("file:/tmp/Flink/State");


Thanks in advance,

James.



Re: Channel became inactive while submitting job

2022-05-17 Thread Weihua Hu
Hi,

Which version of Flink are you using?  And what is the start cmd?

Best,
Weihua

> 2022年5月17日 下午6:33,Zain Haider Nemati  写道:
> 
>  main method caused an error: Failed to execute job 'Tracer Processor'.
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
> at 
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)



Re: Does kafka key is supported in kafka sink table

2022-05-17 Thread Dhavan Vaidya
Hey wang!

Perhaps this is what you want:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-format
&
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-fields
?

Note that the fields *have* to be one of the "top" level columns of your
sink table (i.e., fields inside Row are not supported, at least in PyFlink).

Thanks!

On Mon, 16 May 2022 at 19:33, wang <24248...@163.com> wrote:

> Hi dear engineer,
>
> Flink sql supports kafka sink table, not sure whether it supports kafka
> key in kafka sink table? As I want to specify kafka key when inserting
> data into kafka sink table.
> Thanks for your answer in advance.
>
>
>
> Thanks && Regards,
> Hunk
>
>
>
>


Re: Does kafka key is supported in kafka sink table

2022-05-17 Thread Dhavan Vaidya
Hey wang!

Perhaps this is what you want:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-format
&
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#key-fields
?

Note that the fields *have* to be one of the "top" level columns of your
sink table (i.e., fields inside Row are not supported, at least in PyFlink).

Thanks!

On Mon, 16 May 2022 at 19:33, wang <24248...@163.com> wrote:

> Hi dear engineer,
>
> Flink sql supports kafka sink table, not sure whether it supports kafka
> key in kafka sink table? As I want to specify kafka key when inserting
> data into kafka sink table.
> Thanks for your answer in advance.
>
>
>
> Thanks && Regards,
> Hunk
>
>
>
>


Channel became inactive while submitting job

2022-05-17 Thread Zain Haider Nemati
Hi,
I am trying to run a job in my local cluster and facing this issue.


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute job 'Tracer Processor'.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
*Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'Tracer Processor'.*
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1970)
at
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
at
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at
*org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1834)*
at basicpackage.StreamingJob.main(StreamingJob.java:284)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 11 more
*Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
to submit JobGraph.*
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$9(RestClusterClient.java:405)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelInactive(RestClient.java:588)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at
org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler.channelInactive(ChunkedWriteHandler.java:138)
at

Re: How to avoid multiple reading from same topic

2022-05-17 Thread Caizhi Weng
Hi!

In that case just put these two SQL queries into one job and use the same
table as their source. The source task will be automatically reused
(actually controlled by a configuration [1] but it is enabled by default)
and the records will be only read once from Kafka and will be replicated
inside Flink if necessary.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-optimizer-reuse-source-enabled

Surendra Lalwani  于2022年5月17日周二 15:52写道:

> So let's assume if we are reading data from a topic named A and performing
> aggregation on 4 hour window and a different pipeline which does same
> aggregation for 6 hour window, so in this case we are reading same data
> couple of times and processing it for different intervals, can we do it
> using just a single read.
>
> Thanks and Regards ,
> Surendra Lalwani
>
>
> On Tue, May 17, 2022 at 1:19 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> I wanted to explore if we can avoid reading multiple times and read only
>>> once
>>>
>>
>> Could you elaborate more on the reason you need this? If both of your
>> queries need full data from that topic you'll have to read the whole topic
>> many times. If each query only need to consume a specific portion of that
>> topic I would suggest splitting it into multiple topics.
>>
>>
>> Surendra Lalwani  于2022年5月17日周二 15:44写道:
>>
>>> Hi Team,
>>>
>>> We have various SQL queries where we are querying the same kafka topic
>>> but both of the queries are completely different. I wanted to explore if we
>>> can avoid reading multiple times and read only once and perform different
>>> processing according to the query and dump data accordingly.
>>>
>>> Thanks and Regards ,
>>> Surendra Lalwani
>>>
>>>
>>> --
>>> IMPORTANT NOTICE: This e-mail, including any attachments, may contain
>>> confidential information and is intended only for the addressee(s) named
>>> above. If you are not the intended recipient(s), you should not
>>> disseminate, distribute, or copy this e-mail. Please notify the sender by
>>> reply e-mail immediately if you have received this e-mail in error and
>>> permanently delete all copies of the original message from your system.
>>> E-mail transmission cannot be guaranteed to be secure as it could be
>>> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
>>> contain viruses. Company accepts no liability for any damage or loss of
>>> confidential information caused by this email or due to any virus
>>> transmitted by this email or otherwise.
>>
>>
>
> --
> IMPORTANT NOTICE: This e-mail, including any attachments, may contain
> confidential information and is intended only for the addressee(s) named
> above. If you are not the intended recipient(s), you should not
> disseminate, distribute, or copy this e-mail. Please notify the sender by
> reply e-mail immediately if you have received this e-mail in error and
> permanently delete all copies of the original message from your system.
> E-mail transmission cannot be guaranteed to be secure as it could be
> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
> contain viruses. Company accepts no liability for any damage or loss of
> confidential information caused by this email or due to any virus
> transmitted by this email or otherwise.


Re: How to avoid multiple reading from same topic

2022-05-17 Thread Surendra Lalwani
So let's assume if we are reading data from a topic named A and performing
aggregation on 4 hour window and a different pipeline which does same
aggregation for 6 hour window, so in this case we are reading same data
couple of times and processing it for different intervals, can we do it
using just a single read.

Thanks and Regards ,
Surendra Lalwani


On Tue, May 17, 2022 at 1:19 PM Caizhi Weng  wrote:

> Hi!
>
> I wanted to explore if we can avoid reading multiple times and read only
>> once
>>
>
> Could you elaborate more on the reason you need this? If both of your
> queries need full data from that topic you'll have to read the whole topic
> many times. If each query only need to consume a specific portion of that
> topic I would suggest splitting it into multiple topics.
>
>
> Surendra Lalwani  于2022年5月17日周二 15:44写道:
>
>> Hi Team,
>>
>> We have various SQL queries where we are querying the same kafka topic
>> but both of the queries are completely different. I wanted to explore if we
>> can avoid reading multiple times and read only once and perform different
>> processing according to the query and dump data accordingly.
>>
>> Thanks and Regards ,
>> Surendra Lalwani
>>
>>
>> --
>> IMPORTANT NOTICE: This e-mail, including any attachments, may contain
>> confidential information and is intended only for the addressee(s) named
>> above. If you are not the intended recipient(s), you should not
>> disseminate, distribute, or copy this e-mail. Please notify the sender by
>> reply e-mail immediately if you have received this e-mail in error and
>> permanently delete all copies of the original message from your system.
>> E-mail transmission cannot be guaranteed to be secure as it could be
>> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
>> contain viruses. Company accepts no liability for any damage or loss of
>> confidential information caused by this email or due to any virus
>> transmitted by this email or otherwise.
>
>

-- 

IMPORTANT NOTICE: This e-mail, including any attachments, may contain 
confidential information and is intended only for the addressee(s) named 
above. If you are not the intended recipient(s), you should not 
disseminate, distribute, or copy this e-mail. Please notify the sender by 
reply e-mail immediately if you have received this e-mail in error and 
permanently delete all copies of the original message from your system. 
E-mail transmission cannot be guaranteed to be secure as it could be 
intercepted, corrupted, lost, destroyed, arrive late or incomplete, or 
contain viruses. Company accepts no liability for any damage or loss of 
confidential information caused by this email or due to any virus 
transmitted by this email or otherwise.


Re: How to avoid multiple reading from same topic

2022-05-17 Thread Caizhi Weng
Hi!

I wanted to explore if we can avoid reading multiple times and read only
> once
>

Could you elaborate more on the reason you need this? If both of your
queries need full data from that topic you'll have to read the whole topic
many times. If each query only need to consume a specific portion of that
topic I would suggest splitting it into multiple topics.


Surendra Lalwani  于2022年5月17日周二 15:44写道:

> Hi Team,
>
> We have various SQL queries where we are querying the same kafka topic but
> both of the queries are completely different. I wanted to explore if we can
> avoid reading multiple times and read only once and perform different
> processing according to the query and dump data accordingly.
>
> Thanks and Regards ,
> Surendra Lalwani
>
>
> --
> IMPORTANT NOTICE: This e-mail, including any attachments, may contain
> confidential information and is intended only for the addressee(s) named
> above. If you are not the intended recipient(s), you should not
> disseminate, distribute, or copy this e-mail. Please notify the sender by
> reply e-mail immediately if you have received this e-mail in error and
> permanently delete all copies of the original message from your system.
> E-mail transmission cannot be guaranteed to be secure as it could be
> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
> contain viruses. Company accepts no liability for any damage or loss of
> confidential information caused by this email or due to any virus
> transmitted by this email or otherwise.


How to avoid multiple reading from same topic

2022-05-17 Thread Surendra Lalwani
Hi Team,

We have various SQL queries where we are querying the same kafka topic but
both of the queries are completely different. I wanted to explore if we can
avoid reading multiple times and read only once and perform different
processing according to the query and dump data accordingly.

Thanks and Regards ,
Surendra Lalwani

-- 

IMPORTANT NOTICE: This e-mail, including any attachments, may contain 
confidential information and is intended only for the addressee(s) named 
above. If you are not the intended recipient(s), you should not 
disseminate, distribute, or copy this e-mail. Please notify the sender by 
reply e-mail immediately if you have received this e-mail in error and 
permanently delete all copies of the original message from your system. 
E-mail transmission cannot be guaranteed to be secure as it could be 
intercepted, corrupted, lost, destroyed, arrive late or incomplete, or 
contain viruses. Company accepts no liability for any damage or loss of 
confidential information caused by this email or due to any virus 
transmitted by this email or otherwise.


Re:如何更新中文文档?

2022-05-17 Thread Xuyang
Hi, 文档的贡献可以参考[1]。
1、一般来说,发现一个问题,就可以直接开issue,修文档bug,issue的component贴好文档和对应的模块就行。
2、关于你说的讨论区,因为开发者涉及国内外,因此现在基本都是在dev邮箱[2]讨论比较重要的决定,社区现在也在尝试建一个slack来更高效的讨论内容,详见[3]。pr会有热心同学给你review,如果有翻译错误的情况,会直接在你开的issue或pr下面讨论的。


ps: 你的图挂了,下次可以贴图床的链接。




[1] https://flink.apache.org/contributing/contribute-documentation.html
[2] https://flink.apache.org/community.html#mailing-lists
[3] https://lists.apache.org/thread/n43r4qmwprhdmzrj494dbbwr9w7bbdcv



--

Best!
Xuyang




在 2022-05-17 11:13:34,"z y xing"  写道:

各位好:
  最近准备对照中英文系统的学习一下Flink,注意到部分中文翻译可能有问题,以及只有英文,所以想要咨询如下两个问题:
1. 修复的粒度一般是什么?
例如我最近发现的一个问题如下,Flink应该是在1.11版本开始就去除了ingest 
time,至少英文版本是去除了这个属性,但是这里几个版本的时间属性翻译的都是有问题的
同时也参考了https://flink.apache.org/zh/contributing/contribute-documentation.html#section-3
 中的中文文档翻译手册,但是不是很清楚这种小改动应该是怎么处理了?也是要jira上提issue,走PR吗?还是是有一个大的issue的,直接patch即可?








https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/concepts/time_attributes/



2. 有单独的讨论区吗
后续可能会有技术上以及翻译上的(因为英语也不是很好)问题,会有专门做这块的前辈们先带一下吗?当然钉钉群已经加了


谢谢!
振宇