Re: [External] : Re: Need help with executing Flink CLI for native Kubernetes deployment

2021-04-04 Thread Fuyao Li
Hello Yang,

I am just following up the previous email to see if you got some time to reply.
I also took a deeper look into lyft k8s operator recently. It seems it doesn’t 
support HA natively. It still needs the help of ZooKeeper. In terms of this, 
native k8s is better. Any other ideas? Thanks for your help.

Best,
Fuyao

From: Fuyao Li 
Date: Thursday, April 1, 2021 at 12:22
To: Yang Wang 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Yang,

Thanks for sharing the insights.

For problem 1:
I think I can’t do telnet in the container. I tried to use curl 
144.25.13.78:8081 and I could see the HTML of Flink dashboard UI. This proves 
such public IP is reachable inside the cluster. Just as you mentioned, there 
might still be some network issues with the cluster. I will do some further 
check.

For problem 2:
I created a new K8S cluster with bastion server with some public IP assigned to 
it. Finally, I can see something valid from my browser. (There still exist some 
problems with connecting to some databases, but I think these network problems 
are not directly related to Flink, I can investigate into it later.)

For problem 3:
Thanks for sharing the repo you created. I am not sure how much work it could 
take to develop a deployer. I understand is depends on the proficiency, could 
you give a rough estimation? If it is too complicated and some other options 
are not significantly inferior to native Kubernetes. I might prefer to choose 
other options. I am currently comparing different options to deploy in 
Kubernetes.

  1.  Standalone K8S
  2.  Native Kubernetes
  3.  Flink operator (Google Cloud Platform/ Lyft) [1][2]

I also watched the demo video you presented. [3] I noticed you mentioned that 
native K8S is not going to replace the other two options. I still doesn’t fully 
get your idea with limited explanation in the demo. Could you compare the 
tradeoff a little bit? Thanks!
[1] 
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
[2]  
https://github.com/lyft/flinkk8soperator
[3] 
https://youtu.be/pdFPr_VOWTU

Best,
Fuyao


From: Yang Wang 
Date: Tuesday, March 30, 2021 at 19:15
To: Fuyao Li 
Cc: user 
Subject: Re: [External] : Re: Need help with executing Flink CLI for native 
Kubernetes deployment
Hi Fuyao,

Thanks for sharing the progress.

1. The flink client is able to list/cancel jobs, based on logs shared above, I 
should be able to ping 144.25.13.78, why I still can NOT ping such address?

I think this is a environment problem. Actually, not every IP address could be 
tested with "ping" command. I suggest you to use "telnet 
144.25.13.78:8081"
 to check the network connectivity.

2. Why is 
144.25.13.78:8081
 not accessible from outside, I mean on my laptop’s browser. I am within the 
company’s VPN and such public load balancer should expose the flink Web UI, 
right? I tried to debug the network configuration, but failed to find a reason, 
could you give me some hints?

Just like my above answer, I think you need to check the network connectivity 
via "telnet 
144.25.13.78:8081".
 Maybe because the firewall is not allowed connecting from your local(e.g. your 
local ip is not in the white list of LoadBalancer IP).

In production, what is the suggested approach to list and cancel jobs? The 
current manual work of “kubectl exec” into pods is not very reliable.. How to 
automate this process and integrate this CI/CD? Please share some blogs there 
is any, thanks.

I think in production environment, you should have your own deployer, which 
will take care of submitting the jobs, list/cancel the jobs. Even the deployer 
could help with triggering savepoint and manage the whole lifecycle of Flink 
applications. I used to develop a PoC of native-flink-k8s-operator[1]. It could 
be a start point of your own deployer if you want to develop it in JAVA.

[1]. 
https://github.com/wangyang0918/flink-native-k8s-operator

Re: [External] : Union of more then two streams

2021-04-04 Thread Fuyao Li
Hello BB,

Just want to share you some of my immature ideas. Maybe some experts can give 
you better solutions and advice.

  1.  DataStream based solution:
 *   To do a union, as you already know, you must have the datastream to be 
of the same format. Otherwise, you can’t do it. There is a work around way to 
solve you problem. You can ingest the datastream with deserializationSchema and 
map different code book streams to the same Java type, there is a field of 
foreign key value (codebook_fk1, cookbook_fk2 values will all stored here), 
another field just contains the name of the foreign value (e.g. cookbook_fk1.) 
All other fields should also be generalized into such Java Type. After that, 
you can do a union for these different code book  streams and join with 
mainstream.
 *   For cascade connect streams, I guess it is not a suggested approach, 
in additional to memory, I think it will also make the watermark hard to 
coordinate.
  2.  Flink SQL approach:

You can try to use Flink temporal table join to do the join work here. [1][2]. 
For such approach, you are cascade the join to enrich the mainstream. This 
seems to be fitting into your use case since your enrich stream doesn’t change 
so often and contains something like currency. For such join, there should be 
some internal optimization and might get rid of some memory consumption issues, 
I guess? Maybe I am wrong. But it worth to take a look.




Reference:
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#event-time-temporal-join

Best,
Fuyao



From: B.B. 
Date: Friday, April 2, 2021 at 01:41
To: user@flink.apache.org 
Subject: [External] : Union of more then two streams
Hi,

I have an architecture question regarding the union of more than two streams in 
Apache Flink.

We are having three and sometime more streams that are some kind of code book 
with whom we have to enrich main stream.
Code book streams are compacted Kafka topics. Code books are something that 
doesn't change so often, eg currency. Main stream is a fast event stream.

Idea is to make a union of all code books and then join it with main stream and 
store the enrichment data as managed, keyed state (so when compact events from 
kafka expire I have the codebooks saved in state).

The problem is that enriched data foreign keys of every code book is different. 
Eg. codebook_1 has foreign key id codebook_fk1, codebook_2 has foreign key 
codebook_fk2,…. that connects with main stream.
This means I cannot use the keyBy with coProcessFunction.

Is this doable with union or I should cascade a series of connect streams with 
main stream, eg. mainstream.conect(codebook_1) -> 
mainstreamWihtCodebook1.connect(codebook_2) - > 
mainstreamWithCodebook1AndCodebook2.connect(codebook_3) - > ….?
I read somewhere that this later approach is not memory friendly.

Thx.

BB.


Re: Re: Meaning of checkpointStartDelayNanos

2021-04-04 Thread Kai Fu
Thank you for the clarification Yun, it helps.

*-- Best wishes*
*Kai*

On Mon, Apr 5, 2021 at 12:03 PM Yun Gao  wrote:

> Hi Kai,
>
> Yes, you are basically right, one minor point is that the start time is
> taken as the time that the checkpoint get intiated in the JM side.
>
> Best,
>  Yun
>
>
> --Original Mail --
> *Sender:*Kai Fu 
> *Send Date:*Mon Apr 5 09:31:58 2021
> *Recipients:*user 
> *Subject:*Re: Meaning of checkpointStartDelayNanos
>
>> I found its meaning in the code
>> .
>> It means the delay of checkpoint action when the checkpoint barrier comes
>> to the current operator since it's intiated in the source.
>>
>>
>> On Mon, Apr 5, 2021 at 9:21 AM Kai Fu  wrote:
>>
>>> Hi team,
>>>
>>> I'm a little confused by the meaning of *checkpointStartDelayNanos*, I
>>> do not understand what time it exactly means, but it seems it's a quite
>>> important indicator for checkpoint/backpresure.  The explanation of it on 
>>> metrics
>>> page
>>>  
>>> does
>>> not help too much. Can someone help to explain it more clearly?
>>>
>>> --
>>> *Best regards,*
>>> *- Kai*
>>>
>>
>>
>> --
>> *Best regards,*
>> *- Kai*
>>
>

-- 
*Best regards,*
*- Kai*


Re: Questions about checkpointAlignmentTime in unaligned checkpoint

2021-04-04 Thread Kai Fu
Hi Yun,

Thank you for the explanation, it clarifies a lot.

*-- Best wishes*
*Kai*

On Mon, Apr 5, 2021 at 12:13 PM Yun Gao  wrote:

> Hi Kai,
>
> Under unaligned checkpoint settings, there are still alignment process.
> Although
> the task could snapshot the state of the operators on received the first
> barrier and
> emit barriers to the following tasks, it still need to wait till all the
> barriers to be received
> before finalize the checkpoint, and during this process, it need to
> snapshot the buffers
> that are skipped by the barrier, and the final snapshot would compose of
> both the operator
> snapshots and the snapshots of the skipped buffers.
>
> Therefore, the *checkpointAlignmentTime* metric still exists.
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Kai Fu 
> *Send Date:*Mon Apr 5 09:18:39 2021
> *Recipients:*user 
> *Subject:*Questions about checkpointAlignmentTime in unaligned checkpoint
>
>> Hi team,
>>
>> I'm observing the metrics reporter still emits *checkpointAlignmentTime 
>> *metric
>> in the unaligned checkpoint setting as shown in the figure below. Is it a
>> meaningful in unaligned checkpoint, since I suppose the alignment operation
>> only happens in aligned checkpoint.
>>
>> [image: image.png]
>>
>> --
>> *Best regards,*
>> *- Kai*
>>
>

-- 
*Best regards,*
*- Kai*


Re: Union of more then two streams

2021-04-04 Thread Yun Gao
Hi, 

With a.connect(b).coprocess(xx).connect(c).coprocess(xx), there would create two
operators, the first operators would union a and b and output the enriched 
data, 
and then .connect(c).coprocess(xx) would pass-throught the already enriched data
and enrich the record from c. Since the two operators could not get chained, 
the performance
seems would be affected.

Another method is to first label each input with a tag, e.g., ("a", a record), 
("b", b record), ..
and then use 

a.union(b).union(c).union(d).process(xx)

then in the process operator, different logic could be chosen according to the 
tag. 

If adding tag is hard, then it might need to use the new multiple-inputs 
operator, which somehow would need
to use the low-level API of Flink, thus I would recommend the above tag + union 
method first.

Best, 
Yun
 --Original Mail --
Sender:B.B. 
Send Date:Fri Apr 2 16:41:16 2021
Recipients:flink_user 
Subject:Union of more then two streams

Hi,

I have an architecture question regarding the union of more than two streams in 
Apache Flink.

We are having three and sometime more streams that are some kind of code book 
with whom we have to enrich main stream.
Code book streams are compacted Kafka topics. Code books are something that 
doesn't change so often, eg currency. Main stream is a fast event stream.

Idea is to make a union of all code books and then join it with main stream and 
store the enrichment data as managed, keyed state (so when compact events from 
kafka expire I have the codebooks saved in state).

The problem is that enriched data foreign keys of every code book is different. 
Eg. codebook_1 has foreign key id codebook_fk1, codebook_2 has foreign key 
codebook_fk2,…. that connects with main stream.
This means I cannot use the keyBy with coProcessFunction.

Is this doable with union or I should cascade a series of connect streams with 
main stream, eg. mainstream.conect(codebook_1) -> 
mainstreamWihtCodebook1.connect(codebook_2) - > 
mainstreamWithCodebook1AndCodebook2.connect(codebook_3) - > ….?
I read somewhere that this later approach is not memory friendly.

Thx.

BB.

Re: Questions about checkpointAlignmentTime in unaligned checkpoint

2021-04-04 Thread Yun Gao
Hi Kai,

Under unaligned checkpoint settings, there are still alignment process. Although
the task could snapshot the state of the operators on received the first 
barrier and
emit barriers to the following tasks, it still need to wait till all the 
barriers to be received
before finalize the checkpoint, and during this process, it need to snapshot 
the buffers
that are skipped by the barrier, and the final snapshot would compose of both 
the operator
snapshots and the snapshots of the skipped buffers. 

Therefore, the checkpointAlignmentTime metric still exists.

Best,
Yun



 --Original Mail --
Sender:Kai Fu 
Send Date:Mon Apr 5 09:18:39 2021
Recipients:user 
Subject:Questions about checkpointAlignmentTime in unaligned checkpoint

Hi team,

I'm observing the metrics reporter still emits checkpointAlignmentTime metric 
in the unaligned checkpoint setting as shown in the figure below. Is it a 
meaningful in unaligned checkpoint, since I suppose the alignment operation 
only happens in aligned checkpoint.  



-- 
Best regards,- Kai

Re: Re: Meaning of checkpointStartDelayNanos

2021-04-04 Thread Yun Gao
Hi Kai,

Yes, you are basically right, one minor point is that the start time is
taken as the time that the checkpoint get intiated in the JM side.

Best,
 Yun



 --Original Mail --
Sender:Kai Fu 
Send Date:Mon Apr 5 09:31:58 2021
Recipients:user 
Subject:Re: Meaning of checkpointStartDelayNanos

I found its meaning in the code. It means the delay of checkpoint action when 
the checkpoint barrier comes to the current operator since it's intiated in the 
source.


On Mon, Apr 5, 2021 at 9:21 AM Kai Fu  wrote:

Hi team,

I'm a little confused by the meaning of checkpointStartDelayNanos, I do not 
understand what time it exactly means, but it seems it's a quite important 
indicator for checkpoint/backpresure.  The explanation of it on metrics page 
does not help too much. Can someone help to explain it more clearly?

-- 
Best regards,- Kai

-- 
Best regards,- Kai

Re: Meaning of checkpointStartDelayNanos

2021-04-04 Thread Kai Fu
I found its meaning in the code
.
It means the delay of checkpoint action when the checkpoint barrier comes
to the current operator since it's intiated in the source.


On Mon, Apr 5, 2021 at 9:21 AM Kai Fu  wrote:

> Hi team,
>
> I'm a little confused by the meaning of *checkpointStartDelayNanos*, I do
> not understand what time it exactly means, but it seems it's a quite
> important indicator for checkpoint/backpresure.  The explanation of it on 
> metrics
> page
>  does
> not help too much. Can someone help to explain it more clearly?
>
> --
> *Best regards,*
> *- Kai*
>


-- 
*Best regards,*
*- Kai*


Meaning of checkpointStartDelayNanos

2021-04-04 Thread Kai Fu
Hi team,

I'm a little confused by the meaning of *checkpointStartDelayNanos*, I do
not understand what time it exactly means, but it seems it's a quite
important indicator for checkpoint/backpresure.  The explanation of it
on metrics
page
 does
not help too much. Can someone help to explain it more clearly?

-- 
*Best regards,*
*- Kai*


Re: DataStream from kafka topic

2021-04-04 Thread Maminspapin
Thank you all very much!

The problem is solved using
ConfluentRegistryAvroDeserializationSchema.forGeneric(schema,
"http://xxx.xx.xxx.xx:8081";) method.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html

  

But I want to explore your notes. So many new things for me ))

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Proper way to get DataStream

2021-04-04 Thread Maminspapin
Hi, @Arvid Heise-4, @Matthias

I'm very appreciate for your attention, guys. And sorry for my late reply.

Yes, Arvid, you are right, the second way in fact works. I coppied schema
from Schema Registry using it's API and created the .avsc format file. And
thanks again for explaining me why the first way is not compatible.

So, my code to define schema is (I don't know is it good decision...):

Path path = Paths.get("path_to_schema/schema.avsc");
String content = new String(Files.readAllBytes(path));
Schema schema = new Schema.Parser().parse(content);

And it really works.

But, I don't understand why should I use two schemas:
1. schema I created (reader schema)
2. schema I get with SR url (writer schema)

I have some expirience with KafkaStreams lib and using it there is no need
to get reader schema. There is one service to communicate with schemas -
it's Schema Registry. Why not to use single source to get schema in Flink?


Again, the second way is correct, and I can to go farther with my program.

Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ARM support

2021-04-04 Thread Rex Fenley
Thanks for all of this info. Highly appreciated!

On Thu, Apr 1, 2021 at 1:17 AM Guowei Ma  wrote:

> Hi, Rex
>
> I think that Flink does not have an official release that supports the arm
> architecture. There are some efforts and discussion [1][2][3] about
> supporting the architecture. I think you could find some builds at
> openlabtesting. [4]
> But AFAIK there is no clear timeline about that.(correct me if I miss
> something) There is a discussion [5] and I think you might find some
> insight from there at that time.
>
> [1] https://issues.apache.org/jira/browse/FLINK-13448
> [2]
> https://lists.apache.org/thread.html/a564836a3c7cc5300bec7729c2af1ad9d611d526bb59dd6cca72cc7b%40%3Cdev.flink.apache.org%3E
> [3]
> https://lists.apache.org/thread.html/2399c8a701bced2266f9658719807b98a2e593a99b949f50e9a1ab1a%40%3Cdev.flink.apache.org%3E
> [4] http://status.openlabtesting.org/builds?project=apache%2Fflink
> [5]
> https://lists.apache.org/thread.html/5c4c75a2de979ed7ef1c661c15dd252569e598a374c27042b38d078b%40%3Cdev.flink.apache.org%3E
>
> Best,
> Guowei
>
>
> On Thu, Apr 1, 2021 at 3:55 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> We would like to run Flink on ARM yet haven't found any resources
>> indicating that this is yet possible. We are wondering what the timeline is
>> for Flink supporting ARM. Given that all Mac Books are moving to ARM and
>> that AWS is excitedly supporting ARM, it seems important that Flink also
>> supports running on ARM.
>>
>> Thank you
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Flink - Pod Identity

2021-04-04 Thread Swagat Mishra
Austin -

In my case the set up is such that services are deployed on Kubernetes with
Docker, running on EKS. There is also an istio service mesh. So all the
services communicate and access AWS resources like S3 using the service
account. Service account is associated with IAM roles. I have verified that
the service account has access to S3, by running a program that connects to
S3 to read a file also aws client when packaged into the pod is able to
access S3. So that means the roles and policies are good.

When I am running flink, I am following the same configuration for job
manager and task manager as provided here:

https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/kubernetes.html

The exception we are getting is -
org.apache.flink.fs.s3presto.shaded.com.amazonaws.SDKClientException:
Unable to load credentials from service end point.

This happens in the EC2CredentialFetcher class method fetchCredentials -
line number 66, when it tries to read resource, effectively executing
CURL 169.254.170.2/AWS_CONTAINER_CREDENTIALS_RELATIVE_URI

I am not setting the variable AWS_CONTAINER_CREDENTIALS_RELATIVE_URI
because its not the right way to do it for us, we are on EKS. Similarly any
of the ~/.aws/credentials file approach will also not work for us.


Atm, I haven't tried the kuberenetes service account property you mentioned
above. I will try and let you know how it goes.

Question - do i need to provide any parameters while building the docker
image or any configuration in the flink config to tell flink that for all
purposes it should be using the service account and not try to get into
the EC2CredentialFetcher class.

One more thing - we were trying this on the 1.6 version of Flink and not
the 1.12 version.

Regards,
Swagat

On Sun, Apr 4, 2021 at 8:56 AM Sameer Wadkar  wrote:

> Kube2Iam needs to modify IPtables to proxy calls to ec2 metadata to a
> daemonset which runs privileged pods which maps a IP Address of the pods
> and its associated service account to make STS calls and return temporary
> AWS credentials. Your pod “thinks” the ec2 metadata url works locally like
> in an ec2 instance.
>
> I have found that mutating webhooks are easier to deploy (when you have no
> control over the Kubernetes environment - say you cannot change iptables or
> run privileged pods). These can configure the ~/.aws/credentials file. The
> webhook can make the STS call for the service account to role mapping. A
> side car container to which the main container has no access can even renew
> credentials becoz STS returns temp credentials.
>
> Sent from my iPhone
>
> On Apr 3, 2021, at 10:29 PM, Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
> 
> If you’re just looking to attach a service account to a pod using the
> native AWS EKS IAM mapping[1], you should be able to attach the service
> account to the pod via the `kubernetes.service-account` configuration
> option[2].
>
> Let me know if that works for you!
>
> Best,
> Austin
>
> [1]:
> https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html
> [2]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#kubernetes-service-account
>
> On Sat, Apr 3, 2021 at 10:18 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Can you describe your setup a little bit more? And perhaps how you use
>> this setup to grant access to other non-Flink pods?
>>
>> On Sat, Apr 3, 2021 at 2:29 PM Swagat Mishra  wrote:
>>
>>> Yes I looked at kube2iam, I haven't experimented with it.
>>>
>>> Given that the service account has access to S3, shouldn't we have a
>>> simpler mechanism to connect to underlying resources based on the service
>>> account authorization?
>>>
>>> On Sat, Apr 3, 2021, 10:10 PM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
 Hi Swagat,

 I’ve used kube2iam[1] for granting AWS access to Flink pods in the past
 with good results. It’s all based on mapping pod annotations to AWS IAM
 roles. Is this something that might work for you?

 Best,
 Austin

 [1]: https://github.com/jtblin/kube2iam

 On Sat, Apr 3, 2021 at 10:40 AM Swagat Mishra 
 wrote:

> No we are running on aws. The mechanisms supported by flink to connect
> to resources like S3, need us to make changes that will impact all
> services, something that we don't want to do. So providing the aws secret
> key ID and passcode upfront or iam rules where it connects by executing
> curl/ http calls to connect to S3 , don't work for me.
>
> I want to be able to connect to S3, using aws Api's and if that
> connection can be leveraged by the presto library, that is what I am
> looking for.
>
> Regards,
> Swagat
>
>
> On Sat, Apr 3, 2021, 7:37 PM Israel Ekpo  wrote:
>
>> Are you running on Azure Kubernetes Service.
>>
>> You should be able to do it becaus