Re: Interesting window behavior with savepoints

2016-05-16 Thread Andrew Whitaker
Thanks Ufuk.

Thanks for explaining. The reasons behind the savepoint being restored
successfully kind of make sense, but it seems like the window type (count
vs time) should be taken into account when restoring savepoints. I don't
actually see anyone doing this, but I would expect flink to complain about
changing windowing semantics between program versions.

On Sat, May 14, 2016 at 3:34 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> For a WindowedStream the uid would be set on the result of the
> apply/reduce/fold call. The WindowedStream itself does not represent an
> operation.
>
> On Fri, 13 May 2016 at 00:20 Ufuk Celebi <u...@apache.org> wrote:
>
>> On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
>> <andrew.whita...@braintreepayments.com> wrote:
>> > From what I've observed, most of the time when Flink can't successfully
>> > restore a checkpoint it throws an exception saying as much. I was
>> expecting
>> > to see that behavior here. Could someone explain why this "works" (as
>> in,
>> > flink accepts the program with the savepoint from the first version of
>> the
>> > program), and if this is a bug?
>>
>> Hey Andrew! Thanks for reporting this.
>>
>> Flink generates operator IDs and uses these to map the state back to
>> the same operator when restoring from a savepoint. We want these IDs
>> to stay the same as long as the program does not change.
>>
>> The ID can either be generated automatically by Flink or manually by the
>> user.
>>
>> The automatically generated ID is based on certain topology attributes
>> like parallelism, operator placement, etc. If the attribute changes,
>> the operator ID changes and you can't map the savepoint state back. If
>> it stays the same, we assume that the program has not changed.
>>
>> The problem in your example is that to Flink both programs look the
>> same with respect to how the IDs are generated: the topology didn't
>> change and both the time and count window are executed by the
>> WindowOperator with an InternalWindowFunction.
>>
>> The recommended way to work with savepoints is to skip the automatic
>> IDs altogether and assign the IDs manually instead. You can do this
>> via the "uid(String)" method of each operator, which gives you
>> fine-grained control over the "versioning" of state:
>>
>> env.addSource(..).uid("my-source")
>>
>> vs.
>>
>> env.addSource(..).uid("my-source-2")
>>
>> The problem I've just noticed is that you can't specify this on
>> WindowedStreams, but only on DataStreams, which is clearly a bug.
>> Furthermore, it might be a good idea to special case windows when
>> automatically generating the IDs.
>>
>> I hope this helps a little with understanding the core problem. If you
>> have further questions, feel free to ask. I will make sure to fix this
>> soon.
>>
>> – Ufuk
>>
>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Re: Interesting window behavior with savepoints

2016-05-12 Thread Andrew Whitaker
"Flink can't successfully restore a checkpoint" should be "Flink can't
successfully restore a savepoint".

On Thu, May 12, 2016 at 3:44 PM, Andrew Whitaker <
andrew.whita...@braintreepayments.com> wrote:

> Hi,
>
> I was recently experimenting with savepoints and various situations in
> which they succeed or fail. I expected this example to fail:
>
> https://gist.github.com/AndrewWhitaker/fa46db04066ea673fe0eda232f0a5ce1
>
> Basically, the first program runs with a count window. The second program
> is identical except that it uses a time window instead of a count window.
>
> From what I've observed, most of the time when Flink can't successfully
> restore a checkpoint it throws an exception saying as much. I was expecting
> to see that behavior here. Could someone explain why this "works" (as in,
> flink accepts the program with the savepoint from the first version of the
> program), and if this is a bug?
>
> Thanks,
>
> --
> Andrew Whitaker | andrew.whita...@braintreepayments.com
>



-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Interesting window behavior with savepoints

2016-05-12 Thread Andrew Whitaker
Hi,

I was recently experimenting with savepoints and various situations in
which they succeed or fail. I expected this example to fail:

https://gist.github.com/AndrewWhitaker/fa46db04066ea673fe0eda232f0a5ce1

Basically, the first program runs with a count window. The second program
is identical except that it uses a time window instead of a count window.

>From what I've observed, most of the time when Flink can't successfully
restore a checkpoint it throws an exception saying as much. I was expecting
to see that behavior here. Could someone explain why this "works" (as in,
flink accepts the program with the savepoint from the first version of the
program), and if this is a bug?

Thanks,

-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com


Re: FromIteratorFunction problems

2016-04-08 Thread Andrew Whitaker
Thanks, that example is helpful. It seems like to use `fromCollection` with
an iterator it must be an iterator that implements serializable, and Java's
built in `Iterator`s don't, unfortunately.

On Thu, Apr 7, 2016 at 6:11 PM, Chesnay Schepler <ches...@apache.org> wrote:

> hmm, maybe i was to quick with linking to the JIRA.
>
> As for an example: you can look at the streaming WindowJoin example. The
> sample data uses an Iterator. (ThrottledIterator)
> Note that the iterator implementation used is part of flink and also
> implements serializable.
>
> On 07.04.2016 22:18, Andrew Whitaker wrote:
>
> Hi,
>
> I'm trying to get a simple example of a source backed by an iterator
> working. Here's the code I've got:
>
> ```
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> List list = Arrays.asList(1, 2);
>
> env.fromCollection(list.iterator(), Integer.class).print();
> ```
>
> I get the following exception:
>
> ```
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Object
> org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91
> not serializable
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
> at braintree.demo.FromIterator.main(FromIterator.java:14)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> ... 11 more
> ```
>
> This kind of makes sense. The root issue seems to be that the list's
> iterator is not serializable. In fact, java.util.Iterator doesn't implement
> Serializable.
>
> I can't seem to find any examples of `FromIteratorFunction` being used in
> the flink codebase. Am I using it wrong?
>
> Thanks!
>
> --
> Andrew Whitaker | andrew.whita...@braintreepayments.com
> --
> Note: this information is confidential. It is prohibited to share, post
> online or otherwise publicize without Braintree's prior written consent.
>
>
>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


FromIteratorFunction problems

2016-04-07 Thread Andrew Whitaker
Hi,

I'm trying to get a simple example of a source backed by an iterator
working. Here's the code I've got:

```
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

List list = Arrays.asList(1, 2);

env.fromCollection(list.iterator(), Integer.class).print();
```

I get the following exception:

```
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Object
org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91
not serializable
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 11 more
```

This kind of makes sense. The root issue seems to be that the list's
iterator is not serializable. In fact, java.util.Iterator doesn't implement
Serializable.

I can't seem to find any examples of `FromIteratorFunction` being used in
the flink codebase. Am I using it wrong?

Thanks!

-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Re: asm IllegalArgumentException with 1.0.0

2016-03-14 Thread Andrew Whitaker
We're having the same issue (we also have a dependency on
flink-connector-elasticsearch). It's only happening to us in IntelliJ
though. Is this the case for you as well?

On Thu, Mar 10, 2016 at 3:20 PM, Zach Cox <zcox...@gmail.com> wrote:

> After some poking around I noticed
> that flink-connector-elasticsearch_2.10-1.0.0.jar contains shaded asm
> classes. If I remove that dependency from my project then I do not get the
> IllegalArgumentException.
>
>
> On Thu, Mar 10, 2016 at 11:51 AM Zach Cox <zcox...@gmail.com> wrote:
>
>> Here are the jars on the classpath when I try to run our Flink job in a
>> local environment (via `sbt run`):
>>
>>
>> https://gist.githubusercontent.com/zcox/0992aba1c517b51dc879/raw/7136ec034c2beef04bd65de9f125ce3796db511f/gistfile1.txt
>>
>> There are many transitive dependencies pulled in from internal library
>> projects that probably need to be cleaned out. Maybe we are including
>> something that conflicts? Or maybe something important is being excluded?
>>
>> Are the asm classes included in Flink jars in some shaded form?
>>
>> Thanks,
>> Zach
>>
>>
>> On Thu, Mar 10, 2016 at 5:06 AM Stephan Ewen <se...@apache.org> wrote:
>>
>>> Dependency shading changed a bit between RC4 and RC5 - maybe a different
>>> minor ASM version is now included in the "test" scope.
>>>
>>> Can you share the dependencies of the problematic project?
>>>
>>> On Thu, Mar 10, 2016 at 12:26 AM, Zach Cox <zcox...@gmail.com> wrote:
>>>
>>>> I also noticed when I try to run this application in a local
>>>> environment, I get the same IllegalArgumentException.
>>>>
>>>> When I assemble this application into a fat jar and run it on a Flink
>>>> cluster using the CLI tools, it seems to run fine.
>>>>
>>>> Maybe my local classpath is missing something that is provided on the
>>>> Flink task managers?
>>>>
>>>> -Zach
>>>>
>>>>
>>>> On Wed, Mar 9, 2016 at 5:16 PM Zach Cox <zcox...@gmail.com> wrote:
>>>>
>>>>> Hi - after upgrading to 1.0.0, I'm getting this exception now in a
>>>>> unit test:
>>>>>
>>>>>IllegalArgumentException:   (null:-1)
>>>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>>>> Source)
>>>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>>>> Source)
>>>>>
>>>>> org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279)
>>>>>
>>>>> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
>>>>>
>>>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
>>>>>
>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
>>>>>
>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
>>>>>
>>>>> The line that causes that exception is just adding
>>>>> a FlinkKafkaConsumer08 source.
>>>>>
>>>>> ClassVisitor [1] seems to throw that IllegalArgumentException when it
>>>>> is not given a valid api version number, but InnerClosureFinder [2] looks
>>>>> fine to me.
>>>>>
>>>>> Any idea what might be causing this? This unit test worked fine with
>>>>> 1.0.0-rc0 jars.
>>>>>
>>>>> Thanks,
>>>>> Zach
>>>>>
>>>>> [1]
>>>>> http://websvn.ow2.org/filedetails.php?repname=asm=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java
>>>>> [2]
>>>>> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279
>>>>>
>>>>>
>>>>>
>>>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Compilation error with Scala case class with private constructor

2016-03-03 Thread Andrew Whitaker
Hi,

I've run up against a compilation error involving a case class with a
private constructor:

[error]
/Users/anwhitaker/code/flink-fold-issue/src/main/scala/TestApp.scala:18:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[List[scala.util.Try[TestApp.Wrapper]]]
[error]   .fold(List[Try[Wrapper]](), new FoldFunction[Tuple2[Int,
Int], List[Try[Wrapper]]] {
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

If I make the constructor public again, the error goes away. I've set up a
simple example that demonstrates the problem here:
https://github.com/AndrewWhitaker/flink-case-class-private-ctor
I've read this article on Flink's website:
https://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception-
but I think my issue is slightly different.

I'm just trying to understand this behavior and if there's a way I can work
around it.

Thanks!

-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com


Re: Error starting job manager in 1.0-SNAPSHOT

2016-01-20 Thread Andrew Whitaker
Stephen,

Thanks so much for the quick response. That worked for me!

On Wed, Jan 20, 2016 at 11:34 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> As of a few weeks ago, there is no "streaming" or "batch" mode any more.
> There is only one mode that handles both.
>
> I think the argument "streaming" passed to the script is then incorrectly
> interpreted as the hostname to bin the JobManager network interface to.
> Then you get the "UnknownHostException".
>
> Simply drop "streaming" from the command line arguments (call 
> ./bin/jobmanager.sh
> start cluster). That should solve it.
>
> Best,
> Stephan
>
>
> On Wed, Jan 20, 2016 at 6:23 PM, Andrew Whitaker <
> andrew.whita...@braintreepayments.com> wrote:
>
>> Hi,
>>
>> I'm getting the following error when attempting to start the job manager:
>>
>> ```
>> ./bin/jobmanager.sh start cluster streaming
>> ```
>>
>> ```
>> 10:51:27,824 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>- Registered UNIX signal handlers for [TERM, HUP, INT]
>> 10:51:27,914 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>- Loading configuration from
>> /Users/anwhitaker/Downloads/flink-1.0-SNAPSHOT 3/conf
>> 10:51:27,922 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>- Starting JobManager without high-availability
>> 10:51:28,034 ERROR org.apache.flink.runtime.jobmanager.JobManager
>>- streaming: unknown error
>> java.net.UnknownHostException: streaming: unknown error
>> at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
>> at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
>> at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
>> at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
>> at java.net.InetAddress.getAllByName(InetAddress.java:1192)
>> at java.net.InetAddress.getAllByName(InetAddress.java:1126)
>> at java.net.InetAddress.getByName(InetAddress.java:1076)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$.parseArgs(JobManager.scala:1955)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$.liftedTree2$1(JobManager.scala:1517)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1516)
>> at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
>> ```
>>
>> I don't think my config has changed. Are there changes from the last few
>> days that could be causing this?
>>
>> Thanks,
>>
>> Andrew Whitaker | andrew.whita...@braintreepayments.com
>>
>
>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Error starting job manager in 1.0-SNAPSHOT

2016-01-20 Thread Andrew Whitaker
Hi,

I'm getting the following error when attempting to start the job manager:

```
./bin/jobmanager.sh start cluster streaming
```

```
10:51:27,824 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Registered UNIX signal handlers for [TERM, HUP, INT]
10:51:27,914 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Loading configuration from
/Users/anwhitaker/Downloads/flink-1.0-SNAPSHOT 3/conf
10:51:27,922 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Starting JobManager without high-availability
10:51:28,034 ERROR org.apache.flink.runtime.jobmanager.JobManager
 - streaming: unknown error
java.net.UnknownHostException: streaming: unknown error
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
at java.net.InetAddress.getAllByName(InetAddress.java:1192)
at java.net.InetAddress.getAllByName(InetAddress.java:1126)
at java.net.InetAddress.getByName(InetAddress.java:1076)
at
org.apache.flink.runtime.jobmanager.JobManager$.parseArgs(JobManager.scala:1955)
at
org.apache.flink.runtime.jobmanager.JobManager$.liftedTree2$1(JobManager.scala:1517)
at
org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1516)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
```

I don't think my config has changed. Are there changes from the last few
days that could be causing this?

Thanks,

Andrew Whitaker | andrew.whita...@braintreepayments.com


Flink+avro integration

2015-10-19 Thread Andrew Whitaker
I'm doing some research on Flink + Avro integration, and I've come across
"org.apache.flink.api.java.io.AvroInputFormat" as a way to create a stream
of Avro objects from a file. I had the following questions:

1. Is this the extent of Flink's integration with Avro? If I wanted to read
Avro-serialized objects from a Kafka stream, would I have to write
something to do this or is this functionality already built somewhere?

2. Is there an analogous InputFormat in Flink's Scala API? If not, what's
the recommended way to work with Avro objects in Scala using Flink?

Thanks,

-- 
Andrew Whitaker
aawhita...@gmail.com | 540-521-5299 | @andrewwhitaker