async io parallelism
Hello, Let's say, my elements are simple key-value pairs, elements are coming from Kafka, where they were partitioned by "key", then I do processing using KeyedProcessFunction (keyed by same "key"), then I enrich elements using ordered RichAsyncFunction, then output to another KeyedProcessFunction (keyed by same "key") and then write to Kafka topic, again partitioned by same "key", something like this: FlinkKafkaConsumer -> keyBy("key") -> Intake(KeyedProcessFunction) -> AsyncDataStream.orderedWait() -> keyBy("key")->Output(KeyedProcessFunction)->FlinkKafkaProducer Will it preserve order of events with same "key"? * Will Output function receive elements with same "key" in same order as they were originally in Kafka? * Will FlinkKafkaProducer writes elements with same "key" in same order as they were originally in Kafka? * Does it depend on parallelism of async IO? Documentation says "the stream order is preserved", but if there are multiple parallel instances of async function, does it mean order relative to each single instance? Or total stream order? Thanks, Alexey
StreamingFileSink Not Flushing All Data
Hi there, Using Flink 1.9.1, trying to write .tgz files with the StreamingFileSink#BulkWriter. It seems like flushing the output stream doesn't flush all the data written. I've verified I can create valid files using the same APIs and data on there own, so thinking it must be something I'm doing wrong with the bulk format. I'm writing to the local filesystem, with the `file://` protocol. For Tar/ Gzipping, I'm using the Apache Commons Compression library, version 1.20. Here's a runnable example of the issue: import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import java.io.FileOutputStream; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; class Scratch { public static class Record implements Serializable { private static final long serialVersionUID = 1L; String id; public Record() {} public Record(String id) { this.id = id; } public String getId() { return id; } public void setId(String id) { this.id = id; } } public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); TarArchiveOutputStream taos = new TarArchiveOutputStream(new GzipCompressorOutputStream(new FileOutputStream("/home/austin/Downloads/test.tgz"))); TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", "test")); String fullText = "hey\nyou\nwork"; byte[] fullTextData = fullText.getBytes(); fileEntry.setSize(fullTextData.length); taos.putArchiveEntry(fileEntry); taos.write(fullTextData, 0, fullTextData.length); taos.closeArchiveEntry(); taos.flush(); taos.close(); StreamingFileSink textSink = StreamingFileSink .forBulkFormat(new Path("file:///home/austin/Downloads/text-output"), new BulkWriter.Factory() { @Override public BulkWriter create(FSDataOutputStream out) throws IOException { final TarArchiveOutputStream compressedOutputStream = new TarArchiveOutputStream(new GzipCompressorOutputStream(out)); return new BulkWriter() { @Override public void addElement(Record record) throws IOException { TarArchiveEntry fileEntry = new TarArchiveEntry(String.format("%s.txt", record.id)); byte[] fullTextData = "hey\nyou\nplease\nwork".getBytes(StandardCharsets.UTF_8); fileEntry.setSize(fullTextData.length); compressedOutputStream.putArchiveEntry(fileEntry); compressedOutputStream.write(fullTextData, 0, fullTextData.length); compressedOutputStream.closeArchiveEntry(); } @Override public void flush() throws IOException { compressedOutputStream.flush(); } @Override public void finish() throws IOException { this.flush(); } }; } }) .withBucketCheckInterval(1000) .build(); env .fromElements(new Record("1"), new Record("2")) .addSink(textSink) .name("Streaming File Sink") .uid("streaming-file-sink"); env.execute("streaming file sink test"); } } >From the stat/ hex dumps, you can see that the first bits are there, but are then cut off: ~/Downloads » stat test.tgz File: test.tgz Size: 114 Blocks: 8 IO Block: 4096 regular file Device: 801h/2049d Inode: 30041077Links: 1 Access: (0664/-rw-rw-r--) Uid: ( 1000/ austin) Gid: ( 1000/ austin) Access: 2020-02-21 19:30:06.009028283 -0500 Modify: 2020-02-21 19:30:44.509424406 -0500 Change: 2020-02-21 19:30:44.509424406 -0500 Birth: - ~/Downloads » tar -tvf test.tgz -rw-r--r-- 0/0 12 2020-02-21 19:35 test.txt ~/Downloads » hd test.tgz 1f 8b 08 00 00 00 00 00 00 ff ed cf 31 0e 80 20 |1.. | 0010 0c 85 61 66 4f c1 09 cc 2b 14 3c 8f 83 89 89 03 |..afO...+.<.| 0020 09 94 a8 b7 77 30 2e ae 8a 2e fd 96 37 f6 af 4c |w0..7..L| 0030 45 7a d9 c4 34 04 02 22 b3 c5 e9 be 00 b1 25 1f |Ez..4.."..%.| 0040 1d 63 f0 81 82 05 91 77 d1 58 b4 8c ba d4 22 63 |.c.w.X"c| 0050 36 78 7c eb fe dc 0b 69 5f 98 a7 bd db 53 ed d6 |6x|i_S..| 0060 94 97 bf 5b 94 52 4a 7d
Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?
Hey, you are right. I'm also seeing this exception now. It was hidden in other log output. The solution to all this confusion is simple: DataStreamUtils.collect() Is like an execute(). The stream graph is cleared on each execute(). That's why collect() and then execute() lead to the "no operators defined" error. However, if you have collect(), print(), execute(), then the print() is filling the stream graph again, and you are executing two Flink jobs: the collect job and the execute job. I hope I got it right this time :) Best, Robert On Fri, Feb 21, 2020 at 4:47 PM Niels Basjes wrote: > I tried this in Flink 1.10.0 : > > @Test > public void experimentalTest() throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream input = env.fromElements("One", "Two"); > //DataStream input = env.addSource(new > StringSourceFunction()); > List result = new ArrayList<>(5); > DataStreamUtils.collect(input).forEachRemaining(result::add); > env.execute("Flink Streaming Java API Skeleton"); > } > > > Results in > > > java.lang.IllegalStateException: No operators defined in streaming topology. > Cannot execute. > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) > at > nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > ... > > > > On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger > wrote: > >> Hey Niels, >> >> This minimal Flink job executes in Flink 1.10: >> >> public static void main(String[] args) throws Exception { >>final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >>DataStream input = env.addSource(new StringSourceFunction()); >>List result = new ArrayList<>(5); >>DataStreamUtils.collect(input).forEachRemaining(result::add); >>env.execute("Flink Streaming Java API Skeleton"); >> } >> >> Maybe the TestUserAgentAnalysisMapperInline class is doing some magic >> that breaks with the StreamGraphGenerator? >> >> Best, >> Robert >> >> On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes wrote: >> >>> Hi Gordon, >>> >>> Thanks. This works for me. >>> >>> I find it strange that when I do this it works (I made the differences >>> bold) >>> >>> List result = new ArrayList<>(5); >>> >>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >>> >>> *resultDataStream.print();* >>> >>> environment.execute(); >>> >>> >>> how ever this does not work >>> >>> List result = new ArrayList<>(5); >>> >>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >>> >>> environment.execute(); >>> >>> >>> and this also does not work >>> >>> *resultDataStream.print();* >>> >>> List result = new ArrayList<>(5); >>> >>> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >>> >>> environment.execute(); >>> >>> >>> In both these cases it fails with >>> >>> >>> java.lang.IllegalStateException: *No operators defined in streaming >>> topology. Cannot execute.* >>> >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602) >>> at >>> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144) >>> >>> >>> >>> Did I do something wrong? >>> Is this a bug in the DataStreamUtils ? >>> >>> Niels Basjes >>> >>> >>> >>> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai wrote: >>> Hi, To collect the elements of a DataStream (usually only meant for testing purposes), you can take a look at `DataStreamUtils#collect(DataStream)`. Cheers, Gordon -- Sent f
Re: JDBC source running continuously
Jark, Thank you for the reply. By running continuously, I meant the source operator does not finish after all the data is read. Similar to ContinuousFileMonitoringFunction, i'm thinking of a continuously database monitoring function. The reason for doing this is to enable savepoint for my pipeline (savepoint does not work for finished operators). The following code shows that the format will close once it reads all data: while (isRunning && !format.reachedEnd()) { nextElement = format.nextRecord(nextElement); if (nextElement != null) { ctx.collect(nextElement); } else { break; } } format.close(); completedSplitsCounter.inc(); if (isRunning) { isRunning = splitIterator.hasNext(); } Is there any way to keep the operator running but not reading any data and also enable proper savepoint? Thanks, Fanbin On Fri, Feb 21, 2020 at 12:32 AM Jark Wu wrote: > Hi Fanbin, > > .iterate() is not available on Table API, it's an API of DataStream. > Currently, the JDBC source is a bounded source (a snapshot of table at the > execution time), so the job will finish when it processes all the data. > > Regarding to your requirement, "running continuously with JDBC source", we > should make it clear what do you want the source to read after the full > snapshot: > 1) read a full snapshot again > 2) read new inserted rows > 3) read new inserted rows and updated rows and deleted rows. > > For (1), you can create your own jdbc input format based on > JDBCInputFormat, trying to re-execute the SQL query while reading the last > row from DB in nextRecord. (this is the answer in the stackoverflow [1]). > For (2), in the nextRecord(), you need to execute a SQL query with a > filter to fetch rows which are greater than the last max ID or max created > time. > For (3), this is a changelog support, which will be supported natively in > 1.11 in Flink SQL. > > Best, > Jark > > > On Fri, 21 Feb 2020 at 02:35, Fanbin Bu wrote: > >> >> https://stackoverflow.com/questions/48151881/how-to-run-apache-flink-streaming-job-continuously-on-flink-server >> >> On Thu, Feb 20, 2020 at 3:14 AM Chesnay Schepler >> wrote: >> >>> Can you show us where you found the suggestion to use iterate()? >>> >>> On 20/02/2020 02:08, Fanbin Bu wrote: >>> > Hi, >>> > >>> > My app creates the source from JDBC inputformat and running some sql >>> > and print out. But the source terminates itself after the query is >>> > done. Is there anyway to keep the source running? >>> > samle code: >>> > val env = StreamExecutionEnvironment.getExecutionEnvironment >>> > val settings = EnvironmentSettings.newInstance() >>> > .useBlinkPlanner() >>> > .inStreamingMode() >>> > .build() >>> > val tEnv = StreamTableEnvironment.create(env, settings) >>> > val inputFormat >>> > = JDBCInputFormat.buildJDBCInputFormat.setQuery("select * from >>> > table")... .finish() >>> > val source = env.createInput(inputFormat) >>> > tEnv.registerTableSource(source) >>> > val queryResult = tEnv.sqlQuery("select * from awesomeSource") >>> > queryResult.insertInto(mySink) >>> > >>> > >>> > I searched around and its suggested to use .iterate(). can somebody >>> > give more examples on how to use it in this case? >>> > >>> > Thanks, >>> > Fanbin >>> >>> >>>
Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
Congrats Jingsong! Cheers, Fabian Am Fr., 21. Feb. 2020 um 17:49 Uhr schrieb Rong Rong : > Congratulations Jingsong!! > > Cheers, > Rong > > On Fri, Feb 21, 2020 at 8:45 AM Bowen Li wrote: > > > Congrats, Jingsong! > > > > On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann > > wrote: > > > >> Congratulations Jingsong! > >> > >> Cheers, > >> Till > >> > >> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao wrote: > >> > >>> Congratulations Jingsong! > >>> > >>>Best, > >>>Yun > >>> > >>> -- > >>> From:Jingsong Li > >>> Send Time:2020 Feb. 21 (Fri.) 21:42 > >>> To:Hequn Cheng > >>> Cc:Yang Wang ; Zhijiang < > >>> wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey > >>> he ; dev ; user < > >>> user@flink.apache.org> > >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer > >>> > >>> Thanks everyone~ > >>> > >>> It's my pleasure to be part of the community. I hope I can make a > better > >>> contribution in future. > >>> > >>> Best, > >>> Jingsong Lee > >>> > >>> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng wrote: > >>> Congratulations Jingsong! Well deserved. > >>> > >>> Best, > >>> Hequn > >>> > >>> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang > wrote: > >>> Congratulations!Jingsong. Well deserved. > >>> > >>> > >>> Best, > >>> Yang > >>> > >>> Zhijiang 于2020年2月21日周五 下午1:18写道: > >>> Congrats Jingsong! Welcome on board! > >>> > >>> Best, > >>> Zhijiang > >>> > >>> -- > >>> From:Zhenghua Gao > >>> Send Time:2020 Feb. 21 (Fri.) 12:49 > >>> To:godfrey he > >>> Cc:dev ; user > >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer > >>> > >>> Congrats Jingsong! > >>> > >>> > >>> *Best Regards,* > >>> *Zhenghua Gao* > >>> > >>> > >>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he > wrote: > >>> Congrats Jingsong! Well deserved. > >>> > >>> Best, > >>> godfrey > >>> > >>> Jeff Zhang 于2020年2月21日周五 上午11:49写道: > >>> Congratulations!Jingsong. You deserve it > >>> > >>> wenlong.lwl 于2020年2月21日周五 上午11:43写道: > >>> Congrats Jingsong! > >>> > >>> On Fri, 21 Feb 2020 at 11:41, Dian Fu wrote: > >>> > >>> > Congrats Jingsong! > >>> > > >>> > > 在 2020年2月21日,上午11:39,Jark Wu 写道: > >>> > > > >>> > > Congratulations Jingsong! Well deserved. > >>> > > > >>> > > Best, > >>> > > Jark > >>> > > > >>> > > On Fri, 21 Feb 2020 at 11:32, zoudan wrote: > >>> > > > >>> > >> Congratulations! Jingsong > >>> > >> > >>> > >> > >>> > >> Best, > >>> > >> Dan Zou > >>> > >> > >>> > > >>> > > >>> > >>> > >>> -- > >>> Best Regards > >>> > >>> Jeff Zhang > >>> > >>> > >>> > >>> -- > >>> Best, Jingsong Lee > >>> > >>> > >>> >
Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
Congrats Jingsong! On Fri, Feb 21, 2020 at 8:49 AM Rong Rong wrote: > Congratulations Jingsong!! > > Cheers, > Rong > > On Fri, Feb 21, 2020 at 8:45 AM Bowen Li wrote: > >> Congrats, Jingsong! >> >> On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann >> wrote: >> >>> Congratulations Jingsong! >>> >>> Cheers, >>> Till >>> >>> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao wrote: >>> Congratulations Jingsong! Best, Yun -- From:Jingsong Li Send Time:2020 Feb. 21 (Fri.) 21:42 To:Hequn Cheng Cc:Yang Wang ; Zhijiang < wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey he ; dev ; user < user@flink.apache.org> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer Thanks everyone~ It's my pleasure to be part of the community. I hope I can make a better contribution in future. Best, Jingsong Lee On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng wrote: Congratulations Jingsong! Well deserved. Best, Hequn On Fri, Feb 21, 2020 at 2:42 PM Yang Wang wrote: Congratulations!Jingsong. Well deserved. Best, Yang Zhijiang 于2020年2月21日周五 下午1:18写道: Congrats Jingsong! Welcome on board! Best, Zhijiang -- From:Zhenghua Gao Send Time:2020 Feb. 21 (Fri.) 12:49 To:godfrey he Cc:dev ; user Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer Congrats Jingsong! *Best Regards,* *Zhenghua Gao* On Fri, Feb 21, 2020 at 11:59 AM godfrey he wrote: Congrats Jingsong! Well deserved. Best, godfrey Jeff Zhang 于2020年2月21日周五 上午11:49写道: Congratulations!Jingsong. You deserve it wenlong.lwl 于2020年2月21日周五 上午11:43写道: Congrats Jingsong! On Fri, 21 Feb 2020 at 11:41, Dian Fu wrote: > Congrats Jingsong! > > > 在 2020年2月21日,上午11:39,Jark Wu 写道: > > > > Congratulations Jingsong! Well deserved. > > > > Best, > > Jark > > > > On Fri, 21 Feb 2020 at 11:32, zoudan wrote: > > > >> Congratulations! Jingsong > >> > >> > >> Best, > >> Dan Zou > >> > > -- Best Regards Jeff Zhang -- Best, Jingsong Lee
Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
Congratulations Jingsong!! Cheers, Rong On Fri, Feb 21, 2020 at 8:45 AM Bowen Li wrote: > Congrats, Jingsong! > > On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann > wrote: > >> Congratulations Jingsong! >> >> Cheers, >> Till >> >> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao wrote: >> >>> Congratulations Jingsong! >>> >>>Best, >>>Yun >>> >>> -- >>> From:Jingsong Li >>> Send Time:2020 Feb. 21 (Fri.) 21:42 >>> To:Hequn Cheng >>> Cc:Yang Wang ; Zhijiang < >>> wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey >>> he ; dev ; user < >>> user@flink.apache.org> >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer >>> >>> Thanks everyone~ >>> >>> It's my pleasure to be part of the community. I hope I can make a better >>> contribution in future. >>> >>> Best, >>> Jingsong Lee >>> >>> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng wrote: >>> Congratulations Jingsong! Well deserved. >>> >>> Best, >>> Hequn >>> >>> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang wrote: >>> Congratulations!Jingsong. Well deserved. >>> >>> >>> Best, >>> Yang >>> >>> Zhijiang 于2020年2月21日周五 下午1:18写道: >>> Congrats Jingsong! Welcome on board! >>> >>> Best, >>> Zhijiang >>> >>> -- >>> From:Zhenghua Gao >>> Send Time:2020 Feb. 21 (Fri.) 12:49 >>> To:godfrey he >>> Cc:dev ; user >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer >>> >>> Congrats Jingsong! >>> >>> >>> *Best Regards,* >>> *Zhenghua Gao* >>> >>> >>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he wrote: >>> Congrats Jingsong! Well deserved. >>> >>> Best, >>> godfrey >>> >>> Jeff Zhang 于2020年2月21日周五 上午11:49写道: >>> Congratulations!Jingsong. You deserve it >>> >>> wenlong.lwl 于2020年2月21日周五 上午11:43写道: >>> Congrats Jingsong! >>> >>> On Fri, 21 Feb 2020 at 11:41, Dian Fu wrote: >>> >>> > Congrats Jingsong! >>> > >>> > > 在 2020年2月21日,上午11:39,Jark Wu 写道: >>> > > >>> > > Congratulations Jingsong! Well deserved. >>> > > >>> > > Best, >>> > > Jark >>> > > >>> > > On Fri, 21 Feb 2020 at 11:32, zoudan wrote: >>> > > >>> > >> Congratulations! Jingsong >>> > >> >>> > >> >>> > >> Best, >>> > >> Dan Zou >>> > >> >>> > >>> > >>> >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >>> >>> >>> -- >>> Best, Jingsong Lee >>> >>> >>>
Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
Congrats, Jingsong! On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann wrote: > Congratulations Jingsong! > > Cheers, > Till > > On Fri, Feb 21, 2020 at 4:03 PM Yun Gao wrote: > >> Congratulations Jingsong! >> >>Best, >>Yun >> >> -- >> From:Jingsong Li >> Send Time:2020 Feb. 21 (Fri.) 21:42 >> To:Hequn Cheng >> Cc:Yang Wang ; Zhijiang < >> wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey he >> ; dev ; user < >> user@flink.apache.org> >> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer >> >> Thanks everyone~ >> >> It's my pleasure to be part of the community. I hope I can make a better >> contribution in future. >> >> Best, >> Jingsong Lee >> >> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng wrote: >> Congratulations Jingsong! Well deserved. >> >> Best, >> Hequn >> >> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang wrote: >> Congratulations!Jingsong. Well deserved. >> >> >> Best, >> Yang >> >> Zhijiang 于2020年2月21日周五 下午1:18写道: >> Congrats Jingsong! Welcome on board! >> >> Best, >> Zhijiang >> >> -- >> From:Zhenghua Gao >> Send Time:2020 Feb. 21 (Fri.) 12:49 >> To:godfrey he >> Cc:dev ; user >> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer >> >> Congrats Jingsong! >> >> >> *Best Regards,* >> *Zhenghua Gao* >> >> >> On Fri, Feb 21, 2020 at 11:59 AM godfrey he wrote: >> Congrats Jingsong! Well deserved. >> >> Best, >> godfrey >> >> Jeff Zhang 于2020年2月21日周五 上午11:49写道: >> Congratulations!Jingsong. You deserve it >> >> wenlong.lwl 于2020年2月21日周五 上午11:43写道: >> Congrats Jingsong! >> >> On Fri, 21 Feb 2020 at 11:41, Dian Fu wrote: >> >> > Congrats Jingsong! >> > >> > > 在 2020年2月21日,上午11:39,Jark Wu 写道: >> > > >> > > Congratulations Jingsong! Well deserved. >> > > >> > > Best, >> > > Jark >> > > >> > > On Fri, 21 Feb 2020 at 11:32, zoudan wrote: >> > > >> > >> Congratulations! Jingsong >> > >> >> > >> >> > >> Best, >> > >> Dan Zou >> > >> >> > >> > >> >> >> -- >> Best Regards >> >> Jeff Zhang >> >> >> >> -- >> Best, Jingsong Lee >> >> >>
Re: FlinkCEP questions - architecture
Amazing content, thanks for asking and answering. On Fri, Feb 21, 2020 at 5:04 AM Juergen Donnerstag < juergen.donners...@gmail.com> wrote: > thanks a lot > Juergen > > On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas > wrote: > >> Hi Juergen, >> >> I will reply to your questions inline. As a general comment I would >> suggest to also have a look at [3] so that you have an idea of some of >> the alternatives. >> With that said, here come the answers :) >> >> 1) We receive files every day, which are exports from some database >> tables, containing ONLY changes from the day. Most tables have >> modify-cols. Even though they are files but because they contain >> changes only, I belief the file records shall be considered events in >> Flink terminology. Is that assumption correct? >> >> -> Yes. I think your assumption is correct. >> >> 2) The records within the DB export files are NOT in chronologically, >> and we can not change the export. Our use case is a "complex event >> processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first >> A, then B, then C within 30 days, then do something". Does that work >> with FlinkCEP despite the events/records are not in chrono order >> within the file? The files are 100MB to 20GB in size. Do I need to >> sort the files first before CEP processing? >> >> -> Flink CEP also works in event time and the re-ordering can be done by >> Flink >> >> 3) Occassionally some crazy people manually "correct" DB records >> within the database and manually trigger a re-export of ALL of the >> changes for that respective day (e.g. last weeks Tuesday). >> Consequently we receive a correction file. Same filename but "_1" >> appended. All filenames include the date (of the original export). >> What are the options to handle that case (besides telling the DB >> admins not to, which we did already). Regular checkpoints and >> re-process all files since then? What happens to the CEP state? Will >> it be checkpointed as well? >> >> -> If you require re-processing, then I would say that your best >> option is what you described. The other option would be to keep >> everything in Flink state until you are sure that no more corrections >> will come. In this case, you have to somehow issue the "correction" in >> a way that the downstream system can understand what to correct and >> how. Keep in mind that this may be an expensive operation because >> everything has to be kept in state for longer. >> >> 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? >> >> -> The only thing to consider is the size of your state. Time is not >> necessarily an issue. If your state for these 180 days is a couple of >> MBs, then you have no problem. If it increases fast, then you have to >> provision your cluster accordingly. >> >> 5) We also have CEP rules that must fire if after a start sequence >> matched, the remaining sequence did NOT within a configured window. >> E.g. If A, then B, but C did not occur within 30 days since A. Is that >> supported by FlinkCEP? I couldn't find a working example. >> >> -> You can have a look at [1] for the supported pattern combinations >> and you can also look at [2] for some tests of different pattern >> combinations. >> >> 6) We expect 30-40 CEP rules. How can we estimate the required storage >> size for the temporary CEP state? Is there some sort of formular >> considering number of rules, number of records per file or day, record >> size, window, number of records matched per sequence, number of keyBy >> grouping keys, ... >> >> -> In FlinkCEP, each pattern becomes a single operator. This means >> that you will have 30-40 operators in your job graph, each with each >> own state. This can become heavy but once again it depends on your >> workload. I cannot give an estimate because in CEP, in order to >> guarantee correct ordering of events in an unordered stream, the >> library sometimes has to keep also in state more records than will be >> presented at the end. >> >> Have you considered going with a solution based on processfunction and >> broadcast state? This will also allow you to have a more dynamic >> set-up where patterns can be added at runtime and it will allow you to >> do any optimizations specific to your workload ;) For a discussion on >> this, check [3]. In addition, it will allow you to "multiplex" many >> patterns into a single operator thus potentially minimizing the amount >> of copies of the state you keep. >> >> 7) I can imagine that for debugging reasons it'd be good if we were >> able to query the temporary CEP state. What is the (CEP) schema used >> to persist the CEP state and how can we query it? And does such query >> work on the whole cluster or only per node (e.g. because of shuffle >> and nodes responsible only for a portion of the events). >> >> -> Unfortunatelly the state in CEP is not queryable, thus I am not >> sure if you can inspect it at runtime. >> >> 8) I understand state is stored per node. What happens if
Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?
I tried this in Flink 1.10.0 : @Test public void experimentalTest() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream input = env.fromElements("One", "Two"); //DataStream input = env.addSource(new StringSourceFunction()); List result = new ArrayList<>(5); DataStreamUtils.collect(input).forEachRemaining(result::add); env.execute("Flink Streaming Java API Skeleton"); } Results in java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) at nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.experimentalTest(TestUserAgentAnalysisMapperInline.java:177) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ... On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger wrote: > Hey Niels, > > This minimal Flink job executes in Flink 1.10: > > public static void main(String[] args) throws Exception { >final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); >DataStream input = env.addSource(new StringSourceFunction()); >List result = new ArrayList<>(5); >DataStreamUtils.collect(input).forEachRemaining(result::add); >env.execute("Flink Streaming Java API Skeleton"); > } > > Maybe the TestUserAgentAnalysisMapperInline class is doing some magic > that breaks with the StreamGraphGenerator? > > Best, > Robert > > On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes wrote: > >> Hi Gordon, >> >> Thanks. This works for me. >> >> I find it strange that when I do this it works (I made the differences >> bold) >> >> List result = new ArrayList<>(5); >> >> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >> >> *resultDataStream.print();* >> >> environment.execute(); >> >> >> how ever this does not work >> >> List result = new ArrayList<>(5); >> >> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >> >> environment.execute(); >> >> >> and this also does not work >> >> *resultDataStream.print();* >> >> List result = new ArrayList<>(5); >> >> DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); >> >> environment.execute(); >> >> >> In both these cases it fails with >> >> >> java.lang.IllegalStateException: *No operators defined in streaming >> topology. Cannot execute.* >> >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602) >> at >> nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144) >> >> >> >> Did I do something wrong? >> Is this a bug in the DataStreamUtils ? >> >> Niels Basjes >> >> >> >> On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai wrote: >> >>> Hi, >>> >>> To collect the elements of a DataStream (usually only meant for testing >>> purposes), you can take a look at `DataStreamUtils#collect(DataStream)`. >>> >>> Cheers, >>> Gordon >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > -- Best regards / Met vriendelijke groeten, Niels Basjes
Re: [DISCUSS] Drop Savepoint Compatibility with Flink 1.2
+1 for dropping savepoint compatibility with Flink 1.2. Cheers, Till On Thu, Feb 20, 2020 at 6:55 PM Stephan Ewen wrote: > Thank you for the feedback. > > Here is the JIRA issue with some more explanation also about the > background and implications: > https://jira.apache.org/jira/browse/FLINK-16192 > > Best, > Stephan > > > On Thu, Feb 20, 2020 at 2:26 PM vino yang wrote: > >> +1 for dropping Savepoint compatibility with Flink 1.2 >> >> Flink 1.2 is quite far away from the latest 1.10. Especially after the >> release of Flink 1.9 and 1.10, the code and architecture have undergone >> major changes. >> >> Currently, I am updating state migration tests for Flink 1.10. I can >> still see some binary snapshot files of version 1.2. If we agree on this >> topic, we may be able to alleviate some of the burdens(remove those binary >> files) when the migration tests would be updated later. >> >> Best, >> Vino >> >> Theo Diefenthal 于2020年2月20日周四 >> 下午9:04写道: >> >>> +1 for dropping compatibility. >>> >>> I personally think that it is very important for a project to keep a >>> good pace in developing that old legacy stuff must be dropped from time to >>> time. As long as there is an upgrade routine (via going to another flink >>> release) that's fine. >>> >>> -- >>> *Von: *"Stephan Ewen" >>> *An: *"dev" , "user" >>> *Gesendet: *Donnerstag, 20. Februar 2020 11:11:43 >>> *Betreff: *[DISCUSS] Drop Savepoint Compatibility with Flink 1.2 >>> >>> Hi all! >>> For some cleanup and simplifications, it would be helpful to drop >>> Savepoint compatibility with Flink version 1.2. That version was released >>> almost three years ago. >>> >>> I would expect that no one uses that old version any more in a way that >>> they actively want to upgrade directly to 1.11. >>> >>> Even if, there is still the way to first upgrade to another version >>> (like 1.9) and then upgrade to 1.11 from there. >>> >>> Any concerns to drop that support? >>> >>> Best, >>> Stephan >>> >>> >>> -- >>> SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln >>> Theo Diefenthal >>> >>> T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 >>> theo.diefent...@scoop-software.de - www.scoop-software.de >>> Sitz der Gesellschaft: Köln, Handelsregister: Köln, >>> Handelsregisternummer: HRB 36625 >>> Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen, >>> Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel >>> >>
Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
Congratulations Jingsong! Cheers, Till On Fri, Feb 21, 2020 at 4:03 PM Yun Gao wrote: > Congratulations Jingsong! > >Best, >Yun > > -- > From:Jingsong Li > Send Time:2020 Feb. 21 (Fri.) 21:42 > To:Hequn Cheng > Cc:Yang Wang ; Zhijiang ; > Zhenghua Gao ; godfrey he ; dev < > d...@flink.apache.org>; user > Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer > > Thanks everyone~ > > It's my pleasure to be part of the community. I hope I can make a better > contribution in future. > > Best, > Jingsong Lee > > On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng wrote: > Congratulations Jingsong! Well deserved. > > Best, > Hequn > > On Fri, Feb 21, 2020 at 2:42 PM Yang Wang wrote: > Congratulations!Jingsong. Well deserved. > > > Best, > Yang > > Zhijiang 于2020年2月21日周五 下午1:18写道: > Congrats Jingsong! Welcome on board! > > Best, > Zhijiang > > -- > From:Zhenghua Gao > Send Time:2020 Feb. 21 (Fri.) 12:49 > To:godfrey he > Cc:dev ; user > Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer > > Congrats Jingsong! > > > *Best Regards,* > *Zhenghua Gao* > > > On Fri, Feb 21, 2020 at 11:59 AM godfrey he wrote: > Congrats Jingsong! Well deserved. > > Best, > godfrey > > Jeff Zhang 于2020年2月21日周五 上午11:49写道: > Congratulations!Jingsong. You deserve it > > wenlong.lwl 于2020年2月21日周五 上午11:43写道: > Congrats Jingsong! > > On Fri, 21 Feb 2020 at 11:41, Dian Fu wrote: > > > Congrats Jingsong! > > > > > 在 2020年2月21日,上午11:39,Jark Wu 写道: > > > > > > Congratulations Jingsong! Well deserved. > > > > > > Best, > > > Jark > > > > > > On Fri, 21 Feb 2020 at 11:32, zoudan wrote: > > > > > >> Congratulations! Jingsong > > >> > > >> > > >> Best, > > >> Dan Zou > > >> > > > > > > > -- > Best Regards > > Jeff Zhang > > > > -- > Best, Jingsong Lee > > >
Re: Emit message at start and end of event time session window
Hi Manas and Rafi, you are right that when using merging windows as event time session windows are, then Flink requires that any state the Trigger keeps is of type MergingState. This constraint allows that the state can be merged whenever two windows get merged. Rafi, you are right. With the current implementation it might happen that you send a wrong started window message. I think it depends on the MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also your watermark. If you want to be on the safe side, then I would recommend to use the ProcessFunction to implement the required logic. The ProcessFunction [1] is Flink's low level API and gives you access to state and timers. In it, you would need to buffer the elements and to sessionize them yourself, though. However, it would give you access to the watermark which in turn would allow you to properly handle your described edge case. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html Cheers, Till Cheers, Till On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch wrote: > I think one "edge" case which is not handled would be that the first event > (by event-time) arrives late, then a wrong "started-window" would be > reported. > > Rafi > > > On Thu, Feb 20, 2020 at 12:36 PM Manas Kale wrote: > >> Is the reason ValueState cannot be use because session windows are always >> formed by merging proto-windows of single elements, therefore a state store >> is needed that can handle merging. ValueState does not provide this >> functionality, but a ReducingState does? >> >> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale wrote: >> >>> Hi Till, >>> Thanks for your answer! You also answered the next question that I was >>> about to ask "Can we share state between a Trigger and a Window?" Currently >>> the only (convoluted) way to share state between two operators is through >>> the broadcast state pattern, right? >>> Also, in your example, why can't we use a ValueStateDescriptor >>> in the Trigger? I tried using it in my own example but it I am not able >>> to call the mergePartitionedState() method on a ValueStateDescriptor. >>> >>> Regards, >>> Manas >>> >>> >>> >>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann >>> wrote: >>> Hi Manas, you can implement something like this with a bit of trigger magic. What you need to do is to define your own trigger implementation which keeps state to remember whether it has triggered the "started window" message or not. In the stateful window function you would need to do something similar. The first call could trigger the output of "window started" and any subsequent call will trigger the evaluation of the window. It would have been a bit easier if the trigger and the window process function could share its internal state. Unfortunately, this is not possible at the moment. I've drafted a potential solution which you can find here [1]. [1] https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef Cheers, Till On Mon, Feb 17, 2020 at 8:09 AM Manas Kale wrote: > Hi, > I want to achieve the following using event time session windows: > >1. When the window.getStart() and last event timestamp in the >window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit a >message "Window started @ timestamp". >2. When the session window ends, i.e. the watermark passes >lasteventTimestamp + inactivityPeriod, I want to emit a message "Window >ended @ timestamp". > > It is guaranteed that all events are on time and no lateness is > allowed. I am having difficulty implementing both 1 and 2 simultaneously. > I am able to implement point 1 using a custom trigger, which checks > if (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE and > triggers > a customProcessWindowFunction(). > However, with this architecture I can't detect the end of the window. > > Is my approach correct or is there a completely different method to > achieve this? > > Thanks, > Manas Kale > > > >
Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
Congratulations Jingsong! Best, Yun -- From:Jingsong Li Send Time:2020 Feb. 21 (Fri.) 21:42 To:Hequn Cheng Cc:Yang Wang ; Zhijiang ; Zhenghua Gao ; godfrey he ; dev ; user Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer Thanks everyone~ It's my pleasure to be part of the community. I hope I can make a better contribution in future. Best, Jingsong Lee On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng wrote: Congratulations Jingsong! Well deserved. Best, Hequn On Fri, Feb 21, 2020 at 2:42 PM Yang Wang wrote: Congratulations!Jingsong. Well deserved. Best, Yang Zhijiang 于2020年2月21日周五 下午1:18写道: Congrats Jingsong! Welcome on board! Best, Zhijiang -- From:Zhenghua Gao Send Time:2020 Feb. 21 (Fri.) 12:49 To:godfrey he Cc:dev ; user Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer Congrats Jingsong! Best Regards, Zhenghua Gao On Fri, Feb 21, 2020 at 11:59 AM godfrey he wrote: Congrats Jingsong! Well deserved. Best, godfrey Jeff Zhang 于2020年2月21日周五 上午11:49写道: Congratulations!Jingsong. You deserve it wenlong.lwl 于2020年2月21日周五 上午11:43写道: Congrats Jingsong! On Fri, 21 Feb 2020 at 11:41, Dian Fu wrote: > Congrats Jingsong! > > > 在 2020年2月21日,上午11:39,Jark Wu 写道: > > > > Congratulations Jingsong! Well deserved. > > > > Best, > > Jark > > > > On Fri, 21 Feb 2020 at 11:32, zoudan wrote: > > > >> Congratulations! Jingsong > >> > >> > >> Best, > >> Dan Zou > >> > > -- Best Regards Jeff Zhang -- Best, Jingsong Lee
Re: Flink's Either type information
Hi Jacopo, Robert, Very sorry for missing the previous email and not response in time. I think exactly as Robert has pointed out with the example: using inline anonymous subclass of KeyedBroadcastProcessFunction should not cause the problem. As far as I know, the possible reason that cause the attached exception might be that the parameter types of Either get erased due to the way to create KeyedBroadcastProcessFunction object. For example, if you first implement a generic subclass of KeyedBroadcastProcessFunction like: public class MyKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction, String, Either> { ... } and create a function object directly when constructing the DataStream job: stream.process(new MyKeyedBroadcastProcessFunction()); Then MyLeftType and MyRightType will be erased and will cause the attached exception when Flink tries to inference the output type. And I totally agree with Robert that attaching the corresponding codes would help debugging the problem. Yours, Yun -- From:Robert Metzger Send Time:2020 Feb. 21 (Fri.) 19:47 To:jacopo.gobbi Cc:yungao.gy ; user Subject:Re: Flink's Either type information Hey Jacopo, can you post an example to reproduce the issue? I've tried it, but it worked in this artificial example: MapStateDescriptor state = new MapStateDescriptor<>("test", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); DataStream> result = input .map((MapFunction>) value -> Tuple2.of(0, value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class)) .keyBy(0).connect(input.broadcast(state)) .process(new KeyedBroadcastProcessFunction, String, Either>() { @Override public void processElement(Tuple2 value, ReadOnlyContext ctx, Collector> out) throws Exception { out.collect(Either.Left(111)); } @Override public void processBroadcastElement(String value, Context ctx, Collector> out) throws Exception { } }); result.print(); On Wed, Feb 19, 2020 at 6:07 PM wrote: Yes, I create it the way you mentioned. From: Yun Gao [mailto:yungao...@aliyun.com] Sent: Dienstag, 18. Februar 2020 10:12 To: Gobbi, Jacopo-XT; user Subject: [External] Re: Flink's Either type information Hi Jacopo, Could you also provide how the KeyedBroadcastProcessFunction is created when constructing datastream API ? For example, are you using something like new KeyedBroadcastProcessFunction() { // Function implementation } or something else? Best, Yun -- From:jacopo.gobbi Send Time:2020 Feb. 17 (Mon.) 18:31 To:user Subject:Flink's Either type information Hi all, How can an Either value be returned by a KeyedBroadcastProcessFunction? We keep getting "InvalidTypesException: Type extraction is not possible on Either type as it does not contain information about the 'left' type." when doing: out.collect(Either.Right(myObject)); Thanks, Jacopo Gobbi
Re: Running Flink Cluster on PaaS
> I always wonder what do you guys mean by "Standalone Flink session" or "Standalone Cluster" ... "Standalone Flink session" usually means an empty Flink cluster is started and could accept multiple jobs submission from the Flink client or webui. Even all the jobs finished, the session cluster will still be there until you manually stop it. "Standalone Flink per-job" means a dedicated Flink cluster is started for only one the job. It has better isolation. Usually this mode is used in container environment. And users build in their jars and dependencies in the image. You could check how to build your custom image and run a per-job cluster here[1]. [1]. https://github.com/apache/flink/blob/release-1.10/flink-container/kubernetes/README.md#deploy-flink-job-cluster Best, Yang KristoffSC 于2020年2月21日周五 下午4:03写道: > Thank you Yang Wang, > > Regarding [1] and a sentence from that doc. > "This page describes deploying a standalone Flink session" > > I always wonder what do you guys mean by "Standalone Flink session" or > "Standalone Cluster" that can be found here [2]. > > I'm using a Docker with Job Cluster approach, I know that there is also a > Session Cluster docker images. I understand the differences, but I'm not > sure what you are referring to using those to terms from [1] and [2]. > > Thanks, > Krzysztof > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#cluster_setup.html > [2] > > https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html > > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >
Re: Question: Determining Total Recovery Time
Hi Morgan, sorry for the late reply. In general, that should work. You need to ensure that the same task is processing the same record though. Local copy needs to be state or else the last message would be lost upon restart. Performance will take a hit but if that is significant depends on the remaining pipeline. Btw, at least once should be enough for that, since you implicitly deduplicating. Best, Arvid On Tue, Feb 11, 2020 at 11:24 AM Morgan Geldenhuys < morgan.geldenh...@tu-berlin.de> wrote: > Thanks for the advice, i will look into it. > > Had a quick think about another simple solution but we would need a hook > into the checkpoint process from the task/operator perspective, which I > haven't looked into yet. It would work like this: > > - The sink operators (?) would keep a local copy of the last message > processed (or digest?), the current timestamp, and a boolean value > indicating whether or not the system is in recovery or not. > - While not in recovery, update the local copy and timestamp with each new > event processed. > - When a failure is detected and the taskmanagers are notified to > rollback, we use the hook into this process to switch the boolean value to > true. > - While true, it compares each new message with the last one processed > before the recovery process was initiated. > - When a match is found, the difference between the previous and current > timestamp is calculated and outputted as a custom metric and the boolean is > reset to false. > > From here, the mean total recovery time could be calculated across the > operators. Not sure how it would impact on performance, but i doubt it > would be significant. We would need to ensure exactly once so that the > message would be guaranteed to be seen again. thoughts? > > On 11.02.20 08:57, Arvid Heise wrote: > > Hi Morgan, > > as Timo pointed out, there is no general solution, but in your setting, > you could look at the consumer lag of the input topic after a crash. Lag > would spike until all tasks restarted and reprocessing begins. Offsets are > only committed on checkpoints though by default. > > Best, > > Arvid > > On Tue, Feb 4, 2020 at 12:32 PM Timo Walther wrote: > >> Hi Morgan, >> >> as far as I know this is not possible mostly because measuring "till the >> point when the system catches up to the last message" is very >> pipeline/connector dependent. Some sources might need to read from the >> very beginning, some just continue from the latest checkpointed offset. >> >> Measure things like that (e.g. for experiments) might require collecting >> own metrics as part of your pipeline definition. >> >> Regards, >> Timo >> >> >> On 03.02.20 12:20, Morgan Geldenhuys wrote: >> > Community, >> > >> > I am interested in determining the total time to recover for a Flink >> > application after experiencing a partial failure. Let's assume a >> > pipeline consisting of Kafka -> Flink -> Kafka with Exactly-Once >> > guarantees enabled. >> > >> > Taking a look at the documentation >> > ( >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html), >> >> > one of the metrics which can be gathered is /recoveryTime/. However, as >> > far as I can tell this is only the time taken for the system to go from >> > an inconsistent state back into a consistent state, i.e. restarting the >> > job. Is there any way of measuring the amount of time taken from the >> > point when the failure occurred till the point when the system catches >> > up to the last message that was processed before the outage? >> > >> > Thank you very much in advance! >> > >> > Regards, >> > Morgan. >> >> >
Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
Thanks everyone~ It's my pleasure to be part of the community. I hope I can make a better contribution in future. Best, Jingsong Lee On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng wrote: > Congratulations Jingsong! Well deserved. > > Best, > Hequn > > On Fri, Feb 21, 2020 at 2:42 PM Yang Wang wrote: > >> Congratulations!Jingsong. Well deserved. >> >> >> Best, >> Yang >> >> Zhijiang 于2020年2月21日周五 下午1:18写道: >> >>> Congrats Jingsong! Welcome on board! >>> >>> Best, >>> Zhijiang >>> >>> -- >>> From:Zhenghua Gao >>> Send Time:2020 Feb. 21 (Fri.) 12:49 >>> To:godfrey he >>> Cc:dev ; user >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer >>> >>> Congrats Jingsong! >>> >>> >>> *Best Regards,* >>> *Zhenghua Gao* >>> >>> >>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he wrote: >>> Congrats Jingsong! Well deserved. >>> >>> Best, >>> godfrey >>> >>> Jeff Zhang 于2020年2月21日周五 上午11:49写道: >>> Congratulations!Jingsong. You deserve it >>> >>> wenlong.lwl 于2020年2月21日周五 上午11:43写道: >>> Congrats Jingsong! >>> >>> On Fri, 21 Feb 2020 at 11:41, Dian Fu wrote: >>> >>> > Congrats Jingsong! >>> > >>> > > 在 2020年2月21日,上午11:39,Jark Wu 写道: >>> > > >>> > > Congratulations Jingsong! Well deserved. >>> > > >>> > > Best, >>> > > Jark >>> > > >>> > > On Fri, 21 Feb 2020 at 11:32, zoudan wrote: >>> > > >>> > >> Congratulations! Jingsong >>> > >> >>> > >> >>> > >> Best, >>> > >> Dan Zou >>> > >> >>> > >>> > >>> >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >>> >>> -- Best, Jingsong Lee
Re: FsStateBackend vs RocksDBStateBackend
I would try the FsStateBackend in this scenario, as you have enough memory available. On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang wrote: > Hi Gordon, > > Thanks for your reply! Regarding state size - we are at 200-300gb but we > have 120 parallelism which will make each task handle ~2 - 3 gb state. > (when we submit the job we are setting tm memory to 15g.) In this scenario > what will be the best fit for statebackend? > > Thanks, > Ran > > On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai > wrote: > >> Hi Ran, >> >> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang wrote: >> >>> Hi all, >>> >>> We have a Flink app that uses a KeyedProcessFunction, and in the >>> function it requires a ValueState(of TreeSet) and the processElement method >>> needs to access and update it. We tried to use RocksDB as our stateBackend >>> but the performance is not good, and intuitively we think it was because of >>> the serialization / deserialization on each processElement call. >>> >> >> As you have already pointed out, serialization behaviour is a major >> difference between the 2 state backends, and will directly impact >> performance due to the extra runtime overhead in RocksDB. >> If you plan to continue using the RocksDB state backend, make sure to use >> MapState instead of ValueState where possible, since every access to the >> ValueState in the RocksDB backend requires serializing / deserializing the >> whole value. >> For MapState, de-/serialization happens per K-V access. Whether or not >> this makes sense would of course depend on your state access pattern. >> >> >>> Then we tried to switch to use FsStateBackend (which keeps the in-flight >>> data in the TaskManager’s memory according to doc), and it could resolve >>> the performance issue. *So we want to understand better what are the >>> tradeoffs in choosing between these 2 stateBackend.* Our checkpoint >>> size is 200 - 300 GB in stable state. For now we know one benefits of >>> RocksDB is it supports incremental checkpoint, but would love to know what >>> else we are losing in choosing FsStateBackend. >>> >> >> As of now, feature-wise both backends support asynchronous snapshotting, >> state schema evolution, and access via the State Processor API. >> In the end, the major factor for deciding between the two state backends >> would be your expected state size. >> That being said, it could be possible in the future that savepoint >> formats for the backends are changed to be compatible, meaning that you >> will be able to switch between different backends upon restore [1]. >> >> >>> >>> Thanks a lot! >>> Ran Zhang >>> >> >> Cheers, >> Gordon >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State >> >
Re: [Flink 1.10] How do I use LocalCollectionOutputFormat now that writeUsingOutputFormat is deprecated?
Hey Niels, This minimal Flink job executes in Flink 1.10: public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream input = env.addSource(new StringSourceFunction()); List result = new ArrayList<>(5); DataStreamUtils.collect(input).forEachRemaining(result::add); env.execute("Flink Streaming Java API Skeleton"); } Maybe the TestUserAgentAnalysisMapperInline class is doing some magic that breaks with the StreamGraphGenerator? Best, Robert On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes wrote: > Hi Gordon, > > Thanks. This works for me. > > I find it strange that when I do this it works (I made the differences > bold) > > List result = new ArrayList<>(5); > > DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); > > *resultDataStream.print();* > > environment.execute(); > > > how ever this does not work > > List result = new ArrayList<>(5); > > DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); > > environment.execute(); > > > and this also does not work > > *resultDataStream.print();* > > List result = new ArrayList<>(5); > > DataStreamUtils.collect(resultDataStream).forEachRemaining(result::add); > > environment.execute(); > > > In both these cases it fails with > > > java.lang.IllegalStateException: *No operators defined in streaming > topology. Cannot execute.* > > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1792) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1783) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1768) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602) > at > nl.basjes.parse.useragent.flink.TestUserAgentAnalysisMapperInline.testInlineDefinitionDataStream(TestUserAgentAnalysisMapperInline.java:144) > > > > Did I do something wrong? > Is this a bug in the DataStreamUtils ? > > Niels Basjes > > > > On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai wrote: > >> Hi, >> >> To collect the elements of a DataStream (usually only meant for testing >> purposes), you can take a look at `DataStreamUtils#collect(DataStream)`. >> >> Cheers, >> Gordon >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >
Re: Flink's Either type information
Hey Jacopo, can you post an example to reproduce the issue? I've tried it, but it worked in this artificial example: MapStateDescriptor state = new MapStateDescriptor<>("test", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); DataStream> result = input .map((MapFunction>) value -> Tuple2.of(0, value)).returns(TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class)) .keyBy(0).connect(input.broadcast(state)) .process(new KeyedBroadcastProcessFunction, String, Either>() { @Override public void processElement(Tuple2 value, ReadOnlyContext ctx, Collector> out) throws Exception { out.collect(Either.Left(111)); } @Override public void processBroadcastElement(String value, Context ctx, Collector> out) throws Exception { } }); result.print(); On Wed, Feb 19, 2020 at 6:07 PM wrote: > Yes, I create it the way you mentioned. > > > > *From:* Yun Gao [mailto:yungao...@aliyun.com] > *Sent:* Dienstag, 18. Februar 2020 10:12 > *To:* Gobbi, Jacopo-XT; user > *Subject:* [External] Re: Flink's Either type information > > > > Hi Jacopo, > > > > Could you also provide how the KeyedBroadcastProcessFunction is > created when constructing datastream API ? For example, are you using > something like > > > > new KeyedBroadcastProcessFunction Either() { > >// Function implementation > > } > > > > or something else? > > > > Best, > > Yun > > > > > > -- > > From:jacopo.gobbi > > Send Time:2020 Feb. 17 (Mon.) 18:31 > > To:user > > Subject:Flink's Either type information > > > > Hi all, > > > > How can an Either value be returned by a KeyedBroadcastProcessFunction? > > We keep getting "InvalidTypesException: Type extraction is not possible on > Either type as it does not contain information about the 'left' type." when > doing: out.collect(Either.Right(myObject)); > > > > Thanks, > > > > Jacopo Gobbi > > > > >
Re: Tests in FileUtilsTest while building Flink in local
These tests also fail on my mac. It may be some mac os setup related issue. I create a JIRA ticket for that: https://issues.apache.org/jira/browse/FLINK-16198 > On 20 Feb 2020, at 12:03, Chesnay Schepler wrote: > > Is the stacktrace identical in both tests? > > Did these fail on the command-line or in the IDE? > > Can you check what directory the java.io.tmpdir points to? > > On 19/02/2020 20:42, Arujit Pradhan wrote: >> Hi all, >> >> I was trying to build Flink in my local machine and these two unit tests are >> failing. >> >> [ERROR] Errors: >> [ERROR] >> FileUtilsTest.testCompressionOnRelativePath:261->verifyDirectoryCompression:440 >> » NoSuchFile >> [ERROR] FileUtilsTest.testDeleteDirectoryConcurrently » FileSystem >> /var/folders/x9/tr2... >> >> I am building on these versions >> Java 1.8.0_221 >> maven 3.6.3 >> and OS is Mac Catalina(10.15). >> >> Did anyone face this issue? Am I missing something? >> >> The stack-trace is : >> java.nio.file.NoSuchFileException: >> ../../../../../../../../var/folders/x9/tr2xclq51sx891lbntv7bwy4gn/T/junit3367096668518353289/compressDir/rootDir >> >> at >> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92) >> at >> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111) >> at >> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116) >> at >> java.base/sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:389) >> at java.base/java.nio.file.Files.createDirectory(Files.java:689) >> at >> org.apache.flink.util.FileUtilsTest.verifyDirectoryCompression(FileUtilsTest.java:440) >> at >> org.apache.flink.util.FileUtilsTest.testCompressionOnRelativePath(FileUtilsTest.java:261) >> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> at >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >> at >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> at >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >> at >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) >> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >> at >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) >> at >> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) >> at >> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) >> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) >> >> Thanks in advance. >
Re: Flink Kafka connector consume from a single kafka partition
Hey Hemant, Are you able to reconstruct the ordering of the event, for example based on time or some sequence number? If so, you could create as many Kafka partitions as you need (for proper load distribution), disregarding any ordering at that point. Then you keyBy your stream in Flink, and order it within a window operator (or some custom logic in a process function) Flink is able to handle quite large states using the RocksDB statebackend. Best, Robert On Wed, Feb 19, 2020 at 6:34 PM hemant singh wrote: > Hi Arvid, > > Thanks for your response. I think I did not word my question properly. > I wanted to confirm that if the data is distributed to more than one > partition then the ordering cannot be maintained (which is documented). > According to your response I understand if I set the parallelism to number > of partition then each consumer will consume from one partition and > ordering can be maintained. > > However, I have a question here in case my parallelism is less than number > of partitions still I believe if I create keyedstream ordering will be > maintained at operator level for that key. Correct me if I am wrong. > > Second, one issue/challenge which I see with this model is one of the > source's frequency of pushing data is very high then one partition is > overloaded. Hence the task which process this will be overloaded too, > however for maintaining ordering I do not have any other options but to > maintain data in one partition. > > Thanks, > Hemant > > On Wed, Feb 19, 2020 at 5:54 PM Arvid Heise wrote: > >> Hi Hemant, >> >> Flink passes your configurations to the Kafka consumer, so you could >> check if you can subscribe to only one partition there. >> >> However, I would discourage that approach. I don't see the benefit to >> just subscribing to the topic entirely and have dedicated processing for >> the different devices. >> >> If you are concerned about the order, you shouldn't. Since all events of >> a specific device-id reside in the same source partition, events are >> in-order in Kafka (responsibility of producer, but I'm assuming that >> because of your mail) and thus they are also in order in non-keyed streams >> in Flink. Any keyBy on device-id or composite key involving device-id, >> would also retain the order. >> >> If you have exactly one partition per device-id, you could even go with >> `DataStreamUtil#reinterpretAsKeyedStream` to avoid any shuffling. >> >> Let me know if I misunderstood your use case or if you have further >> questions. >> >> Best, >> >> Arvid >> >> On Wed, Feb 19, 2020 at 8:39 AM hemant singh >> wrote: >> >>> Hello Flink Users, >>> >>> I have a use case where I am processing metrics from different type of >>> sources(one source will have multiple devices) and for aggregations as well >>> as build alerts order of messages is important. To maintain customer data >>> segregation I plan to have single topic for each customer with each source >>> stream data to one kafka partition. >>> To maintain ordering I am planning to push data for a single source type >>> to single partitions. Then I can create keyedstream so that each of the >>> device-id I have a single stream which has ordered data for each device-id. >>> >>> However, flink-kafka consumer I don't see that I can read from a >>> specific partition hence flink consumer read from multiple kafka >>> partitions. So even if I try to create a keyedstream on source type(and >>> then write to a partition for further processing like keyedstream on >>> device-id) I think ordering will not be maintained per source type. >>> >>> Only other option I feel I am left with is have single partition for the >>> topic so that flink can subscribe to the topic and this maintains the >>> ordering, the challenge is too many topics(as I have this configuration for >>> multiple customers) which is not advisable for a kafka cluster. >>> >>> Can anyone shed some light on how to handle this use case. >>> >>> Thanks, >>> Hemant >>> >>
Re: AWS Client Builder with default credentials
There are multiple ways of passing configuration parameters to your user defined code in Flink a) use getRuntimeContext().getUserCodeClassLoader().getResource() to load a config file from your user code jar or the classpath. b) use getRuntimeContext().getExecutionConfig().getGlobalJobParameters() to access a configuration object serialized from the main method. you can pass a custom object to the job parameters, or use Flink's "Configuration" object in your main method: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Configuration config = new Configuration(); config.setString("foo", "bar"); env.getConfig().setGlobalJobParameters(config); c) Load the flink-conf.yaml: Configuration conf = GlobalConfiguration.loadConfiguration(); I'm not 100% sure if this approach works, as it is not intended to be used in user code (I believe). Let me know if this helps! Best, Robert On Thu, Feb 20, 2020 at 1:50 PM Chesnay Schepler wrote: > First things first, we do not intend for users to use anything in the S3 > filesystem modules except the filesystems itself, > meaning that you're somewhat treading on unsupported ground here. > > Nevertheless, the S3 modules contain a large variety of AWS-provided > CerentialsProvider implementations, > that can derive credentials from environment variables, system properties, > files on the classpath and many more. > > Ultimately though, you're kind of asking us how to use AWS APIs, for which > I would direct you to the AWS documentation. > > On 20/02/2020 13:16, David Magalhães wrote: > > I'm using > org.apache.flink.fs.s3base.shaded.com.amazonaws.client.builder.AwsClientBuilder > to create a S3 client to copy objects and delete object inside > a TwoPhaseCommitSinkFunction. > > Shouldn't be another way to set up configurations without put them > hardcoded ? Something like core-site.xml or flink-conf.yaml ? > > Right now I need to have them hardcoded like this. > > AmazonS3ClientBuilder.standard > .withPathStyleAccessEnabled(true) > .withEndpointConfiguration( > new EndpointConfiguration("http://minio:9000";, "us-east-1") > ) > .withCredentials( > new AWSStaticCredentialsProvider(new BasicAWSCredentials("minio", > "minio123")) > ) > .build > > Thanks > > >
Re: FlinkCEP questions - architecture
thanks a lot Juergen On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas wrote: > Hi Juergen, > > I will reply to your questions inline. As a general comment I would > suggest to also have a look at [3] so that you have an idea of some of > the alternatives. > With that said, here come the answers :) > > 1) We receive files every day, which are exports from some database > tables, containing ONLY changes from the day. Most tables have > modify-cols. Even though they are files but because they contain > changes only, I belief the file records shall be considered events in > Flink terminology. Is that assumption correct? > > -> Yes. I think your assumption is correct. > > 2) The records within the DB export files are NOT in chronologically, > and we can not change the export. Our use case is a "complex event > processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first > A, then B, then C within 30 days, then do something". Does that work > with FlinkCEP despite the events/records are not in chrono order > within the file? The files are 100MB to 20GB in size. Do I need to > sort the files first before CEP processing? > > -> Flink CEP also works in event time and the re-ordering can be done by > Flink > > 3) Occassionally some crazy people manually "correct" DB records > within the database and manually trigger a re-export of ALL of the > changes for that respective day (e.g. last weeks Tuesday). > Consequently we receive a correction file. Same filename but "_1" > appended. All filenames include the date (of the original export). > What are the options to handle that case (besides telling the DB > admins not to, which we did already). Regular checkpoints and > re-process all files since then? What happens to the CEP state? Will > it be checkpointed as well? > > -> If you require re-processing, then I would say that your best > option is what you described. The other option would be to keep > everything in Flink state until you are sure that no more corrections > will come. In this case, you have to somehow issue the "correction" in > a way that the downstream system can understand what to correct and > how. Keep in mind that this may be an expensive operation because > everything has to be kept in state for longer. > > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? > > -> The only thing to consider is the size of your state. Time is not > necessarily an issue. If your state for these 180 days is a couple of > MBs, then you have no problem. If it increases fast, then you have to > provision your cluster accordingly. > > 5) We also have CEP rules that must fire if after a start sequence > matched, the remaining sequence did NOT within a configured window. > E.g. If A, then B, but C did not occur within 30 days since A. Is that > supported by FlinkCEP? I couldn't find a working example. > > -> You can have a look at [1] for the supported pattern combinations > and you can also look at [2] for some tests of different pattern > combinations. > > 6) We expect 30-40 CEP rules. How can we estimate the required storage > size for the temporary CEP state? Is there some sort of formular > considering number of rules, number of records per file or day, record > size, window, number of records matched per sequence, number of keyBy > grouping keys, ... > > -> In FlinkCEP, each pattern becomes a single operator. This means > that you will have 30-40 operators in your job graph, each with each > own state. This can become heavy but once again it depends on your > workload. I cannot give an estimate because in CEP, in order to > guarantee correct ordering of events in an unordered stream, the > library sometimes has to keep also in state more records than will be > presented at the end. > > Have you considered going with a solution based on processfunction and > broadcast state? This will also allow you to have a more dynamic > set-up where patterns can be added at runtime and it will allow you to > do any optimizations specific to your workload ;) For a discussion on > this, check [3]. In addition, it will allow you to "multiplex" many > patterns into a single operator thus potentially minimizing the amount > of copies of the state you keep. > > 7) I can imagine that for debugging reasons it'd be good if we were > able to query the temporary CEP state. What is the (CEP) schema used > to persist the CEP state and how can we query it? And does such query > work on the whole cluster or only per node (e.g. because of shuffle > and nodes responsible only for a portion of the events). > > -> Unfortunatelly the state in CEP is not queryable, thus I am not > sure if you can inspect it at runtime. > > 8) I understand state is stored per node. What happens if I want to > add or remove a nodes. Will the state still be found, despite it being > stored in another node? I read that I need to be equally careful when > changing rules? Or is that a different issue? > > -> Rescaling a Flink job is not done
Re: JDBC source running continuously
Hi Fanbin, .iterate() is not available on Table API, it's an API of DataStream. Currently, the JDBC source is a bounded source (a snapshot of table at the execution time), so the job will finish when it processes all the data. Regarding to your requirement, "running continuously with JDBC source", we should make it clear what do you want the source to read after the full snapshot: 1) read a full snapshot again 2) read new inserted rows 3) read new inserted rows and updated rows and deleted rows. For (1), you can create your own jdbc input format based on JDBCInputFormat, trying to re-execute the SQL query while reading the last row from DB in nextRecord. (this is the answer in the stackoverflow [1]). For (2), in the nextRecord(), you need to execute a SQL query with a filter to fetch rows which are greater than the last max ID or max created time. For (3), this is a changelog support, which will be supported natively in 1.11 in Flink SQL. Best, Jark On Fri, 21 Feb 2020 at 02:35, Fanbin Bu wrote: > > https://stackoverflow.com/questions/48151881/how-to-run-apache-flink-streaming-job-continuously-on-flink-server > > On Thu, Feb 20, 2020 at 3:14 AM Chesnay Schepler > wrote: > >> Can you show us where you found the suggestion to use iterate()? >> >> On 20/02/2020 02:08, Fanbin Bu wrote: >> > Hi, >> > >> > My app creates the source from JDBC inputformat and running some sql >> > and print out. But the source terminates itself after the query is >> > done. Is there anyway to keep the source running? >> > samle code: >> > val env = StreamExecutionEnvironment.getExecutionEnvironment >> > val settings = EnvironmentSettings.newInstance() >> > .useBlinkPlanner() >> > .inStreamingMode() >> > .build() >> > val tEnv = StreamTableEnvironment.create(env, settings) >> > val inputFormat >> > = JDBCInputFormat.buildJDBCInputFormat.setQuery("select * from >> > table")... .finish() >> > val source = env.createInput(inputFormat) >> > tEnv.registerTableSource(source) >> > val queryResult = tEnv.sqlQuery("select * from awesomeSource") >> > queryResult.insertInto(mySink) >> > >> > >> > I searched around and its suggested to use .iterate(). can somebody >> > give more examples on how to use it in this case? >> > >> > Thanks, >> > Fanbin >> >> >>
Re: Running Flink Cluster on PaaS
Thank you Yang Wang, Regarding [1] and a sentence from that doc. "This page describes deploying a standalone Flink session" I always wonder what do you guys mean by "Standalone Flink session" or "Standalone Cluster" that can be found here [2]. I'm using a Docker with Job Cluster approach, I know that there is also a Session Cluster docker images. I understand the differences, but I'm not sure what you are referring to using those to terms from [1] and [2]. Thanks, Krzysztof [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#cluster_setup.html [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/