Async code inside Flink Sink
Hello, I have a use case where I need to do a cache file deletion after a successful sunk operation(writing to db). My Flink pipeline is built using Java. I am contemplating using Java completableFuture.runasync() to perform the file deletion activity. I am wondering what issues this might cause in terms of thread management and next event processing. Also related to the same usecase, in another Flink pipeline. I might need to do this cache file deletion in a timed fashion. For example, every five minutes I have to check for cache files that are older than currently opened cache file that is serving some data into the Sink function. All old cache files that are in closed status need to be deleted in a timely manner. All this deletion has to happen asynchronously without blocking the flow of events from Flink source to sink. Also, based on requirement, I cannot make the whole Sink operation async. I have to make the file based cache deletion alone async inside the Sink function. Does Flink support timers or async blocks? Any inputs will be highly helpful. Thanks, JB
Re: Parallelism for auto-scaling, memory for auto-tuning - Flink operator
If you have some experience before, I'd recommend setting a good parallelism and TM resource spec first, to give the autotuner a good starting point. Usually, the autoscaler can tune your jobs well within a few attempts (<=3). As for `pekko.ask.timeout`, the default value should be sufficient in most cases. Best, Zhanghao Chen From: Maxim Senin via user Sent: Thursday, April 18, 2024 5:56 To: user@flink.apache.org Subject: Parallelism for auto-scaling, memory for auto-tuning - Flink operator Hi. Does it make sense to specify `parallelism` for task managers or the `job`, and, similarly, to specify memory amount for the task managers, or it’s better to leave it to autoscaler and autotuner to pick the best values? How many times would the autoscaler need to restart task managers before it picks the right values? Does `pekko.ask.timeout` need to be sufficient for task managers to get into running state with all the restarts? Cheers, Maxim COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email is confidential and is intended solely for the addressee. Access to this email by anyone else is unauthorized. If you are not the intended recipient, any disclosure, copying, distribution or any action taken or omitted to be taken in reliance on it, is prohibited and may be unlawful.
Parallelism for auto-scaling, memory for auto-tuning - Flink operator
Hi. Does it make sense to specify `parallelism` for task managers or the `job`, and, similarly, to specify memory amount for the task managers, or it’s better to leave it to autoscaler and autotuner to pick the best values? How many times would the autoscaler need to restart task managers before it picks the right values? Does `pekko.ask.timeout` need to be sufficient for task managers to get into running state with all the restarts? Cheers, Maxim COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email is confidential and is intended solely for the addressee. Access to this email by anyone else is unauthorized. If you are not the intended recipient, any disclosure, copying, distribution or any action taken or omitted to be taken in reliance on it, is prohibited and may be unlawful.
Re: Understanding default firings in case of allowed lateness
Hi Xuyang, So if I check the side output way then my pipeline would be something like this: final OutputTag lateOutputTag = new OutputTag("late-data"){}; SingleOutputStreamOperator reducedDataStream = dataStream .keyBy(new MyKeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .allowedLateness(Time.seconds(180)) .sideOutputLateData(lateOutputTag) .reduce(new MyDataReducer()); DataStream lateStream = reducedDataStream.getSideOutput (lateOutputTag); lateStream .keyBy(new MyKeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .allowedLateness(Time.seconds(480)) .reduce(new MyDataReducer()); So basically I collect all the late data into another stream and apply the same transformations again on it, to get reduced data from this late data. Is this the correct handling for having reduced data from late data only ? Also I have a few more queries 1. Now for this late records stream not having to drop records, I would have to set allowed lateness to be of a larger value than what I had set in the first stream transformation ? Basically do I need to set any allowed lateness for the window operation of the late data stream if I want to also reduce them the same way as in time records ? 2. Also when I collect late data as side output, would the reduced function now only contain the data reduced from in time records only and no late records would be included in the subsequent reduced data. Basically after this the output of reduced data will only contain: [ reduceData (d1, d2, d3) ] and not any data like: reducedData(d1, d2, d3, late d4, late d5) or reducedData(d1, d2, d3, late d4, late d5, late d6) And transformation of lata data stream would now contain reduced data from: [ reducedData(late d4, late d5, late d6) ] Thanks Sachin On Wed, Apr 17, 2024 at 4:05 PM Xuyang wrote: > Hi, Sachin. > > IIUC, it is in the second situation you listed, that is: > [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late > d6) ]. > However, because of `table.exec.emit.late-fire.delay`, it could also be > such as > [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5), > reducedData(ld1, d2, d3, late d4, late d5, late d6) ] > > Actually, allow-lateness(table.exec.emit.allow-lateness) is used to > control when it decides not to change the value of the window output, and > allowing the framework to automatically clear the corresponding state. > > > Also if I want the reduced data from late records to not include the > data emitted within the window bounds, how can I do the same ? > or if this is handled as default case ? > > Maybe side output[1] can help you to collect the late data and re-compute > them. > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/ > > -- > Best! > Xuyang > > > At 2024-04-17 16:56:54, "Sachin Mittal" wrote: > > Hi, > > Suppose my pipeline is: > > data > .keyBy(new MyKeySelector()) > .window(TumblingEventTimeWindows.of(Time.seconds(60))) > .allowedLateness(Time.seconds(180)) > .reduce(new MyDataReducer()) > > > So I wanted to know if the final output stream would contain reduced data > at the end of the window mark and also another reduced data at the end of > allowed lateness ? > If that is the case, then the reduced data at the end of allowed lateness > would also include the data from non late records or it will only include > reduced data from late records. > > Example > > If I have data in sequence: > > [window start], d1, d2, d3, [window end], late d4, late d5, late d6, [end > of allowed lateness] > > The resultant stream after window and reduce operation would be: > > [ reduceData (d1, d2, d3), reducedData(late d4, late d5, late d6) ] > > or > [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late > d6) ] > > or something else ? > > Also if I want the reduced data from late records to not include the data > emitted within the window bounds, how can I do the same ? > or if this is handled as default case ? > > Thanks > Sachin > > > > >
Re:Understanding default firings in case of allowed lateness
Hi, Sachin. IIUC, it is in the second situation you listed, that is: [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late d6) ]. However, because of `table.exec.emit.late-fire.delay`, it could also be such as [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5), reducedData(ld1, d2, d3, late d4, late d5, late d6) ] Actually, allow-lateness(table.exec.emit.allow-lateness) is used to control when it decides not to change the value of the window output, and allowing the framework to automatically clear the corresponding state. > Also if I want the reduced data from late records to not include the data > emitted within the window bounds, how can I do the same ? or if this is handled as default case ? Maybe side output[1] can help you to collect the late data and re-compute them. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/ -- Best! Xuyang At 2024-04-17 16:56:54, "Sachin Mittal" wrote: Hi, Suppose my pipeline is: data .keyBy(new MyKeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .allowedLateness(Time.seconds(180)) .reduce(new MyDataReducer()) So I wanted to know if the final output stream would contain reduced data at the end of the window mark and also another reduced data at the end of allowed lateness ? If that is the case, then the reduced data at the end of allowed lateness would also include the data from non late records or it will only include reduced data from late records. Example If I have data in sequence: [window start], d1, d2, d3, [window end], late d4, late d5, late d6, [end of allowed lateness] The resultant stream after window and reduce operation would be: [ reduceData (d1, d2, d3), reducedData(late d4, late d5, late d6) ] or [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late d6) ] or something else ? Also if I want the reduced data from late records to not include the data emitted within the window bounds, how can I do the same ? or if this is handled as default case ? Thanks Sachin
Understanding default firings in case of allowed lateness
Hi, Suppose my pipeline is: data .keyBy(new MyKeySelector()) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .allowedLateness(Time.seconds(180)) .reduce(new MyDataReducer()) So I wanted to know if the final output stream would contain reduced data at the end of the window mark and also another reduced data at the end of allowed lateness ? If that is the case, then the reduced data at the end of allowed lateness would also include the data from non late records or it will only include reduced data from late records. Example If I have data in sequence: [window start], d1, d2, d3, [window end], late d4, late d5, late d6, [end of allowed lateness] The resultant stream after window and reduce operation would be: [ reduceData (d1, d2, d3), reducedData(late d4, late d5, late d6) ] or [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late d6) ] or something else ? Also if I want the reduced data from late records to not include the data emitted within the window bounds, how can I do the same ? or if this is handled as default case ? Thanks Sachin
Re: Elasticsearch8 example
Hi Tauseef. I see that the support of Elasticsearch 8[1] will be released in elasticsearch-3.1.0. So there is no docs for the elasticsearch8 by now. We could learn to use it by some tests[2] before the docs is ready. Best, Hang [1] https://issues.apache.org/jira/browse/FLINK-26088 [2] https://github.com/apache/flink-connector-elasticsearch/blob/main/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkITCase.java Tauseef Janvekar 于2024年4月17日周三 01:12写道: > Dear Team, > > Can anyone please share an example for flink-connector-elasticsearch8 > I found this connector being added to the github. But no proper > documentation is present around it. > > It will be of great help if a sample code is provided on the above > connector. > > Thanks, > Tauseef >
Re: Table Source from Parquet Bug
Hi, David. Have you added the parquet format[1] dependency in your dependencies? It seems that the class ParquetColumnarRowInputFormat cannot be found. Best, Hang [1] https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/parquet/ Sohil Shah 于2024年4月17日周三 04:42写道: > Hello David, > > Since this is a ClassNotFoundException, you maybe missing a dependency. > Could you share your pom.xml. > > Thanks > -Sohil > Project: Braineous https://bugsbunnyshah.github.io/braineous/ > > On Tue, Apr 16, 2024 at 11:25 AM David Silva via user < > user@flink.apache.org> wrote: > >> Hi, >> >> Our team would like to leverage Flink but we're running into some issues >> with reading from a parquet file source. I *think* it's an issue with >> the Flink API >> >> Could someone please help take a look? >> >> We're using *Scala 2.12* & *Flink 1.18.1* >> >> I attached a copy of the code, the terminal output, and the flink logs. >> >> The issue is @ *MacFlapAggregator.scala:324*, it errors because of: >> *Caused by: java.lang.ClassNotFoundException: >> org.apache.flink.formats.parquet.ParquetColumnarRowInputFormat* >> >> >>- *MacFlapAggregator.scala:206 *creates and queries the same exact >>table successfully though >>- *MacFlapAggregator.scala:318 *If I create the table using a CSV >>source, it works >> >> >> I also posted in the slack server here >> https://apache-flink.slack.com/archives/C03G7LJTS2G/p1713209215085589 >> >> Any help with this would be immensely helpful, our team has been >> struggling with this for a couple days now. >> >> Thanks! >> >