Tracing and Flink

2020-08-14 Thread Aaron Levin
Hello Flink Friends!

This is a long-shot, but I'm wondering if anyone is thinking or working on
applying tracing to Streaming systems and in particular Flink. As far as I
understand this is a fairly open problem and so I'm curious how folks are
thinking about it and if anyone has considered how they might apply tracing
to Flink systems.

Some patterns in Streaming systems fit into tracing fairly easily (consumer
fanout-out, for example) but many patterns do not. For example, how do you
trace when there is batching or aggregations? Nevertheless, I'm sure some
folks have thought about this or even tried to implement solutions, and so
I'd love to hear about this. Especially if there are any standards work in
this direction (for example, within the OpenTracing project).

If you've thought about this, implemented something, or are working on
standards related to this, I'd love to hear from you! Thank you!

Best,

Aaron Levin


Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Aaron Levin
Hi Georg, you can try using the circe library for this which has a way to
automatically generate JSON decoders for scala case classes.

As it was mentioned earlier, Flink does not come packaged with
JSON-decoding generators for Scala like spark does.

On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler 
wrote:

> Great. Thanks.
> But would it be possible to automate this i.e. to have this work
> automatically for the case class / product?
>
> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
> taher...@gmail.com>:
>
>> The performant way would be to apply a map function over the stream and
>> then use the Jackson ObjectMapper to convert to scala objects. In flink
>> there is no API like Spark to automatically get all fields.
>>
>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler 
>> wrote:
>>
>>> How can I use it with a scala case class?
>>> If I understand it correctly for better performance the Object Mapper is
>>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>>> probably I should rephrase to: how can I then map these to case classes
>>> without handcoding it?  https://github.com/json4s/json4s or
>>> https://github.com/FasterXML/jackson-module-scala both only seem to
>>> consume strings.
>>>
>>> Best,
>>> Georg
>>>
>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>>> taher...@gmail.com>:
>>>
 You can try the Jackson ObjectMapper library and that will get you from
 json to object.

 Regards,
 Taher Koitawala

 On Thu, Jul 9, 2020, 9:54 PM Georg Heiler 
 wrote:

> Hi,
>
> I want to map a stream of JSON documents from Kafka to a scala
> case-class. How can this be accomplished using the
> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
> required?
>
> I have a Spark background. There, such manual mappings usually are
> discouraged. Instead, they offer a nice API (dataset API) to perform such 
> a
> type of assignment.
> 1) this is concise
> 2) it operates on sparks off-heap memory representations (tungsten) to
> be faster
>
> In Flink, instead, such off-heap optimizations seem not to be talked
> much about (sorry if I miss something, I am a Flink newbie). Is there a
> reason why these optimizations are not necessary in Flink?
>
>
> How could I get the following example:
> val serializer = new JSONKeyValueDeserializationSchema(false)
> val stream = senv.addSource(
> new FlinkKafkaConsumer(
>   "tweets-raw-json",
>   serializer,
>   properties
> ).setStartFromEarliest() // TODO experiment with different start
> values
>   )
>
> to map to this Tweet class concisely, i.e. without manually iterating
> through all the attribute fields and parsing the keys from the object node
> tree.
>
> final case class Tweet(tweet_id: Option[String], text: Option[String],
> source: Option[String], geo: Option[String], place: Option[String], lang:
> Option[String], created_at: Option[String], timestamp_ms: Option[String],
> coordinates: Option[String], user_id: Option[Long], user_name:
> Option[String], screen_name: Option[String], user_created_at:
> Option[String], followers_count: Option[Long], friends_count: 
> Option[Long],
> user_lang: Option[String], user_location: Option[String], hashtags:
> Option[Seq[String]])
>
> Best,
> Georg
>



Re: Does anyone have an example of Bazel working with Flink?

2020-06-18 Thread Aaron Levin
Hi Austin,

In our experience, `rules_scala` and `rules_java` are enough for us at this
point.

It's entirely possible I'm not thinking far enough into the future, though,
so don't take our lack of investment as a sign it's not worth investing in
:)

Best,

Aaron Levin

On Thu, Jun 18, 2020 at 10:27 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Great to hear Dan!
>
> @Aaron - would you/ your team be interested in a `rules_flink` project?
> I'm still fairly new to Bazel and know enough to contribute, but could
> definitely use guidance on design as well.
>
> Best,
> Austin
>
> On Mon, Jun 15, 2020 at 11:07 PM Dan Hill  wrote:
>
>> Thanks for the replies!  I was able to use the provided answers to get a
>> setup working (maybe not the most efficiently).  The main change I made was
>> to switch to including the deploy jar in the image (rather than the default
>> one).
>>
>> I'm open to contributing to a "rules_flink" project.  I don't know enough
>> yet to help design it.
>>
>> On Sat, Jun 13, 2020 at 4:39 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> if you want to run a Flink job without specifying the main class via
>>> `bin/flink run --class org.a.b.Foobar` then you have to add a MANIFEST.MF
>>> file to your jar under META-INF and this file needs to contain `Main-Class:
>>> org.a.b.Foobar`.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Jun 12, 2020 at 12:30 AM Austin Cawley-Edwards <
>>> austin.caw...@gmail.com> wrote:
>>>
>>>> Hey all,
>>>>
>>>> Adding to Aaron's response, we use Bazel to build our Flink apps. We've
>>>> open-sourced some of our setup here[1] though a bit outdated. There are
>>>> definitely rough edges/ probably needs a good deal of work to fit other
>>>> setups. We have written a wrapper around the `java_library` and
>>>> `java_binary` and could do the same for `rules_scala`, though we just
>>>> started using Bazel last November and have a lot to learn in terms of best
>>>> practices there.
>>>>
>>>> If you're interested in contributing to a `rules_flink` project, I
>>>> would be as well!
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> [1]: https://github.com/fintechstudios/vp-operator-demo-ff-virtual-2020
>>>>
>>>> On Thu, Jun 11, 2020 at 6:14 PM Aaron Levin 
>>>> wrote:
>>>>
>>>>> Hi Dan,
>>>>>
>>>>> We use Bazel to compile our Flink applications. We're using
>>>>> "rules_scala" (https://github.com/bazelbuild/rules_scala) to manage
>>>>> the dependencies and produce jars. We haven't had any issues. However, I
>>>>> have found that sometimes it's difficult to figure out exactly what Flink
>>>>> target or dependency my application needs.
>>>>>
>>>>> Unfortunately I'm not sure what issue you're seeing here. I would
>>>>> guess either your flink application wasn't compiled into the jar
>>>>> you're executing. If you can paste the bazel target used to generate your
>>>>> jar and how you're launching the application, that will be helpful
>>>>> for diagnosis.
>>>>>
>>>>> On Thu, Jun 11, 2020 at 5:21 PM Dan Hill 
>>>>> wrote:
>>>>>
>>>>>> I took the Flink playground and I'm trying to swap out Maven for
>>>>>> Bazel.  I got to the point where I'm hitting the following error.  I want
>>>>>> to diff my code with an existing, working setup.
>>>>>>
>>>>>> Thanks! - Dan
>>>>>>
>>>>>>
>>>>>> client_1| 
>>>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>>>> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
>>>>>> file.
>>>>>>
>>>>>> client_1| at
>>>>>> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)
>>>>>>
>>>>>> client_1| at
>>>>>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)
>>>>>>
>>>>>> client_1| at
>>>>>> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
>>>>>>
>>>>>> client_1| at
>>>>>> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)
>>>>>>
>>>>>> client_1| at
>>>>>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)
>>>>>>
>>>>>> client_1| at
>>>>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>>>>>>
>>>>>> client_1| at
>>>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>>>>>>
>>>>>> client_1| at
>>>>>> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>>>>>>
>>>>>> client_1| at
>>>>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>>>>>>
>>>>>


Re: Does anyone have an example of Bazel working with Flink?

2020-06-11 Thread Aaron Levin
Hi Dan,

We use Bazel to compile our Flink applications. We're using "rules_scala" (
https://github.com/bazelbuild/rules_scala) to manage the dependencies and
produce jars. We haven't had any issues. However, I have found that
sometimes it's difficult to figure out exactly what Flink target or
dependency my application needs.

Unfortunately I'm not sure what issue you're seeing here. I would guess
either your flink application wasn't compiled into the jar
you're executing. If you can paste the bazel target used to generate your
jar and how you're launching the application, that will be helpful
for diagnosis.

On Thu, Jun 11, 2020 at 5:21 PM Dan Hill  wrote:

> I took the Flink playground and I'm trying to swap out Maven for Bazel.  I
> got to the point where I'm hitting the following error.  I want to diff my
> code with an existing, working setup.
>
> Thanks! - Dan
>
>
> client_1| 
> org.apache.flink.client.program.ProgramInvocationException:
> Neither a 'Main-Class', nor a 'program-class' entry was found in the jar
> file.
>
> client_1| at
> org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:596)
>
> client_1| at
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:190)
>
> client_1| at
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:128)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:862)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:204)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
>
> client_1| at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> client_1| at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
>


ListState with millions of elements

2020-04-08 Thread Aaron Levin
Hello friendly Flink community!

I'm curious if anyone has operational experience with jobs that store
ListState where occasionally, due to skew, some small number of lists
stored in ListState (stored in RocksDB) will have millions of elements.
Here are the stats:

* millions of keys
* p95 size of list in ListState is ~2.
* some small number of keys (less than 100) may have lists whose size is on
the order of tens of thousands and up to millions.
* state is stored in RocksDB

Are there any known issues or limitations with storing or fetching that
much list state out of RocksDB? I realize fetching from RocksDB and
deserializing will be costly when hitting a key with a list of a million
elements, but is there anything else we should consider?

Thanks!

Best,

Aaron Levin


Re: Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

2020-03-13 Thread Aaron Levin
Hi Piotr,

Thanks for your response! I understand that checkpoints and savepoints may
be diverging (for unaligned checkpoints) but parts also seem to be
converging per FLIP-47[0]. Specifically, in FLIP-47 they state that
rescaling is "Supported but not in all cases" for checkpoints. What I'm
hoping to find is guidance or documentation on when rescaling is supported
for checkpoints, and, more importantly, if the cases where it's not
supported will result in hard or silent failures.

The context here is that we rely on the exactly-once semantics for our
Flink jobs in some important systems. In some cases when a job is in a bad
state it may not be able to take a checkpoint, but changing the job's
parallelism may resolve the issue. Therefore it's important for us to know
if deploying from a checkpoint, on purpose or by operator error, will break
the semantic guarantees of our job.

Hard failure in the cases where you cannot change parallelism would be the
desired outcome imo.

Thank you!

[0]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-47%3A+Checkpoints+vs.+Savepoints

Best,

Aaron Levin

On Fri, Mar 13, 2020 at 9:08 AM Piotr Nowojski  wrote:

> Hi,
>
> Generally speaking changes of parallelism is supported between checkpoints
> and savepoints. Other changes to the job’s topology, like
> adding/changing/removing operators, changing types in the job graph are
> only officially supported via savepoints.
>
> But in reality, as for now, there is no difference between checkpoints and
> savepoints, but that’s subject to change, so it’s better not to relay this
> behaviour. For example with unaligned checkpoints [1] (hopefully in 1.11),
> there will be a difference between those two concepts.
>
> Piotrek
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-76:+Unaligned+Checkpoints>
>
> On 12 Mar 2020, at 12:16, Aaron Levin  wrote:
>
> Hi,
>
> What's the expected behaviour of:
>
> * changing an operator's parallelism
> * deploying this change from an incremental (RocksDB) checkpoint instead
> of a savepoint
>
> The flink docs[0][1] are a little unclear on what the expected behaviour
> is here. I understand that the key-space is being changed because
> parallelism is changed. I've seen instances where this happens and a job
> does not fail. But how does it treat potentially missing state for a given
> key?
>
> I know I can test this, but I'm curious what the _expected_ behaviour is?
> I.e. what behaviour can I rely on, which won't change between versions or
> releases? Do we expect the job to fail? Do we expect missing keys to just
> be considered empty?
>
> Thanks!
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html
>
> Aaron Levin
>
>
>


Expected behaviour when changing operator parallelism but starting from an incremental checkpoint

2020-03-12 Thread Aaron Levin
Hi,

What's the expected behaviour of:

* changing an operator's parallelism
* deploying this change from an incremental (RocksDB) checkpoint instead of
a savepoint

The flink docs[0][1] are a little unclear on what the expected behaviour is
here. I understand that the key-space is being changed because parallelism
is changed. I've seen instances where this happens and a job does not fail.
But how does it treat potentially missing state for a given key?

I know I can test this, but I'm curious what the _expected_ behaviour is?
I.e. what behaviour can I rely on, which won't change between versions or
releases? Do we expect the job to fail? Do we expect missing keys to just
be considered empty?

Thanks!

[0]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-is-a-savepoint-how-is-a-savepoint-different-from-a-checkpoint
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html

Aaron Levin


Re: [DISCUSS] Change default for RocksDB timers: Java Heap => in RocksDB

2020-01-17 Thread Aaron Levin
+1. I personally found it a little confusing when I discovered I had to
configure this after already choosing RocksDB as a backend. Also very
strongly in favour of "safe and scalable" as the default.

Best,

Aaron Levin

On Fri, Jan 17, 2020 at 4:41 AM Piotr Nowojski  wrote:

> +1 for making it consistent. When using X state backend, timers should be
> stored in X by default.
>
> Also I think any configuration option controlling that needs to be well
> documented in some performance tuning section of the docs.
>
> Piotrek
>
> On 17 Jan 2020, at 09:16, Congxian Qiu  wrote:
>
> +1 to store timers in RocksDB default.
>
> Store timers in Heap can encounter OOM problems, and make the checkpoint
> much slower, and store times in RocksDB can get ride of both.
>
> Best,
> Congxian
>
>
> Biao Liu  于2020年1月17日周五 下午3:10写道:
>
>> +1
>>
>> I think that's how it should be. Timer should align with other regular
>> state.
>>
>> If user wants a better performance without memory concern, memory or FS
>> statebackend might be considered. Or maybe we could optimize the
>> performance by introducing a specific column family for timer. It could
>> have its own tuned options.
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Fri, 17 Jan 2020 at 10:11, Jingsong Li  wrote:
>>
>>> Hi Stephan,
>>>
>>> Thanks for starting this discussion.
>>> +1 for stores times in RocksDB by default.
>>> In the past, when Flink didn't save the times with RocksDb, I had a
>>> headache. I always adjusted parameters carefully to ensure that there was
>>> no risk of Out of Memory.
>>>
>>> Just curious, how much impact of heap and RocksDb for times on
>>> performance
>>> - if there is no order of magnitude difference between heap and RocksDb,
>>> there is no problem in using RocksDb.
>>> - if there is, maybe we should improve our documentation to let users
>>> know about this option. (Looks like a lot of users didn't know)
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Jan 17, 2020 at 3:18 AM Yun Tang  wrote:
>>>
>>>> Hi Stephan,
>>>>
>>>> I am +1 for the change which stores timers in RocksDB by default.
>>>>
>>>> Some users hope the checkpoint could be completed as fast as possible,
>>>> which also need the timer stored in RocksDB to not affect the sync part of
>>>> checkpoint.
>>>>
>>>> Best
>>>> Yun Tang
>>>> --
>>>> *From:* Andrey Zagrebin 
>>>> *Sent:* Friday, January 17, 2020 0:07
>>>> *To:* Stephan Ewen 
>>>> *Cc:* dev ; user 
>>>> *Subject:* Re: [DISCUSS] Change default for RocksDB timers: Java Heap
>>>> => in RocksDB
>>>>
>>>> Hi Stephan,
>>>>
>>>> Thanks for starting this discussion. I am +1 for this change.
>>>> In general, number of timer state keys can have the same order as
>>>> number of main state keys.
>>>> So if RocksDB is used for main state for scalability, it makes sense to
>>>> have timers there as well
>>>> unless timers are used for only very limited subset of keys which fits
>>>> into memory.
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>> On Thu, Jan 16, 2020 at 4:27 PM Stephan Ewen  wrote:
>>>>
>>>> Hi all!
>>>>
>>>> I would suggest a change of the current default for timers. A bit of
>>>> background:
>>>>
>>>>   - Timers (for windows, process functions, etc.) are state that is
>>>> managed and checkpointed as well.
>>>>   - When using the MemoryStateBackend and the FsStateBackend, timers
>>>> are kept on the JVM heap, like regular state.
>>>>   - When using the RocksDBStateBackend, timers can be kept in RocksDB
>>>> (like other state) or on the JVM heap. The JVM heap is the default though!
>>>>
>>>> I find this a bit un-intuitive and would propose to change this to let
>>>> the RocksDBStateBackend store all state in RocksDB by default.
>>>> The rationale being that if there is a tradeoff (like here), safe and
>>>> scalable should be the default and unsafe performance be an explicit 
>>>> choice.
>>>>
>>>> This sentiment seems to be shared by various users as well, see
>>>> https://twitter.com/StephanEwen/status/1214590846168903680 and
>>>> https://twitter.com/StephanEwen/status/1214594273565388801
>>>> We would of course keep the switch and mention in the performance
>>>> tuning section that this is an option.
>>>>
>>>> # RocksDB State Backend Timers on Heap
>>>>   - Pro: faster
>>>>   - Con: not memory safe, GC overhead, longer synchronous checkpoint
>>>> time, no incremental checkpoints
>>>>
>>>> #  RocksDB State Backend Timers on in RocksDB
>>>>   - Pro: safe and scalable, asynchronously and
>>>> incrementally checkpointed
>>>>   - Con: performance overhead.
>>>>
>>>> Please chime in and let me know what you think.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>


Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

2019-12-04 Thread Aaron Levin
Thanks for the clarification. I'll try to find some time to write a
reproducible test case and submit a ticket. While it may not be able
to delete the non-referenced ones, I'm surprised it's exponentially
replicating them, and so it's probably worth documenting in a ticket.

On Wed, Nov 27, 2019 at 12:15 PM Gyula Fóra  wrote:
>
> You are right Aaron.
>
> I would say this is like this by design as Flink doesn't require you to 
> initialize state in the open method so it has no safe way to delete the 
> non-referenced ones.
>
> What you can do is restore the state and clear it on all operators and not 
> reference it again. I know this feels like a workaround but I have no better 
> idea at the moment.
>
> Cheers,
> Gyula
>
> On Wed, Nov 27, 2019 at 6:08 PM Aaron Levin  wrote:
>>
>> Hi,
>>
>> Yes, we're using UNION state. I would assume, though, that if you are
>> not reading the UNION state it would either stop stick around as a
>> constant factor in your state size, or get cleared.
>>
>> Looks like I should try to recreate a small example and submit a bug
>> if this is true. Otherwise it's impossible to remove union state from
>> your operators.
>>
>> On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu  wrote:
>> >
>> > Hi
>> >
>> > Do you use UNION state in your scenario, when using UNION state, then JM 
>> > may encounter OOM because each TDD will contains all the state of all 
>> > subtasks[1]
>> >
>> > [1] 
>> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
>> > Best,
>> > Congxian
>> >
>> >
>> > Aaron Levin  于2019年11月27日周三 上午3:55写道:
>> >>
>> >> Hi,
>> >>
>> >> Some context: after a refactoring, we were unable to start our jobs.
>> >> They started fine and checkpointed fine, but once the job restarted
>> >> owing to a transient failure, the application was unable to start. The
>> >> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
>> >> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
>> >> the `_metadata` file we saw `- 1402496 offsets:
>> >> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
>> >> to be the operator state we were no longer initializing or
>> >> snapshotting after the refactoring.
>> >>
>> >> Before I dig further into this and try to find a smaller reproducible
>> >> test case I thought I would ask if someone knows what the expected
>> >> behaviour is for the following scenario:
>> >>
>> >> suppose you have an operator (in this case a Source) which has some
>> >> operator ListState. Suppose you run your flink job for some time and
>> >> then later refactor your job such that you no longer use that state
>> >> (so after the refactoring you're no longer initializing this operator
>> >> state in initializeState, nor are you snapshotting the operator state
>> >> in snapshotState). If you launch your new code from a recent
>> >> savepoint, what do we expect to happen to the state? Do we anticipate
>> >> the behaviour I explained above?
>> >>
>> >> My assumption would be that Flink would not read this state and so it
>> >> would be removed from the next checkpoint or savepoint. Alternatively,
>> >> I might assume it would not be read but would linger around every
>> >> future checkpoint or savepoint. However, it feels like what is
>> >> happening is it's not read and then possibly replicated by every
>> >> instance of the task every time a checkpoint happens (hence the
>> >> accidentally exponential behaviour).
>> >>
>> >> Thoughts?
>> >>
>> >> PS - in case someone asks: I was sure that we were calling `.clear()`
>> >> appropriately in `snapshotState` (we, uh, already learned that lesson
>> >> :D)
>> >>
>> >> Best,
>> >>
>> >> Aaron Levin


Re: What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

2019-11-27 Thread Aaron Levin
Hi,

Yes, we're using UNION state. I would assume, though, that if you are
not reading the UNION state it would either stop stick around as a
constant factor in your state size, or get cleared.

Looks like I should try to recreate a small example and submit a bug
if this is true. Otherwise it's impossible to remove union state from
your operators.

On Wed, Nov 27, 2019 at 6:50 AM Congxian Qiu  wrote:
>
> Hi
>
> Do you use UNION state in your scenario, when using UNION state, then JM may 
> encounter OOM because each TDD will contains all the state of all subtasks[1]
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state
> Best,
> Congxian
>
>
> Aaron Levin  于2019年11月27日周三 上午3:55写道:
>>
>> Hi,
>>
>> Some context: after a refactoring, we were unable to start our jobs.
>> They started fine and checkpointed fine, but once the job restarted
>> owing to a transient failure, the application was unable to start. The
>> Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
>> `_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
>> the `_metadata` file we saw `- 1402496 offsets:
>> com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
>> to be the operator state we were no longer initializing or
>> snapshotting after the refactoring.
>>
>> Before I dig further into this and try to find a smaller reproducible
>> test case I thought I would ask if someone knows what the expected
>> behaviour is for the following scenario:
>>
>> suppose you have an operator (in this case a Source) which has some
>> operator ListState. Suppose you run your flink job for some time and
>> then later refactor your job such that you no longer use that state
>> (so after the refactoring you're no longer initializing this operator
>> state in initializeState, nor are you snapshotting the operator state
>> in snapshotState). If you launch your new code from a recent
>> savepoint, what do we expect to happen to the state? Do we anticipate
>> the behaviour I explained above?
>>
>> My assumption would be that Flink would not read this state and so it
>> would be removed from the next checkpoint or savepoint. Alternatively,
>> I might assume it would not be read but would linger around every
>> future checkpoint or savepoint. However, it feels like what is
>> happening is it's not read and then possibly replicated by every
>> instance of the task every time a checkpoint happens (hence the
>> accidentally exponential behaviour).
>>
>> Thoughts?
>>
>> PS - in case someone asks: I was sure that we were calling `.clear()`
>> appropriately in `snapshotState` (we, uh, already learned that lesson
>> :D)
>>
>> Best,
>>
>> Aaron Levin


What happens to a Source's Operator State if it stops being initialized and snapshotted? Accidentally exponential?

2019-11-26 Thread Aaron Levin
Hi,

Some context: after a refactoring, we were unable to start our jobs.
They started fine and checkpointed fine, but once the job restarted
owing to a transient failure, the application was unable to start. The
Job Manager was OOM'ing (even when I gave them 256GB of ram!). The
`_metadata` file for the checkpoint was 1.3GB (usually 11MB). Inside
the `_metadata` file we saw `- 1402496 offsets:
com.stripe.flink.backfill.kafka-archive-file-progress`. This happened
to be the operator state we were no longer initializing or
snapshotting after the refactoring.

Before I dig further into this and try to find a smaller reproducible
test case I thought I would ask if someone knows what the expected
behaviour is for the following scenario:

suppose you have an operator (in this case a Source) which has some
operator ListState. Suppose you run your flink job for some time and
then later refactor your job such that you no longer use that state
(so after the refactoring you're no longer initializing this operator
state in initializeState, nor are you snapshotting the operator state
in snapshotState). If you launch your new code from a recent
savepoint, what do we expect to happen to the state? Do we anticipate
the behaviour I explained above?

My assumption would be that Flink would not read this state and so it
would be removed from the next checkpoint or savepoint. Alternatively,
I might assume it would not be read but would linger around every
future checkpoint or savepoint. However, it feels like what is
happening is it's not read and then possibly replicated by every
instance of the task every time a checkpoint happens (hence the
accidentally exponential behaviour).

Thoughts?

PS - in case someone asks: I was sure that we were calling `.clear()`
appropriately in `snapshotState` (we, uh, already learned that lesson
:D)

Best,

Aaron Levin


Re: Property based testing

2019-09-18 Thread Aaron Levin
Hey,

I've used ScalaCheck to test flink applications. Basic idea is:

* use ScalaCheck to generate some kind of collection
* use `fromCollection` in `StreamExecutionEnvironment` to create a
`DataStream`
* use `DataStreamUtils.collect` as a sink
* plug my flink logic between the collection source and the collection sink
* make a ScalaCheck property assertion based on the input collection and
output collection.

Possible to wrap all that in a single method in Scala. LMK if you have any
more questions or any of this was not clear!

(note: not sure how to do this in Java).

Best,

Aaron Levin

On Wed, Sep 18, 2019 at 8:36 AM Indraneel R  wrote:

> Hi All,
>
> Is there any property based testing framework for flink like
> 'SparkTestingBase'  for spark?
>
> Would also be useful to know what are some of the standard testing
> practices for data testing for flink pipelines.
>
> regards
> -Indraneel
>


Re: Update Checkpoint and/or Savepoint Timeout of Running Job without restart?

2019-08-19 Thread Aaron Levin
Thanks for the answer, Congxian!

On Sun, Aug 18, 2019 at 10:43 PM Congxian Qiu 
wrote:

> Hi
>
> Currently, we can't change a running job's checkpoint timeout, but there
> is an issue[1] which wants to set a separate timeout for savepoint.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9465
> Best,
> Congxian
>
>
> Aaron Levin  于2019年8月17日周六 上午12:37写道:
>
>> Hello,
>>
>> Question: Is it possible to update the checkpoint and/or savepoint
>> timeout of a running job without restarting it? If not, is this something
>> that would be a welcomed contribution (not sure how easy this would be)?
>>
>> Context: sometimes we have jobs who are making progress but get into a
>> state where checkpoints are timing out, though we believe they would be
>> successful if we could increase the checkpoint timeout. Unfortunately we
>> currently need to restart the job to change this, and we would like to
>> avoid this if possible. Ideally we could make this change temporarily,
>> allow a checkpoint or savepoint to succeed, and then change the settings
>> back.
>>
>> Best,
>>
>> Aaron Levin
>>
>


Update Checkpoint and/or Savepoint Timeout of Running Job without restart?

2019-08-16 Thread Aaron Levin
Hello,

Question: Is it possible to update the checkpoint and/or savepoint timeout
of a running job without restarting it? If not, is this something that
would be a welcomed contribution (not sure how easy this would be)?

Context: sometimes we have jobs who are making progress but get into a
state where checkpoints are timing out, though we believe they would be
successful if we could increase the checkpoint timeout. Unfortunately we
currently need to restart the job to change this, and we would like to
avoid this if possible. Ideally we could make this change temporarily,
allow a checkpoint or savepoint to succeed, and then change the settings
back.

Best,

Aaron Levin


Re: Graceful Task Manager Termination and Replacement

2019-07-24 Thread Aaron Levin
I was on vacation but wanted to thank Biao for summarizing the current
state! Thanks!

On Mon, Jul 15, 2019 at 2:00 AM Biao Liu  wrote:

> Hi Aaron,
>
> From my understanding, you want shutting down a Task Manager without
> restart the job which has tasks running on this Task Manager?
>
> Based on current implementation, if there is a Task Manager is down, the
> tasks on it would be treated as failed. The behavior of task failure is
> defined via `FailoverStrategy` which is `RestartAllStrategy` by default.
> That's the reason why the whole job restarts when a Task Manager has gone.
> As Paul said, you could try "region restart failover strategy" when 1.9 is
> released. It might be helpful however it depends on your job topology.
>
> The deeper reason of this issue is the consistency semantics of Flink,
> AT_LEAST_ONCE or EXACTLY_ONCE. Flink must respect these semantics. So there
> is no much choice of `FailoverStrategy`.
> It might be improved in the future. There are some discussions in the
> mailing list that providing some weaker consistency semantics to improve
> the `FailoverStrategy`. We are pushing forward this improvement. I hope it
> can be included in 1.10.
>
> Regarding your question, I guess the answer is no for now. A more frequent
> checkpoint or a savepoint manually triggered might be helpful by a quicker
> recovery.
>
>
> Paul Lam  于2019年7月12日周五 上午10:25写道:
>
>> Hi,
>>
>> Maybe region restart strategy can help. It restarts minimum required
>> tasks. Note that it’s recommended to use only after 1.9 release, see [1],
>> unless you’re running a stateless job.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-10712
>>
>> Best,
>> Paul Lam
>>
>> 在 2019年7月12日,03:38,Aaron Levin  写道:
>>
>> Hello,
>>
>> Is there a way to gracefully terminate a Task Manager beyond just killing
>> it (this seems to be what `./taskmanager.sh stop` does)? Specifically I'm
>> interested in a way to replace a Task Manager that has currently-running
>> tasks. It would be great if it was possible to terminate a Task Manager
>> without restarting the job, though I'm not sure if this is possible.
>>
>> Context: at my work we regularly cycle our hosts for maintenance and
>> security. Each time we do this we stop the task manager running on the host
>> being cycled. This causes the entire job to restart, resulting in downtime
>> for the job. I'd love to decrease this downtime if at all possible.
>>
>> Thanks! Any insight is appreciated!
>>
>> Best,
>>
>> Aaron Levin
>>
>>
>>


Graceful Task Manager Termination and Replacement

2019-07-11 Thread Aaron Levin
Hello,

Is there a way to gracefully terminate a Task Manager beyond just killing
it (this seems to be what `./taskmanager.sh stop` does)? Specifically I'm
interested in a way to replace a Task Manager that has currently-running
tasks. It would be great if it was possible to terminate a Task Manager
without restarting the job, though I'm not sure if this is possible.

Context: at my work we regularly cycle our hosts for maintenance and
security. Each time we do this we stop the task manager running on the host
being cycled. This causes the entire job to restart, resulting in downtime
for the job. I'd love to decrease this downtime if at all possible.

Thanks! Any insight is appreciated!

Best,

Aaron Levin


Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-29 Thread Aaron Levin
Hi Ufuk,

I'll answer your question, but first I'll give you an update on how we
resolved the issue:

* adding `org.apache.hadoop.io.compress.SnappyCodec` to
`classloader.parent-first-patterns.additional` in `flink-conf.yaml`
(though, putting `org.apache.hadoop.util.NativeCodeLoader` also worked)
* putting a jar with `hadoop-common` + it's transitive dependencies, then
using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and
its transitive dependencies). So we end up with jar that has `SnappyCodec`
and whatever it needs to call transitively. We put this jar on the task
manager classpath.

I believe `SnappyCodec` was being called via our code. This worked the
first time but deploying a second time caused `libhadoop.so` to be loaded
in a second class loader. By putting a jar with `SnappyCodec` and it's
transitive dependencies on the task manager classpath and specifying that
`SnappyCodec` needs to be loaded from the parent classloader, we ensure
that only one classloader loads `libhadoop.so`. I don't think this is the
best way to achieve what we want, but it works for now.

Next steps: if no one is on it, I can take a stab at updating the
documentation to clarify how to debug and resolve Native library loading.
This was a nice learning experience and I think it'll be helpful to have
this in the docs for those who aren't well-versed in how classloading on
the JVM works!

To answer your questions:

1. We install hadoop on our machines and tell flink task managers to access
it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in
`flink-conf.yaml`
2. We put flink's shaded hadoop-fs-s3 on both the task manager and job
manager classpath (I believe this is only used by the Job Managers when
they interact with S3 for checkpoints etc. I don't believe any user code is
using this).
3. Our flink applications consist of a "fat jar" that has some
`org.apache.hadoop` dependencies bundled with it. I believe this is the
source of why we're loading `SnappyCodec` twice and triggering this issue.
4. For example code: we have a small wrapper around
`org.apache.flink.api.common.io.FileInputFormat` which does the work with
sequence files. It looks like (after removing some stuff to make it more
clear):

```
abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <:
Writable](
typeInformation: TypeInformation[T]
) extends FileInputFormat[T]
with ResultTypeQueryable[T] {
  @transient private var bufferedNextRecord: T = _
  @transient private var hadoopStream: HadoopFSDataInputStream = _
  @transient private var sequenceFileReader: SequenceFile.Reader = _

  unsplittable = true
  enumerateNestedFiles = true

  // *
  // This is where we'd see exceptions.
  // *
  override def open(fileSplit: FileInputSplit): Unit = {
super.open(fileSplit)
val config = new Configuration()
hadoopStream = WrappedHadoopInputStream.wrap(stream)
sequenceFileReader = new SequenceFile.Reader(config,
SequenceFile.Reader.stream(hadoopStream))
bufferNextRecord()
  }
...
}

// AND

class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream)
extends InputStream
with Seekable
with PositionedReadable {

  def read(): Int = underlying.read()
  def seek(pos: Long): Unit = underlying.seek(pos)
  def getPos: Long = underlying.getPos
}
...
```

Thanks for all your help, I appreciate it! I wouldn't have been able to
debug and resolve this if it wasn't for you filing the ticket. Thank you so
much!

[0] https://github.com/pantsbuild/jarjar

Aaron Levin

On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi  wrote:

> Hey Aaron,
>
> sorry for the late reply (again).
>
> (1) I think that your final result is in line with what I have
> reproduced in https://issues.apache.org/jira/browse/FLINK-11402.
>
> (2) I think renaming the file would not help as it will still be
> loaded multiple times when the jobs restarts (as it happens in
> FLINK-11402).
>
> (3) I'll try to check whether Flink's shading of Hadoop is related to
> this. I don't think so though. @Chesnay (cc'd): What do you think?
>
> (4) @Aaron: Can you tell me which Hadoop libraries you use and share
> some code so I can try to reproduce this exactly on my side? Judging
> from the earlier stack traces you have shared, I'm assuming you are
> trying to read Snappy-compressed sequence files.
>
> – Ufuk
>
> On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin  wrote:
> >
> > I don't control the code calling `System.loadLibrary("hadoop")` so
> that's not an option for me, unfortunately.
> >
> > On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma  wrote:
> >>
> >> This may be caused by a  jvm process can only load a so once.So a triky
> way is to rename it。
> >>
> >> 发自我的 iPhone
> >>
> >> 在 2019年1月25日,上午7:12

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-24 Thread Aaron Levin
Hi Ufuk,

I'm starting to believe the bug is much deeper than the originally reported
error because putting the libraries in `/usr/lib` or `/lib` does not work.
This morning I dug into why putting `libhadoop.so` into `/usr/lib` didn't
work, despite that being in the `java.library.path` at the call site of the
error. I wrote a small program to test the loading of native libraries, and
it was able to successfully load `libhadoop.so`. I'm very perplexed. Could
this be related to the way flink shades hadoop stuff?

Here is my program and its output:

```
$ cat LibTest.scala
package com.redacted.flink

object LibTest {
  def main(args: Array[String]): Unit = {
val library = args(0)

System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
System.out.println(s"Attempting to load $library")
System.out.flush()
System.loadLibrary(library)
System.out.println(s"Successfully loaded ")
System.out.flush()
}
```

I then tried running that on one of the task managers with `hadoop` as an
argument:

```
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in
java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at com.stripe.flink.LibTest$.main(LibTest.scala:11)
at com.stripe.flink.LibTest.main(LibTest.scala)
```

I then copied the native libraries into `/usr/lib/` and ran it again:

```
$ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
$ java -jar lib_test_deploy.jar hadoop
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
Attempting to load hadoop
Successfully loaded
```

Any ideas?

On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin  wrote:

> Hi Ufuk,
>
> One more update: I tried copying all the hadoop native `.so` files (mainly
> `libhadoop.so`) into `/lib` and am I still experiencing the issue I
> reported. I also tried naively adding the `.so` files to the jar with the
> flink application and am still experiencing the issue I reported (however,
> I'm going to investigate this further as I might not have done it
> correctly).
>
> Best,
>
> Aaron Levin
>
> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin  wrote:
>
>> Hi Ufuk,
>>
>> Two updates:
>>
>> 1. As suggested in the ticket, I naively copied the every `.so` in
>> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My
>> knowledge of how shared libs get picked up is hazy, so I'm not sure if
>> blindly copying them like that should work. I did check what
>> `System.getProperty("java.library.path")` returns at the call-site and
>> it's: 
>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> 2. The exception I see comes from
>> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below).
>> This uses `System.loadLibrary("hadoop")`.
>>
>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError:
>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>> [2019-01-23 19:52:33.081376]  at
>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
>> [2019-01-23 19:52:33.081406]  at
>> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
>> [2019-01-23 19:52:33.081429]  at
>> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
>> [2019-01-23 19:52:33.081457]  at
>> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
>> [2019-01-23 19:52:33.081494]  at
>> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
>> [2019-01-23 19:52:33.081517]  at
>> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
>> [2019-01-23 19:52:33.081549]  at
>> org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)
>> ... (redacted) ...
>> [2019-01-23 19:52:33.081728]  at
>> scala.collection.immutable.List.foreach(List.scala:392)
>> ... (redacted) ...
>> [2019-01-23 19:52:33.081832]  at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>> [2019-01-23 19:52:33.081854]  at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>> [2019-01-23 19:52:33.081882]  at
>> org.apache.flink.s

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-23 Thread Aaron Levin
Hi Ufuk,

One more update: I tried copying all the hadoop native `.so` files (mainly
`libhadoop.so`) into `/lib` and am I still experiencing the issue I
reported. I also tried naively adding the `.so` files to the jar with the
flink application and am still experiencing the issue I reported (however,
I'm going to investigate this further as I might not have done it
correctly).

Best,

Aaron Levin

On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin  wrote:

> Hi Ufuk,
>
> Two updates:
>
> 1. As suggested in the ticket, I naively copied the every `.so` in
> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My
> knowledge of how shared libs get picked up is hazy, so I'm not sure if
> blindly copying them like that should work. I did check what
> `System.getProperty("java.library.path")` returns at the call-site and
> it's: 
> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
> 2. The exception I see comes from
> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below).
> This uses `System.loadLibrary("hadoop")`.
>
> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError:
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
> [2019-01-23 19:52:33.081376]  at
> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
> [2019-01-23 19:52:33.081406]  at
> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
> [2019-01-23 19:52:33.081429]  at
> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
> [2019-01-23 19:52:33.081457]  at
> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
> [2019-01-23 19:52:33.081494]  at
> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
> [2019-01-23 19:52:33.081517]  at
> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
> [2019-01-23 19:52:33.081549]  at
> org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)
> ... (redacted) ...
> [2019-01-23 19:52:33.081728]  at
> scala.collection.immutable.List.foreach(List.scala:392)
> ... (redacted) ...
> [2019-01-23 19:52:33.081832]  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> [2019-01-23 19:52:33.081854]  at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> [2019-01-23 19:52:33.081882]  at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> [2019-01-23 19:52:33.081904]  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> [2019-01-23 19:52:33.081946]  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> [2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)
>
> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin  wrote:
>
>> Hey Ufuk,
>>
>> So, I looked into this a little bit:
>>
>> 1. clarification: my issues are with the hadoop-related snappy libraries
>> and not libsnappy itself (this is my bad for not being clearer, sorry!). I
>> already have `libsnappy` on my classpath, but I am looking into including
>> the hadoop snappy libraries.
>> 2. exception: I don't see the class loading error. I'm going to try to
>> put some more instrumentation and see if I can get a clearer stacktrace
>> (right now I get an NPE on closing a sequence file in a finalizer - when I
>> last logged the exception it was something deep in hadoop's snappy libs -
>> I'll get clarification soon).
>> 3. I'm looking into including hadoop's snappy libs in my jar and we'll
>> see if that resolves the problem.
>>
>> Thanks again for your help!
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin 
>> wrote:
>>
>>> Hey,
>>>
>>> Thanks so much for the help! This is awesome. I'll start looking into
>>> all of this right away and report back.
>>>
>>> Best,
>>>
>>> Aaron Levin
>>>
>>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi  wrote:
>>>
>>>> Hey Aaron,
>>>>
>>>> sorry for the late reply.
>>>>
>>>> (1) I think I was able to reproduce this issue using snappy-java. I've
>>>> filed a ticket here:
>>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
>>>> ticket description whether it's in line with what you are
>>>> experiencing? Most importantly, do you see the same Exception being
>>>> reported after cancelling a

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-23 Thread Aaron Levin
Hi Ufuk,

Two updates:

1. As suggested in the ticket, I naively copied the every `.so` in
`hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. My
knowledge of how shared libs get picked up is hazy, so I'm not sure if
blindly copying them like that should work. I did check what
`System.getProperty("java.library.path")` returns at the call-site and
it's: 
java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2. The exception I see comes from
`hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace below).
This uses `System.loadLibrary("hadoop")`.

[2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError:
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
[2019-01-23 19:52:33.081376]  at
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
[2019-01-23 19:52:33.081406]  at
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
[2019-01-23 19:52:33.081429]  at
org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
[2019-01-23 19:52:33.081457]  at
org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
[2019-01-23 19:52:33.081494]  at
org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
[2019-01-23 19:52:33.081517]  at
org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
[2019-01-23 19:52:33.081549]  at
org.apache.hadoop.io.SequenceFile$Reader.(SequenceFile.java:1872)
... (redacted) ...
[2019-01-23 19:52:33.081728]  at
scala.collection.immutable.List.foreach(List.scala:392)
... (redacted) ...
[2019-01-23 19:52:33.081832]  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
[2019-01-23 19:52:33.081854]  at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
[2019-01-23 19:52:33.081882]  at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
[2019-01-23 19:52:33.081904]  at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
[2019-01-23 19:52:33.081946]  at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
[2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)

On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin  wrote:

> Hey Ufuk,
>
> So, I looked into this a little bit:
>
> 1. clarification: my issues are with the hadoop-related snappy libraries
> and not libsnappy itself (this is my bad for not being clearer, sorry!). I
> already have `libsnappy` on my classpath, but I am looking into including
> the hadoop snappy libraries.
> 2. exception: I don't see the class loading error. I'm going to try to put
> some more instrumentation and see if I can get a clearer stacktrace (right
> now I get an NPE on closing a sequence file in a finalizer - when I last
> logged the exception it was something deep in hadoop's snappy libs - I'll
> get clarification soon).
> 3. I'm looking into including hadoop's snappy libs in my jar and we'll see
> if that resolves the problem.
>
> Thanks again for your help!
>
> Best,
>
> Aaron Levin
>
> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin 
> wrote:
>
>> Hey,
>>
>> Thanks so much for the help! This is awesome. I'll start looking into all
>> of this right away and report back.
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi  wrote:
>>
>>> Hey Aaron,
>>>
>>> sorry for the late reply.
>>>
>>> (1) I think I was able to reproduce this issue using snappy-java. I've
>>> filed a ticket here:
>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
>>> ticket description whether it's in line with what you are
>>> experiencing? Most importantly, do you see the same Exception being
>>> reported after cancelling and re-starting the job?
>>>
>>> (2) I don't think it's caused by the environment options not being
>>> picked up. You can check the head of the log files of the JobManager
>>> or TaskManager to verify that your provided option is picked up as
>>> expected. You should see something similar to this:
>>>
>>> 2019-01-21 22:53:49,863 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>>
>>> 
>>> 2019-01-21 22:53:49,864 INFO
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
>>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>>> ...
>>> 2019-01-21 22:53:4

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-22 Thread Aaron Levin
Hey Ufuk,

So, I looked into this a little bit:

1. clarification: my issues are with the hadoop-related snappy libraries
and not libsnappy itself (this is my bad for not being clearer, sorry!). I
already have `libsnappy` on my classpath, but I am looking into including
the hadoop snappy libraries.
2. exception: I don't see the class loading error. I'm going to try to put
some more instrumentation and see if I can get a clearer stacktrace (right
now I get an NPE on closing a sequence file in a finalizer - when I last
logged the exception it was something deep in hadoop's snappy libs - I'll
get clarification soon).
3. I'm looking into including hadoop's snappy libs in my jar and we'll see
if that resolves the problem.

Thanks again for your help!

Best,

Aaron Levin

On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin  wrote:

> Hey,
>
> Thanks so much for the help! This is awesome. I'll start looking into all
> of this right away and report back.
>
> Best,
>
> Aaron Levin
>
> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi  wrote:
>
>> Hey Aaron,
>>
>> sorry for the late reply.
>>
>> (1) I think I was able to reproduce this issue using snappy-java. I've
>> filed a ticket here:
>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
>> ticket description whether it's in line with what you are
>> experiencing? Most importantly, do you see the same Exception being
>> reported after cancelling and re-starting the job?
>>
>> (2) I don't think it's caused by the environment options not being
>> picked up. You can check the head of the log files of the JobManager
>> or TaskManager to verify that your provided option is picked up as
>> expected. You should see something similar to this:
>>
>> 2019-01-21 22:53:49,863 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>
>> 
>> 2019-01-21 22:53:49,864 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>> ...
>> 2019-01-21 22:53:49,865 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
>> Options:
>> 2019-01-21 22:53:49,865 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Xms1024m
>> 2019-01-21 22:53:49,865 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Xmx1024m
>> You are looking for this line > 2019-01-21 22:53:49,865 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <
>> 2019-01-21 22:53:49,865 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
>> ...
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> Program Arguments:
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> --configDir
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> /.../flink-1.7.0/conf
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> --executionMode
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>> cluster
>> ...
>> 2019-01-21 22:53:49,866 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>>
>> ----
>>
>> Can you verify that you see the log messages as expected?
>>
>> (3) As noted FLINK-11402, is it possible to package the snappy library
>> as part of your user code instead of loading the library via
>> java.library.path? In my example, that seems to work fine.
>>
>> – Ufuk
>>
>> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin 
>> wrote:
>> >
>> > Hello!
>> >
>> > *tl;dr*: settings in `env.java.opts` seem to stop having impact when a
>> job is canceled or fails and then is restarted (with or without
>> savepoint/checkpoints). If I restart the task-managers, the `env.java.opts`
>> seem to start having impact again and our job will run without failure.
>> More below.
>> >
>> > We use consume Snappy-compressed sequence files in our flink job. This
>> requires access to the hadoop native libraries. In our `

Re: `env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-22 Thread Aaron Levin
Hey,

Thanks so much for the help! This is awesome. I'll start looking into all
of this right away and report back.

Best,

Aaron Levin

On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi  wrote:

> Hey Aaron,
>
> sorry for the late reply.
>
> (1) I think I was able to reproduce this issue using snappy-java. I've
> filed a ticket here:
> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
> ticket description whether it's in line with what you are
> experiencing? Most importantly, do you see the same Exception being
> reported after cancelling and re-starting the job?
>
> (2) I don't think it's caused by the environment options not being
> picked up. You can check the head of the log files of the JobManager
> or TaskManager to verify that your provided option is picked up as
> expected. You should see something similar to this:
>
> 2019-01-21 22:53:49,863 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>
> 
> 2019-01-21 22:53:49,864 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
> ...
> 2019-01-21 22:53:49,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -  JVM
> Options:
> 2019-01-21 22:53:49,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Xms1024m
> 2019-01-21 22:53:49,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Xmx1024m
> You are looking for this line > 2019-01-21 22:53:49,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <
> 2019-01-21 22:53:49,865 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
> ...
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> Program Arguments:
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> --configDir
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> /.../flink-1.7.0/conf
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> --executionMode
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
> cluster
> ...
> 2019-01-21 22:53:49,866 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint -
>
> 
>
> Can you verify that you see the log messages as expected?
>
> (3) As noted FLINK-11402, is it possible to package the snappy library
> as part of your user code instead of loading the library via
> java.library.path? In my example, that seems to work fine.
>
> – Ufuk
>
> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin  wrote:
> >
> > Hello!
> >
> > *tl;dr*: settings in `env.java.opts` seem to stop having impact when a
> job is canceled or fails and then is restarted (with or without
> savepoint/checkpoints). If I restart the task-managers, the `env.java.opts`
> seem to start having impact again and our job will run without failure.
> More below.
> >
> > We use consume Snappy-compressed sequence files in our flink job. This
> requires access to the hadoop native libraries. In our `flink-conf.yaml`
> for both the task manager and the job manager, we put:
> >
> > ```
> > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
> > ```
> >
> > If I launch our job on freshly-restarted task managers, the job operates
> fine. If at some point I cancel the job or if the job restarts for some
> other reason, the job will begin to crashloop because it tries to open a
> Snappy-compressed file but doesn't have access to the codec from the native
> hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the
> task manager while the job is crashlooping, the job is start running
> without any codec failures.
> >
> > The only reason I can conjure that would cause the Snappy compression to
> fail is if the `env.java.opts` were not being passed through to the job on
> restart for some reason.
> >
> > Does anyone know what's going on? Am I missing some additional
> configuration? I really appreciate any help!
> >
> > About our setup:
> >
> > - Flink Version: 1.7.0
> > - Deployment: Standalone in HA
> > - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s
> shaded jars to access our files in S3. We do not use the
> `bundled-with-hadoop` distribution of Flink.
> >
> > Best,
> >
> > Aaron Levin
>


`env.java.opts` not persisting after job canceled or failed and then restarted

2019-01-17 Thread Aaron Levin
Hello!

*tl;dr*: settings in `env.java.opts` seem to stop having impact when a job
is canceled or fails and then is restarted (with or without
savepoint/checkpoints). If I restart the task-managers, the `env.java.opts`
seem to start having impact again and our job will run without failure.
More below.

We use consume Snappy-compressed sequence files in our flink job. This
requires access to the hadoop native libraries. In our `flink-conf.yaml`
for both the task manager and the job manager, we put:

```
env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
```

If I launch our job on freshly-restarted task managers, the job operates
fine. If at some point I cancel the job or if the job restarts for some
other reason, the job will begin to crashloop because it tries to open a
Snappy-compressed file but doesn't have access to the codec from the native
hadoop libraries in `/usr/local/hadoop/lib/native`. If I then restart the
task manager while the job is crashlooping, the job is start running
without any codec failures.

The only reason I can conjure that would cause the Snappy compression to
fail is if the `env.java.opts` were not being passed through to the job on
restart for some reason.

Does anyone know what's going on? Am I missing some additional
configuration? I really appreciate any help!

About our setup:

- Flink Version: 1.7.0
- Deployment: Standalone in HA
- Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use Flink’s
shaded jars to access our files in S3. We do not use the
`bundled-with-hadoop` distribution of Flink.

Best,

Aaron Levin


Re: [Flink 1.7.0] always got 10s ask timeout exception when submitting job with checkpoint via REST

2019-01-10 Thread Aaron Levin
We are also experiencing this! Thanks for speaking up! It's relieving to
know we're not alone :)

We tried adding `akka.ask.timeout: 1 min` to our `flink-conf.yaml`, which
did not seem to have any effect. I tried adding every other related akka,
rpc, etc. timeout and still continue to encounter these errors. I believe
they may also impact our ability to deploy (as we get a timeout when
submitting the job programmatically). I'd love to see a solution to this if
one exists!

Best,

Aaron Levin

On Thu, Jan 10, 2019 at 2:58 PM Steven Wu  wrote:

> We are trying out Flink 1.7.0. We always get this exception when
> submitting a job with external checkpoint via REST. Job parallelism is
> 1,600. state size is probably in the range of 1-5 TBs. Job is actually
> started. Just REST api returns this failure.
>
> If we submitting the job without external checkpoint, everything works
> fine.
>
> Anyone else see such problem with 1.7? Appreciate your help!
>
> Thanks,
> Steven
>
> org.apache.flink.runtime.rest.handler.RestHandlerException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$4(JarRunHandler.java:114)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
> at
> java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
> at
> java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
> at
> java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
> ... 21 more
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#-641142843]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> ... 9 more
>


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-12 Thread Aaron Levin
Hi Aljoscha,

Thanks! I will look into this.

Best,

Aaron Levin

On Fri, Nov 9, 2018 at 5:01 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> I think for this case a model that is similar to how the Streaming File
> Source works should be good. You can have a look at
> ContinuousFileMonitoringFunction and ContinuousFileReaderOperator. The
> idea is that the first emits splits that should be processed and the second
> is responsible for reading those splits. A generic version of that is what
> I'm proposing for the refactoring of our source interface [1] that also
> comes with a prototype implementation [2].
>
> I think something like this should be adaptable to your case. The split
> enumerator would at first only emit file splits downstream, after that it
> would emit Kafka partitions that should be read. The split reader would
> understand both file splits and kafka partitions and can read from both.
> This still has some kinks to be worked out when it comes to watermarks,
> FLIP-27 is not finished.
>
> What do you think?
>
> Best,
> Aljoscha
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 27%3A+Refactor+Source+Interface
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
> [2] https://github.com/aljoscha/flink/commits/refactor-source-interface
>
>
> On 1. Nov 2018, at 16:50, Aaron Levin  wrote:
>
> Hey,
>
> Thanks for reaching out! I'd love to take a step back and find a better
> solution, so I'll try to be succint in what I'm trying to accomplish:
>
> We're trying to write a SourceFunction which:
> * reads some Sequence files from S3 in a particular order (each task gets
> files in a specific order).
> * sends a watermark between each sequence file
> * when that's complete, starts reading from Kafka topics.
> * (This is similar to the bootstrap problem which Lyft has talked about
> (see: https://www.slideshare.net/FlinkForward/flink-forward-
> san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink))
>
> The current solution I have involves a custom InputFormat, InputSplit, and
> SplitAssignor. It achieves most of these requirements, except I have to
> extend InputFormatSourceFunction. I have a class that looks like:
>
> class MySourceFunction(val s3Archives: CustomInputFormat, val kafka:
> KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}
>
> There are lots I don't like about the existing solution:
> * I have to extend InputFormatSourceFunction to ensure the graph is
> initialized properly (the bug I wrote about)
> * I had to replicate most of the implementation of
> InputFormatSourceFunction so I could insert Watermarks between splits.
>
> I'd love any suggestions around improving this!
>
> Best,
>
> Aaron Levin
>
> On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek 
> wrote:
>
>> Hi Aaron,
>>
>> I'l like to take a step back and understand why you're trying to wrap an
>> InputFormatSourceFunction?
>>
>> In my opinion, InputFormatSourceFunction should not be used because it
>> has some shortcomings, the most prominent among them that it does not
>> support checkpointing, i.e. in case of failure all data will (probably) be
>> read again. I'm saying probably because the interaction of
>> InputFormatSourceFunction with how InputSplits are generated (which relates
>> to that code snippet with the cast you found) could be somewhat "spooky"
>> and lead to weird results in some cases.
>>
>> The interface is a remnant of a very early version of the streaming API
>> and should probably be removed soon. I hope we can find a better solution
>> for your problem that fits better with Flink.
>>
>> Best,
>> Aljoscha
>>
>> On 1. Nov 2018, at 15:30, Aaron Levin  wrote:
>>
>> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
>> provide any insight or advice, that would be helpful!
>>
>> Thanks again.
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin 
>> wrote:
>>
>>> Hey,
>>>
>>> Not sure how convo threading works on this list, so in case the folks
>>> CC'd missed my other response, here's some more info:
>>>
>>> First, I appreciate everyone's help! Thank you!
>>>
>>> I wrote several wrappers to try and debug this, including one which is
>>> an exact copy of `InputFormatSourceFunction` which also failed. They all
>>> failed with the same error I detail above. I'll post two of them below.
>>> They all extended `RichParallelSourceFunction` and, as far 

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-01 Thread Aaron Levin
Hey,

Thanks for reaching out! I'd love to take a step back and find a better
solution, so I'll try to be succint in what I'm trying to accomplish:

We're trying to write a SourceFunction which:
* reads some Sequence files from S3 in a particular order (each task gets
files in a specific order).
* sends a watermark between each sequence file
* when that's complete, starts reading from Kafka topics.
* (This is similar to the bootstrap problem which Lyft has talked about
(see:
https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink
))

The current solution I have involves a custom InputFormat, InputSplit, and
SplitAssignor. It achieves most of these requirements, except I have to
extend InputFormatSourceFunction. I have a class that looks like:

class MySourceFunction(val s3Archives: CustomInputFormat, val kafka:
KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}

There are lots I don't like about the existing solution:
* I have to extend InputFormatSourceFunction to ensure the graph is
initialized properly (the bug I wrote about)
* I had to replicate most of the implementation of
InputFormatSourceFunction so I could insert Watermarks between splits.

I'd love any suggestions around improving this!

Best,

Aaron Levin

On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek 
wrote:

> Hi Aaron,
>
> I'l like to take a step back and understand why you're trying to wrap an
> InputFormatSourceFunction?
>
> In my opinion, InputFormatSourceFunction should not be used because it has
> some shortcomings, the most prominent among them that it does not support
> checkpointing, i.e. in case of failure all data will (probably) be read
> again. I'm saying probably because the interaction of
> InputFormatSourceFunction with how InputSplits are generated (which relates
> to that code snippet with the cast you found) could be somewhat "spooky"
> and lead to weird results in some cases.
>
> The interface is a remnant of a very early version of the streaming API
> and should probably be removed soon. I hope we can find a better solution
> for your problem that fits better with Flink.
>
> Best,
> Aljoscha
>
> On 1. Nov 2018, at 15:30, Aaron Levin  wrote:
>
> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
> provide any insight or advice, that would be helpful!
>
> Thanks again.
>
> Best,
>
> Aaron Levin
>
> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin 
> wrote:
>
>> Hey,
>>
>> Not sure how convo threading works on this list, so in case the folks
>> CC'd missed my other response, here's some more info:
>>
>> First, I appreciate everyone's help! Thank you!
>>
>> I wrote several wrappers to try and debug this, including one which is an
>> exact copy of `InputFormatSourceFunction` which also failed. They all
>> failed with the same error I detail above. I'll post two of them below.
>> They all extended `RichParallelSourceFunction` and, as far as I could tell,
>> were properly initialized (though I may have missed something!).
>> Additionally, for the two below, if I change `extends
>> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
>> I no longer receive the exception. This is what led me to believe the
>> source of the issue was casting and how I found the line of code where the
>> stream graph is given the input format.
>>
>> Quick explanation of the wrappers:
>> 1. `WrappedInputFormat` does a basic wrap around
>> `InputFormatSourceFunction` and delegates all methods to the underlying
>> `InputFormatSourceFunction`
>> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
>> `InputFormatSourceFunction` source.
>> 3. They're being used in a test which looks vaguely like:
>> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
>> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str
>> ing]]))).javaStream).asScala.toSeq`
>>
>> class WrappedInputFormat[A](
>>   inputFormat: InputFormatSourceFunction[A]
>> )(
>>   implicit typeInfo: TypeInformation[A]
>> ) extends RichParallelSourceFunction[A] {
>>
>>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
>> = {
>> inputFormat.run(sourceContext)
>>   }
>>   override def setRuntimeContext(t: RuntimeContext): Unit = {
>> inputFormat.setRuntimeContext(t)
>>   }
>>   override def equals(obj: scala.Any) = {
>> inputFormat.equals(obj)
>>   }
>>   override def hashCode() = { inputFormat.hashCode() }
>>   override def toString = { inputFormat.toString }
>>   override def get

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-11-01 Thread Aaron Levin
Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
provide any insight or advice, that would be helpful!

Thanks again.

Best,

Aaron Levin

On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin  wrote:

> Hey,
>
> Not sure how convo threading works on this list, so in case the folks CC'd
> missed my other response, here's some more info:
>
> First, I appreciate everyone's help! Thank you!
>
> I wrote several wrappers to try and debug this, including one which is an
> exact copy of `InputFormatSourceFunction` which also failed. They all
> failed with the same error I detail above. I'll post two of them below.
> They all extended `RichParallelSourceFunction` and, as far as I could tell,
> were properly initialized (though I may have missed something!).
> Additionally, for the two below, if I change `extends
> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
> I no longer receive the exception. This is what led me to believe the
> source of the issue was casting and how I found the line of code where the
> stream graph is given the input format.
>
> Quick explanation of the wrappers:
> 1. `WrappedInputFormat` does a basic wrap around
> `InputFormatSourceFunction` and delegates all methods to the underlying
> `InputFormatSourceFunction`
> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
> `InputFormatSourceFunction` source.
> 3. They're being used in a test which looks vaguely like:
> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str
> ing]]))).javaStream).asScala.toSeq`
>
> class WrappedInputFormat[A](
>   inputFormat: InputFormatSourceFunction[A]
> )(
>   implicit typeInfo: TypeInformation[A]
> ) extends RichParallelSourceFunction[A] {
>
>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
> = {
> inputFormat.run(sourceContext)
>   }
>   override def setRuntimeContext(t: RuntimeContext): Unit = {
> inputFormat.setRuntimeContext(t)
>   }
>   override def equals(obj: scala.Any) = {
> inputFormat.equals(obj)
>   }
>   override def hashCode() = { inputFormat.hashCode() }
>   override def toString = { inputFormat.toString }
>   override def getRuntimeContext(): RuntimeContext = {
> inputFormat.getRuntimeContext }
>   override def getIterationRuntimeContext = {
> inputFormat.getIterationRuntimeContext }
>   override def open(parameters: Configuration): Unit = {
> inputFormat.open(parameters)
>   }
>   override def cancel(): Unit = {
> inputFormat.cancel()
>   }
>   override def close(): Unit = {
> inputFormat.close()
>   }
> }
>
> And the other one:
>
> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
> InputSplit], val typeInfo: TypeInformation[A]) extends
> RichParallelSourceFunction[A] {
>
>   @transient private var provider: InputSplitProvider = _
>   @transient private var serializer: TypeSerializer[A] = _
>   @transient private var splitIterator: Iterator[InputSplit] = _
>   private var isRunning: Boolean = _
>
>   override def open(parameters: Configuration): Unit = {
> val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
> if(format.isInstanceOf[RichInputFormat[_,_]]) {
>   format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context)
> }
> format.configure(parameters)
>
> provider = context.getInputSplitProvider
> serializer = typeInfo.createSerializer(getR
> untimeContext.getExecutionConfig)
> splitIterator = getInputSplits()
> isRunning = splitIterator.hasNext
>   }
>
>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
> = {
> if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
>   format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
> }
>
> var nextElement: A = serializer.createInstance()
> try {
>   while (isRunning) {
> format.open(splitIterator.next())
> while (isRunning && !format.reachedEnd()) {
>   nextElement = format.nextRecord(nextElement)
>   if (nextElement != null) {
> sourceContext.collect(nextElement)
>   } else {
> break
>   }
>   format.close()
>   if (isRunning) {
> isRunning = splitIterator.hasNext
>   }
> }
>   }
> } finally {
>
>   format.close()
>   if (format.isInstanceOf[RichInputFormat[_,_]]) {
> format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>   }
>   isRunning = false
> }
>   }
>
>   override def 

Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-26 Thread Aaron Levin
  }

if(split != null) {
  nextSplit = split
  true
} else {
  exhausted = true
  false
}
  }

  override def next(): InputSplit = {
if(nextSplit == null && !hasNext) {
  throw new NoSuchElementException()
}
val tmp: InputSplit = nextSplit
nextSplit = null
tmp
  }

}
  }
}


On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz 
wrote:

> Hi Aaron,
>
> Could you share the code of you custom function?
>
> I am also adding Aljosha and Kostas to cc, who should be more helpful on
> that topic.
>
> Best,
>
> Dawid
> On 19/10/2018 20:06, Aaron Levin wrote:
>
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
> stream (via `env.addSource` and a subsequent sink) I get errors related to
> the `InputSplitAssigner` not being initialized for a particular vertex ID.
> Full error here[1].
>
> I believe the underlying error is related to this[0] call to `instanceof
> InputFormatSourceFunction`.
>
> *My questions*:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
> Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so, would
> ya'll be open to a PR which adds an interface one can extend which will set
> the input format in the stream graph? Or is there a preferred way of
> achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] https://github.com/apache/flink/blob/release-1.6/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/api/graph/
> StreamGraphGenerator.java#L480
> [1]
> java.lang.RuntimeException: Could not retrieve next input split.
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
> at REDACTED
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
> Requesting the next input split failed.
> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:69)
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
> ... 8 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
> at java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(
> CompletableFuture.java:1915)
> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:61)
> ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
> at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(
> JobMaster.java:575)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:247)
> ...
>
>


Re: Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-24 Thread Aaron Levin
austed = true
  false
}
  }

  override def next(): InputSplit = {
if(nextSplit == null && !hasNext) {
  throw new NoSuchElementException()
}
val tmp: InputSplit = nextSplit
nextSplit = null
tmp
      }

}
  }
}

Best,

Aaron Levin

On Wed, Oct 24, 2018 at 8:00 AM, Kien Truong 
wrote:

> Hi,
>
> Since InputFormatSourceFunction is a subclass of
> RichParallelSourceFunction, your wrapper should also extend this class.
>
> In addition, remember to overwrite the methods defined in the
> AbstractRichFunction interface and
>
> proxy the call to the underlying InputFormatSourceFunction, in order to
> initialize the underlying source correctly.
>
>
> Best regards,
>
> Kien
>
>
> On 10/20/2018 1:06 AM, Aaron Levin wrote:
>
> Hi,
>
> I'm writing a custom `SourceFunction` which wraps an underlying
> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
> stream (via `env.addSource` and a subsequent sink) I get errors related to
> the `InputSplitAssigner` not being initialized for a particular vertex ID.
> Full error here[1].
>
> I believe the underlying error is related to this[0] call to `instanceof
> InputFormatSourceFunction`.
>
> *My questions*:
>
> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
> Am I missing a chunk of the API covering this?
> 2. is the error I'm experience related to that casting call? If so, would
> ya'll be open to a PR which adds an interface one can extend which will set
> the input format in the stream graph? Or is there a preferred way of
> achieving this?
>
> Thanks!
>
> Aaron Levin
>
> [0] https://github.com/apache/flink/blob/release-1.6/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/api/graph/
> StreamGraphGenerator.java#L480
> [1]
> java.lang.RuntimeException: Could not retrieve next input split.
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
> at REDACTED
> at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.
> open(AbstractUdfStreamOperator.java:102)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:424)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
> Requesting the next input split failed.
> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:69)
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
> ... 8 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
> No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
> at java.util.concurrent.CompletableFuture.reportGet(
> CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(
> CompletableFuture.java:1915)
> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.
> getNextInputSplit(RpcInputSplitProvider.java:61)
> ... 9 more
> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
> cbc357ccb763df2852fee8c4fc7d55f2
> at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(
> JobMaster.java:575)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(
> AkkaRpcActor.java:247)
> ...
>
>


Error with custom `SourceFunction` wrapping `InputFormatSourceFunction` (Flink 1.6)

2018-10-19 Thread Aaron Levin
Hi,

I'm writing a custom `SourceFunction` which wraps an underlying
`InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
stream (via `env.addSource` and a subsequent sink) I get errors related to
the `InputSplitAssigner` not being initialized for a particular vertex ID.
Full error here[1].

I believe the underlying error is related to this[0] call to `instanceof
InputFormatSourceFunction`.

*My questions*:

1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am
I missing a chunk of the API covering this?
2. is the error I'm experience related to that casting call? If so, would
ya'll be open to a PR which adds an interface one can extend which will set
the input format in the stream graph? Or is there a preferred way of
achieving this?

Thanks!

Aaron Levin

[0]
https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480
[1]
java.lang.RuntimeException: Could not retrieve next input split.
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
at REDACTED
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
Requesting the next input split failed.
at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No
InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 9 more
Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
cbc357ccb763df2852fee8c4fc7d55f2
at
org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
...


Re: Scala 2.12 Support

2018-08-16 Thread Aaron Levin
Hi Piotr,

Thanks for the update. Glad to hear it's high on the priority list! I'm
looking forward to the 1.7 update!

It may be worth having someone more official from the Flink team give an
update on that ticket. It wasn't clear if the 1.7 comment from that user
was just a reference to the fact that 1.6 had come out (or where they got
that information). I know a few people have cited the ticket and concluded
"not clear what's going on with Scala 2.12 support." If you have the
bandwidth, a note from you or anyone else would be helpful!

Thanks again!

Best,

Aaron Levin

On Thu, Aug 16, 2018 at 6:04 AM, Piotr Nowojski 
wrote:

> Hi,
>
> Scala 2.12 support is high on our priority list and we hope to have it
> included for the 1.7 release (as you can see in the ticket itself), which
> should happen later this year.
>
> Piotrek
>
>
> On 15 Aug 2018, at 17:59, Aaron Levin  wrote:
>
> Hello!
>
> I'm wondering if there is anywhere I can see Flink's roadmap for Scala
> 2.12 support. The last email I can find on the list for this was back in
> January, and the FLINK-7811[0], the ticket asking for Scala 2.12 support,
> hasn't been updated in a few months.
>
> Recently Spark fixed the ClosureCleaner code to support Scala 2.12[1], and
> from what I can gather this was one of the main barrier for Flink
> supporting Scala 2.12. Given this has been fixed, is there work in progress
> to support Scala 2.12? Any updates on FLINK-7811?
>
> Thanks for your help!
>
> [0] https://issues.apache.org/jira/browse/FLINK-7811
> [1] https://issues.apache.org/jira/browse/SPARK-14540
>
> Best,
>
> Aaron Levin
>
>
>


Scala 2.12 Support

2018-08-15 Thread Aaron Levin
Hello!

I'm wondering if there is anywhere I can see Flink's roadmap for Scala 2.12
support. The last email I can find on the list for this was back in
January, and the FLINK-7811[0], the ticket asking for Scala 2.12 support,
hasn't been updated in a few months.

Recently Spark fixed the ClosureCleaner code to support Scala 2.12[1], and
from what I can gather this was one of the main barrier for Flink
supporting Scala 2.12. Given this has been fixed, is there work in progress
to support Scala 2.12? Any updates on FLINK-7811?

Thanks for your help!

[0] https://issues.apache.org/jira/browse/FLINK-7811
[1] https://issues.apache.org/jira/browse/SPARK-14540

Best,

Aaron Levin