Re: [Question] Basic Python examples.wordcount on local FlinkRunner

2021-09-02 Thread Dian Fu
This seems more like a Beam issue although it uses Flink runner. It would be 
helpful to also send it to the Beam user mailing list.

Regarding to this issue itself, could you check is input.txt accessible in the 
Docker container?

Regards,
Dian

> 2021年9月3日 上午5:19,Adam Pearce  写道:
> 
> Hello all,
>  
> I’m attempting to run a simple, minimally viable example of a Beam pipeline 
> on Flink. I have installed flink-1.13.2 following the setup instructions, and 
> have successfully run the server and navigated to localhost:8081 to view the 
> Web UI. I can see the job successfully submitted and running. It runs, and 
> completes, but the output is not appropriate and the final output never 
> occurs. I have enabled DEBUG logging for further output, but I don’t really 
> see anything that would indicate issues other than what I am showing below. 
> I’ve attached the complete log.
>  
> I am running the following from a Python 3.8.12 virtualenv with apache-beam 
> 2.31.0 installed via pip:
>  
> python -m apache_beam.examples.wordcount --input input.txt --output counts 
> --runner FlinkRunner --flink_master="localhost:8081" [--flink_submit_uber_jar]
>  
> I have tried with and without “--flink_submit_uber_jar” without any change. 
> The local embedded run of this pipeline works (same command as above, 
> omitting “flink_master”. I understand that running this through the Flink 
> server will use docker to stand up containers to perform the work. It appears 
> to be successfully pulling the image: “apache/beam_python3.8_sdk:2.31.0”. I’m 
> curious if there is some issue with the Python container, because in the 
> logs, I am seeing:
>  
> 2021/09/02 20:57:54 Initializing python harness: /opt/apache/beam/boot 
> --id=5-1 --provision_endpoint=host.docker.internal:55847
> 2021/09/02 20:57:54 Downloaded: /tmp/staged/pickled_main_session 
> (sha256:4e9a1199bade55ad73ae6872c8f156c69227ef23d8155f19dead745264999084, 
> size: 3029)
> 2021/09/02 20:57:54 Found artifact: pickled_main_session
> 2021/09/02 20:57:54 Installing setup packages ...
> 2021/09/02 20:57:54 Executing: python -m 
> apache_beam.runners.worker.sdk_worker_main
> 2021/09/02 20:57:55 Python exited: 
>  
> I’ve searched high and low for that “Python exited: ” but have found 
> very little.
>  
> Further context:
> Operating system: macOS 11.5.2
> Docker: Docker version 20.10.8, build 3967b7d
> Python: 3.8.12 (virtualenv)
> Beam: 2.31.0
> Flink: 1.13.2
> This communication is the property of XOR Security and may contain 
> confidential or privileged information. Unauthorized use of this 
> communication is strictly prohibited and may be unlawful. If you have 
> received this communication in error, please immediately notify the sender by 
> reply e-mail and destroy all copies of the communication and any attachments. 
> 



Re: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-09-02 Thread Yang Wang
Hi Alexis

Thanks for your valuable inputs.

First, I want to share why Flink has to overwrite the resources which are
defined in the pod template. You could the fields that will be
overwritten by Flink here[1]. I think the major reason is that Flink need
to ensure the consistency between Flink configuration
(taskmanager.memory.process.size, kubernetes.taskmanager.cpu)
and pod template resource settings. Since users could specify the total
process memory or detailed memory[2], Flink will calculate the
pod resource internally. If we allow users could specify the resources via
pod template, then the users should guarantee the configuration
consistency especially when they specify the detailed memory(e.g. heap,
managed, offheap, etc.). I believe it is a new burden for them.

For the limit-factor, you are right that factors aren’t linear. But I think
the factor is more flexible than the absolute value. A bigger pod usually
could use more burst resources. Moreover, I do not suggest to set
limit-factor for memory since it does not take too much benefit. As a
comparison,
the burst cpu resources could help a lot for the performance.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#pod-template
[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#detailed-memory-model


@spoon_lz  You are right. The limit-factor should be
greater than or equal to 1. And the default value is 1.


Best,
Yang

Alexis Sarda-Espinosa  于2021年9月2日周四
下午8:20写道:

> Just to provide my opinion, I find the idea of factors unintuitive for
> this specific case. When I’m working with Kubernetes resources and sizing,
> I have to think in absolute terms for all pods and define requests and
> limits with concrete values. Using factors for Flink means that I have to
> think differently for my Flink resources, and if I’m using pod templates,
> it makes this switch more jarring because I define what is essentially
> another Kubernetes resources that I’m familiar with, but some of the values
> in my template are ignored. Additionally, if I understand correctly,
> factors aren’t linear, right? If someone specifies a 1GiB request with a
> factor of 1.5, they only get 500MiB on top, but if they specify 10GiB,
> suddenly the limit goes all the way up to 15GiB.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* spoon_lz 
> *Sent:* Donnerstag, 2. September 2021 14:12
> *To:* Yang Wang 
> *Cc:* Denis Cosmin NUTIU ; Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com>; matth...@ververica.com;
> user@flink.apache.org
> *Subject:* Re: Deploying Flink on Kubernetes with fractional CPU and
> different limits and requests
>
>
>
> Hi Yang,
>
> I agree with you, but I think the limit-factor should be greater than or
> equal to 1, and default to 1 is a better choice.
>
> If the default value is 1.5, the memory limit will exceed the actual
> physical memory of a node, which may result in OOM, machine downtime, or
> random pod death if the node runs full.
>
> For some required jobs, increase this value appropriately.
>
>
>
> Best,
>
> Zhuo
>
>
>
>
>
> On 09/2/2021 11:50,Yang Wang
>  wrote:
>
> Given that the limit-factor should be greater than 1, then using the
> limit-factor could also work for memory.
>
>
>
> > Why do we need a larger memory resource limit than request?
>
> A typical use case I could imagine is the page cache. Having more page
> cache might improve the performance.
>
> And they could be reclaimed when the Kubernetes node does not have enough
> memory.
>
>
>
> I still believe that it is the user responsibility to configure a proper
> resource(memory and cpu), not too big. And
>
> using the limit-factor to allow the Flink job could benefit from the burst
> resources.
>
>
>
>
>
> Best,
>
> Yang
>
>
>
> spoon_lz  于2021年9月1日周三 下午8:12写道:
>
> Yes, shrinking the requested memory will result in OOM. We do this because
> the user-created job provides an initial value (for example, 2 cpus and
> 4096MB of memory for TaskManager). In most cases, the user will not change
> this value unless the task fails or there is an exception such as data
> delay. This results in a waste of memory for most simple ETL tasks. These
> isolated situations may not apply to more Flink users. We can adjust
> Kubernetes instead of Flink to solve the resource waste problem.
>
> Just adjusting the CPU value might be a more robust choice, and there are
> probably some scenarios for both decreasing the CPU request and increasing
> the CPU limit
>
>
>
> Best,
>
> Zhuo
>
>
>
> On 09/1/2021 19:39,Yang Wang
>  wrote:
>
> Hi Lz,
>
> Thanks for sharing your ideas.
>
>
> I have to admin that I prefer the limit factor to set the resource limit,
> not the percentage to set the resource request.
>
> Because usually the resource request is configured or calculated by Flink,
> and it indicates user required resources.
>
> It has the same semantic for all deployments(e.g. Yarn, K8s). Especia

Re: Triggers for windowed aggregations in Table API

2021-09-02 Thread Guowei Ma
Hi, John

I agree with Caizhi that you might need to customize a window trigger. But
there is a small addition, you need to convert Table to DataStream first.
Then you can customize the trigger of the window. Because as far as I know,
Table API does not support custom windows yet. For details on how to
convert, you can refer to [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#datastream-api-integration
Best,
Guowei


On Fri, Sep 3, 2021 at 10:28 AM Caizhi Weng  wrote:

> Hi!
>
> You might want to use your custom trigger to achieve this.
>
> Tumble windows are using EventTimeTrigger by default. Flink has another
> built-in trigger called CountTrigger but it only fires for every X records,
> ignoring the event time completely. You might want to create your own
> trigger to combine the two, or more specifically, combine
> EventTimeTrigger#onEventTime and CountTrigger#onElement.
>
> For more about custom triggers see
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers
>
> John Smith  于2021年9月3日周五 上午2:00写道:
>
>> Hi,
>>
>> Sorry if this has been answered previously but I couldn't find any answer
>> for the question and would appreciate any help.
>> Context:
>> Let's say I have a log stream in Kafka where message values have an *id*
>> field along with a few other fields. I want to count the number of messages
>> for each id for a tumbling window of* ten minutes *and if the count for
>> any id in the given window is higher than 5, I want to write the message
>> into the sink topic. However, I don't want to wait until the end of the 10
>> minute window to emit the result and want to immediately emit the result
>> when the count is more than 5 for an id in the window. For example, if I
>> see 6 messages in the first minute for an id, I want to trigger a write
>> with the count of 6 in the sink topic immediately and not wait the whole 10
>> minutes.
>> The following code does the aggregation but emits the results at the end
>> of the window. How can I trigger the emitting result earlier?
>>
>> final Table resultTable = sourceTable
>> .select( $("id")
>> , $("key")
>> 
>> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
>> .groupBy($("w"), $("id"))
>> .select($("w").start().as("WindowStart"), $("id"), 
>> $("key").count().as("count"))
>> ;
>>
>> resultTable.execute().print();
>>
>>
>> Thanks in advance!
>>
>>


Re: Checkpointing failure, subtasks get stuck

2021-09-02 Thread JING ZHANG
Hi Xiangyu Su,
Because of the lack of detailed information, I could only give the
troubleshooting ideas. I hope it is helpful to you.
1. find out which checkpoint expire. You could find that in WEB UI [1] or
in `jobmanager.log`
2. find out operators which not finished checkpoint yet when the checkpoint
expire. You could find that in WEB UI checkpoint detailed information [1]
3. find out which stage of expired operator is slow, align duration  or
sync duration or async duration [1]
If operator spent a long time in  align duration, please check whether
the job exists back pressure. You could find that in WEB UI BackPressure
part. You can enable unaligned checkpoints

[2] to greatly reduce checkpointing times under backpressure.
If operator spent a long time in async duration, you could check
whether there is any network problem.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/unaligned_checkpoints/

Best,
JING ZHANG

Xiangyu Su  于2021年9月1日周三 下午3:52写道:

> Hello Everyone,
> We were facing checkpointing failure issue since version 1.9, currently we
> are using  version 1.13.2
>
> We are using filesystem(s3) as statebackend, 10 mins checkpoint timeout,
> usually the checkpoint takes 10-30 seconds.
> But sometimes I have seen Job failed and restarted due to checkpoint
> timeout without huge increasing of incoming data... and also seen the
> checkpointing progress of some subtasks get stuck by e.g 7% for 10 mins.
> My guess would be somehow the thread for doing checkpointing get blocked...
>
> Any suggestions? idea will be helpful, thanks
>
>
> Best Regards,
> --
> Xiangyu Su
> Java Developer
> xian...@smaato.com
>
> Smaato Inc.
> San Francisco - New York - Hamburg - Singapore
> www.smaato.com
>
> Germany:
>
> Barcastraße 5
>
> 22087 Hamburg
>
> Germany
> M 0049(176)43330282
>
> The information contained in this communication may be CONFIDENTIAL and is
> intended only for the use of the recipient(s) named above. If you are not
> the intended recipient, you are hereby notified that any dissemination,
> distribution, or copying of this communication, or any of its contents, is
> strictly prohibited. If you have received this communication in error,
> please notify the sender and delete/destroy the original message and any
> copy of it from your computer or paper files.
>


Re: Flink on Kubernetes

2021-09-02 Thread Guowei Ma
Hi, Julian

I notice that your configuration
includes "restart-strategy.fixed-delay.attempts: 10". It means that the job
would fail after 10 times failure. So maybe it leads to the job not
restarting again and you could increase this value.
But I am not sure if this is the root cause. So if this does not work I
think you could share the log at that time and the flink version you use.

Best,
Guowei


On Fri, Sep 3, 2021 at 2:00 AM Julian Cardarelli  wrote:

> Hello –
>
>
>
> We have implemented Flink on Kubernetes with Google Cloud Storage in high
> availability configuration as per the below configmap. Everything appears
> to be working normally, state is being saved to GCS.
>
>
>
> However, every now and then – perhaps weekly or every other week, all of
> the submitted jobs are lost and the cluster appears completely reset.
> Perhaps GKE is doing maintenance or something of this nature, but the point
> being that the cluster does not resume from this activity in an operational
> state with all jobs placed into running status.
>
>
>
> Is there something we are missing? Thanks!
>
> -jc
>
>
>
>
>
> apiVersion: v1
>
> kind: ConfigMap
>
> metadata:
>
>   name: flink-config
>
>   labels:
>
> app: flink
>
> data:
>
>   flink-conf.yaml: |+
>
> jobmanager.rpc.address: flink-jobmanager
>
> taskmanager.numberOfTaskSlots: 1
>
> blob.server.port: 6124
>
> jobmanager.rpc.port: 6123
>
> taskmanager.rpc.port: 6122
>
> jobmanager.heap.size: 1024m
>
> taskmanager.memory.process.size: 1024m
>
> kubernetes.cluster-id: cluster1
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> high-availability.storageDir: gs://
> storage-uswest.yyy.com/kubernetes-flink
>
> state.backend: filesystem
>
> state.checkpoints.dir: gs://
> storage-uswest.yyy.com/kubernetes-checkpoint
>
> state.savepoints.dir: gs://storage-uswest.yyy.com/kubernetes-savepoint
>
> execution.checkpointing.interval: 3min
>
> execution.checkpointing.externalized-checkpoint-retention:
> DELETE_ON_CANCELLATION
>
> execution.checkpointing.max-concurrent-checkpoints: 1
>
> execution.checkpointing.min-pause: 0
>
> execution.checkpointing.mode: EXACTLY_ONCE
>
> execution.checkpointing.timeout: 10min
>
> execution.checkpointing.tolerable-failed-checkpoints: 0
>
> execution.checkpointing.unaligned: false
>
> restart-strategy: fixed-delay
>
> restart-strategy.fixed-delay.attempts: 10
>
> restart-strategy.fixed-delay.delay 10s
>
>
>
>   log4j.properties: |+
>
> log4j.rootLogger=INFO, file
>
> log4j.logger.akka=INFO
>
> log4j.logger.org.apache.kafka=INFO
>
> log4j.logger.org.apache.hadoop=INFO
>
> log4j.logger.org.apache.zookeeper=INFO
>
> log4j.appender.file=org.apache.log4j.FileAppender
>
> log4j.appender.file.file=${log.file}
>
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
>
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd
> HH:mm:ss,SSS} %-5p %-60c %x - %m%n
>
>
> log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
> file
>
>
>
>
> ___​
> Julian   Cardarelli
> CEO
> T  *(800) 961-1549* <(800)%20961-1549>
> E *jul...@thentia.com* 
> *LinkedIn* 
> [image: Thentia Website]
> 
> DISCLAIMER
> ​
> ​Neither Thentia Corporation, nor its directors, officers, shareholders,
> representatives, employees, non-arms length companies, subsidiaries,
> parent, affiliated brands and/or agencies are licensed to provide legal
> advice. This e-mail may contain among other things legal information. We
> disclaim any and all responsibility for the content of this e-mail. YOU
> MUST NOT rely on any of our communications as legal advice. Only a licensed
> legal professional may give you advice. Our communications are never
> provided as legal advice, because we are not licensed to provide legal
> advice nor do we possess the knowledge, skills or capacity to provide legal
> advice. We disclaim any and all responsibility related to any action you
> might take based upon our communications and emphasize the need for you to
> never rely on our communications as the basis of any claim or proceeding.
>
> CONFIDENTIALITY
> ​
> ​This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you have received this email in error please notify the
> system manager. This message contains confidential information and is
> intended only for the individual(s) named. If you are not the named
> addressee(s) you should not disseminate, distribute or copy this e-mail.
> Please notify the sender immediately by e-mail if you have received this
> e-mail by mistake and delete this e-mail from your system. If you are not
> the intended recipient y

Re: Verifying max-parallelism value

2021-09-02 Thread Guowei Ma
Hi, Niklas

As far as I know, the maximum parallelism is not currently displayed on the
web ui. Maximum parallelism is the concept of operator granularity, so I
understand that it is a little difficult to show it. However, each job can
have its own default value, if not, there is a calculation method, see [1]
for details.

But if you only want to know the maximum parallelism of the operator after
the keyby, you can refer to restful api[2], this will return the
"maxParallelism" of each job vertex (actually the maximum parallelism of
the first "keyed"operator in the job vertex)
For example: http://localhost:8081/jobs/1eaf5e2ff65e199e4d8e8875882de7db

Hope it helpful.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/production_ready/#set-an-explicit-max-parallelism
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid

Best,
Guowei


On Fri, Sep 3, 2021 at 10:22 AM Caizhi Weng  wrote:

> Hi!
>
> Do you mean pipeline.max-parallelism or any other config options? If yes
> you should see it in the "Job Manager > Configuration" page of Flink UI.
>
> Which config option are you setting and how do you set that?
>
> Niklas Wilcke  于2021年9月3日周五 上午12:53写道:
>
>> Hi Flink community,
>>
>> most likely I'm missing something but I failed to verify the setting of
>> the max-parallelism (# key groups).
>> Is there a way to check the value for a job? I checked the following
>> places without finding it.
>>
>> 1. Flink UI: Job Configuration
>> 2. Flink UI: SubTasks of a Job
>> 3. Taskmanager Logs
>> 4. Exported Metrics (prom)
>>
>> I'm using Flink 1.12.2.
>>
>> It would be very nice to check that the value is properly set in
>> different environments. Maybe someone can help me out how I could do that.
>>
>> Thank you very much!
>>
>> Regards,
>> Niklas
>
>


Re: Triggers for windowed aggregations in Table API

2021-09-02 Thread Caizhi Weng
Hi!

You might want to use your custom trigger to achieve this.

Tumble windows are using EventTimeTrigger by default. Flink has another
built-in trigger called CountTrigger but it only fires for every X records,
ignoring the event time completely. You might want to create your own
trigger to combine the two, or more specifically, combine
EventTimeTrigger#onEventTime and CountTrigger#onElement.

For more about custom triggers see
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/windows/#triggers

John Smith  于2021年9月3日周五 上午2:00写道:

> Hi,
>
> Sorry if this has been answered previously but I couldn't find any answer
> for the question and would appreciate any help.
> Context:
> Let's say I have a log stream in Kafka where message values have an *id*
> field along with a few other fields. I want to count the number of messages
> for each id for a tumbling window of* ten minutes *and if the count for
> any id in the given window is higher than 5, I want to write the message
> into the sink topic. However, I don't want to wait until the end of the 10
> minute window to emit the result and want to immediately emit the result
> when the count is more than 5 for an id in the window. For example, if I
> see 6 messages in the first minute for an id, I want to trigger a write
> with the count of 6 in the sink topic immediately and not wait the whole 10
> minutes.
> The following code does the aggregation but emits the results at the end
> of the window. How can I trigger the emitting result earlier?
>
> final Table resultTable = sourceTable
> .select( $("id")
> , $("key")
> 
> .window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
> .groupBy($("w"), $("id"))
> .select($("w").start().as("WindowStart"), $("id"), 
> $("key").count().as("count"))
> ;
>
> resultTable.execute().print();
>
>
> Thanks in advance!
>
>


Re: Verifying max-parallelism value

2021-09-02 Thread Caizhi Weng
Hi!

Do you mean pipeline.max-parallelism or any other config options? If yes
you should see it in the "Job Manager > Configuration" page of Flink UI.

Which config option are you setting and how do you set that?

Niklas Wilcke  于2021年9月3日周五 上午12:53写道:

> Hi Flink community,
>
> most likely I'm missing something but I failed to verify the setting of
> the max-parallelism (# key groups).
> Is there a way to check the value for a job? I checked the following
> places without finding it.
>
> 1. Flink UI: Job Configuration
> 2. Flink UI: SubTasks of a Job
> 3. Taskmanager Logs
> 4. Exported Metrics (prom)
>
> I'm using Flink 1.12.2.
>
> It would be very nice to check that the value is properly set in different
> environments. Maybe someone can help me out how I could do that.
>
> Thank you very much!
>
> Regards,
> Niklas


Re: Reuse in Blink execution plan

2021-09-02 Thread Caizhi Weng
Hi!

Reusing common sub-plans are an optimization of Flink. Flink is really
reusing them in runtime and the results of the reused tasks are calculated
only once.

Vasily Melnik  于2021年9月2日周四 下午6:32写道:

>
> Hi all.
>
> Using SQL with blink planner for batch calculations, i see *Reused*
> nodes in Optimized Execution Plan while making self join operations:
>
>
> == Optimized Execution Plan ==
> Union(all=[true], union=[id, v, v0, w0$o0])
> :- OverAggregate(orderBy=[id DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, v, v0,
> w0$o0])(reuse_id=[2])
> :  +- Sort(orderBy=[id DESC])
> : +- Exchange(distribution=[single])
> :+- Calc(select=[id, v, v0])
> :   +- HashJoin(joinType=[LeftOuterJoin], where=[($f2 = id0)],
> select=[id, v, $f2, id0, v0], build=[right])
> :  :- Exchange(distribution=[hash[$f2]])
> :  :  +- Calc(select=[id, v, (id + 1) AS $f2])
> :  : +- TableSourceScan(table=[[default_catalog,
> default_database, t1]], fields=[id, v])(reuse_id=[1])
> :  +- Exchange(distribution=[hash[id]])
> : +- *Reused*(reference_id=[1])
> +- *Reused*(reference_id=[2])
>
>
> Question is: do these steps (scans, intermediate calculations) really be
> calculated once or it is just a print shortcut?
>


Re: Use FlinkKafkaConsumer to synchronize multiple Kafka topics

2021-09-02 Thread Arvid Heise
Hi Yan,

Afaik this is not directly supported and would be surprising to other users
since it's a rather specific requirement.
In fact, Flink delegates reading the topics to Kafka consumer API and I
suspect that the warning you received is also coming from Kafka consumer (I
have not found a respective warning in Flink's code base but you could also
show the exact log statement so I can recheck).

What you could do is try to config Kafka consumer to fail hard when topic
metadata cannot be retrieved with a small timeout.

Note that I'm a bit confused by the terms "dead" topic and "rebooted"
topic. Afaik you can only have dead brokers and rebooted brokers and maybe
deleted topics. But I have yet to understand a use case where you would
delete a topic while the consumer is running.

On Thu, Sep 2, 2021 at 4:58 AM Yan Wang  wrote:

> Hi,
>
>
>
> We are currently using a single FlinkKafkaConsumer to consume multiple
> Kafka topics, however, we find that if one of the Kafka topics goes down at
> run time(like rebooting one of the topics), the FlinkKafkaConsumer will
> keep throwing warning message of the dead Kafka topic, and will also
> continue consume other live Kafka topics.
>
> However, what we want is that, if one of the topics goes down, the
> FlinkKafkaConsumer will wait and stop consuming other live topics until the
> dead topic goes live.
>
>
> Code example:
>
> *List kafkaTopicsList = new ArrayList<>( Arrays.asList(
> “KafkaTopic1”,  “KafkaTopic2” ) );*
>
> *FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer<>(
> kafkaTopicsList, new SimpleStringSchema(), properties);*
>
>
>
> As shown in the code example, *kafkaTopicsList* contains two Kafka
> topics, and flinkKafkaConsumer consumes both two topics. We hope that if 
> *KafkaTopic1
> *goes down at run-time(we may reboot *KafkaTopic1 *at run time), the
> flinkKafkaConsumer will wait and stop consuming *KafkaTopic2, *until*
> KafkaTopic1 *goes live again.
>
>
>
> May I ask is it possible to achieve this purpose using current Flink API?
> Do we need to edit configuration somewhere? Or we have to overwrite 
> *FlinkKafkaConsumer
> *Class to achieve this? Thank you very much!
>
>
>
> Thanks, Yan
>


Streaming Patterns and Best Practices - featuring Apache Flink

2021-09-02 Thread Devin Bost
I just released a new video that features Apache Flink in several design
patterns: Streaming Patterns and Best Practices with Apache Pulsar for
Enabling Machine Learning and Analytics 
I thought it might be of interest to the Flink community.

Devin G. Bost


Re: Deploying Stateful functions with an existing Ververica cluster

2021-09-02 Thread Barry Higgins
Hi Igal,
Thank you for getting back so quickly.
All of our applications are currently deployed onto the one Ververica cluster 
so I would be quite keen to get the DataSteam integration option evaluated (I 
am currently hitting an exception where the ObjectMapper in 
DefaultHttpRequestReplyClientSpec is not supporting Java 8 java.time.Duration). 
While I muddle through that, I would be obliged if you could direct me as to 
how I can deploy the equivalent of the master/worker container on Ververica.
Would it be as easy as creating a new Flink application in Java and porting the 
module.yml configurations with the relevant dependencies into that, the 
deploying that jar?
This is a nice middle ground option where the statefun state could be managed 
outside of the calling application whilst offering the separation you referred 
to on the same cluster.
I am thinking that that same statefun flink master/worker could be used to 
route all traffic in the future assuming the load was tolerable but that is 
further down the line.
Thanks again, I really appreciate your insights.
Barry

On 2021/09/02 13:09:13, Igal Shilman  wrote: 
> Hi Barry,
> I've forward your email to the user mailing list as it is more suitable
> here :-)
> 
> Your question definitely makes sense, and let me try to provide you with
> some pointers:
> 
> 1. The architecture that you've outlined has many advantages and is
> desirable if you can
> afford that. Some of them are
> - clean separation of concerns
> - better resource isolation.
> - different SLO and fault domains (failure/slowness in your Python
> function, doesn't trigger a failure/back-pressure in your ETL)
> - you can use event time watermarks for your ETL (statefun only works with
> processing time)
> 
> 2. If you would still prefer to merge the two then you can checkout the
> DataStream integration API [1]
> Although it has some rough edges in respect with working with remote
> functions in particular.
> 
> Good luck,
> Igal.
> 
> 
> [1]
> https://nightlies.apache.org/flink/flink-statefun-docs-release-3.1/docs/sdk/flink-datastream/
> 
> 
> On Thu, Sep 2, 2021 at 1:07 PM Barry Higgins 
> wrote:
> 
> > Hi,
> >
> > I have set up a remote stateful function in python which I’ve deployed
> > on an AWS EC2 box. I am interacting with this from a separate statefun
> > docker container running 2 flink-statefun images with roles master and
> > worker (on a separate EC2 instance). The ingress and egress points for
> > this function are Kafka.
> >
> > I then have a separate Java application using Flink, deployed on a
> > Ververica cluster. From this application I am communicating with the
> > statefun function by adding a sink/source pointing at the
> > ingress/egress above.
> >
> > I have a couple of questions on this setup.
> >
> > I am unsure if there is a better way to communicate with the function
> > from the Flink application
> > I am wondering if there is anyway that I can use the existing deployed
> > application to maintain the state of my remote function, meaning that
> > I can discard the statefun master/worker elements?
> > Failing that, do I just need to create a new Flink application,
> > translate the equivalent of the module.yml that is passed to the
> > existing master/worker to Java, add the dependencies and deploy that
> > jar?
> >
> > I hope that makes sense?
> > Kindest Regards,
> >
> > Barry
> >
> 


Triggers for windowed aggregations in Table API

2021-09-02 Thread John Smith
Hi,

Sorry if this has been answered previously but I couldn't find any answer
for the question and would appreciate any help.
Context:
Let's say I have a log stream in Kafka where message values have an *id*
field along with a few other fields. I want to count the number of messages
for each id for a tumbling window of* ten minutes *and if the count for any
id in the given window is higher than 5, I want to write the message into
the sink topic. However, I don't want to wait until the end of the 10
minute window to emit the result and want to immediately emit the result
when the count is more than 5 for an id in the window. For example, if I
see 6 messages in the first minute for an id, I want to trigger a write
with the count of 6 in the sink topic immediately and not wait the whole 10
minutes.
The following code does the aggregation but emits the results at the end of
the window. How can I trigger the emitting result earlier?

final Table resultTable = sourceTable
.select( $("id")
, $("key")

.window(Tumble.over(lit(10).minutes()).on($("timestamp")).as("w")   )
.groupBy($("w"), $("id"))
.select($("w").start().as("WindowStart"), $("id"),
$("key").count().as("count"))
;

resultTable.execute().print();


Thanks in advance!


Flink on Kubernetes

2021-09-02 Thread Julian Cardarelli
Hello -

We have implemented Flink on Kubernetes with Google Cloud Storage in high 
availability configuration as per the below configmap. Everything appears to be 
working normally, state is being saved to GCS.

However, every now and then - perhaps weekly or every other week, all of the 
submitted jobs are lost and the cluster appears completely reset. Perhaps GKE 
is doing maintenance or something of this nature, but the point being that the 
cluster does not resume from this activity in an operational state with all 
jobs placed into running status.

Is there something we are missing? Thanks!
-jc


apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
app: flink
data:
  flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1024m
kubernetes.cluster-id: cluster1
high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: gs://storage-uswest.yyy.com/kubernetes-flink
state.backend: filesystem
state.checkpoints.dir: gs://storage-uswest.yyy.com/kubernetes-checkpoint
state.savepoints.dir: gs://storage-uswest.yyy.com/kubernetes-savepoint
execution.checkpointing.interval: 3min
execution.checkpointing.externalized-checkpoint-retention: 
DELETE_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 10min
execution.checkpointing.tolerable-failed-checkpoints: 0
execution.checkpointing.unaligned: false
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 10
restart-strategy.fixed-delay.delay 10s

  log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS} 
%-5p %-60c %x - %m%n

log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
 file



___
Julian Cardarelli
CEO
T (800) 961-1549
ejul...@thentia.com
LinkedIn
DISCLAIMER
​
​Neither Thentia Corporation, nor its directors, officers, shareholders, 
representatives, employees, non-arms length companies, subsidiaries, parent, 
affiliated brands and/or agencies are licensed to provide legal advice. This 
e-mail may contain among other things legal information. We disclaim any and 
all responsibility for the content of this e-mail. YOU MUST NOT rely on any of 
our communications as legal advice. Only a licensed legal professional may give 
you advice. Our communications are never provided as legal advice, because we 
are not licensed to provide legal advice nor do we possess the knowledge, 
skills or capacity to provide legal advice. We disclaim any and all 
responsibility related to any action you might take based upon our 
communications and emphasize the need for you to never rely on our 
communications as the basis of any claim or proceeding.
CONFIDENTIALITY
​
​This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. If 
you have received this email in error please notify the system manager. This 
message contains confidential information and is intended only for the 
individual(s) named. If you are not the named addressee(s) you should not 
disseminate, distribute or copy this e-mail. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and delete 
this e-mail from your system. If you are not the intended recipient you are 
notified that disclosing, copying, distributing or taking any action in 
reliance on the contents of this information is strictly prohibited.

Disclaimer

The information contained in this communication from the sender is 
confidential. It is intended solely for use by the recipient and others 
authorized to receive it. If you are not the recipient, you are hereby notified 
that any disclosure, copying, distribution or taking action in relation of the 
contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been 
automatically archived by Mimecast, a leader in email security and cyber 
resilience. Mimecast integrates email defenses with brand protection, security 
awareness training, web security, compliance and other essential capabilities. 
Mimecast helps protect large and small organizations from malicious activity, 

Verifying max-parallelism value

2021-09-02 Thread Niklas Wilcke
Hi Flink community,

most likely I'm missing something but I failed to verify the setting of the 
max-parallelism (# key groups).
Is there a way to check the value for a job? I checked the following places 
without finding it.

1. Flink UI: Job Configuration
2. Flink UI: SubTasks of a Job
3. Taskmanager Logs
4. Exported Metrics (prom)

I'm using Flink 1.12.2.

It would be very nice to check that the value is properly set in different 
environments. Maybe someone can help me out how I could do that.

Thank you very much!

Regards,
Niklas

smime.p7s
Description: S/MIME cryptographic signature


Re: FLINK-14316 happens on version 1.13.2

2021-09-02 Thread Yun Gao
Hi Xiangyu,

There might be different reasons for the "Job Leader... lost leadership" 
problem. Do you see the erros
in the TM log ? If so, the root cause might be that the connection between the 
TM and ZK is lost or
timeout. Have you checked the GC status of the TM side ? If the GC is ok, could 
you provide more detailed
exception stack ?

Best,
Yun



 --Original Mail --
Sender:Xiangyu Su 
Send Date:Wed Sep 1 15:31:03 2021
Recipients:user 
Subject:FLINK-14316 happens on version 1.13.2

Hello Everyone,
We upgrade flink to 1.13.2, and we were facing randomly the "Job leader ... 
lost leadership" error, the job keep restarting and failing...
It behaviours like this ticket https://issues.apache.org/jira/browse/FLINK-14316

Did anybody had same issue or any suggestions?

Best Regards,


-- 
Xiangyu Su
Java Developer
xian...@smaato.com

Smaato Inc.
San Francisco - New York - Hamburg - Singapore
www.smaato.com

Germany:
Barcastraße 5
22087 Hamburg
GermanyM 0049(176)43330282

The information contained in this communication may be CONFIDENTIAL and is 
intended only for the use of the recipient(s) named above. If you are not the 
intended recipient, you are hereby notified that any dissemination, 
distribution, or copying of this communication, or any of its contents, is 
strictly prohibited. If you have received this communication in error, please 
notify the sender and delete/destroy the original message and any copy of it 
from your computer or paper files.


Re: Seeing Exception ClassNotFoundException: __wrapper while running in Kubernetes Cluster

2021-09-02 Thread Nicolaus Weidner
Hi Praneeth,

It does look like a failure constructing the serializer. Can you share the
serialization config you use for the Kafka producer? In particular, are you
using a custom serializer?
Do you use any custom classloading configuration?

Best regards,
Nico

On Wed, Sep 1, 2021 at 11:38 PM Praneeth Ramesh 
wrote:

> Hi All
>
> I am trying to run a flink scala application which reads from kafka apply
> some lookup transformations and then writes to kafka.
>
> I am using Flink Version 1.12.1
>
> I tested it in local and it works fine. But when I try to run it on
> cluster using native kubernetes integration I see weird errors like below.
>
> The cluster also looks fine, because I tried to run a wordcount
> application on the cluster and it worked fine.
>
> The exception is not clear and also the stacktrace shows the taskmanager
> stack trace and hence no idea where in the application the problem could
> be. Could this be a serialization issue? Is there a way to debug such
> issues and find the actual point in application code where there is a
> problem?
>
> ```org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
> instantiate serializer.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:160)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.io.IOException: unexpected exception type
> at java.io.ObjectStreamClass.throwMiscException(Unknown Source)
> ~[?:?]
> at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
> ~[?:?]
> at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> ~[?:?]
> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> ~[?:?]
> at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> ~[?:?]
> at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
> at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> ... 8 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.ClassNotFoundException:
> __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
> ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
> ~[flink-dist_2.12-1

Re: Deploying Stateful functions with an existing Ververica cluster

2021-09-02 Thread Igal Shilman
Hi Barry,
I've forward your email to the user mailing list as it is more suitable
here :-)

Your question definitely makes sense, and let me try to provide you with
some pointers:

1. The architecture that you've outlined has many advantages and is
desirable if you can
afford that. Some of them are
- clean separation of concerns
- better resource isolation.
- different SLO and fault domains (failure/slowness in your Python
function, doesn't trigger a failure/back-pressure in your ETL)
- you can use event time watermarks for your ETL (statefun only works with
processing time)

2. If you would still prefer to merge the two then you can checkout the
DataStream integration API [1]
Although it has some rough edges in respect with working with remote
functions in particular.

Good luck,
Igal.


[1]
https://nightlies.apache.org/flink/flink-statefun-docs-release-3.1/docs/sdk/flink-datastream/


On Thu, Sep 2, 2021 at 1:07 PM Barry Higgins 
wrote:

> Hi,
>
> I have set up a remote stateful function in python which I’ve deployed
> on an AWS EC2 box. I am interacting with this from a separate statefun
> docker container running 2 flink-statefun images with roles master and
> worker (on a separate EC2 instance). The ingress and egress points for
> this function are Kafka.
>
> I then have a separate Java application using Flink, deployed on a
> Ververica cluster. From this application I am communicating with the
> statefun function by adding a sink/source pointing at the
> ingress/egress above.
>
> I have a couple of questions on this setup.
>
> I am unsure if there is a better way to communicate with the function
> from the Flink application
> I am wondering if there is anyway that I can use the existing deployed
> application to maintain the state of my remote function, meaning that
> I can discard the statefun master/worker elements?
> Failing that, do I just need to create a new Flink application,
> translate the equivalent of the module.yml that is passed to the
> existing master/worker to Java, add the dependencies and deploy that
> jar?
>
> I hope that makes sense?
> Kindest Regards,
>
> Barry
>


RE: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-09-02 Thread Alexis Sarda-Espinosa
Just to provide my opinion, I find the idea of factors unintuitive for this 
specific case. When I’m working with Kubernetes resources and sizing, I have to 
think in absolute terms for all pods and define requests and limits with 
concrete values. Using factors for Flink means that I have to think differently 
for my Flink resources, and if I’m using pod templates, it makes this switch 
more jarring because I define what is essentially another Kubernetes resources 
that I’m familiar with, but some of the values in my template are ignored. 
Additionally, if I understand correctly, factors aren’t linear, right? If 
someone specifies a 1GiB request with a factor of 1.5, they only get 500MiB on 
top, but if they specify 10GiB, suddenly the limit goes all the way up to 15GiB.

Regards,
Alexis.

From: spoon_lz 
Sent: Donnerstag, 2. September 2021 14:12
To: Yang Wang 
Cc: Denis Cosmin NUTIU ; Alexis Sarda-Espinosa 
; matth...@ververica.com; 
user@flink.apache.org
Subject: Re: Deploying Flink on Kubernetes with fractional CPU and different 
limits and requests

Hi Yang,
I agree with you, but I think the limit-factor should be greater than or equal 
to 1, and default to 1 is a better choice.
If the default value is 1.5, the memory limit will exceed the actual physical 
memory of a node, which may result in OOM, machine downtime, or random pod 
death if the node runs full.
For some required jobs, increase this value appropriately.

Best,
Zhuo


On 09/2/2021 11:50,Yang 
Wang wrote:
Given that the limit-factor should be greater than 1, then using the 
limit-factor could also work for memory.

> Why do we need a larger memory resource limit than request?
A typical use case I could imagine is the page cache. Having more page cache 
might improve the performance.
And they could be reclaimed when the Kubernetes node does not have enough 
memory.

I still believe that it is the user responsibility to configure a proper 
resource(memory and cpu), not too big. And
using the limit-factor to allow the Flink job could benefit from the burst 
resources.


Best,
Yang

spoon_lz mailto:spoon...@126.com>> 于2021年9月1日周三 下午8:12写道:
Yes, shrinking the requested memory will result in OOM. We do this because the 
user-created job provides an initial value (for example, 2 cpus and 4096MB of 
memory for TaskManager). In most cases, the user will not change this value 
unless the task fails or there is an exception such as data delay. This results 
in a waste of memory for most simple ETL tasks. These isolated situations may 
not apply to more Flink users. We can adjust Kubernetes instead of Flink to 
solve the resource waste problem.
Just adjusting the CPU value might be a more robust choice, and there are 
probably some scenarios for both decreasing the CPU request and increasing the 
CPU limit

Best,
Zhuo

On 09/1/2021 19:39,Yang 
Wang wrote:
Hi Lz,

Thanks for sharing your ideas.

I have to admin that I prefer the limit factor to set the resource limit, not 
the percentage to set the resource request.
Because usually the resource request is configured or calculated by Flink, and 
it indicates user required resources.
It has the same semantic for all deployments(e.g. Yarn, K8s). Especially for 
the memory resource, giving a discount
for the resource request may cause OOM.
BTW, I am wondering why the users do not allocate fewer resources if they do 
not need.

@Denis Cosmin NUTIU I really appreciate for that 
you want to work on this feature. Let's first to reach a consensus
about the implementation. And then opening a PR is welcome.


Best,
Yang


spoon_lz mailto:spoon...@126.com>> 于2021年9月1日周三 下午4:36写道:

Hi,everyone
I have some other ideas for kubernetes resource Settings, as described by 
WangYang in [flink-15648], which increase the CPU limit by a certain percentage 
to provide more computational performance for jobs. Should we consider the 
alternative of shrinking the request to start more jobs, which would improve 
cluster resource utilization? For example, for some low-traffic tasks, we can 
even set the CPU request to 0 in extreme cases. Both limit enlargement and 
Request shrinkage may be required

Best,
Lz
On 09/1/2021 16:06,Denis Cosmin 
NUTIU wrote:
Hi Yang,

I have limited Flink internals knowledge, but I can try to implement 
FLINK-15648 and open up a PR on GitHub or send the patch via email. How does 
that sound?
I'll sign the ICLA and switch to my personal address.

Sincerely,
Denis

On Wed, 2021-09-01 at 13:48 +0800, Yang Wang wrote:
Great. If no one wants to work on this ticket FLINK-15648, I will try to get 
this done in the next major release cycle(1.15).

Best,
Yang

Denis Cosmin NUTIU mailto:dnu...@bitdefender.com>> 
于2021年8月31日周二 下午4:59写道:
Hi everyone,

Thanks for getting back to me!

>  I think it would be nice if the task manager pods get their values from the 
> configuration file only if the 

Re: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-09-02 Thread spoon_lz
Hi Yang,
I agree with you, but I think the limit-factor should be greater than or equal 
to 1, and default to 1 is a better choice.
If the default value is 1.5, the memory limit will exceed the actual physical 
memory of a node, which may result in OOM, machine downtime, or random pod 
death if the node runs full. 
For some required jobs, increase this value appropriately.


Best,
Zhuo




On 09/2/2021 11:50,Yang Wang wrote:
Given that the limit-factor should be greater than 1, then using the 
limit-factor could also work for memory.


> Why do we need a larger memory resource limit than request?
A typical use case I could imagine is the page cache. Having more page cache 
might improve the performance.
And they could be reclaimed when the Kubernetes node does not have enough 
memory.


I still believe that it is the user responsibility to configure a proper 
resource(memory and cpu), not too big. And
using the limit-factor to allow the Flink job could benefit from the burst 
resources.




Best,
Yang


spoon_lz  于2021年9月1日周三 下午8:12写道:

Yes, shrinking the requested memory will result in OOM. We do this because the 
user-created job provides an initial value (for example, 2 cpus and 4096MB of 
memory for TaskManager). In most cases, the user will not change this value 
unless the task fails or there is an exception such as data delay. This results 
in a waste of memory for most simple ETL tasks. These isolated situations may 
not apply to more Flink users. We can adjust Kubernetes instead of Flink to 
solve the resource waste problem.
Just adjusting the CPU value might be a more robust choice, and there are 
probably some scenarios for both decreasing the CPU request and increasing the 
CPU limit


Best,
Zhuo


On 09/1/2021 19:39,Yang Wang wrote:
Hi Lz,

Thanks for sharing your ideas.

I have to admin that I prefer the limit factor to set the resource limit, not 
the percentage to set the resource request.
Because usually the resource request is configured or calculated by Flink, and 
it indicates user required resources.
It has the same semantic for all deployments(e.g. Yarn, K8s). Especially for 
the memory resource, giving a discount
for the resource request may cause OOM.
BTW, I am wondering why the users do not allocate fewer resources if they do 
not need.


@Denis Cosmin NUTIU I really appreciate for that you want to work on this 
feature. Let's first to reach a consensus
about the implementation. And then opening a PR is welcome.




Best,
Yang




spoon_lz  于2021年9月1日周三 下午4:36写道:



Hi,everyone
I have some other ideas for kubernetes resource Settings, as described by 
WangYang in [flink-15648], which increase the CPU limit by a certain percentage 
to provide more computational performance for jobs. Should we consider the 
alternative of shrinking the request to start more jobs, which would improve 
cluster resource utilization? For example, for some low-traffic tasks, we can 
even set the CPU request to 0 in extreme cases. Both limit enlargement and 
Request shrinkage may be required


Best,
Lz
On 09/1/2021 16:06,Denis Cosmin NUTIU wrote: 
Hi Yang,


I have limited Flink internals knowledge, but I can try to implement 
FLINK-15648 and open up a PR on GitHub or send the patch via email. How does 
that sound?
I'll sign the ICLA and switch to my personal address.


Sincerely,
Denis


On Wed, 2021-09-01 at 13:48 +0800, Yang Wang wrote:
Great. If no one wants to work on this ticket FLINK-15648, I will try to get 
this done in the next major release cycle(1.15).


Best,
Yang


Denis Cosmin NUTIU  于2021年8月31日周二 下午4:59写道:

Hi everyone,


Thanks for getting back to me! 


>  I think it would be nice if the task manager pods get their values from the 
> configuration file only if the pod templates don’t specify any resources. 
> That was the goal of supporting pod templates, right? Allowing more custom 
> scenarios without letting the configuration options get bloated.


I think that's correct. In the current behavior Flink will override the 
resources settings "The memory and cpu resources(including requests and limits) 
will be overwritten by Flink configuration options. All other resources(e.g. 
ephemeral-storage) will be retained.'[1]. After reading the comments from 
FLINK-15648[2], I'm not sure that it can be done in a clean manner with pod 
templates.


> I think it is a good improvement to support different resource requests and 
> limits. And it is very useful especially for the CPU resource since it 
> heavily depends on the upstream workloads.


I agree with you! I have limited knowledge of Flink internals but the 
kubernetes.jobmanager.limit-factor and kubernetes.taskmanager.limit-factor 
seems to be the right way to do it.


[1] Native Kubernetes | Apache Flink
[2] [FLINK-15648] Support to configure limit for CPU and memory requirement - 
ASF JIRA (apache.org) 


From: Yang Wang 
Sent: Tuesday, August 31, 2021 6:04 AM
To: Alexis Sarda-Espinosa 
Cc: Denis Cosmin NUTIU ; matth...@ververica.com 
; 

Re: Flink Performance Issue

2021-09-02 Thread Mohammed Kamaal
Hi Fabian,

Just an update,

Problem 2:-

Caused by: org.apache.kafka.common.errors.NetworkException
It is resolved. It was because we exceeded the number of allowed
partitions for the kafka cluster (AWS MSK cluster). Have deleted
unused topics and partitions to resolve the issue.

Problem 1:-

I increased the kafka partition and flink parallelism to 45 and the
throughput has improved from 20 minutes to 14 minutes (20K records).
Can you check the flink graph and let me know if there is anything
else that can be done here to improve the throughput further.

Thanks

On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
 wrote:
>
> Hi Fabian,
>
> Problem 1:-
> -
> I have removed the print out sink's and ran the test again. This time
> the throughput is 17 minutes for 20K records (200 records every
> second). Earlier it was 20 minutes for 20K records. (parallelism 15
> and kafka partition of 15)
>
> Please find the attached application graph. Can you suggest what else
> is required further to improve the throughput.
>
> Problem 2:-
> -
> Also, I tried to increase the parallelism to 45 from 15 (also
> increasing the kafka partition to 45 from 15) to see if this helps in
> getting a better throughput.
>
> After increasing the partition, I am facing the Network issue with
> Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
> with 15 partitions for the kafka topic. This could be an issue with
> the Kafka cluster?
>
> Kafka Cluster Configuration:-
> ---
> auto.create.topics.enable=true
> log.retention.hours=24
> default.replication.factor=3
> min.insync.replicas=2
> num.io.threads=45
> num.network.threads=60
> num.partitions=45
> num.replica.fetchers=2
> unclean.leader.election.enable=true
> replica.lag.time.max.ms=3
> zookeeper.session.timeout.ms=18000
> log.retention.ms=17280
> log.cleanup.policy=delete
> group.max.session.timeout.ms=120
>
> Exception:-
> 
>  "locationInformation":
> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
> "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
> "message": "Error during disposal of stream operator.",
> "throwableInformation": [
> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed to send data to Kafka: Failed to send data to Kafka: The server
> disconnected
>
> "Caused by: org.apache.kafka.common.errors.NetworkException: The
> server disconnected before a response was received."
>
>
> Thanks
>
>
> On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul  wrote:
> >
> > Hi Mohammed,
> >
> > 200records should definitely be doable. The first you can do is remove the 
> > print out Sink because they are increasing the load on your cluster due to 
> > the additional IO
> > operation and secondly preventing Flink from fusing operators.
> > I am interested to see the updated job graph after the removal of the print 
> > sinks.
> >
> > Best,
> > Fabian


Reuse in Blink execution plan

2021-09-02 Thread Vasily Melnik
Hi all.

Using SQL with blink planner for batch calculations, i see *Reused*  nodes
in Optimized Execution Plan while making self join operations:


== Optimized Execution Plan ==
Union(all=[true], union=[id, v, v0, w0$o0])
:- OverAggregate(orderBy=[id DESC], window#0=[ROW_NUMBER(*) AS w0$o0 ROWS
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[id, v, v0,
w0$o0])(reuse_id=[2])
:  +- Sort(orderBy=[id DESC])
: +- Exchange(distribution=[single])
:+- Calc(select=[id, v, v0])
:   +- HashJoin(joinType=[LeftOuterJoin], where=[($f2 = id0)],
select=[id, v, $f2, id0, v0], build=[right])
:  :- Exchange(distribution=[hash[$f2]])
:  :  +- Calc(select=[id, v, (id + 1) AS $f2])
:  : +- TableSourceScan(table=[[default_catalog,
default_database, t1]], fields=[id, v])(reuse_id=[1])
:  +- Exchange(distribution=[hash[id]])
: +- *Reused*(reference_id=[1])
+- *Reused*(reference_id=[2])


Question is: do these steps (scans, intermediate calculations) really be
calculated once or it is just a print shortcut?


Re: Job leader ... lost leadership with version 1.13.2

2021-09-02 Thread Till Rohrmann
Forwarding the discussion back to the user mailing list.

On Thu, Sep 2, 2021 at 12:25 PM Till Rohrmann  wrote:

> The stack trace looks ok. This happens whenever the leader loses
> leadership and this can have different reasons. What's more interesting is
> what happens before and after and what's happening on the system you use
> for HA (probably ZooKeeper). Maybe the connection to ZooKeeper is unstable
> or there is some other problem.
>
> Cheers,
> Till
>
> On Thu, Sep 2, 2021 at 12:20 PM Xiangyu Su  wrote:
>
>> Hi Till,
>> thank you very much for this fast reply!
>> This issue happens very randomly, I did some tries to reproduce that, but
>> not easy...
>> and here is the exception stacktrace from JM logs and TM logs:
>>
>> java.lang.Exception: Job leader for job id
>> 6fd38dedbca7bf65bfa57cb306930fa9 lost leadership.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2189)
>> at java.util.Optional.ifPresent(Optional.java:159)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2187)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> On Thu, 2 Sept 2021 at 12:14, Till Rohrmann  wrote:
>>
>>> Hi Xiangyu,
>>>
>>> Do you have the logs of the problematic test run available? Ideally, we
>>> can enable the DEBUG log level to get some more information. I think this
>>> information would be needed to figure out the problem.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Sep 2, 2021 at 11:47 AM Xiangyu Su  wrote:
>>>
 Hello Everyone,
 Hello Till,
 We upgrade flink to 1.13.2, and we were facing randomly the "Job leader
 ... lost leadership" error, the job keep restarting and failing...
 It behaviours like this ticket
 https://issues.apache.org/jira/browse/FLINK-14316

 Did anybody had same issue or any suggestions?

 Best Regards,

 --
 Xiangyu Su
 Java Developer
 xian...@smaato.com

 Smaato Inc.
 San Francisco - New York - Hamburg - Singapore
 www.smaato.com

 Germany:

 Barcastraße 5

 22087 Hamburg

 Germany
 M 0049(176)43330282

 The information contained in this communication may be CONFIDENTIAL and
 is intended only for the use of the recipient(s) named above. If you are
 not the intended recipient, you are hereby notified that any dissemination,
 distribution, or copying of this communication, or any of its contents, is
 strictly prohibited. If you have received this communication in error,
 please notify the sender and delete/destroy the original message and any
 copy of it from your computer or paper files.

>>>
>>
>> --
>> Xiangyu Su
>> Java Developer
>> xian...@smaato.com
>>
>> Smaato Inc.
>> San Francisco - New York - Hamburg - Singapore
>> www.smaato.com
>>
>> Germany:
>>
>> Barcastraße 5
>>
>> 22087 Hamburg
>>
>> Germany
>> M 0049(176)43330282
>>
>> The information contained in this communication may be CONFIDENTIAL and
>> is intended only for the use of the recipient(s) named above. If you are
>> not the intended recipient, you are hereby notified that any dissemination,
>> distribution, or copying of this communication, or any of its contents, is
>> strictly prohibited. If you have received this communication in error,
>> please notify the sender and delete/destroy the original message and any
>> copy of it from your computer or paper files.
>>
>


Re: Checkpointing failure, subtasks get stuck

2021-09-02 Thread Till Rohrmann
Hi Xiangyu,

Can you provide us with more information about your job, which state
backend you are using and how you've configured the checkpointing? Can you
also provide some information about the problematic checkpoints (e.g.
alignment time, async/sync duration) that you find on the checkpoint
details page? If you have access to the logs, then this could also help
better understand what is going on.

In general, such a problem can be caused by backpressure and long alignment
times. Backpressure can come from skewed data or if the user code is
performing very lengthy operations. What you could try is to enable
unaligned checkpoints if the problem is long alignment times caused by
backpressure.

Cheers,
Till

On Thu, Sep 2, 2021 at 11:48 AM Xiangyu Su  wrote:

> Hello Everyone,
> Hello Till,
> We were facing checkpointing failure issue since version 1.9, currently we
> are using  version 1.13.2
>
> We are using filesystem(s3) as statebackend, 10 mins checkpoint timeout,
> usually the checkpoint takes 10-30 seconds.
> But sometimes I have seen Job failed and restarted due to checkpoint
> timeout without huge increasing of incoming data... and also seen the
> checkpointing progress of some subtasks get stuck by e.g 7% for 10 mins.
> My guess would be somehow the thread for doing checkpointing get blocked...
>
> Any suggestions? idea will be helpful, thanks
>
>
> Best Regards,
>
> --
> Xiangyu Su
> Java Developer
> xian...@smaato.com
>
> Smaato Inc.
> San Francisco - New York - Hamburg - Singapore
> www.smaato.com
>
> Germany:
>
> Barcastraße 5
>
> 22087 Hamburg
>
> Germany
> M 0049(176)43330282
>
> The information contained in this communication may be CONFIDENTIAL and is
> intended only for the use of the recipient(s) named above. If you are not
> the intended recipient, you are hereby notified that any dissemination,
> distribution, or copying of this communication, or any of its contents, is
> strictly prohibited. If you have received this communication in error,
> please notify the sender and delete/destroy the original message and any
> copy of it from your computer or paper files.
>


Re: Job leader ... lost leadership with version 1.13.2

2021-09-02 Thread Till Rohrmann
Hi Xiangyu,

Do you have the logs of the problematic test run available? Ideally, we can
enable the DEBUG log level to get some more information. I think this
information would be needed to figure out the problem.

Cheers,
Till

On Thu, Sep 2, 2021 at 11:47 AM Xiangyu Su  wrote:

> Hello Everyone,
> Hello Till,
> We upgrade flink to 1.13.2, and we were facing randomly the "Job leader
> ... lost leadership" error, the job keep restarting and failing...
> It behaviours like this ticket
> https://issues.apache.org/jira/browse/FLINK-14316
>
> Did anybody had same issue or any suggestions?
>
> Best Regards,
>
> --
> Xiangyu Su
> Java Developer
> xian...@smaato.com
>
> Smaato Inc.
> San Francisco - New York - Hamburg - Singapore
> www.smaato.com
>
> Germany:
>
> Barcastraße 5
>
> 22087 Hamburg
>
> Germany
> M 0049(176)43330282
>
> The information contained in this communication may be CONFIDENTIAL and is
> intended only for the use of the recipient(s) named above. If you are not
> the intended recipient, you are hereby notified that any dissemination,
> distribution, or copying of this communication, or any of its contents, is
> strictly prohibited. If you have received this communication in error,
> please notify the sender and delete/destroy the original message and any
> copy of it from your computer or paper files.
>


Checkpointing failure, subtasks get stuck

2021-09-02 Thread Xiangyu Su
Hello Everyone,
Hello Till,
We were facing checkpointing failure issue since version 1.9, currently we
are using  version 1.13.2

We are using filesystem(s3) as statebackend, 10 mins checkpoint timeout,
usually the checkpoint takes 10-30 seconds.
But sometimes I have seen Job failed and restarted due to checkpoint
timeout without huge increasing of incoming data... and also seen the
checkpointing progress of some subtasks get stuck by e.g 7% for 10 mins.
My guess would be somehow the thread for doing checkpointing get blocked...

Any suggestions? idea will be helpful, thanks


Best Regards,

-- 
Xiangyu Su
Java Developer
xian...@smaato.com

Smaato Inc.
San Francisco - New York - Hamburg - Singapore
www.smaato.com

Germany:

Barcastraße 5

22087 Hamburg

Germany
M 0049(176)43330282

The information contained in this communication may be CONFIDENTIAL and is
intended only for the use of the recipient(s) named above. If you are not
the intended recipient, you are hereby notified that any dissemination,
distribution, or copying of this communication, or any of its contents, is
strictly prohibited. If you have received this communication in error,
please notify the sender and delete/destroy the original message and any
copy of it from your computer or paper files.


Job leader ... lost leadership with version 1.13.2

2021-09-02 Thread Xiangyu Su
Hello Everyone,
Hello Till,
We upgrade flink to 1.13.2, and we were facing randomly the "Job leader ...
lost leadership" error, the job keep restarting and failing...
It behaviours like this ticket
https://issues.apache.org/jira/browse/FLINK-14316

Did anybody had same issue or any suggestions?

Best Regards,

-- 
Xiangyu Su
Java Developer
xian...@smaato.com

Smaato Inc.
San Francisco - New York - Hamburg - Singapore
www.smaato.com

Germany:

Barcastraße 5

22087 Hamburg

Germany
M 0049(176)43330282

The information contained in this communication may be CONFIDENTIAL and is
intended only for the use of the recipient(s) named above. If you are not
the intended recipient, you are hereby notified that any dissemination,
distribution, or copying of this communication, or any of its contents, is
strictly prohibited. If you have received this communication in error,
please notify the sender and delete/destroy the original message and any
copy of it from your computer or paper files.


Re: De/Serialization API to tear-down user code

2021-09-02 Thread Dawid Wysakowicz
Hi Sergio,

You can find the explanation why we haven't added the close method in
the corresponding JIRA ticke[1]:

When adding close() method to both DeserializationSchema and
SerializationSchema with a default implementation, it breaks source
compatibility if a user's class implements both interfaces at the
same time. The problem is that java does not know which default
implementation to use from those two interfaces as the close()
signature is the same in both. In flink code base we have three of
such implementations: SimpleStringSchema
TypeInformationSerializationSchema and one in tests It is not a
problem for open as we have a paremeter there that differentiates
the two methods.

We decided to skip the closefor now until we have a first use case
for it. We do not need to close the schema registry client because
it communicates over REST. Moreover no other schema needs a close
for now. For the Table API we also need only the open for generating
the code of the serializer.

Now that you're reaching out with such a requirement we might revisit
it. WDYT Arvid?

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-17306

On 02/09/2021 10:37, Sergio Morales wrote:
>
> Thank you for the answer. I’m using the (De)SerializationSchema in
> such way that it has a reference to a custom class that manages some
> resources. In the open() method I’m able to init the resources
> accordingly, but it is really strange that despite providing an
> “open()” there is no counter-part “close()” to release properly the
> same resources. We have in the project a memory leak that could be
> resolved using the close() API approach and now we have to do some
> extra hop by subclassing the SinkFunction to `@Override` the close
> method in order to close itself and the resources managed by the
> De/SerializationSchema too.
>
>  
>
> I was considering to add the close() API method by following a
> previous PR: https://github.com/apache/flink/pull/12006
>  , would it be something
> that the team is willing to accept or should I avoid any effort on
> that part because the previous design document is not valid anymore?
>
>  
>
> Regards,
>
> Sergio.
>
>  
>
> *From: *Caizhi Weng 
> *Date: *Thursday, 2 September 2021 at 04:18
> *To: *Sergio Morales 
> *Cc: *user 
> *Subject: *Re: De/Serialization API to tear-down user code
>
>  
>
> Hi!
>
>  
>
> The (De)serializationSchema is only a helper for changing the data
> object to another format. What's your use case? If you're creating a
> (De)serializationSchema for a source / sink you might want to open and
> close the resources in the open / close methods of the source / sink,
> not in the (De)serializationSchema.
>
>  
>
> Sergio Morales mailto:sdmorale...@gmail.com>>
> 于2021年9月1日周三下午6:44写道:
>
> Hi,
>
> I’m currently working to close some resources while using the
> SerializationSchema and DeserializationSchema (Flink-core
> v1.12.1), however, after revising the document outlining the API
> the methods
> 
> (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
> 
> )
> the close() operations are missing, and in the master branch I
> could not find any new version including them:
>
> *
> 
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java
> 
> 
>
> *
> 
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
> 
> 
>
> Thank you for any help.
>
>  
>
> Regards,
>
> Sergio.
>
>  
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: De/Serialization API to tear-down user code

2021-09-02 Thread Sergio Morales
Thank you for the answer. I’m using the (De)SerializationSchema in such way 
that it has a reference to a custom class that manages some resources. In the 
open() method I’m able to init the resources accordingly, but it is really 
strange that despite providing an “open()” there is no counter-part “close()” 
to release properly the same resources. We have in the project a memory leak 
that could be resolved using the close() API approach and now we have to do 
some extra hop by subclassing the SinkFunction to `@Override` the close method 
in order to close itself and the resources managed by the 
De/SerializationSchema too.

I was considering to add the close() API method by following a previous PR: 
https://github.com/apache/flink/pull/12006 , would it be something that the 
team is willing to accept or should I avoid any effort on that part because the 
previous design document is not valid anymore?

Regards,
Sergio.

From: Caizhi Weng 
Date: Thursday, 2 September 2021 at 04:18
To: Sergio Morales 
Cc: user 
Subject: Re: De/Serialization API to tear-down user code

Hi!

The (De)serializationSchema is only a helper for changing the data object to 
another format. What's your use case? If you're creating a 
(De)serializationSchema for a source / sink you might want to open and close 
the resources in the open / close methods of the source / sink, not in the 
(De)serializationSchema.

Sergio Morales mailto:sdmorale...@gmail.com>> 
于2021年9月1日周三 下午6:44写道:

Hi,

I’m currently working to close some resources while using the 
SerializationSchema and DeserializationSchema (Flink-core v1.12.1), however, 
after revising the document outlining the API the methods 
(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988) 
the close() operations are missing, and in the master branch I could not find 
any new version including them:

* 
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java

* 
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java

Thank you for any help.



Regards,

Sergio.