Re: [E] Re: Kubernetes operator expose UI rest service as NodePort instead of default clusterIP

2022-09-02 Thread Vignesh Kumar Kathiresan via user
Jacob,
Thanks, I checked it out and didn't work. The config overriding to
ClusterIP part

we
were talking about.  So looks like its always being set to ClusterIP now.

Yang,
Having the alb target type as ip works with a ClusterIP type service.

On Fri, Sep 2, 2022 at 8:18 AM Jeesmon Jacob  wrote:

> I remember testing the operator with the rest service exposed as NodePort.
> NodePort requires rbac.nodeRoules.create: true (default is false) in
> values.yaml. Maybe you missed that?
>
>
> https://github.com/apache/flink-kubernetes-operator/blob/release-1.1/helm/flink-kubernetes-operator/values.yaml#L34-L38
> 
>
> On Thu, Sep 1, 2022 at 11:45 PM Vignesh Kumar Kathiresan via user <
> user@flink.apache.org> wrote:
>
>> Hi Yang,
>>
>> Yeah, I gathered that from the operator code soon after posting. I am
>> using the aws alb ingress class [1]. There under considerations it is
>> mentioned if the alb target type is "instance" which is the default traffic
>> mode, the kubernetes service type has to be nodeport or loadbalancer.
>>
>> Also alb target if changed to "ip" might work. Let me try that. I believe
>> there should be a reason to always override the "REST_SERVICE_EXPOSED_TYPE"
>> to "ClusterIP".
>>
>> [1] https://docs.aws.amazon.com/eks/latest/userguide/alb-ingress.html
>> 
>>
>> On Thu, Sep 1, 2022 at 7:01 PM Yang Wang  wrote:
>>
>>> I am afraid the current flink-kubernetes-operator always overrides the
>>> "REST_SERVICE_EXPOSED_TYPE" to "ClusterIP".
>>> Could you please share why the ingress[1] could not meet your
>>> requirements? Compared with NodePort, I think it is a more graceful
>>> implementation.
>>>
>>> [1].
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/operations/ingress/
>>> 
>>>
>>> Best,
>>> Yang
>>>
>>> Vignesh Kumar Kathiresan via user  于2022年9月2日周五
>>> 04:57写道:
>>>
 Hello Flink community,

 Need some help with "flink kubernetes operator" based cluster setup.

 My flink cluster is set up using the flink-kubernetes-operator in AWS
 EKS. The required resources(deployments, pods, services, configmaps etc)
 are created as expected. But the service "*-rest" is created as a
 "ClusterIP" type. I would want it created as a NodePort type.

 I want to expose the UI to external viewing via ingress using the aws
 alb class. This aws-load balancer-controller requires my service to be of
 type NodePort.

 I have tried a few options but the service is always created as
 ClusterIP.
 1) In the FlinkDeployment CRD, under spec.flinkConfiguration
 added kubernetes.rest-service.exposed.type: "NodePort"
 2) In the operator helm values.yaml

 defaultConfiguration:
   create: true
   # Set append to false to replace configuration files
   append: true
   flink-conf.yaml: |+
 # Flink Config Overrides
 kubernetes.rest-service.exposed.type: NodePort

 Neither option gives me a NodePort type service for the UI.
 Any suggestions?










What is the recommended solution for this error of too many files open during a checkpoint?

2022-09-02 Thread Marco Villalobos
What is the recommended solution for this error of too many files open during a 
checkpoint?

2022-09-02 10:04:56 java.io.IOException: Could not perform checkpoint 119366 
for operator tag enrichment (3/4)#104. at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:968)
 at 
org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:115)
 at 
org.apache.flink.streaming.runtime.io.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:156)
 at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:178)
 at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:179)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at 
java.lang.Thread.run(Thread.java:750) Caused by: 
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete 
snapshot 119366 for operator tag enrichment (3/4)#104. Failure reason: 
Checkpoint was declined. at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
 at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
 at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:685)
 at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:606)
 at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:571)
 at 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1003)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:993)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:951)
 ... 13 more Caused by: org.rocksdb.RocksDBException: While open a file for 
appending: 
/mnt/yarn/usercache/hadoop/appcache/application_1631124824249_0061/flink-io-7f392e48-d086-492b-960b-1c56d0f864a0/job_a5b70dea0d3c27b2798c53df49065433_op_KeyedProcessOperator_a91e7e58fb0d0cb4a427ff0c6489016c__3_4__uuid_252bcc06-8857-4153-a866-2e6b3f50c4bb/chk-119366.tmp/MANIFEST-423131:
 Too many open files

RE: Re: [DISCUSS] Contribution of Multi Cluster Kafka Source

2022-09-02 Thread Ryan van Huuksloot via user
Hi Mason,

First off, thanks for putting this FLIP together! Sorry for the delay. Full
disclosure Mason and I chatted a little bit at Flink Forward 2022 but I
have tried to capture the questions I had for him then.

I'll start the conversation with a few questions:

1. The concept of streamIds is not clear to me in the proposal and could
use some more information. If I understand correctly, they will be used in
the MetadataService to link KafkaClusters to ones you want to use? If you
assign stream ids using `setStreamIds`, how can you dynamically increase
the number of clusters you consume if the list of StreamIds is static? I am
basing this off of your example .setStreamIds(List.of("my-stream-1",
"my-stream-2")) so I could be off base with my assumption. If you don't
mind clearing up the intention, that would be great!

2. How would offsets work if you wanted to use this MultiClusterKafkaSource
with a file based backfill? In the case I am thinking of, you have a bucket
backed archive of Kafka data per cluster. and you want to pick up from the
last offset in the archived system, how would you set OffsetInitializers
"per cluster" potentially as a function or are you limited to setting an
OffsetInitializer for the entire Source?

3. Just to make sure - because this system will layer on top of Flink-27
and use KafkaSource for some aspects under the hood, the watermark
alignment that was introduced in FLIP-182 / Flink 1.15 would be possible
across multiple clusters if you assign them to the same alignment group?

Thanks!
Ryan

On 2022/06/28 07:21:15 Mason Chen wrote:
> Hi all,
>
> Thanks for the feedback! I'm adding the users, who responded in the user
> mailing list, to this thread.
>
> @Qingsheng - Yes, I would prefer to reuse the existing Kafka connector
> module. It makes a lot of sense since the dependencies are the same and
the
> implementation can also extend and improve some of the test utilities you
> have been working on for the FLIP 27 Kafka Source. I will enumerate the
> migration steps in the FLIP template.
>
> @Ryan - I don't have a public branch available yet, but I would appreciate
> your review on the FLIP design! When the FLIP design is approved by devs
> and the community, I can start to commit our implementation to a fork.
>
> @Andrew - Yup, one of the requirements of the connector is to read
multiple
> clusters within a single source, so it should be able to work well with
> your use case.
>
> @Devs - what do I need to get started on the FLIP design? I see the FLIP
> template and I have an account (mason6345), but I don't have access to
> create a page.
>
> Best,
> Mason
>
>
>
>
> On Sun, Jun 26, 2022 at 8:08 PM Qingsheng Ren  wrote:
>
> > Hi Mason,
> >
> > It sounds like an exciting enhancement to the Kafka source and will
> > benefit a lot of users I believe.
> >
> > Would you prefer to reuse the existing flink-connector-kafka module or
> > create a new one for the new multi-cluster feature? Personally I prefer
the
> > former one because users won’t need to introduce another dependency
module
> > to their projects in order to use the feature.
> >
> > Thanks for the effort on this and looking forward to your FLIP!
> >
> > Best,
> > Qingsheng
> >
> > > On Jun 24, 2022, at 09:43, Mason Chen  wrote:
> > >
> > > Hi community,
> > >
> > > We have been working on a Multi Cluster Kafka Source and are looking
to
> > > contribute it upstream. I've given a talk about the features and
design
> > at
> > > a Flink meetup: https://youtu.be/H1SYOuLcUTI.
> > >
> > > The main features that it provides is:
> > > 1. Reading multiple Kafka clusters within a single source.
> > > 2. Adjusting the clusters and topics the source consumes from
> > dynamically,
> > > without Flink job restart.
> > >
> > > Some of the challenging use cases that these features solve are:
> > > 1. Transparent Kafka cluster migration without Flink job restart.
> > > 2. Transparent Kafka topic migration without Flink job restart.
> > > 3. Direct integration with Hybrid Source.
> > >
> > > In addition, this is designed with wrapping and managing the existing
> > > KafkaSource components to enable these features, so it can continue to
> > > benefit from KafkaSource improvements and bug fixes. It can be
considered
> > > as a form of a composite source.
> > >
> > > I think the contribution of this source could benefit a lot of users
who
> > > have asked in the mailing list about Flink handling Kafka migrations
and
> > > removing topics in the past. I would love to hear and address your
> > thoughts
> > > and feedback, and if possible drive a FLIP!
> > >
> > > Best,
> > > Mason
> >
> >
>


Re: [E] Re: Kubernetes operator expose UI rest service as NodePort instead of default clusterIP

2022-09-02 Thread Jeesmon Jacob
I remember testing the operator with the rest service exposed as NodePort.
NodePort requires rbac.nodeRoules.create: true (default is false) in
values.yaml. Maybe you missed that?

https://github.com/apache/flink-kubernetes-operator/blob/release-1.1/helm/flink-kubernetes-operator/values.yaml#L34-L38

On Thu, Sep 1, 2022 at 11:45 PM Vignesh Kumar Kathiresan via user <
user@flink.apache.org> wrote:

> Hi Yang,
>
> Yeah, I gathered that from the operator code soon after posting. I am
> using the aws alb ingress class [1]. There under considerations it is
> mentioned if the alb target type is "instance" which is the default traffic
> mode, the kubernetes service type has to be nodeport or loadbalancer.
>
> Also alb target if changed to "ip" might work. Let me try that. I believe
> there should be a reason to always override the "REST_SERVICE_EXPOSED_TYPE"
> to "ClusterIP".
>
> [1] https://docs.aws.amazon.com/eks/latest/userguide/alb-ingress.html
>
> On Thu, Sep 1, 2022 at 7:01 PM Yang Wang  wrote:
>
>> I am afraid the current flink-kubernetes-operator always overrides the
>> "REST_SERVICE_EXPOSED_TYPE" to "ClusterIP".
>> Could you please share why the ingress[1] could not meet your
>> requirements? Compared with NodePort, I think it is a more graceful
>> implementation.
>>
>> [1].
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/operations/ingress/
>> 
>>
>> Best,
>> Yang
>>
>> Vignesh Kumar Kathiresan via user  于2022年9月2日周五
>> 04:57写道:
>>
>>> Hello Flink community,
>>>
>>> Need some help with "flink kubernetes operator" based cluster setup.
>>>
>>> My flink cluster is set up using the flink-kubernetes-operator in AWS
>>> EKS. The required resources(deployments, pods, services, configmaps etc)
>>> are created as expected. But the service "*-rest" is created as a
>>> "ClusterIP" type. I would want it created as a NodePort type.
>>>
>>> I want to expose the UI to external viewing via ingress using the aws
>>> alb class. This aws-load balancer-controller requires my service to be of
>>> type NodePort.
>>>
>>> I have tried a few options but the service is always created as
>>> ClusterIP.
>>> 1) In the FlinkDeployment CRD, under spec.flinkConfiguration
>>> added kubernetes.rest-service.exposed.type: "NodePort"
>>> 2) In the operator helm values.yaml
>>>
>>> defaultConfiguration:
>>>   create: true
>>>   # Set append to false to replace configuration files
>>>   append: true
>>>   flink-conf.yaml: |+
>>> # Flink Config Overrides
>>> kubernetes.rest-service.exposed.type: NodePort
>>>
>>> Neither option gives me a NodePort type service for the UI.
>>> Any suggestions?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>


Re: flink ci build run longer than the maximum time of 310 minutes.

2022-09-02 Thread Matthias Pohl via user
Not sure whether that applies to your case, but there was a recent issue
[1] where the e2e_1_ci job ran into a timeout. If that's what you were
observing, rebasing your branch might help.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-29161

On Fri, Sep 2, 2022 at 10:51 AM Martijn Visser 
wrote:

> You can ask the Flinkbot to run again by typing as comment
>
> @flinkbot run azure
>
> Best regards,
>
> Martijn
>
> Op vr 2 sep. 2022 om 08:40 schreef hjw <1010445...@qq.com>:
>
>> I commit a pr to Flink Github .
>> A error happened in building ci.
>> [error]The job running on agent Azure Pipelines 6 ran longer than the
>> maximum time of 310 minutes. For more information, see
>> https://go.microsoft.com/fwlink/?linkid=2077134
>>
>> How to solve this problem?
>> How to triigle the ci building again?
>> thx.
>>
>


Re: Kafka source stops consuming messages from topic after some minutes

2022-09-02 Thread Martijn Visser
My initial thought is that there's something in your business logic. You're
reading from one Kafka topic, then you're mentioning that it's "connected"
to another Kafka topic. What type of business logic are you executing? Are
you joining data, looking things up etc? My suspicion would be that in this
process there's an issue which causes that operator to not progress as
quickly, causing the source to pause/stop reading.

Op do 1 sep. 2022 om 22:40 schreef alfredo.vasquez.spglobal.com via user <
user@flink.apache.org>:

> Hello,
>
>
>
> Im using flink-connector-kafka version 1.15.2 to consume messages from a
> kafka topic which has 3 partitions that later its connected to to another
> kafka source and then processed in a BroadcastProcessFunction.
>
>
>
> The Kafka source is created as follows:
>
>
>
> Properties properties = new Properties();
>
> properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60");
>
> properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,
> "1000");
>
> properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200");
>
> properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> "90");
>
>
>
> KafkaSource kafkaSource = KafkaSource.builder()
>
>   .setBootstrapServers("localhost:9092")
>
>   .setTopics("mytopic")
>
>   .setGroupId("group-id")
>
>   .setClientIdPrefix("client-id")
>
>   .setStartingOffsets(OffsetsInitializer.latest())
>
>   .setProperty("security.protocol", "SSL")
>
>   .setProperty("partition.discovery.interval.ms", "30")
>
>   .setProperties(properties)
>
>   .setDeserializer(new StringDeserializationSchema())
>
> .build();
>
>
>
> DataStreamSource myStreamSource =
>
>   env.fromSource(kafkaSource,
> WatermarkStrategy.noWatermarks(), "myStreamSource");
>
>
>
>
>
> Then I start sending 10 messages per second to the topic and notice that
> the consumer starts reading messages but after some minutes the consumer
> stops to read messages from the topic, for example if I send 3000 messages
> to the topic only around 1200 or 2000 are consumed.
>
> I do not get any exception or error message in the task manager logs, the
> job does not restart and the backpressure its around 15 to 20% when its
> reading messages and then drops to 0%
>
>
>
> Please let me know any suggestion or additional information required to
> fix this issue.
>
>
>
> Best.
>
> --
>
> The information contained in this message is intended only for the
> recipient, and may be a confidential attorney-client communication or may
> otherwise be privileged and confidential and protected from disclosure. If
> the reader of this message is not the intended recipient, or an employee or
> agent responsible for delivering this message to the intended recipient,
> please be aware that any dissemination or copying of this communication is
> strictly prohibited. If you have received this communication in error,
> please immediately notify us by replying to the message and deleting it
> from your computer. S&P Global Inc. reserves the right, subject to
> applicable local law, to monitor, review and process the content of any
> electronic message or information sent to or from S&P Global Inc. e-mail
> addresses without informing the sender or recipient of the message. By
> sending electronic message or information to S&P Global Inc. e-mail
> addresses you, as the sender, are consenting to S&P Global Inc. processing
> any of your personal data therein.
>


Re: flink ci build run longer than the maximum time of 310 minutes.

2022-09-02 Thread Martijn Visser
You can ask the Flinkbot to run again by typing as comment

@flinkbot run azure

Best regards,

Martijn

Op vr 2 sep. 2022 om 08:40 schreef hjw <1010445...@qq.com>:

> I commit a pr to Flink Github .
> A error happened in building ci.
> [error]The job running on agent Azure Pipelines 6 ran longer than the
> maximum time of 310 minutes. For more information, see
> https://go.microsoft.com/fwlink/?linkid=2077134
>
> How to solve this problem?
> How to triigle the ci building again?
> thx.
>