Tracing and Flink
Hello Flink Friends! This is a long-shot, but I'm wondering if anyone is thinking or working on applying tracing to Streaming systems and in particular Flink. As far as I understand this is a fairly open problem and so I'm curious how folks are thinking about it and if anyone has considered how they might apply tracing to Flink systems. Some patterns in Streaming systems fit into tracing fairly easily (consumer fanout-out, for example) but many patterns do not. For example, how do you trace when there is batching or aggregations? Nevertheless, I'm sure some folks have thought about this or even tried to implement solutions, and so I'd love to hear about this. Especially if there are any standards work in this direction (for example, within the OpenTracing project). If you've thought about this, implemented something, or are working on standards related to this, I'd love to hear from you! Thank you! Best, Aaron Levin
Re: map JSON to scala case class & off-heap optimization
Hi Georg, you can try using the circe library for this which has a way to automatically generate JSON decoders for scala case classes. As it was mentioned earlier, Flink does not come packaged with JSON-decoding generators for Scala like spark does. On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler wrote: > Great. Thanks. > But would it be possible to automate this i.e. to have this work > automatically for the case class / product? > > Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala < > taher...@gmail.com>: > >> The performant way would be to apply a map function over the stream and >> then use the Jackson ObjectMapper to convert to scala objects. In flink >> there is no API like Spark to automatically get all fields. >> >> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler >> wrote: >> >>> How can I use it with a scala case class? >>> If I understand it correctly for better performance the Object Mapper is >>> already initialized in each KafkaConsumer and returning ObjectNodes. So >>> probably I should rephrase to: how can I then map these to case classes >>> without handcoding it? https://github.com/json4s/json4s or >>> https://github.com/FasterXML/jackson-module-scala both only seem to >>> consume strings. >>> >>> Best, >>> Georg >>> >>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala < >>> taher...@gmail.com>: >>> You can try the Jackson ObjectMapper library and that will get you from json to object. Regards, Taher Koitawala On Thu, Jul 9, 2020, 9:54 PM Georg Heiler wrote: > Hi, > > I want to map a stream of JSON documents from Kafka to a scala > case-class. How can this be accomplished using the > JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes > required? > > I have a Spark background. There, such manual mappings usually are > discouraged. Instead, they offer a nice API (dataset API) to perform such > a > type of assignment. > 1) this is concise > 2) it operates on sparks off-heap memory representations (tungsten) to > be faster > > In Flink, instead, such off-heap optimizations seem not to be talked > much about (sorry if I miss something, I am a Flink newbie). Is there a > reason why these optimizations are not necessary in Flink? > > > How could I get the following example: > val serializer = new JSONKeyValueDeserializationSchema(false) > val stream = senv.addSource( > new FlinkKafkaConsumer( > "tweets-raw-json", > serializer, > properties > ).setStartFromEarliest() // TODO experiment with different start > values > ) > > to map to this Tweet class concisely, i.e. without manually iterating > through all the attribute fields and parsing the keys from the object node > tree. > > final case class Tweet(tweet_id: Option[String], text: Option[String], > source: Option[String], geo: Option[String], place: Option[String], lang: > Option[String], created_at: Option[String], timestamp_ms: Option[String], > coordinates: Option[String], user_id: Option[Long], user_name: > Option[String], screen_name: Option[String], user_created_at: > Option[String], followers_count: Option[Long], friends_count: > Option[Long], > user_lang: Option[String], user_location: Option[String], hashtags: > Option[Seq[String]]) > > Best, > Georg >
Re: Does anyone have an example of Bazel working with Flink?
Hi Austin, In our experience, `rules_scala` and `rules_java` are enough for us at this point. It's entirely possible I'm not thinking far enough into the future, though, so don't take our lack of investment as a sign it's not worth investing in :) Best, Aaron Levin On Thu, Jun 18, 2020 at 10:27 AM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Great to hear Dan! > > @Aaron - would you/ your team be interested in a `rules_flink` project? > I'm still fairly new to Bazel and know enough to contribute, but could > definitely use guidance on design as well. > > Best, > Austin > > On Mon, Jun 15, 2020 at 11:07 PM Dan Hill wrote: > >> Thanks for the replies! I was able to use the provided answers to get a >> setup working (maybe not the most efficiently). The main change I made was >> to switch to including the deploy jar in the image (rather than the default >> one). >> >> I'm open to contributing to a "rules_flink" project. I don't know enough >> yet to help design it. >> >> On Sat, Jun 13, 2020 at 4:39 AM Till Rohrmann >> wrote: >> >>> Hi Dan, >>> >>> if you want to run a Flink job without specifying the main class via >>> `bin/flink run --class org.a.b.Foobar` then you have to add a MANIFEST.MF >>> file to your jar under META-INF and this file needs to contain `Main-Class: >>> org.a.b.Foobar`. >>> >>> Cheers, >>> Till >>> >>> On Fri, Jun 12, 2020 at 12:30 AM Austin Cawley-Edwards < >>> austin.caw...@gmail.com> wrote: >>> >>>> Hey all, >>>> >>>> Adding to Aaron's response, we use Bazel to build our Flink apps. We've >>>> open-sourced some of our setup here[1] though a bit outdated. There are >>>> definitely rough edges/ probably needs a good deal of work to fit other >>>> setups. We have written a wrapper around the `java_library` and >>>> `java_binary` and could do the same for `rules_scala`, though we just >>>> started using Bazel last November and have a lot to learn in terms of best >>>> practices there. >>>> >>>> If you're interested in contributing to a `rules_flink` project, I >>>> would be as well! >>>> >>>> Best, >>>> Austin >>>> >>>> [1]: https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020 >>>> >>>> On Thu, Jun 11, 2020 at 6:14 PM Aaron Levin >>>> wrote: >>>> >>>>> Hi Dan, >>>>> >>>>> We use Bazel to compile our Flink applications. We're using >>>>> "rules_scala" (https://github.com/bazelbuild/rules_scala) to manage >>>>> the dependencies and produce jars. We haven't had any issues. However, I >>>>> have found that sometimes it's difficult to figure out exactly what Flink >>>>> target or dependency my application needs. >>>>> >>>>> Unfortunately I'm not sure what issue you're seeing here. I would >>>>> guess either your flink application wasn't compiled into the jar >>>>> you're executing. If you can paste the bazel target used to generate your >>>>> jar and how you're launching the application, that will be helpful >>>>> for diagnosis. >>>>> >>>>> On Thu, Jun 11, 2020 at 5:21 PM Dan Hill >>>>> wrote: >>>>> >>>>>> I took the Flink playground and I'm trying to swap out Maven for >>>>>> Bazel. I got to the point where I'm hitting the following error. I want >>>>>> to diff my code with an existing, working setup. >>>>>> >>>>>> Thanks! - Dan >>>>>> >>>>>> >>>>>> client_1| >>>>>> org.apache.flink.client.program.ProgramInvocationException: >>>>>> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar >>>>>> file. >>>>>> >>>>>> client_1| at >>>>>> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596) >>>>>> >>>>>> client_1| at >>>>>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190) >>>>>> >>>>>> client_1| at >>>>>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128) >>>>>> >>>>>> client_1| at >>>>>> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862) >>>>>> >>>>>> client_1| at >>>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204) >>>>>> >>>>>> client_1| at >>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) >>>>>> >>>>>> client_1| at >>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) >>>>>> >>>>>> client_1| at >>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) >>>>>> >>>>>> client_1| at >>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >>>>>> >>>>>
Re: Does anyone have an example of Bazel working with Flink?
Hi Dan, We use Bazel to compile our Flink applications. We're using "rules_scala" ( https://github.com/bazelbuild/rules_scala) to manage the dependencies and produce jars. We haven't had any issues. However, I have found that sometimes it's difficult to figure out exactly what Flink target or dependency my application needs. Unfortunately I'm not sure what issue you're seeing here. I would guess either your flink application wasn't compiled into the jar you're executing. If you can paste the bazel target used to generate your jar and how you're launching the application, that will be helpful for diagnosis. On Thu, Jun 11, 2020 at 5:21 PM Dan Hill wrote: > I took the Flink playground and I'm trying to swap out Maven for Bazel. I > got to the point where I'm hitting the following error. I want to diff my > code with an existing, working setup. > > Thanks! - Dan > > > client_1| > org.apache.flink.client.program.ProgramInvocationException: > Neither a 'Main-Class', nor a 'program-class' entry was found in the jar > file. > > client_1| at > org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596) > > client_1| at > org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190) > > client_1| at > org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128) > > client_1| at > org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862) > > client_1| at > org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204) > > client_1| at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > > client_1| at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > > client_1| at > org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > > client_1| at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) >
ListState with millions of elements
Hello friendly Flink community! I'm curious if anyone has operational experience with jobs that store ListState where occasionally, due to skew, some small number of lists stored in ListState (stored in RocksDB) will have millions of elements. Here are the stats: * millions of keys * p95 size of list in ListState is ~2. * some small number of keys (less than 100) may have lists whose size is on the order of tens of thousands and up to millions. * state is stored in RocksDB Are there any known issues or limitations with storing or fetching that much list state out of RocksDB? I realize fetching from RocksDB and deserializing will be costly when hitting a key with a list of a million elements, but is there anything else we should consider? Thanks! Best, Aaron Levin
Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint
Hi Piotr, Thanks for your response! I understand that checkpoints and savepoints may be diverging (for unaligned checkpoints) but parts also seem to be converging per FLIP-47[0]. Specifically, in FLIP-47 they state that rescaling is "Supported but not in all cases" for checkpoints. What I'm hoping to find is guidance or documentation on when rescaling is supported for checkpoints, and, more importantly, if the cases where it's not supported will result in hard or silent failures. The context here is that we rely on the exactly-once semantics for our Flink jobs in some important systems. In some cases when a job is in a bad state it may not be able to take a checkpoint, but changing the job's parallelism may resolve the issue. Therefore it's important for us to know if deploying from a checkpoint, on purpose or by operator error, will break the semantic guarantees of our job. Hard failure in the cases where you cannot change parallelism would be the desired outcome imo. Thank you! [0] https://cwiki.apache.org/confluence/display/FLINK/FLIP-47%3A+Checkpoints+vs.+Savepoints Best, Aaron Levin On Fri, Mar 13, 2020 at 9:08 AM Piotr Nowojski wrote: > Hi, > > Generally speaking changes of parallelism is supported between checkpoints > and savepoints. Other changes to the job’s topology, like > adding/changing/removing operators, changing types in the job graph are > only officially supported via savepoints. > > But in reality, as for now, there is no difference between checkpoints and > savepoints, but that’s subject to change, so it’s better not to relay this > behaviour. For example with unaligned checkpoints [1] (hopefully in 1.11), > there will be a difference between those two concepts. > > Piotrek > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-76:+Unaligned+Checkpoints> > > On 12 Mar 2020, at 12:16, Aaron Levin wrote: > > Hi, > > What's the expected behaviour of: > > * changing an operator's parallelism > * deploying this change from an incremental (RocksDB) checkpoint instead > of a savepoint > > The flink docs[0][1] are a little unclear on what the expected behaviour > is here. I understand that the key-space is being changed because > parallelism is changed. I've seen instances where this happens and a job > does not fail. But how does it treat potentially missing state for a given > key? > > I know I can test this, but I'm curious what the _expected_ behaviour is? > I.e. what behaviour can I rely on, which won't change between versions or > releases? Do we expect the job to fail? Do we expect missing keys to just > be considered empty? > > Thanks! > > [0] > https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html > > Aaron Levin > > >
Expected behaviour when changing operator parallelism but starting from an incremental checkpoint
Hi, What's the expected behaviour of: * changing an operator's parallelism * deploying this change from an incremental (RocksDB) checkpoint instead of a savepoint The flink docs[0][1] are a little unclear on what the expected behaviour is here. I understand that the key-space is being changed because parallelism is changed. I've seen instances where this happens and a job does not fail. But how does it treat potentially missing state for a given key? I know I can test this, but I'm curious what the _expected_ behaviour is? I.e. what behaviour can I rely on, which won't change between versions or releases? Do we expect the job to fail? Do we expect missing keys to just be considered empty? Thanks! [0] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html Aaron Levin
Re: [DISCUSS] Change default for RocksDB timers: Java Heap => in RocksDB
+1. I personally found it a little confusing when I discovered I had to configure this after already choosing RocksDB as a backend. Also very strongly in favour of "safe and scalable" as the default. Best, Aaron Levin On Fri, Jan 17, 2020 at 4:41 AM Piotr Nowojski wrote: > +1 for making it consistent. When using X state backend, timers should be > stored in X by default. > > Also I think any configuration option controlling that needs to be well > documented in some performance tuning section of the docs. > > Piotrek > > On 17 Jan 2020, at 09:16, Congxian Qiu wrote: > > +1 to store timers in RocksDB default. > > Store timers in Heap can encounter OOM problems, and make the checkpoint > much slower, and store times in RocksDB can get ride of both. > > Best, > Congxian > > > Biao Liu 于2020年1月17日周五 下午3:10写道: > >> +1 >> >> I think that's how it should be. Timer should align with other regular >> state. >> >> If user wants a better performance without memory concern, memory or FS >> statebackend might be considered. Or maybe we could optimize the >> performance by introducing a specific column family for timer. It could >> have its own tuned options. >> >> Thanks, >> Biao /'bɪ.aʊ/ >> >> >> >> On Fri, 17 Jan 2020 at 10:11, Jingsong Li wrote: >> >>> Hi Stephan, >>> >>> Thanks for starting this discussion. >>> +1 for stores times in RocksDB by default. >>> In the past, when Flink didn't save the times with RocksDb, I had a >>> headache. I always adjusted parameters carefully to ensure that there was >>> no risk of Out of Memory. >>> >>> Just curious, how much impact of heap and RocksDb for times on >>> performance >>> - if there is no order of magnitude difference between heap and RocksDb, >>> there is no problem in using RocksDb. >>> - if there is, maybe we should improve our documentation to let users >>> know about this option. (Looks like a lot of users didn't know) >>> >>> Best, >>> Jingsong Lee >>> >>> On Fri, Jan 17, 2020 at 3:18 AM Yun Tang wrote: >>> >>>> Hi Stephan, >>>> >>>> I am +1 for the change which stores timers in RocksDB by default. >>>> >>>> Some users hope the checkpoint could be completed as fast as possible, >>>> which also need the timer stored in RocksDB to not affect the sync part of >>>> checkpoint. >>>> >>>> Best >>>> Yun Tang >>>> -- >>>> *From:* Andrey Zagrebin >>>> *Sent:* Friday, January 17, 2020 0:07 >>>> *To:* Stephan Ewen >>>> *Cc:* dev ; user >>>> *Subject:* Re: [DISCUSS] Change default for RocksDB timers: Java Heap >>>> => in RocksDB >>>> >>>> Hi Stephan, >>>> >>>> Thanks for starting this discussion. I am +1 for this change. >>>> In general, number of timer state keys can have the same order as >>>> number of main state keys. >>>> So if RocksDB is used for main state for scalability, it makes sense to >>>> have timers there as well >>>> unless timers are used for only very limited subset of keys which fits >>>> into memory. >>>> >>>> Best, >>>> Andrey >>>> >>>> On Thu, Jan 16, 2020 at 4:27 PM Stephan Ewen wrote: >>>> >>>> Hi all! >>>> >>>> I would suggest a change of the current default for timers. A bit of >>>> background: >>>> >>>> - Timers (for windows, process functions, etc.) are state that is >>>> managed and checkpointed as well. >>>> - When using the MemoryStateBackend and the FsStateBackend, timers >>>> are kept on the JVM heap, like regular state. >>>> - When using the RocksDBStateBackend, timers can be kept in RocksDB >>>> (like other state) or on the JVM heap. The JVM heap is the default though! >>>> >>>> I find this a bit un-intuitive and would propose to change this to let >>>> the RocksDBStateBackend store all state in RocksDB by default. >>>> The rationale being that if there is a tradeoff (like here), safe and >>>> scalable should be the default and unsafe performance be an explicit >>>> choice. >>>> >>>> This sentiment seems to be shared by various users as well, see >>>> https://twitter.com/StephanEwen/status/1214590846168903680 and >>>> https://twitter.com/StephanEwen/status/1214594273565388801 >>>> We would of course keep the switch and mention in the performance >>>> tuning section that this is an option. >>>> >>>> # RocksDB State Backend Timers on Heap >>>> - Pro: faster >>>> - Con: not memory safe, GC overhead, longer synchronous checkpoint >>>> time, no incremental checkpoints >>>> >>>> # RocksDB State Backend Timers on in RocksDB >>>> - Pro: safe and scalable, asynchronously and >>>> incrementally checkpointed >>>> - Con: performance overhead. >>>> >>>> Please chime in and let me know what you think. >>>> >>>> Best, >>>> Stephan >>>> >>>> >>> >>> -- >>> Best, Jingsong Lee >>> >> >
Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?
Thanks for the clarification. I'll try to find some time to write a reproducible test case and submit a ticket. While it may not be able to delete the non-referenced ones, I'm surprised it's exponentially replicating them, and so it's probably worth documenting in a ticket. On Wed, Nov 27, 2019 at 12:15 PM Gyula Fóra wrote: > > You are right Aaron. > > I would say this is like this by design as Flink doesn't require you to > initialize state in the open method so it has no safe way to delete the > non-referenced ones. > > What you can do is restore the state and clear it on all operators and not > reference it again. I know this feels like a workaround but I have no better > idea at the moment. > > Cheers, > Gyula > > On Wed, Nov 27, 2019 at 6:08 PM Aaron Levin wrote: >> >> Hi, >> >> Yes, we're using UNION state. I would assume, though, that if you are >> not reading the UNION state it would either stop stick around as a >> constant factor in your state size, or get cleared. >> >> Looks like I should try to recreate a small example and submit a bug >> if this is true. Otherwise it's impossible to remove union state from >> your operators. >> >> On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu wrote: >> > >> > Hi >> > >> > Do you use UNION state in your scenario, when using UNION state, then JM >> > may encounter OOM because each TDD will contains all the state of all >> > subtasks[1] >> > >> > [1] >> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state >> > Best, >> > Congxian >> > >> > >> > Aaron Levin 于2019年11月27日周三 上午3:55写道: >> >> >> >> Hi, >> >> >> >> Some context: after a refactoring, we were unable to start our jobs. >> >> They started fine and checkpointed fine, but once the job restarted >> >> owing to a transient failure, the application was unable to start. The >> >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The >> >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside >> >> the `_metadata` file we saw `- 1402496 offsets: >> >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened >> >> to be the operator state we were no longer initializing or >> >> snapshotting after the refactoring. >> >> >> >> Before I dig further into this and try to find a smaller reproducible >> >> test case I thought I would ask if someone knows what the expected >> >> behaviour is for the following scenario: >> >> >> >> suppose you have an operator (in this case a Source) which has some >> >> operator ListState. Suppose you run your flink job for some time and >> >> then later refactor your job such that you no longer use that state >> >> (so after the refactoring you're no longer initializing this operator >> >> state in initializeState, nor are you snapshotting the operator state >> >> in snapshotState). If you launch your new code from a recent >> >> savepoint, what do we expect to happen to the state? Do we anticipate >> >> the behaviour I explained above? >> >> >> >> My assumption would be that Flink would not read this state and so it >> >> would be removed from the next checkpoint or savepoint. Alternatively, >> >> I might assume it would not be read but would linger around every >> >> future checkpoint or savepoint. However, it feels like what is >> >> happening is it's not read and then possibly replicated by every >> >> instance of the task every time a checkpoint happens (hence the >> >> accidentally exponential behaviour). >> >> >> >> Thoughts? >> >> >> >> PS - in case someone asks: I was sure that we were calling `.clear()` >> >> appropriately in `snapshotState` (we, uh, already learned that lesson >> >> :D) >> >> >> >> Best, >> >> >> >> Aaron Levin
Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?
Hi, Yes, we're using UNION state. I would assume, though, that if you are not reading the UNION state it would either stop stick around as a constant factor in your state size, or get cleared. Looks like I should try to recreate a small example and submit a bug if this is true. Otherwise it's impossible to remove union state from your operators. On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu wrote: > > Hi > > Do you use UNION state in your scenario, when using UNION state, then JM may > encounter OOM because each TDD will contains all the state of all subtasks[1] > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state > Best, > Congxian > > > Aaron Levin 于2019年11月27日周三 上午3:55写道: >> >> Hi, >> >> Some context: after a refactoring, we were unable to start our jobs. >> They started fine and checkpointed fine, but once the job restarted >> owing to a transient failure, the application was unable to start. The >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside >> the `_metadata` file we saw `- 1402496 offsets: >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened >> to be the operator state we were no longer initializing or >> snapshotting after the refactoring. >> >> Before I dig further into this and try to find a smaller reproducible >> test case I thought I would ask if someone knows what the expected >> behaviour is for the following scenario: >> >> suppose you have an operator (in this case a Source) which has some >> operator ListState. Suppose you run your flink job for some time and >> then later refactor your job such that you no longer use that state >> (so after the refactoring you're no longer initializing this operator >> state in initializeState, nor are you snapshotting the operator state >> in snapshotState). If you launch your new code from a recent >> savepoint, what do we expect to happen to the state? Do we anticipate >> the behaviour I explained above? >> >> My assumption would be that Flink would not read this state and so it >> would be removed from the next checkpoint or savepoint. Alternatively, >> I might assume it would not be read but would linger around every >> future checkpoint or savepoint. However, it feels like what is >> happening is it's not read and then possibly replicated by every >> instance of the task every time a checkpoint happens (hence the >> accidentally exponential behaviour). >> >> Thoughts? >> >> PS - in case someone asks: I was sure that we were calling `.clear()` >> appropriately in `snapshotState` (we, uh, already learned that lesson >> :D) >> >> Best, >> >> Aaron Levin
What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?
Hi, Some context: after a refactoring, we were unable to start our jobs. They started fine and checkpointed fine, but once the job restarted owing to a transient failure, the application was unable to start. The Job Manager was OOM'ing (even when I gave them 256GB of ram!). The `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside the `_metadata` file we saw `- 1402496 offsets: com.stripe.flink.backfill.kafka-archive-file-progress`. This happened to be the operator state we were no longer initializing or snapshotting after the refactoring. Before I dig further into this and try to find a smaller reproducible test case I thought I would ask if someone knows what the expected behaviour is for the following scenario: suppose you have an operator (in this case a Source) which has some operator ListState. Suppose you run your flink job for some time and then later refactor your job such that you no longer use that state (so after the refactoring you're no longer initializing this operator state in initializeState, nor are you snapshotting the operator state in snapshotState). If you launch your new code from a recent savepoint, what do we expect to happen to the state? Do we anticipate the behaviour I explained above? My assumption would be that Flink would not read this state and so it would be removed from the next checkpoint or savepoint. Alternatively, I might assume it would not be read but would linger around every future checkpoint or savepoint. However, it feels like what is happening is it's not read and then possibly replicated by every instance of the task every time a checkpoint happens (hence the accidentally exponential behaviour). Thoughts? PS - in case someone asks: I was sure that we were calling `.clear()` appropriately in `snapshotState` (we, uh, already learned that lesson :D) Best, Aaron Levin
Re: Property based testing
Hey, I've used ScalaCheck to test flink applications. Basic idea is: * use ScalaCheck to generate some kind of collection * use `fromCollection` in `StreamExecutionEnvironment` to create a `DataStream` * use `DataStreamUtils.collect` as a sink * plug my flink logic between the collection source and the collection sink * make a ScalaCheck property assertion based on the input collection and output collection. Possible to wrap all that in a single method in Scala. LMK if you have any more questions or any of this was not clear! (note: not sure how to do this in Java). Best, Aaron Levin On Wed, Sep 18, 2019 at 8:36 AM Indraneel R wrote: > Hi All, > > Is there any property based testing framework for flink like > 'SparkTestingBase' for spark? > > Would also be useful to know what are some of the standard testing > practices for data testing for flink pipelines. > > regards > -Indraneel >
Re: Update Checkpoint and/or Savepoint Timeout of Running Job without restart?
Thanks for the answer, Congxian! On Sun, Aug 18, 2019 at 10:43 PM Congxian Qiu wrote: > Hi > > Currently, we can't change a running job's checkpoint timeout, but there > is an issue[1] which wants to set a separate timeout for savepoint. > > [1] https://issues.apache.org/jira/browse/FLINK-9465 > Best, > Congxian > > > Aaron Levin 于2019年8月17日周六 上午12:37写道: > >> Hello, >> >> Question: Is it possible to update the checkpoint and/or savepoint >> timeout of a running job without restarting it? If not, is this something >> that would be a welcomed contribution (not sure how easy this would be)? >> >> Context: sometimes we have jobs who are making progress but get into a >> state where checkpoints are timing out, though we believe they would be >> successful if we could increase the checkpoint timeout. Unfortunately we >> currently need to restart the job to change this, and we would like to >> avoid this if possible. Ideally we could make this change temporarily, >> allow a checkpoint or savepoint to succeed, and then change the settings >> back. >> >> Best, >> >> Aaron Levin >> >
Update Checkpoint and/or Savepoint Timeout of Running Job without restart?
Hello, Question: Is it possible to update the checkpoint and/or savepoint timeout of a running job without restarting it? If not, is this something that would be a welcomed contribution (not sure how easy this would be)? Context: sometimes we have jobs who are making progress but get into a state where checkpoints are timing out, though we believe they would be successful if we could increase the checkpoint timeout. Unfortunately we currently need to restart the job to change this, and we would like to avoid this if possible. Ideally we could make this change temporarily, allow a checkpoint or savepoint to succeed, and then change the settings back. Best, Aaron Levin
Re: Graceful Task Manager Termination and Replacement
I was on vacation but wanted to thank Biao for summarizing the current state! Thanks! On Mon, Jul 15, 2019 at 2:00 AM Biao Liu wrote: > Hi Aaron, > > From my understanding, you want shutting down a Task Manager without > restart the job which has tasks running on this Task Manager? > > Based on current implementation, if there is a Task Manager is down, the > tasks on it would be treated as failed. The behavior of task failure is > defined via `FailoverStrategy` which is `RestartAllStrategy` by default. > That's the reason why the whole job restarts when a Task Manager has gone. > As Paul said, you could try "region restart failover strategy" when 1.9 is > released. It might be helpful however it depends on your job topology. > > The deeper reason of this issue is the consistency semantics of Flink, > AT_LEAST_ONCE or EXACTLY_ONCE. Flink must respect these semantics. So there > is no much choice of `FailoverStrategy`. > It might be improved in the future. There are some discussions in the > mailing list that providing some weaker consistency semantics to improve > the `FailoverStrategy`. We are pushing forward this improvement. I hope it > can be included in 1.10. > > Regarding your question, I guess the answer is no for now. A more frequent > checkpoint or a savepoint manually triggered might be helpful by a quicker > recovery. > > > Paul Lam 于2019年7月12日周五 上午10:25写道: > >> Hi, >> >> Maybe region restart strategy can help. It restarts minimum required >> tasks. Note that it’s recommended to use only after 1.9 release, see [1], >> unless you’re running a stateless job. >> >> [1] https://issues.apache.org/jira/browse/FLINK-10712 >> >> Best, >> Paul Lam >> >> 在 2019年7月12日,03:38,Aaron Levin 写道: >> >> Hello, >> >> Is there a way to gracefully terminate a Task Manager beyond just killing >> it (this seems to be what `./taskmanager.sh stop` does)? Specifically I'm >> interested in a way to replace a Task Manager that has currently-running >> tasks. It would be great if it was possible to terminate a Task Manager >> without restarting the job, though I'm not sure if this is possible. >> >> Context: at my work we regularly cycle our hosts for maintenance and >> security. Each time we do this we stop the task manager running on the host >> being cycled. This causes the entire job to restart, resulting in downtime >> for the job. I'd love to decrease this downtime if at all possible. >> >> Thanks! Any insight is appreciated! >> >> Best, >> >> Aaron Levin >> >> >>
Graceful Task Manager Termination and Replacement
Hello, Is there a way to gracefully terminate a Task Manager beyond just killing it (this seems to be what `./taskmanager.sh stop` does)? Specifically I'm interested in a way to replace a Task Manager that has currently-running tasks. It would be great if it was possible to terminate a Task Manager without restarting the job, though I'm not sure if this is possible. Context: at my work we regularly cycle our hosts for maintenance and security. Each time we do this we stop the task manager running on the host being cycled. This causes the entire job to restart, resulting in downtime for the job. I'd love to decrease this downtime if at all possible. Thanks! Any insight is appreciated! Best, Aaron Levin
Re: `env.java.opts` not persisting after job canceled or failed and then restarted
Hi Ufuk, I'll answer your question, but first I'll give you an update on how we resolved the issue: * adding `org.apache.hadoop.io.compress.SnappyCodec` to `classloader.parent-first-patterns.additional` in `flink-conf.yaml` (though, putting `org.apache.hadoop.util.NativeCodeLoader` also worked) * putting a jar with `hadoop-common` + it's transitive dependencies, then using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and its transitive dependencies). So we end up with jar that has `SnappyCodec` and whatever it needs to call transitively. We put this jar on the task manager classpath. I believe `SnappyCodec` was being called via our code. This worked the first time but deploying a second time caused `libhadoop.so` to be loaded in a second class loader. By putting a jar with `SnappyCodec` and it's transitive dependencies on the task manager classpath and specifying that `SnappyCodec` needs to be loaded from the parent classloader, we ensure that only one classloader loads `libhadoop.so`. I don't think this is the best way to achieve what we want, but it works for now. Next steps: if no one is on it, I can take a stab at updating the documentation to clarify how to debug and resolve Native library loading. This was a nice learning experience and I think it'll be helpful to have this in the docs for those who aren't well-versed in how classloading on the JVM works! To answer your questions: 1. We install hadoop on our machines and tell flink task managers to access it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in `flink-conf.yaml` 2. We put flink's shaded hadoop-fs-s3 on both the task manager and job manager classpath (I believe this is only used by the Job Managers when they interact with S3 for checkpoints etc. I don't believe any user code is using this). 3. Our flink applications consist of a "fat jar" that has some `org.apache.hadoop` dependencies bundled with it. I believe this is the source of why we're loading `SnappyCodec` twice and triggering this issue. 4. For example code: we have a small wrapper around `org.apache.flink.api.common.io.FileInputFormat` which does the work with sequence files. It looks like (after removing some stuff to make it more clear): ``` abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <: Writable]( typeInformation: TypeInformation[T] ) extends FileInputFormat[T] with ResultTypeQueryable[T] { @transient private var bufferedNextRecord: T = _ @transient private var hadoopStream: HadoopFSDataInputStream = _ @transient private var sequenceFileReader: SequenceFile.Reader = _ unsplittable = true enumerateNestedFiles = true // * // This is where we'd see exceptions. // * override def open(fileSplit: FileInputSplit): Unit = { super.open(fileSplit) val config = new Configuration() hadoopStream = WrappedHadoopInputStream.wrap(stream) sequenceFileReader = new SequenceFile.Reader(config, SequenceFile.Reader.stream(hadoopStream)) bufferNextRecord() } ... } // AND class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream) extends InputStream with Seekable with PositionedReadable { def read(): Int = underlying.read() def seek(pos: Long): Unit = underlying.seek(pos) def getPos: Long = underlying.getPos } ... ``` Thanks for all your help, I appreciate it! I wouldn't have been able to debug and resolve this if it wasn't for you filing the ticket. Thank you so much! [0] https://github.com/pantsbuild/jarjar Aaron Levin On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi wrote: > Hey Aaron, > > sorry for the late reply (again). > > (1) I think that your final result is in line with what I have > reproduced in https://issues.apache.org/jira/browse/FLINK-11402. > > (2) I think renaming the file would not help as it will still be > loaded multiple times when the jobs restarts (as it happens in > FLINK-11402). > > (3) I'll try to check whether Flink's shading of Hadoop is related to > this. I don't think so though. @Chesnay (cc'd): What do you think? > > (4) @Aaron: Can you tell me which Hadoop libraries you use and share > some code so I can try to reproduce this exactly on my side? Judging > from the earlier stack traces you have shared, I'm assuming you are > trying to read Snappy-compressed sequence files. > > – Ufuk > > On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin wrote: > > > > I don't control the code calling `System.loadLibrary("hadoop")` so > that's not an option for me, unfortunately. > > > > On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma wrote: > >> > >> This may be caused by a jvm process can only load a so once.So a triky > way is to rename it。 > >> > >> 发自我的 iPhone > >> > >> 在 2019年1月25日,上午7:12
Re: `env.java.opts` not persisting after job canceled or failed and then restarted
Hi Ufuk, I'm starting to believe the bug is much deeper than the originally reported error because putting the libraries in `/usr/lib` or `/lib` does not work. This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't work, despite that being in the `java.library.path` at the call site of the error. I wrote a small program to test the loading of native libraries, and it was able to successfully load `libhadoop.so`. I'm very perplexed. Could this be related to the way flink shades hadoop stuff? Here is my program and its output: ``` $ cat LibTest.scala package com.redacted.flink object LibTest { def main(args: Array[String]): Unit = { val library = args(0) System.out.println(s"java.library.path=${System.getProperty("java.library.path")}") System.out.println(s"Attempting to load $library") System.out.flush() System.loadLibrary(library) System.out.println(s"Successfully loaded ") System.out.flush() } ``` I then tried running that on one of the task managers with `hadoop` as an argument: ``` $ java -jar lib_test_deploy.jar hadoop java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib Attempting to load hadoop Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867) at java.lang.Runtime.loadLibrary0(Runtime.java:870) at java.lang.System.loadLibrary(System.java:1122) at com.stripe.flink.LibTest$.main(LibTest.scala:11) at com.stripe.flink.LibTest.main(LibTest.scala) ``` I then copied the native libraries into `/usr/lib/` and ran it again: ``` $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/ $ java -jar lib_test_deploy.jar hadoop java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib Attempting to load hadoop Successfully loaded ``` Any ideas? On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin wrote: > Hi Ufuk, > > One more update: I tried copying all the hadoop native `.so` files (mainly > `libhadoop.so`) into `/lib` and am I still experiencing the issue I > reported. I also tried naively adding the `.so` files to the jar with the > flink application and am still experiencing the issue I reported (however, > I'm going to investigate this further as I might not have done it > correctly). > > Best, > > Aaron Levin > > On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin wrote: > >> Hi Ufuk, >> >> Two updates: >> >> 1. As suggested in the ticket, I naively copied the every `.so` in >> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My >> knowledge of how shared libs get picked up is hazy, so I'm not sure if >> blindly copying them like that should work. I did check what >> `System.getProperty("java.library.path")` returns at the call-site and >> it's: >> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib >> 2. The exception I see comes from >> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). >> This uses `System.loadLibrary("hadoop")`. >> >> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: >> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z >> [2019-01-23 19:52:33.081376] at >> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method) >> [2019-01-23 19:52:33.081406] at >> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63) >> [2019-01-23 19:52:33.081429] at >> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195) >> [2019-01-23 19:52:33.081457] at >> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181) >> [2019-01-23 19:52:33.081494] at >> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037) >> [2019-01-23 19:52:33.081517] at >> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923) >> [2019-01-23 19:52:33.081549] at >> org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872) >> ... (redacted) ... >> [2019-01-23 19:52:33.081728] at >> scala.collection.immutable.List.foreach(List.scala:392) >> ... (redacted) ... >> [2019-01-23 19:52:33.081832] at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) >> [2019-01-23 19:52:33.081854] at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> [2019-01-23 19:52:33.081882] at >> org.apache.flink.s
Re: `env.java.opts` not persisting after job canceled or failed and then restarted
Hi Ufuk, One more update: I tried copying all the hadoop native `.so` files (mainly `libhadoop.so`) into `/lib` and am I still experiencing the issue I reported. I also tried naively adding the `.so` files to the jar with the flink application and am still experiencing the issue I reported (however, I'm going to investigate this further as I might not have done it correctly). Best, Aaron Levin On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin wrote: > Hi Ufuk, > > Two updates: > > 1. As suggested in the ticket, I naively copied the every `.so` in > `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My > knowledge of how shared libs get picked up is hazy, so I'm not sure if > blindly copying them like that should work. I did check what > `System.getProperty("java.library.path")` returns at the call-site and > it's: > java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib > 2. The exception I see comes from > `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). > This uses `System.loadLibrary("hadoop")`. > > [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: > org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z > [2019-01-23 19:52:33.081376] at > org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method) > [2019-01-23 19:52:33.081406] at > org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63) > [2019-01-23 19:52:33.081429] at > org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195) > [2019-01-23 19:52:33.081457] at > org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181) > [2019-01-23 19:52:33.081494] at > org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037) > [2019-01-23 19:52:33.081517] at > org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923) > [2019-01-23 19:52:33.081549] at > org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872) > ... (redacted) ... > [2019-01-23 19:52:33.081728] at > scala.collection.immutable.List.foreach(List.scala:392) > ... (redacted) ... > [2019-01-23 19:52:33.081832] at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) > [2019-01-23 19:52:33.081854] at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > [2019-01-23 19:52:33.081882] at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > [2019-01-23 19:52:33.081904] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > [2019-01-23 19:52:33.081946] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > [2019-01-23 19:52:33.081967] at java.lang.Thread.run(Thread.java:748) > > On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin wrote: > >> Hey Ufuk, >> >> So, I looked into this a little bit: >> >> 1. clarification: my issues are with the hadoop-related snappy libraries >> and not libsnappy itself (this is my bad for not being clearer, sorry!). I >> already have `libsnappy` on my classpath, but I am looking into including >> the hadoop snappy libraries. >> 2. exception: I don't see the class loading error. I'm going to try to >> put some more instrumentation and see if I can get a clearer stacktrace >> (right now I get an NPE on closing a sequence file in a finalizer - when I >> last logged the exception it was something deep in hadoop's snappy libs - >> I'll get clarification soon). >> 3. I'm looking into including hadoop's snappy libs in my jar and we'll >> see if that resolves the problem. >> >> Thanks again for your help! >> >> Best, >> >> Aaron Levin >> >> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin >> wrote: >> >>> Hey, >>> >>> Thanks so much for the help! This is awesome. I'll start looking into >>> all of this right away and report back. >>> >>> Best, >>> >>> Aaron Levin >>> >>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi wrote: >>> >>>> Hey Aaron, >>>> >>>> sorry for the late reply. >>>> >>>> (1) I think I was able to reproduce this issue using snappy-java. I've >>>> filed a ticket here: >>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the >>>> ticket description whether it's in line with what you are >>>> experiencing? Most importantly, do you see the same Exception being >>>> reported after cancelling a
Re: `env.java.opts` not persisting after job canceled or failed and then restarted
Hi Ufuk, Two updates: 1. As suggested in the ticket, I naively copied the every `.so` in `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My knowledge of how shared libs get picked up is hazy, so I'm not sure if blindly copying them like that should work. I did check what `System.getProperty("java.library.path")` returns at the call-site and it's: java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib 2. The exception I see comes from `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below). This uses `System.loadLibrary("hadoop")`. [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z [2019-01-23 19:52:33.081376] at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method) [2019-01-23 19:52:33.081406] at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63) [2019-01-23 19:52:33.081429] at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195) [2019-01-23 19:52:33.081457] at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181) [2019-01-23 19:52:33.081494] at org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037) [2019-01-23 19:52:33.081517] at org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923) [2019-01-23 19:52:33.081549] at org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872) ... (redacted) ... [2019-01-23 19:52:33.081728] at scala.collection.immutable.List.foreach(List.scala:392) ... (redacted) ... [2019-01-23 19:52:33.081832] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) [2019-01-23 19:52:33.081854] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) [2019-01-23 19:52:33.081882] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) [2019-01-23 19:52:33.081904] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) [2019-01-23 19:52:33.081946] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) [2019-01-23 19:52:33.081967] at java.lang.Thread.run(Thread.java:748) On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin wrote: > Hey Ufuk, > > So, I looked into this a little bit: > > 1. clarification: my issues are with the hadoop-related snappy libraries > and not libsnappy itself (this is my bad for not being clearer, sorry!). I > already have `libsnappy` on my classpath, but I am looking into including > the hadoop snappy libraries. > 2. exception: I don't see the class loading error. I'm going to try to put > some more instrumentation and see if I can get a clearer stacktrace (right > now I get an NPE on closing a sequence file in a finalizer - when I last > logged the exception it was something deep in hadoop's snappy libs - I'll > get clarification soon). > 3. I'm looking into including hadoop's snappy libs in my jar and we'll see > if that resolves the problem. > > Thanks again for your help! > > Best, > > Aaron Levin > > On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin > wrote: > >> Hey, >> >> Thanks so much for the help! This is awesome. I'll start looking into all >> of this right away and report back. >> >> Best, >> >> Aaron Levin >> >> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi wrote: >> >>> Hey Aaron, >>> >>> sorry for the late reply. >>> >>> (1) I think I was able to reproduce this issue using snappy-java. I've >>> filed a ticket here: >>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the >>> ticket description whether it's in line with what you are >>> experiencing? Most importantly, do you see the same Exception being >>> reported after cancelling and re-starting the job? >>> >>> (2) I don't think it's caused by the environment options not being >>> picked up. You can check the head of the log files of the JobManager >>> or TaskManager to verify that your provided option is picked up as >>> expected. You should see something similar to this: >>> >>> 2019-01-21 22:53:49,863 INFO >>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >>> >>> >>> 2019-01-21 22:53:49,864 INFO >>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0, >>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC) >>> ... >>> 2019-01-21 22:53:4
Re: `env.java.opts` not persisting after job canceled or failed and then restarted
Hey Ufuk, So, I looked into this a little bit: 1. clarification: my issues are with the hadoop-related snappy libraries and not libsnappy itself (this is my bad for not being clearer, sorry!). I already have `libsnappy` on my classpath, but I am looking into including the hadoop snappy libraries. 2. exception: I don't see the class loading error. I'm going to try to put some more instrumentation and see if I can get a clearer stacktrace (right now I get an NPE on closing a sequence file in a finalizer - when I last logged the exception it was something deep in hadoop's snappy libs - I'll get clarification soon). 3. I'm looking into including hadoop's snappy libs in my jar and we'll see if that resolves the problem. Thanks again for your help! Best, Aaron Levin On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin wrote: > Hey, > > Thanks so much for the help! This is awesome. I'll start looking into all > of this right away and report back. > > Best, > > Aaron Levin > > On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi wrote: > >> Hey Aaron, >> >> sorry for the late reply. >> >> (1) I think I was able to reproduce this issue using snappy-java. I've >> filed a ticket here: >> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the >> ticket description whether it's in line with what you are >> experiencing? Most importantly, do you see the same Exception being >> reported after cancelling and re-starting the job? >> >> (2) I don't think it's caused by the environment options not being >> picked up. You can check the head of the log files of the JobManager >> or TaskManager to verify that your provided option is picked up as >> expected. You should see something similar to this: >> >> 2019-01-21 22:53:49,863 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >> >> 2019-01-21 22:53:49,864 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0, >> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC) >> ... >> 2019-01-21 22:53:49,865 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM >> Options: >> 2019-01-21 22:53:49,865 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> -Xms1024m >> 2019-01-21 22:53:49,865 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> -Xmx1024m >> You are looking for this line > 2019-01-21 22:53:49,865 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ < >> 2019-01-21 22:53:49,865 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log >> ... >> 2019-01-21 22:53:49,866 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> Program Arguments: >> 2019-01-21 22:53:49,866 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> --configDir >> 2019-01-21 22:53:49,866 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> /.../flink-1.7.0/conf >> 2019-01-21 22:53:49,866 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> --executionMode >> 2019-01-21 22:53:49,866 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> cluster >> ... >> 2019-01-21 22:53:49,866 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >> ---- >> >> Can you verify that you see the log messages as expected? >> >> (3) As noted FLINK-11402, is it possible to package the snappy library >> as part of your user code instead of loading the library via >> java.library.path? In my example, that seems to work fine. >> >> – Ufuk >> >> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin >> wrote: >> > >> > Hello! >> > >> > *tl;dr*: settings in `env.java.opts` seem to stop having impact when a >> job is canceled or fails and then is restarted (with or without >> savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` >> seem to start having impact again and our job will run without failure. >> More below. >> > >> > We use consume Snappy-compressed sequence files in our flink job. This >> requires access to the hadoop native libraries. In our `
Re: `env.java.opts` not persisting after job canceled or failed and then restarted
Hey, Thanks so much for the help! This is awesome. I'll start looking into all of this right away and report back. Best, Aaron Levin On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi wrote: > Hey Aaron, > > sorry for the late reply. > > (1) I think I was able to reproduce this issue using snappy-java. I've > filed a ticket here: > https://issues.apache.org/jira/browse/FLINK-11402. Can you check the > ticket description whether it's in line with what you are > experiencing? Most importantly, do you see the same Exception being > reported after cancelling and re-starting the job? > > (2) I don't think it's caused by the environment options not being > picked up. You can check the head of the log files of the JobManager > or TaskManager to verify that your provided option is picked up as > expected. You should see something similar to this: > > 2019-01-21 22:53:49,863 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > > > 2019-01-21 22:53:49,864 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0, > Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC) > ... > 2019-01-21 22:53:49,865 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM > Options: > 2019-01-21 22:53:49,865 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > -Xms1024m > 2019-01-21 22:53:49,865 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > -Xmx1024m > You are looking for this line > 2019-01-21 22:53:49,865 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ < > 2019-01-21 22:53:49,865 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log > ... > 2019-01-21 22:53:49,866 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > Program Arguments: > 2019-01-21 22:53:49,866 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > --configDir > 2019-01-21 22:53:49,866 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > /.../flink-1.7.0/conf > 2019-01-21 22:53:49,866 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > --executionMode > 2019-01-21 22:53:49,866 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > cluster > ... > 2019-01-21 22:53:49,866 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - > > > > Can you verify that you see the log messages as expected? > > (3) As noted FLINK-11402, is it possible to package the snappy library > as part of your user code instead of loading the library via > java.library.path? In my example, that seems to work fine. > > – Ufuk > > On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin wrote: > > > > Hello! > > > > *tl;dr*: settings in `env.java.opts` seem to stop having impact when a > job is canceled or fails and then is restarted (with or without > savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` > seem to start having impact again and our job will run without failure. > More below. > > > > We use consume Snappy-compressed sequence files in our flink job. This > requires access to the hadoop native libraries. In our `flink-conf.yaml` > for both the task manager and the job manager, we put: > > > > ``` > > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native > > ``` > > > > If I launch our job on freshly-restarted task managers, the job operates > fine. If at some point I cancel the job or if the job restarts for some > other reason, the job will begin to crashloop because it tries to open a > Snappy-compressed file but doesn't have access to the codec from the native > hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the > task manager while the job is crashlooping, the job is start running > without any codec failures. > > > > The only reason I can conjure that would cause the Snappy compression to > fail is if the `env.java.opts` were not being passed through to the job on > restart for some reason. > > > > Does anyone know what's going on? Am I missing some additional > configuration? I really appreciate any help! > > > > About our setup: > > > > - Flink Version: 1.7.0 > > - Deployment: Standalone in HA > > - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s > shaded jars to access our files in S3. We do not use the > `bundled-with-hadoop` distribution of Flink. > > > > Best, > > > > Aaron Levin >
`env.java.opts` not persisting after job canceled or failed and then restarted
Hello! *tl;dr*: settings in `env.java.opts` seem to stop having impact when a job is canceled or fails and then is restarted (with or without savepoint/checkpoints). If I restart the task-managers, the `env.java.opts` seem to start having impact again and our job will run without failure. More below. We use consume Snappy-compressed sequence files in our flink job. This requires access to the hadoop native libraries. In our `flink-conf.yaml` for both the task manager and the job manager, we put: ``` env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native ``` If I launch our job on freshly-restarted task managers, the job operates fine. If at some point I cancel the job or if the job restarts for some other reason, the job will begin to crashloop because it tries to open a Snappy-compressed file but doesn't have access to the codec from the native hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the task manager while the job is crashlooping, the job is start running without any codec failures. The only reason I can conjure that would cause the Snappy compression to fail is if the `env.java.opts` were not being passed through to the job on restart for some reason. Does anyone know what's going on? Am I missing some additional configuration? I really appreciate any help! About our setup: - Flink Version: 1.7.0 - Deployment: Standalone in HA - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s shaded jars to access our files in S3. We do not use the `bundled-with-hadoop` distribution of Flink. Best, Aaron Levin
Re: [Flink 1.7.0] always got 10s ask timeout exception when submitting job with checkpoint via REST
We are also experiencing this! Thanks for speaking up! It's relieving to know we're not alone :) We tried adding `akka.ask.timeout: 1 min` to our `flink-conf.yaml`, which did not seem to have any effect. I tried adding every other related akka, rpc, etc. timeout and still continue to encounter these errors. I believe they may also impact our ability to deploy (as we get a timeout when submitting the job programmatically). I'd love to see a solution to this if one exists! Best, Aaron Levin On Thu, Jan 10, 2019 at 2:58 PM Steven Wu wrote: > We are trying out Flink 1.7.0. We always get this exception when > submitting a job with external checkpoint via REST. Job parallelism is > 1,600. state size is probably in the range of 1-5 TBs. Job is actually > started. Just REST api returns this failure. > > If we submitting the job without external checkpoint, everything works > fine. > > Anyone else see such problem with 1.7? Appreciate your help! > > Thanks, > Steven > > org.apache.flink.runtime.rest.handler.RestHandlerException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) > at akka.dispatch.OnComplete.internal(Future.scala:258) > at akka.dispatch.OnComplete.internal(Future.scala:256) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.CompletionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at > java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) > at > java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) > at > java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) > at > java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) > ... 21 more > Caused by: akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > ... 9 more >
Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)
Hi Aljoscha, Thanks! I will look into this. Best, Aaron Levin On Fri, Nov 9, 2018 at 5:01 AM, Aljoscha Krettek wrote: > Hi, > > I think for this case a model that is similar to how the Streaming File > Source works should be good. You can have a look at > ContinuousFileMonitoringFunction and ContinuousFileReaderOperator. The > idea is that the first emits splits that should be processed and the second > is responsible for reading those splits. A generic version of that is what > I'm proposing for the refactoring of our source interface [1] that also > comes with a prototype implementation [2]. > > I think something like this should be adaptable to your case. The split > enumerator would at first only emit file splits downstream, after that it > would emit Kafka partitions that should be read. The split reader would > understand both file splits and kafka partitions and can read from both. > This still has some kinks to be worked out when it comes to watermarks, > FLIP-27 is not finished. > > What do you think? > > Best, > Aljoscha > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP- > 27%3A+Refactor+Source+Interface > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface> > [2] https://github.com/aljoscha/flink/commits/refactor-source-interface > > > On 1. Nov 2018, at 16:50, Aaron Levin wrote: > > Hey, > > Thanks for reaching out! I'd love to take a step back and find a better > solution, so I'll try to be succint in what I'm trying to accomplish: > > We're trying to write a SourceFunction which: > * reads some Sequence files from S3 in a particular order (each task gets > files in a specific order). > * sends a watermark between each sequence file > * when that's complete, starts reading from Kafka topics. > * (This is similar to the bootstrap problem which Lyft has talked about > (see: https://www.slideshare.net/FlinkForward/flink-forward- > san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink)) > > The current solution I have involves a custom InputFormat, InputSplit, and > SplitAssignor. It achieves most of these requirements, except I have to > extend InputFormatSourceFunction. I have a class that looks like: > > class MySourceFunction(val s3Archives: CustomInputFormat, val kafka: > KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...} > > There are lots I don't like about the existing solution: > * I have to extend InputFormatSourceFunction to ensure the graph is > initialized properly (the bug I wrote about) > * I had to replicate most of the implementation of > InputFormatSourceFunction so I could insert Watermarks between splits. > > I'd love any suggestions around improving this! > > Best, > > Aaron Levin > > On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek > wrote: > >> Hi Aaron, >> >> I'l like to take a step back and understand why you're trying to wrap an >> InputFormatSourceFunction? >> >> In my opinion, InputFormatSourceFunction should not be used because it >> has some shortcomings, the most prominent among them that it does not >> support checkpointing, i.e. in case of failure all data will (probably) be >> read again. I'm saying probably because the interaction of >> InputFormatSourceFunction with how InputSplits are generated (which relates >> to that code snippet with the cast you found) could be somewhat "spooky" >> and lead to weird results in some cases. >> >> The interface is a remnant of a very early version of the streaming API >> and should probably be removed soon. I hope we can find a better solution >> for your problem that fits better with Flink. >> >> Best, >> Aljoscha >> >> On 1. Nov 2018, at 15:30, Aaron Levin wrote: >> >> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can >> provide any insight or advice, that would be helpful! >> >> Thanks again. >> >> Best, >> >> Aaron Levin >> >> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin >> wrote: >> >>> Hey, >>> >>> Not sure how convo threading works on this list, so in case the folks >>> CC'd missed my other response, here's some more info: >>> >>> First, I appreciate everyone's help! Thank you! >>> >>> I wrote several wrappers to try and debug this, including one which is >>> an exact copy of `InputFormatSourceFunction` which also failed. They all >>> failed with the same error I detail above. I'll post two of them below. >>> They all extended `RichParallelSourceFunction` and, as far
Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)
Hey, Thanks for reaching out! I'd love to take a step back and find a better solution, so I'll try to be succint in what I'm trying to accomplish: We're trying to write a SourceFunction which: * reads some Sequence files from S3 in a particular order (each task gets files in a specific order). * sends a watermark between each sequence file * when that's complete, starts reading from Kafka topics. * (This is similar to the bootstrap problem which Lyft has talked about (see: https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink )) The current solution I have involves a custom InputFormat, InputSplit, and SplitAssignor. It achieves most of these requirements, except I have to extend InputFormatSourceFunction. I have a class that looks like: class MySourceFunction(val s3Archives: CustomInputFormat, val kafka: KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...} There are lots I don't like about the existing solution: * I have to extend InputFormatSourceFunction to ensure the graph is initialized properly (the bug I wrote about) * I had to replicate most of the implementation of InputFormatSourceFunction so I could insert Watermarks between splits. I'd love any suggestions around improving this! Best, Aaron Levin On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek wrote: > Hi Aaron, > > I'l like to take a step back and understand why you're trying to wrap an > InputFormatSourceFunction? > > In my opinion, InputFormatSourceFunction should not be used because it has > some shortcomings, the most prominent among them that it does not support > checkpointing, i.e. in case of failure all data will (probably) be read > again. I'm saying probably because the interaction of > InputFormatSourceFunction with how InputSplits are generated (which relates > to that code snippet with the cast you found) could be somewhat "spooky" > and lead to weird results in some cases. > > The interface is a remnant of a very early version of the streaming API > and should probably be removed soon. I hope we can find a better solution > for your problem that fits better with Flink. > > Best, > Aljoscha > > On 1. Nov 2018, at 15:30, Aaron Levin wrote: > > Hey Friends! Last ping and I'll move this over to a ticket. If anyone can > provide any insight or advice, that would be helpful! > > Thanks again. > > Best, > > Aaron Levin > > On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin > wrote: > >> Hey, >> >> Not sure how convo threading works on this list, so in case the folks >> CC'd missed my other response, here's some more info: >> >> First, I appreciate everyone's help! Thank you! >> >> I wrote several wrappers to try and debug this, including one which is an >> exact copy of `InputFormatSourceFunction` which also failed. They all >> failed with the same error I detail above. I'll post two of them below. >> They all extended `RichParallelSourceFunction` and, as far as I could tell, >> were properly initialized (though I may have missed something!). >> Additionally, for the two below, if I change `extends >> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`, >> I no longer receive the exception. This is what led me to believe the >> source of the issue was casting and how I found the line of code where the >> stream graph is given the input format. >> >> Quick explanation of the wrappers: >> 1. `WrappedInputFormat` does a basic wrap around >> `InputFormatSourceFunction` and delegates all methods to the underlying >> `InputFormatSourceFunction` >> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the >> `InputFormatSourceFunction` source. >> 3. They're being used in a test which looks vaguely like: >> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new >> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str >> ing]]))).javaStream).asScala.toSeq` >> >> class WrappedInputFormat[A]( >> inputFormat: InputFormatSourceFunction[A] >> )( >> implicit typeInfo: TypeInformation[A] >> ) extends RichParallelSourceFunction[A] { >> >> override def run(sourceContext: SourceFunction.SourceContext[A]): Unit >> = { >> inputFormat.run(sourceContext) >> } >> override def setRuntimeContext(t: RuntimeContext): Unit = { >> inputFormat.setRuntimeContext(t) >> } >> override def equals(obj: scala.Any) = { >> inputFormat.equals(obj) >> } >> override def hashCode() = { inputFormat.hashCode() } >> override def toString = { inputFormat.toString } >> override def get
Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)
Hey Friends! Last ping and I'll move this over to a ticket. If anyone can provide any insight or advice, that would be helpful! Thanks again. Best, Aaron Levin On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin wrote: > Hey, > > Not sure how convo threading works on this list, so in case the folks CC'd > missed my other response, here's some more info: > > First, I appreciate everyone's help! Thank you! > > I wrote several wrappers to try and debug this, including one which is an > exact copy of `InputFormatSourceFunction` which also failed. They all > failed with the same error I detail above. I'll post two of them below. > They all extended `RichParallelSourceFunction` and, as far as I could tell, > were properly initialized (though I may have missed something!). > Additionally, for the two below, if I change `extends > RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`, > I no longer receive the exception. This is what led me to believe the > source of the issue was casting and how I found the line of code where the > stream graph is given the input format. > > Quick explanation of the wrappers: > 1. `WrappedInputFormat` does a basic wrap around > `InputFormatSourceFunction` and delegates all methods to the underlying > `InputFormatSourceFunction` > 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the > `InputFormatSourceFunction` source. > 3. They're being used in a test which looks vaguely like: > `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new > InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str > ing]]))).javaStream).asScala.toSeq` > > class WrappedInputFormat[A]( > inputFormat: InputFormatSourceFunction[A] > )( > implicit typeInfo: TypeInformation[A] > ) extends RichParallelSourceFunction[A] { > > override def run(sourceContext: SourceFunction.SourceContext[A]): Unit > = { > inputFormat.run(sourceContext) > } > override def setRuntimeContext(t: RuntimeContext): Unit = { > inputFormat.setRuntimeContext(t) > } > override def equals(obj: scala.Any) = { > inputFormat.equals(obj) > } > override def hashCode() = { inputFormat.hashCode() } > override def toString = { inputFormat.toString } > override def getRuntimeContext(): RuntimeContext = { > inputFormat.getRuntimeContext } > override def getIterationRuntimeContext = { > inputFormat.getIterationRuntimeContext } > override def open(parameters: Configuration): Unit = { > inputFormat.open(parameters) > } > override def cancel(): Unit = { > inputFormat.cancel() > } > override def close(): Unit = { > inputFormat.close() > } > } > > And the other one: > > class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, > InputSplit], val typeInfo: TypeInformation[A]) extends > RichParallelSourceFunction[A] { > > @transient private var provider: InputSplitProvider = _ > @transient private var serializer: TypeSerializer[A] = _ > @transient private var splitIterator: Iterator[InputSplit] = _ > private var isRunning: Boolean = _ > > override def open(parameters: Configuration): Unit = { > val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext] > if(format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context) > } > format.configure(parameters) > > provider = context.getInputSplitProvider > serializer = typeInfo.createSerializer(getR > untimeContext.getExecutionConfig) > splitIterator = getInputSplits() > isRunning = splitIterator.hasNext > } > > override def run(sourceContext: SourceFunction.SourceContext[A]): Unit > = { > if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].openInputFormat() > } > > var nextElement: A = serializer.createInstance() > try { > while (isRunning) { > format.open(splitIterator.next()) > while (isRunning && !format.reachedEnd()) { > nextElement = format.nextRecord(nextElement) > if (nextElement != null) { > sourceContext.collect(nextElement) > } else { > break > } > format.close() > if (isRunning) { > isRunning = splitIterator.hasNext > } > } > } > } finally { > > format.close() > if (format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() > } > isRunning = false > } > } > > override def
Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)
} if(split != null) { nextSplit = split true } else { exhausted = true false } } override def next(): InputSplit = { if(nextSplit == null && !hasNext) { throw new NoSuchElementException() } val tmp: InputSplit = nextSplit nextSplit = null tmp } } } } On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz wrote: > Hi Aaron, > > Could you share the code of you custom function? > > I am also adding Aljosha and Kostas to cc, who should be more helpful on > that topic. > > Best, > > Dawid > On 19/10/2018 20:06, Aaron Levin wrote: > > Hi, > > I'm writing a custom `SourceFunction` which wraps an underlying > `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a > stream (via `env.addSource` and a subsequent sink) I get errors related to > the `InputSplitAssigner` not being initialized for a particular vertex ID. > Full error here[1]. > > I believe the underlying error is related to this[0] call to `instanceof > InputFormatSourceFunction`. > > *My questions*: > > 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? > Am I missing a chunk of the API covering this? > 2. is the error I'm experience related to that casting call? If so, would > ya'll be open to a PR which adds an interface one can extend which will set > the input format in the stream graph? Or is there a preferred way of > achieving this? > > Thanks! > > Aaron Levin > > [0] https://github.com/apache/flink/blob/release-1.6/flink- > streaming-java/src/main/java/org/apache/flink/streaming/api/graph/ > StreamGraphGenerator.java#L480 > [1] > java.lang.RuntimeException: Could not retrieve next input split. > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157) > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction.open(InputFormatSourceFunction.java:71) > at REDACTED > at org.apache.flink.api.common.functions.util.FunctionUtils. > openFunction(FunctionUtils.java:36) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator. > open(AbstractUdfStreamOperator.java:102) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > openAllOperators(StreamTask.java:424) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: > Requesting the next input split failed. > at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider. > getNextInputSplit(RpcInputSplitProvider.java:69) > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155) > ... 8 more > Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: > No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2 > at java.util.concurrent.CompletableFuture.reportGet( > CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get( > CompletableFuture.java:1915) > at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider. > getNextInputSplit(RpcInputSplitProvider.java:61) > ... 9 more > Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID > cbc357ccb763df2852fee8c4fc7d55f2 > at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit( > JobMaster.java:575) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( > AkkaRpcActor.java:247) > ... > >
Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)
austed = true false } } override def next(): InputSplit = { if(nextSplit == null && !hasNext) { throw new NoSuchElementException() } val tmp: InputSplit = nextSplit nextSplit = null tmp } } } } Best, Aaron Levin On Wed, Oct 24, 2018 at 8:00 AM, Kien Truong wrote: > Hi, > > Since InputFormatSourceFunction is a subclass of > RichParallelSourceFunction, your wrapper should also extend this class. > > In addition, remember to overwrite the methods defined in the > AbstractRichFunction interface and > > proxy the call to the underlying InputFormatSourceFunction, in order to > initialize the underlying source correctly. > > > Best regards, > > Kien > > > On 10/20/2018 1:06 AM, Aaron Levin wrote: > > Hi, > > I'm writing a custom `SourceFunction` which wraps an underlying > `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a > stream (via `env.addSource` and a subsequent sink) I get errors related to > the `InputSplitAssigner` not being initialized for a particular vertex ID. > Full error here[1]. > > I believe the underlying error is related to this[0] call to `instanceof > InputFormatSourceFunction`. > > *My questions*: > > 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? > Am I missing a chunk of the API covering this? > 2. is the error I'm experience related to that casting call? If so, would > ya'll be open to a PR which adds an interface one can extend which will set > the input format in the stream graph? Or is there a preferred way of > achieving this? > > Thanks! > > Aaron Levin > > [0] https://github.com/apache/flink/blob/release-1.6/flink- > streaming-java/src/main/java/org/apache/flink/streaming/api/graph/ > StreamGraphGenerator.java#L480 > [1] > java.lang.RuntimeException: Could not retrieve next input split. > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157) > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction.open(InputFormatSourceFunction.java:71) > at REDACTED > at org.apache.flink.api.common.functions.util.FunctionUtils. > openFunction(FunctionUtils.java:36) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator. > open(AbstractUdfStreamOperator.java:102) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > openAllOperators(StreamTask.java:424) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: > Requesting the next input split failed. > at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider. > getNextInputSplit(RpcInputSplitProvider.java:69) > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155) > ... 8 more > Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: > No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2 > at java.util.concurrent.CompletableFuture.reportGet( > CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get( > CompletableFuture.java:1915) > at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider. > getNextInputSplit(RpcInputSplitProvider.java:61) > ... 9 more > Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID > cbc357ccb763df2852fee8c4fc7d55f2 > at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit( > JobMaster.java:575) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( > AkkaRpcActor.java:247) > ... > >
Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)
Hi, I'm writing a custom `SourceFunction` which wraps an underlying `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a stream (via `env.addSource` and a subsequent sink) I get errors related to the `InputSplitAssigner` not being initialized for a particular vertex ID. Full error here[1]. I believe the underlying error is related to this[0] call to `instanceof InputFormatSourceFunction`. *My questions*: 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am I missing a chunk of the API covering this? 2. is the error I'm experience related to that casting call? If so, would ya'll be open to a PR which adds an interface one can extend which will set the input format in the stream graph? Or is there a preferred way of achieving this? Thanks! Aaron Levin [0] https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480 [1] java.lang.RuntimeException: Could not retrieve next input split. at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71) at REDACTED at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: Requesting the next input split failed. at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155) ... 8 more Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61) ... 9 more Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2 at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) ...
Re: Scala 2.12 Support
Hi Piotr, Thanks for the update. Glad to hear it's high on the priority list! I'm looking forward to the 1.7 update! It may be worth having someone more official from the Flink team give an update on that ticket. It wasn't clear if the 1.7 comment from that user was just a reference to the fact that 1.6 had come out (or where they got that information). I know a few people have cited the ticket and concluded "not clear what's going on with Scala 2.12 support." If you have the bandwidth, a note from you or anyone else would be helpful! Thanks again! Best, Aaron Levin On Thu, Aug 16, 2018 at 6:04 AM, Piotr Nowojski wrote: > Hi, > > Scala 2.12 support is high on our priority list and we hope to have it > included for the 1.7 release (as you can see in the ticket itself), which > should happen later this year. > > Piotrek > > > On 15 Aug 2018, at 17:59, Aaron Levin wrote: > > Hello! > > I'm wondering if there is anywhere I can see Flink's roadmap for Scala > 2.12 support. The last email I can find on the list for this was back in > January, and the FLINK-7811[0], the ticket asking for Scala 2.12 support, > hasn't been updated in a few months. > > Recently Spark fixed the ClosureCleaner code to support Scala 2.12[1], and > from what I can gather this was one of the main barrier for Flink > supporting Scala 2.12. Given this has been fixed, is there work in progress > to support Scala 2.12? Any updates on FLINK-7811? > > Thanks for your help! > > [0] https://issues.apache.org/jira/browse/FLINK-7811 > [1] https://issues.apache.org/jira/browse/SPARK-14540 > > Best, > > Aaron Levin > > >
Scala 2.12 Support
Hello! I'm wondering if there is anywhere I can see Flink's roadmap for Scala 2.12 support. The last email I can find on the list for this was back in January, and the FLINK-7811[0], the ticket asking for Scala 2.12 support, hasn't been updated in a few months. Recently Spark fixed the ClosureCleaner code to support Scala 2.12[1], and from what I can gather this was one of the main barrier for Flink supporting Scala 2.12. Given this has been fixed, is there work in progress to support Scala 2.12? Any updates on FLINK-7811? Thanks for your help! [0] https://issues.apache.org/jira/browse/FLINK-7811 [1] https://issues.apache.org/jira/browse/SPARK-14540 Best, Aaron Levin