Re: How to read large amount of data from hive and write to redis, in a batch manner?

2021-07-08 Thread Yik San Chan
Hi Piotr, Yah thanks a lot for your help. For future reference, what I did was simply: 1. Copy the whole BufferingSink as in docs https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#checkpointedfunction 2. In its `invoke` method, I batch write e

Re: Job Recovery Time on TM Lost

2021-07-08 Thread Gen Luo
@刘建刚 Welcome to join the discuss and thanks for sharing your experience. I have a minor question. In my experience, network failures in a certain cluster usually takes a time to recovery, which can be measured as p99 to guide configuring. So I suppose it would be better to use time than attempt co

Re: Flink cep checkpoint size

2021-07-08 Thread Li Jim
Hi, Dawid. Thanks for replying, happy to know you are working on this. On 2021/07/08 12:14:21, Dawid Wysakowicz wrote: > Hi, > > Sorry for the late reply. > > Indeed I found a couple of problems with clearing the state for short > lived keys. I created a JIRA[1] issue to track it and opened a

Apache Flink - Reading Avro messages from Kafka with schema in schema registry

2021-07-08 Thread M Singh
Hi: I am trying to read avro encoded messages from Kafka with schema registered in schema registry. I am using the class (https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.html) using the me

Re: Job Recovery Time on TM Lost

2021-07-08 Thread Lu Niu
Thanks everyone! This is a great discussion! 1. Restarting takes 30s when throwing exceptions from application code because the restart delay is 30s in config. Before lots of related config are 30s which lead to the confusion. I redo the test with config: FixedDelayRestartBackoffTimeStrategy(maxN

Error when trying to setup and run wordcount example on dataproc

2021-07-08 Thread Joey Tran
Hi! I'm trying to just get my bearings with dataproc and flink/beam. When trying to run the wordcount example with a long-running YARN sessions .

Re: How to read large amount of data from hive and write to redis, in a batch manner?

2021-07-08 Thread Piotr Nowojski
Great, thanks for coming back and I'm glad that it works for you! Piotrek czw., 8 lip 2021 o 13:34 Yik San Chan napisał(a): > Hi Piotr, > > Thanks! I end up doing option 1, and that works great. > > Best, > Yik San > > On Tue, May 25, 2021 at 11:43 PM Piotr Nowojski > wrote: > >> Hi, >> >> You

Re: My batch source doesn't emit MAX_WATERMARK when it finishes - why?

2021-07-08 Thread Dawid Wysakowicz
Hi, Your example does not show what watermarks are flowing through the program. It prints the watermark at the point a record is being emitted. As the cited text states, the final watermark is emitted after all records are emitted. You can test it e.g. with the newly added writeWatermark method in

Re: Lot of java.util.zip.ZipFile$Source in JobManager's Heap

2021-07-08 Thread Chesnay Schepler
Those are normal and point to the JDK and Flink classes which remain loaded for the lifetime of the JVM. On 08/07/2021 14:31, Pranjul Ahuja wrote: Hi, On analyzing the heap dump of the JobManager process, I am seeing a lot of instances of java.util.zip.ZipFile$Source which includes open file

My batch source doesn't emit MAX_WATERMARK when it finishes - why?

2021-07-08 Thread Yik San Chan
Hi, According to the docs [1] When a source reaches the end of the input, it emits a final watermark with timestamp Long.MAX_VALUE, indicating the "end of time". However, in my small experiment [2], the Flink job reads from a local csv file, and prints a watermark for each record in the SinkFun

Lot of java.util.zip.ZipFile$Source in JobManager's Heap

2021-07-08 Thread Pranjul Ahuja
Hi, On analyzing the heap dump of the JobManager process, I am seeing a lot of instances of java.util.zip.ZipFile$Source which includes open file handles to jar files. These instances are never garbage collected. I am also observing that the Resident Memory of the process never goes down. It co

Re: Flink cep checkpoint size

2021-07-08 Thread Dawid Wysakowicz
Hi, Sorry for the late reply. Indeed I found a couple of problems with clearing the state for short lived keys. I created a JIRA[1] issue to track it and opened a PR (which needs test coverage before it can be merged) with fixes for those. Best, Dawid [1] https://issues.apache.org/jira/browse/

How to register custormize serializer for flink kafka format type

2021-07-08 Thread Chenzhiyuan(HR)
I create table as below, and the data is from kafka. I want to deserialize the json message to Pojo object. But the message format is not avro or simple json. So I need to know how to register custormized serializer and use it for the 'format.type' property. By the way, my flink version is 1.10.0.

Re: How to read large amount of data from hive and write to redis, in a batch manner?

2021-07-08 Thread Yik San Chan
Hi Piotr, Thanks! I end up doing option 1, and that works great. Best, Yik San On Tue, May 25, 2021 at 11:43 PM Piotr Nowojski wrote: > Hi, > > You could always buffer records in your sink function/operator, until a > large enough batch is accumulated and upload the whole batch at once. Note >

Re: Apache Flink - How to use/invoke LookupTableSource/Function

2021-07-08 Thread M Singh
Thanks Jink for such a great explanation and references.   I will follow-up your references to understand the concepts you have explain so well. Mans On Wednesday, July 7, 2021, 11:21:39 PM EDT, JING ZHANG wrote: Hi Mans,Before coming to the next part, we may need some backgrounds abo

Re: Flink Metric Reporting from Job Manager

2021-07-08 Thread Dawid Wysakowicz
Hi, I think that is not directly supported. After all, the main method can also be executed outside of a JobManager and there you don't have any Flink context/connections/components set up. Best, Dawid On 08/07/2021 00:12, Mason Chen wrote: > Hi all, > > Does Flink support reporting metrics fro

Re: Question about POJO rules - why fields should be public or have public setter/getter?

2021-07-08 Thread Dawid Wysakowicz
Hi Naehee, Short answer would be for historic reasons and compatibility reasons. It was implemented that way back in the days and we don't want to change the default type extraction logic. Otherwise user jobs that rely on the default type extraction logic for state storing would end up with a stat

??????????????????????

2021-07-08 Thread hbdrawn
flink newer 1.5??gelly?? ??4128G50??core flink standlone cluster tm??slots 12??100Gslots~=9G12*

Re: More detail information in sql validate exception

2021-07-08 Thread JING ZHANG
Hi Houyin, Welcome to contribute to the community! Before coding, we need create a JIRA ticket or mailing list discussion and reach consensus. You could.describe the background in detail and give the proposal if you already have one. More information could be found in the document[1]. [1] https://

Re: PyFlink performance and deployment issues

2021-07-08 Thread Xingbo Huang
Hi Wouter, The JIRA is https://issues.apache.org/jira/browse/FLINK-23309. `bundle time` is from the perspective of your e2e latency. Regarding the `bundle size`, generally larger value will provide better throughput, but it should not be set too large, which may cause no output to be seen downstre

Re: PyFlink performance and deployment issues

2021-07-08 Thread Wouter Zorgdrager
Hi Xingbo, all, That is good to know, thank you. Is there any Jira issue I can track? I'm curious to follow this progress! Do you have any recommendations with regard to these two configuration values, to get somewhat reasonable performance? Thanks a lot! Wouter On Thu, 8 Jul 2021 at 10:26, Xing

Re: PyFlink performance and deployment issues

2021-07-08 Thread Xingbo Huang
Hi Wouter, In fact, our users have encountered the same problem. Whenever the `bundle size` or `bundle time` is reached, the data in the buffer needs to be sent from the jvm to the pvm, and then waits for the pym to be processed and sent back to the jvm to send all the results to the downstream op

Re: PyFlink performance and deployment issues

2021-07-08 Thread Wouter Zorgdrager
Hi Dian, all, I will come back to the other points asap. However, I’m still confused about this performance. Is this what I can expect in PyFlink in terms of performance? ~ 1000ms latency for single events? I also had a very simple setup where I send 1000 events to Kafka per second and response t

RE: State Processor API and existing state

2021-07-08 Thread Tan, Min
Many thanks for your prompt reply. Yes. They are the same operators. What I did is just modifying the content of POJO .e.g., doubling amount fields. I am not able to send the production code, but I will do a separate mocked project to reproduce the issue. Send you the mocked code later. Regards