'Custom' mapping function on keyed WindowedStream
I would like to create a custom aggregator function for a windowed KeyedStream which I have complete control over - i.e. instead of implementing an AggregatorFunction, I would like to control the lifecycle of the flink state by implementing the CheckpointedFunction interface, though I still want this state to be per-key, per-window. I am not sure which function I should be calling on the WindowedStream in order to invoke this custom functionality. I see from the documentation that CheckpointedFunction is for non-keyed state - which I guess eliminates this option. A little background - I have logic that needs to hold a very large state in the operator - lots of counts by sub-key. Since only a sub-set of these aggregations are updated, I was interesting in trying out incremental checkpointing in RocksDB. However, I don't want to be hitting RocksDB I/O on every update of state since we need very low latency, and instead wanted to hold the state in Java Heap and then update the Flink state on checkpoint - i.e something like CheckpointedFunction. My assumption is that any update I make to RocksDB backed state will hit the local disk - if this is wrong then I'll be happy What other options do I have? Thanks, Hayden Marchant
RE: Kafka as source for batch job
Forget to mention that my target Kafka version is 0.11.x with aim to upgrade to 1.0 when 1.0.x fixpack is released. From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] Sent: Thursday, February 08, 2018 8:05 PM To: user@flink.apache.org; Marchant, Hayden [ICG-IT] Subject: RE: Kafka as source for batch job Hi Marchant, Yes I agree. In general, the isEndOfStream method has a very ill-defined semantic, with actually different behaviors across different Kafka connector versions. This method will definitely need to be revisited in the future (we are thinking about a rework of the connector). What is your target Kafka version? And do you know the ending offsets of _all_ partitions which you want to only consume a range of? I can probably double check for you if your specific case is possible, given the above information. Cheers, Gordon On 8 February 2018 at 3:22:24 PM, Marchant, Hayden (hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote: Gordon, Thanks for the pointer. I did some searches for usages of isEndOfStream and it’s a little confusing. I see that all implementors of DeserializationSchema must implement this method, but it’s not called from anyone central in the Flink streaming engine, but rather each source can decide to use this in it’s own implementation – for example Kafka stops processing the topic when isEndOfStream returns true. This is nice, but localizes the treatment just to that Operator, and, even though it goers a long way in ensuring that I get just my bounded data, it still does not give me the ability to stop my job when I have finished consuming the elements. Also, in my case I need to ensure that I have reached a certain offset for each of the Kafka partitions that are assigned to the instance of source function. It seems from the code that I need a different implementation of KafkaFetcher.runFetchLoop that has slightly different logic for changing running to be false. What would you recommend in this case? From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] Sent: Thursday, February 08, 2018 12:24 PM To: user@flink.apache.org<mailto:user@flink.apache.org>; Marchant, Hayden [ICG-IT] mailto:hm97...@imceu.eu.ssmb.com>> Subject: Re: Kafka as source for batch job Hi Hayden, Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]? I think that could be what you are looking for. It signals the end of the stream when consuming from Kafka. Cheers, Gordon On 8 February 2018 at 10:44:59 AM, Marchant, Hayden (hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote: I know that traditionally Kafka is used as a source for a streaming job. In our particular case, we are looking at extracting records from a Kafka topic from a particular well-defined offset range (per partition) - i.e. from offset X to offset Y. In this case, we'd somehow want the application to know that it has finished when it gets to offset Y. This is basically changes Kafka stream to be bounded data as opposed to unbounded in the usual Stream paradigm. What would be the best approach to do this in Flink? I see a few options, though there might be more: 1. Use a regular streaming job, and have some external service that monitors the current offsets of the consumer group of the topic and manually stops job when the consumer group of the topic has finished Pros - simple wrt Flink, Cons - hacky 2. Create a batch job, and a new InputFormat based on Kafka that reads the specified subset of Kafka topic into the source. Pros - represent bounded data from Kafka topic as batch source, Cons - requires implementation of source. 3. Dump the subset of Kafka into a file and then trigger a more 'traditional' Flink batch job that reads from a file. Pros - simple, cons - unnecessary I/O. I personally prefer 1 and 3 for simplicity. Has anyone done anything like this before? Thanks, Hayden Marchant
RE: Kafka as source for batch job
Hi Gordon, Actually our use case is that we have start/end timestamp, and we plan on calling KafkaConsumer.offsetForTimes to get the offsets for each partition. So, I guess our logic is different in that we have an ‘and’ predicate between each partition arriving at offset, as opposed to the current ‘or’ predicate – i.e. any partition that fulfills a condition is enough to stop the job. Either way, I’d still need to figure out when to stop the job. Would it make more sense to implement an InputFormat that could wrap this ‘bounded’ Kafka source, and use the DataSet / Batch Table API ? Thanks Hayden From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] Sent: Thursday, February 08, 2018 8:05 PM To: user@flink.apache.org; Marchant, Hayden [ICG-IT] Subject: RE: Kafka as source for batch job Hi Marchant, Yes I agree. In general, the isEndOfStream method has a very ill-defined semantic, with actually different behaviors across different Kafka connector versions. This method will definitely need to be revisited in the future (we are thinking about a rework of the connector). What is your target Kafka version? And do you know the ending offsets of _all_ partitions which you want to only consume a range of? I can probably double check for you if your specific case is possible, given the above information. Cheers, Gordon On 8 February 2018 at 3:22:24 PM, Marchant, Hayden (hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote: Gordon, Thanks for the pointer. I did some searches for usages of isEndOfStream and it’s a little confusing. I see that all implementors of DeserializationSchema must implement this method, but it’s not called from anyone central in the Flink streaming engine, but rather each source can decide to use this in it’s own implementation – for example Kafka stops processing the topic when isEndOfStream returns true. This is nice, but localizes the treatment just to that Operator, and, even though it goers a long way in ensuring that I get just my bounded data, it still does not give me the ability to stop my job when I have finished consuming the elements. Also, in my case I need to ensure that I have reached a certain offset for each of the Kafka partitions that are assigned to the instance of source function. It seems from the code that I need a different implementation of KafkaFetcher.runFetchLoop that has slightly different logic for changing running to be false. What would you recommend in this case? From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] Sent: Thursday, February 08, 2018 12:24 PM To: user@flink.apache.org<mailto:user@flink.apache.org>; Marchant, Hayden [ICG-IT] mailto:hm97...@imceu.eu.ssmb.com>> Subject: Re: Kafka as source for batch job Hi Hayden, Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]? I think that could be what you are looking for. It signals the end of the stream when consuming from Kafka. Cheers, Gordon On 8 February 2018 at 10:44:59 AM, Marchant, Hayden (hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote: I know that traditionally Kafka is used as a source for a streaming job. In our particular case, we are looking at extracting records from a Kafka topic from a particular well-defined offset range (per partition) - i.e. from offset X to offset Y. In this case, we'd somehow want the application to know that it has finished when it gets to offset Y. This is basically changes Kafka stream to be bounded data as opposed to unbounded in the usual Stream paradigm. What would be the best approach to do this in Flink? I see a few options, though there might be more: 1. Use a regular streaming job, and have some external service that monitors the current offsets of the consumer group of the topic and manually stops job when the consumer group of the topic has finished Pros - simple wrt Flink, Cons - hacky 2. Create a batch job, and a new InputFormat based on Kafka that reads the specified subset of Kafka topic into the source. Pros - represent bounded data from Kafka topic as batch source, Cons - requires implementation of source. 3. Dump the subset of Kafka into a file and then trigger a more 'traditional' Flink batch job that reads from a file. Pros - simple, cons - unnecessary I/O. I personally prefer 1 and 3 for simplicity. Has anyone done anything like this before? Thanks, Hayden Marchant
RE: Kafka as source for batch job
Gordon, Thanks for the pointer. I did some searches for usages of isEndOfStream and it’s a little confusing. I see that all implementors of DeserializationSchema must implement this method, but it’s not called from anyone central in the Flink streaming engine, but rather each source can decide to use this in it’s own implementation – for example Kafka stops processing the topic when isEndOfStream returns true. This is nice, but localizes the treatment just to that Operator, and, even though it goers a long way in ensuring that I get just my bounded data, it still does not give me the ability to stop my job when I have finished consuming the elements. Also, in my case I need to ensure that I have reached a certain offset for each of the Kafka partitions that are assigned to the instance of source function. It seems from the code that I need a different implementation of KafkaFetcher.runFetchLoop that has slightly different logic for changing running to be false. What would you recommend in this case? From: Tzu-Li (Gordon) Tai [mailto:tzuli...@apache.org] Sent: Thursday, February 08, 2018 12:24 PM To: user@flink.apache.org; Marchant, Hayden [ICG-IT] Subject: Re: Kafka as source for batch job Hi Hayden, Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]? I think that could be what you are looking for. It signals the end of the stream when consuming from Kafka. Cheers, Gordon On 8 February 2018 at 10:44:59 AM, Marchant, Hayden (hayden.march...@citi.com<mailto:hayden.march...@citi.com>) wrote: I know that traditionally Kafka is used as a source for a streaming job. In our particular case, we are looking at extracting records from a Kafka topic from a particular well-defined offset range (per partition) - i.e. from offset X to offset Y. In this case, we'd somehow want the application to know that it has finished when it gets to offset Y. This is basically changes Kafka stream to be bounded data as opposed to unbounded in the usual Stream paradigm. What would be the best approach to do this in Flink? I see a few options, though there might be more: 1. Use a regular streaming job, and have some external service that monitors the current offsets of the consumer group of the topic and manually stops job when the consumer group of the topic has finished Pros - simple wrt Flink, Cons - hacky 2. Create a batch job, and a new InputFormat based on Kafka that reads the specified subset of Kafka topic into the source. Pros - represent bounded data from Kafka topic as batch source, Cons - requires implementation of source. 3. Dump the subset of Kafka into a file and then trigger a more 'traditional' Flink batch job that reads from a file. Pros - simple, cons - unnecessary I/O. I personally prefer 1 and 3 for simplicity. Has anyone done anything like this before? Thanks, Hayden Marchant
Kafka as source for batch job
I know that traditionally Kafka is used as a source for a streaming job. In our particular case, we are looking at extracting records from a Kafka topic from a particular well-defined offset range (per partition) - i.e. from offset X to offset Y. In this case, we'd somehow want the application to know that it has finished when it gets to offset Y. This is basically changes Kafka stream to be bounded data as opposed to unbounded in the usual Stream paradigm. What would be the best approach to do this in Flink? I see a few options, though there might be more: 1. Use a regular streaming job, and have some external service that monitors the current offsets of the consumer group of the topic and manually stops job when the consumer group of the topic has finished Pros - simple wrt Flink, Cons - hacky 2. Create a batch job, and a new InputFormat based on Kafka that reads the specified subset of Kafka topic into the source. Pros - represent bounded data from Kafka topic as batch source, Cons - requires implementation of source. 3. Dump the subset of Kafka into a file and then trigger a more 'traditional' Flink batch job that reads from a file. Pros - simple, cons - unnecessary I/O. I personally prefer 1 and 3 for simplicity. Has anyone done anything like this before? Thanks, Hayden Marchant
RE: S3 for state backend in Flink 1.4.0
WE actually got it working. Essentially, it's an implementation of HadoopFilesytem, and was written with the idea that it can be used with Spark (since it has broader adoption than Flink as of now). We managed to get it configured, and found the latency to be much lower than by using the s3 connector. There are a lot less copying operations etc... happening under the hood when using this native API which explains the better performance. Happy to provide assistance offline if you're interested. Thanks Hayden -Original Message- From: Edward Rojas [mailto:edward.roja...@gmail.com] Sent: Thursday, February 01, 2018 6:09 PM To: user@flink.apache.org Subject: RE: S3 for state backend in Flink 1.4.0 Hi Hayden, It seems like a good alternative. But I see it's intended to work with spark, did you manage to get it working with Flink ? I some tests but I get several errors when trying to create a file, either for checkpointing or saving data. Thanks in advance, Regards, Edward -- Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=MW1NZ-mLVkooOHg-TWiOE7j2e9PCk7EOAmahXApcLtQ&s=b8kvNKIjylDuKlc2munyBj1da85y8aZ8brJsO24R2GU&e=
RE: Latest version of Kafka
Thanks for the info! -Original Message- From: Piotr Nowojski [mailto:pi...@data-artisans.com] Sent: Friday, February 02, 2018 4:37 PM To: Marchant, Hayden [ICG-IT] Cc: user@flink.apache.org Subject: Re: Latest version of Kafka Hi, Flink as for now provides only a connector for Kafka 0.11, which is using KafkaClient in 0.11.x version. However you should be able to use it for reading to/writing from Kafka 1.0 - Kafka claims (and as far as I know it’s true) that Kafka 1.0 is backward compatible with 0.11. Piotrek > On 1 Feb 2018, at 14:46, Marchant, Hayden wrote: > > What is the newest version of Kafka that is compatible with Flink 1.4.0? I > see the last version of Kafka supported is 0.11 , from documentation, but has > any testing been done with Kafka 1.0? > > > Hayden Marchant >
RE: Joining data in Streaming
Thanks for all the ideas!! From: Steven Wu [mailto:stevenz...@gmail.com] Sent: Tuesday, February 06, 2018 3:46 AM To: Stefan Richter Cc: Marchant, Hayden [ICG-IT] ; user@flink.apache.org; Aljoscha Krettek Subject: Re: Joining data in Streaming There is also a discussion of side input https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API<https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D17-2BSide-2BInputs-2Bfor-2BDataStream-2BAPI&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=DINQCP5lFkUWfTJoHDmE5FW_lPp8zfNUGtYzkDvz9sY&s=sGwpvLXj8T7thBIZ1SsrpuohRFOkcl6bFcl9L49iRgM&e=> I would load the smaller data set as static reference data set. Then you can just do single source streaming of the larger data set. On Wed, Jan 31, 2018 at 1:09 AM, Stefan Richter mailto:s.rich...@data-artisans.com>> wrote: Hi, if the workarounds that Xingcan and me mentioned are no options for your use-case, then I think this might currently be the better option. But I would expect some better support for stream joins in the near future. Best, Stefan > Am 31.01.2018 um 07:04 schrieb Marchant, Hayden > mailto:hayden.march...@citi.com>>: > > Stefan, > > So are we essentially saying that in this case, for now, I should stick to > DataSet / Batch Table API? > > Thanks, > Hayden > > -Original Message- > From: Stefan Richter > [mailto:s.rich...@data-artisans.com<mailto:s.rich...@data-artisans.com>] > Sent: Tuesday, January 30, 2018 4:18 PM > To: Marchant, Hayden [ICG-IT] > mailto:hm97...@imceu.eu.ssmb.com>> > Cc: user@flink.apache.org<mailto:user@flink.apache.org>; Aljoscha Krettek > mailto:aljos...@apache.org>> > Subject: Re: Joining data in Streaming > > Hi, > > as far as I know, this is not easily possible. What would be required is > something like a CoFlatmap function, where one input stream is blocking until > the second stream is fully consumed to build up the state to join against. > Maybe Aljoscha (in CC) can comment on future plans to support this. > > Best, > Stefan > >> Am 30.01.2018 um 12:42 schrieb Marchant, Hayden >> mailto:hayden.march...@citi.com>>: >> >> We have a use case where we have 2 data sets - One reasonable large data set >> (a few million entities), and a smaller set of data. We want to do a join >> between these data sets. We will be doing this join after both data sets are >> available. In the world of batch processing, this is pretty straightforward >> - we'd load both data sets into an application and execute a join operator >> on them through a common key. Is it possible to do such a join using the >> DataStream API? I would assume that I'd use the connect operator, though I'm >> not sure exactly how I should do the join - do I need one 'smaller' set to >> be completely loaded into state before I start flowing the large set? My >> concern is that if I read both data sets from streaming sources, since I >> can't be guaranteed of the order that the data is loaded, I may lose lots of >> potential joined entities since their pairs might not have been read yet. >> >> >> Thanks, >> Hayden Marchant >> >> >
Latest version of Kafka
What is the newest version of Kafka that is compatible with Flink 1.4.0? I see the last version of Kafka supported is 0.11 , from documentation, but has any testing been done with Kafka 1.0? Hayden Marchant
Reading bounded data from Kafka in Flink job
I have 2 datasets that I need to join together in a Flink batch job. One of the datasets needs to be created dynamically by completely 'draining' a Kafka topic in an offset range (start and end), and create a file containing all messages in that range. I know that in Flink streaming I can specify the start offset, but not the end offset. In my case, this preparation of the file from kafka topic is really working on a finite, bounded set of data, even though it's from Kafka. Is there a way that I can do this in Flink (either streaming or batch ? Thanks, Hayden
RE: S3 for state backend in Flink 1.4.0
Edward, We are using Object Storage for checkpointing. I'd like to point out that we were seeing performance problems using the S3 protocol. Btw, we had quite a few problems using the flink-s3-fs-hadoop jar with Object Storage and had to do some ugly hacking to get it working all over. We recently discovered an alternative connector developed by IBM Research called stocator. It's a streaming writer and performs better than using the S3 protocol. Here is a link to the library - https://github.com/SparkTC/stocator, and a blog explaining about it - http://www.spark.tc/stocator-the-fast-lane-connecting-object-stores-to-spark/ Good luck!! -Original Message- From: Edward Rojas [mailto:edward.roja...@gmail.com] Sent: Wednesday, January 31, 2018 3:02 PM To: user@flink.apache.org Subject: RE: S3 for state backend in Flink 1.4.0 Hi, We are having a similar problem when trying to use Flink 1.4.0 with IBM Object Storage for reading and writing data. We followed https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.4_ops_deployment_aws.html&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk&s=gY41yFjnJzQNaL3R1YK7HzG8XUyBn0kJ6_3m-4t7E7k&e= and the suggestion on https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D851&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk&s=bDXNhnIV4KFTK9Byg5w2R_8UlWiXH05uAp9rkWJm_jo&e=. We put the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder and we added the configuration on the flink-config.yaml: s3.access-key: s3.secret-key: s3.endpoint: s3.us-south.objectstorage.softlayer.net With this we can read from IBM Object Storage without any problem when using env.readTextFile("s3://flink-test/flink-test.txt"); But we are having problems when trying to write. We are using a kafka consumer to read from the bus, we're making some processing and after saving some data on Object Storage. When using stream.writeAsText("s3://flink-test/data.txt").setParallelism(1); The file is created but only when the job finish (or we stop it). But we need to save the data without stopping the job, so we are trying to use a Sink. But when using a BucketingSink, we get the error: java.io.IOException: No FileSystem for scheme: s3 at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1196) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355) Do you have any idea how could we make it work using Sink? Thanks, Regards, Edward -- Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk&s=vN9sFldnlnzHZPgOBi42Rwfq1Hbq79gUPUNLgi0zmSM&e=
RE: Joining data in Streaming
Stefan, So are we essentially saying that in this case, for now, I should stick to DataSet / Batch Table API? Thanks, Hayden -Original Message- From: Stefan Richter [mailto:s.rich...@data-artisans.com] Sent: Tuesday, January 30, 2018 4:18 PM To: Marchant, Hayden [ICG-IT] Cc: user@flink.apache.org; Aljoscha Krettek Subject: Re: Joining data in Streaming Hi, as far as I know, this is not easily possible. What would be required is something like a CoFlatmap function, where one input stream is blocking until the second stream is fully consumed to build up the state to join against. Maybe Aljoscha (in CC) can comment on future plans to support this. Best, Stefan > Am 30.01.2018 um 12:42 schrieb Marchant, Hayden : > > We have a use case where we have 2 data sets - One reasonable large data set > (a few million entities), and a smaller set of data. We want to do a join > between these data sets. We will be doing this join after both data sets are > available. In the world of batch processing, this is pretty straightforward > - we'd load both data sets into an application and execute a join operator on > them through a common key. Is it possible to do such a join using the > DataStream API? I would assume that I'd use the connect operator, though I'm > not sure exactly how I should do the join - do I need one 'smaller' set to be > completely loaded into state before I start flowing the large set? My concern > is that if I read both data sets from streaming sources, since I can't be > guaranteed of the order that the data is loaded, I may lose lots of potential > joined entities since their pairs might not have been read yet. > > > Thanks, > Hayden Marchant > >
Joining data in Streaming
We have a use case where we have 2 data sets - One reasonable large data set (a few million entities), and a smaller set of data. We want to do a join between these data sets. We will be doing this join after both data sets are available. In the world of batch processing, this is pretty straightforward - we'd load both data sets into an application and execute a join operator on them through a common key. Is it possible to do such a join using the DataStream API? I would assume that I'd use the connect operator, though I'm not sure exactly how I should do the join - do I need one 'smaller' set to be completely loaded into state before I start flowing the large set? My concern is that if I read both data sets from streaming sources, since I can't be guaranteed of the order that the data is loaded, I may lose lots of potential joined entities since their pairs might not have been read yet. Thanks, Hayden Marchant
RE: S3 for state backend in Flink 1.4.0
I see that we can still use the other implementation, but were hoping that we'd benefit from the bug fix done in Flink 1.4.0 around 'repeated' load of configuration. I'll check with the old implementation and see if it still works. We also have seen discussions on a more native protocol that interfaces directly to IBM Object Storage that can be configured through the hdfs-site.xml called stocator that might speed things up. -Original Message- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: Thursday, January 25, 2018 6:30 PM To: Marchant, Hayden [ICG-IT] Cc: user@flink.apache.org Subject: Re: S3 for state backend in Flink 1.4.0 Hi, Did you try overriding that config and it didn't work? That dependency is in fact still using the Hadoop S3 FS implementation but is shading everything to our own namespace so that there can't be any version conflicts. If that doesn't work then we need to look into this further. The way you usually use this is by putting the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder. I'm not sure including it as a dependency will work but it might. You also don't have to use flink-s3-fs-hadoop dependency if using the regular Hadoop S3 support worked for you before. It's only an additional option. Best, Aljoscha > On 24. Jan 2018, at 16:33, Marchant, Hayden wrote: > > Hi, > > We have a Flink Streaming application that uses S3 for storing checkpoints. > We are not using 'regular' S3, but rather IBM Object Storage which has an > S3-compatible connector. We had quite some challenges in overiding the > endpoint from the default s3.amnazonaws.com to our internal IBM Object > Storage endpoint. In 1.3.2, we managed to get this working by providing our > own jets3t.properties file that overrode s3service.s3-endpoint > (https://urldefense.proofpoint.com/v2/url?u=https-3A__jets3t.s3.amazonaws.com_toolkit_configuration.html&d=DwIFAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=pGMzFMafCab1RjHp3FDDKhlafEqeVPGytcX4PMbDk5Y&s=K2NJPrY_Mdv0u0B2CIvuckgr26dlraUJwZEU6aq5yXM&e=) > > When upgrading to 1.4.0, we added dependency to the flink-s3-fs-hadoop > artifact. Seems that our overriding with jets3t.properties is no longer > relevant since does not use the Hadoop implementation anymore. > > Is there a way to overide this default endpoint, or with the presto endpoint > can we use this? Please note that if we provide the endpoint in the URL for > the state backend, it simply appends s3.amazonaws.com to the url. For example > s3://myobjectstorageendpoint.s3.amazonaws.com. > > Are there any other solutions such as to 'rollback' to the Hadoop > implementation of S3? > > Thanks, > Hayden
S3 for state backend in Flink 1.4.0
Hi, We have a Flink Streaming application that uses S3 for storing checkpoints. We are not using 'regular' S3, but rather IBM Object Storage which has an S3-compatible connector. We had quite some challenges in overiding the endpoint from the default s3.amnazonaws.com to our internal IBM Object Storage endpoint. In 1.3.2, we managed to get this working by providing our own jets3t.properties file that overrode s3service.s3-endpoint (https://jets3t.s3.amazonaws.com/toolkit/configuration.html) When upgrading to 1.4.0, we added dependency to the flink-s3-fs-hadoop artifact. Seems that our overriding with jets3t.properties is no longer relevant since does not use the Hadoop implementation anymore. Is there a way to overide this default endpoint, or with the presto endpoint can we use this? Please note that if we provide the endpoint in the URL for the state backend, it simply appends s3.amazonaws.com to the url. For example s3://myobjectstorageendpoint.s3.amazonaws.com. Are there any other solutions such as to 'rollback' to the Hadoop implementation of S3? Thanks, Hayden
Hardware Reference Architecture
Hi, I'm looking for guidelines for Reference architecture for Hardware for a small/medium Flink cluster - we'll be installing on in-house bare-metal servers. I'm looking for guidance for: 1. Number and spec of CPUs 2. RAM 3. Disks 4. Network 5. Proximity of servers to each other (Most likely, we will choose YARN as a cluster manager for Flink) If someone can share a document or link with relevant information, I will be very grateful. Thanks, Hayden Marchant
TaskManager HA on YARN
Hi, WE are currently start to test Flink running on YARN. Till now, we've been testing on Standalone Cluster. One thing lacking in standalone is that we have to manually restart a Task Manager if it dies. I looked at https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#yarn-cluster-high-availability , and see that YARN deals with HA for Job Manager. How does it deal with a Task Manager if it dies? I would like the Task Manager to be dealt with similarly to Job Manager on failure. For example, let's say I have a cluster with two Task Managers, and one task manager dies. Will YARN restart the dead Task Manager, or would that need to be a manual restart? What actually would happen in the above case? Thanks, Hayden
Garbage collection concerns with Task Manager memory
I read in the Flink documentation that the TaskManager runs all tasks within its own JVM, and that the recommendation is to set the taskmanager.heap.mb to be as much as is available on the server. I have a very large server with 192GB so thinking of giving most of it to the Task Manager. I recall that there are concerns with long stop-the-world garbage collection pauses about allocating too much memory to a JVM - is this still a concern with G1 ? Thanks, Hayden Marchant
start-cluster.sh not working in HA mode
I am attempting to run Flink 1.3.2 in HA mode with zookeeper. When I run the start-cluster.sh, the job manager is not started, even though the task manager is started. When I delved into this, I saw that the command: ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start cluster ${master} ${webuiport} &" is not actually running anything on the host. i.e. I do not see "Starting jobmanager daemon on host ." Only when I remove ALL quotes, do I see it working. i.e. if I run: ssh -n $FLINK_SSH_OPTS $master -- nohup /bin/bash -l ${FLINK_BIN_DIR}/jobmanager.sh start cluster ${master} ${webuiport} & I see that it manages to run the job manager - I see " Starting jobmanager daemon on host.". Did anyone else experience a similar problem? Any elegant workarounds without having to change source code? Thanks, Hayden Marchant
RE: In-memory cache
Nice idea. Actually we are looking at connect for other parts of our solution in which the latency is less critical. A few considerations of not using ‘connect’ in this case were: 1. To isolate the two streams from each other to reduce complexity, simplify debugging etc…. – since we are newbies at Flink I was thinking that it is beneficial to keep the stream as simple as possible, and if need be, we can interface between them to ‘exchange data’ 2. The reference data, even though quite small, is updated every 100ms. Since we would need this reference data on each ‘consuming’ operator instance, we would be essentially nearly double the amount of tuples coming through the operator. Since low-latency is key here, this was a concern, the assumption being that the two sides of the ‘connect’ share the same resources – whereas using a background thread to update a ‘map’ would not be competing with the incoming tuples) I realize that structurally, connect is a neater solution. If I can be convinced that my above concerns are unfounded, I’ll be happy to try that direction. Thanks Hayden From: Stavros Kontopoulos [mailto:st.kontopou...@gmail.com] Sent: Monday, October 02, 2017 2:24 PM To: Marchant, Hayden [ICG-IT] Cc: user@flink.apache.org Subject: Re: In-memory cache How about connecting two streams of data, one from the reference data and one from the main data (I assume using key streams as you mention QueryableState) and keep state locally within the operator. The idea is to have a local sub-copy of the reference data within the operator that is updated from the source of the reference data. Reference data are still updated externally from that low latency flink app. Here is a relevant question: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Accessing-state-in-connected-streams-td8727.html<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_Accessing-2Dstate-2Din-2Dconnected-2Dstreams-2Dtd8727.html&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=eLl5xx2Dc8nmmad2mz2k0aQ53NeI_Fb2V-qeRn-7CVQ&s=05n6tNvhZgLQ4o_N9tpkh8jhM1RcyCB_MIVcZILECtI&e=>. Would that help? Stavros On Mon, Oct 2, 2017 at 1:46 PM, Marchant, Hayden mailto:hayden.march...@citi.com>> wrote: We have an operator in our streaming application that needs to access 'reference data' that is updated by another Flink streaming application. This reference data has about ~10,000 entries and has a small footprint. This reference data needs to be updated ~ every 100 ms. The required latency for this application is extremely low ( a couple of milliseconds), and we are therefore cautious of paying cost of I/O to access the reference data remotely. We are currently examining 3 different options for accessing this reference data: 1. Expose the reference data as QueryableState and access it directly from the 'client' streaming operator using the QueryableState API 2. same as #1, but create an In-memory Java cache of the reference data within the operator that is asynchronously updated at a scheduled frequency using the QueryableState API 3. Output the reference data to Redis, and create an in-memory java cache of the reference data within the operator that is asynchronously updated at a scheduled frequency using Redis API. My understanding is that one of the cons of using Queryable state, is that if the Flink application that generates the reference data is unavailable, the Queryable state will not exist - is that correct? If we were to use an asynchronously scheduled 'read' from the distributed cache, where should it be done? I was thinking of using ScheduledExecutorService from within the open method of the Flink operator. What is the best way to get this done? Regards, Hayden Marchant
In-memory cache
We have an operator in our streaming application that needs to access 'reference data' that is updated by another Flink streaming application. This reference data has about ~10,000 entries and has a small footprint. This reference data needs to be updated ~ every 100 ms. The required latency for this application is extremely low ( a couple of milliseconds), and we are therefore cautious of paying cost of I/O to access the reference data remotely. We are currently examining 3 different options for accessing this reference data: 1. Expose the reference data as QueryableState and access it directly from the 'client' streaming operator using the QueryableState API 2. same as #1, but create an In-memory Java cache of the reference data within the operator that is asynchronously updated at a scheduled frequency using the QueryableState API 3. Output the reference data to Redis, and create an in-memory java cache of the reference data within the operator that is asynchronously updated at a scheduled frequency using Redis API. My understanding is that one of the cons of using Queryable state, is that if the Flink application that generates the reference data is unavailable, the Queryable state will not exist - is that correct? If we were to use an asynchronously scheduled 'read' from the distributed cache, where should it be done? I was thinking of using ScheduledExecutorService from within the open method of the Flink operator. What is the best way to get this done? Regards, Hayden Marchant
Testing recoverable job state
I'm a newbie to Flink and am trying to understand how the recovery works using state backends. I've read the documentation and am now trying to run a simple test to demonstrate the abilities - I'd like to test the recovery of a flink job and how the state is recovered from where it left off when 'disaster hit'. Please note that this whole test is being done on a Windows workstation through my IDE. I am running a LocalFlinkMiniCluster and have enabled checkpointing using FsStateBackend. I am using Kafka as a source. When running this Flink job, I see that a new directory is created within the FsStateBackend base directory with a randomly generated JobID. I assume that if a Task fails within the job, the state stored in the backend will be used to restart the relevant Operator instances from the recent checkpoint. I have tried simulating this by throwing an exception in one of the operators, though I'm not sure what the expected functionality is now - will the Task be killed, or just that 'bad' tuple will be ignored? Also, and more importantly, I would like to simulate a more 'drastic' failure - that of my whole Flink cluster going down. In my test I would do this simply by killing my single LocalFlinkMiniCluster process. In that case, I would like my job to resume when I restart the Flink cluster. However, when I do that, my could launches a new job, with same code, but running with a new Job ID. How do I get it to run with the same Job ID so that it can use the stored state to recover? Am I approaching this test in the right way? If not, please give me some pointers to better simulate a real system. (Note that in a real system, we would like to run on a single node cluster.) Thanks, Hayden Marchant
RE: Queryable State
I can see the job running in the FlinkUI for the job, and specifically specified the port for the Job Manager. When I provided a different port, I got an akka exception. Here, it seems that the code is getting further. I think that it might be connected with how I am creating the StateDescriptor. What exactly does it mean when the KvStateLocation can't be found? -Original Message- From: Biplob Biswas [mailto:revolutioni...@gmail.com] Sent: Wednesday, September 13, 2017 2:20 PM To: user@flink.apache.org Subject: Re: Queryable State Hi, are you sure your jobmanager is running and is accessible from the supplied hostname and port? If you can start up the FLink UI of the job which creates your queryable state, it should have the details of the job manager and the port to be used in this queryable client job. -- Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_&d=DwICAg&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=ox9rY5RgZleCKLmUaw2y4BpSeUaf32AN7o4HRP1gkUQ&s=gZtSvvulOpw2jMACIgulbIacj6bKfndY6B7LdP-jRbg&e=
QueryableState - No KvStateLocation found for KvState instance
I am trying to use queryable state, and am encountering issues when querying the state from the client. I get the following exception: Exception in thread "main" org.apache.flink.runtime.query.UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'word_sums'. at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1532) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:777) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) Exception in thread "main" org.apache.flink.runtime.query.UnknownKvStateLocation: No KvStateLocation found for KvState instance with name 'word_sums'. at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleKvStateMessage(JobManager.scala:1532) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:777) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) In my flow, I am creating the queryable state in the following way: final TypeSerializer> valueSerializer = TypeInformation.of(new TypeHint>() {}).createSerializer(new ExecutionConfig()); ValueStateDescriptor> vsd = new ValueStateDescriptor<>(WORD_SUMS_STATE,valueSerializer); QueryableStateStream> tupleTuple2QueryableStateStream = wordsSummedStream.asQueryableState(WORD_SUMS_STATE, vsd); I am using LocalFlinkMiniCluster and have enabled QueryableStateOptions.SERVER_ENABLE in the configuration. From the logs in the startup of the flow, I see that the queryable state operator is running. I also see the queryable state operation from the web console Is there anything else that I am missing? Thanks, Hayden Marchant
Shuffling between map and keyBy operator
I have a streaming application that has a keyBy operator followed by an operator working on the keyed values (a custom sum operator). If the map operator and aggregate operator are running on same Task Manager , will Flink always serialize and deserialize the tuples, or is there an optimization in this case due to 'locality'? (I was planning on deploying my Flink Streaming application to a single 'big' node in the hope that I can reduce latency by saving on both network and serde.) Thanks, Hayden Marchant
Very low-latency - is it possible?
We're about to get started on a 9-person-month PoC using Flink Streaming. Before we get started, I am interested to know how low-latency I can expect for my end-to-end flow for a single event (from source to sink). Here is a very high-level description of our Flink design: We need at least once semantics, and our main flow of application is parsing a message ( < 50 microseconds) from Kafka, and then doing a keyBy on the parsed event ( <1kb) and then updating a very small user state in the KeyedStream, and then doing another keyBy and then operator of that KeyedStream. Each of the operators is a very simple operation - very little calculation and no I/O. ** Our requirement is to get close to 1ms (99%) or lower for end-to-end processing (timer starts once we get message from Kafka). Is this at all realistic if are flow contains 2 aggregations? If so, what optimizations might we need to get there regarding cluster configuration (both Flink and Hardware). Our throughput is possibly small enough (40,000 events per second) that we could run on one node - which might eliminate some network latency. I did read in https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html in Exactly Once vs At Least Once that a few milliseconds is considered super low-latency - wondering if we can get lower. Any advice or 'war stories' are very welcome. Thanks, Hayden Marchant
RE: Using local FS for checkpoint
I didn’t think about NFS. That would save me the hassle of installing HDFS cluster just for that, especially if my organization already has an NFS ‘handy’. Thanks Hayden From: Tony Wei [mailto:tony19920...@gmail.com] Sent: Thursday, August 31, 2017 12:12 PM To: Marchant, Hayden [ICG-IT] Cc: user@flink.apache.org Subject: Re: Using local FS for checkpoint Hi Marchant, HDFS is not a must for storing checkpoints. S3 or NFS are all acceptable, as long as it is accessible from job manager and task manager. For AWS S3 configuration, you can refer to this page (https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.3_setup_aws.html&d=DwMFaQ&c=j-EkbjBYwkAB4f8ZbVn1Fw&r=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c&m=FqFmOvFkeIVmEku6VDuGfbYpEZLcbN7UUGFUei8TrgA&s=Em489MhXME4sEtU03lj8groEt92mJWMIeAvcx7Hi7is&e=>). Best, Tony Wei 2017-08-31 15:53 GMT+08:00 Marchant, Hayden mailto:hayden.march...@citi.com>>: Whether I use RocksDB or FS State backends, if my requirements are to have fault-tolerance and ability to recover with 'at-least once' semantics for my Flink job, is there still a valid case for using a backing local FS for storing states? i.e. If a Flink Node is invalidated, I would have thought that the only way it could recover (by re-starting the task on different node), is if the state is stored in a shared file system such as HDFS, S3 etc I am asking since I want to know if HDFS is a must have for my deployment. Thanks, Hayden
Using local FS for checkpoint
Whether I use RocksDB or FS State backends, if my requirements are to have fault-tolerance and ability to recover with 'at-least once' semantics for my Flink job, is there still a valid case for using a backing local FS for storing states? i.e. If a Flink Node is invalidated, I would have thought that the only way it could recover (by re-starting the task on different node), is if the state is stored in a shared file system such as HDFS, S3 etc I am asking since I want to know if HDFS is a must have for my deployment. Thanks, Hayden