Re: Checking actual config values used by TaskManager

2016-04-28 Thread Timur Fayruzov
If you're talking about parameters that were set on JVM startup then `ps
aux|grep flink` on an EMR slave node should do the trick, that'll give you
the full command line.

On Thu, Apr 28, 2016 at 9:00 PM, Ken Krugler 
wrote:

> Hi all,
>
> I’m running jobs on EMR via YARN, and wondering how to check exactly what
> configuration settings are actually being used.
>
> This is mostly for the TaskManager.
>
> I know I can modify the conf/flink-conf.yaml file, and (via the CLI) I can
> use -yD param=value.
>
> But my experience with Hadoop makes me want to see the exact values being
> used, versus assuming I know what’s been set :)
>
> Thanks,
>
> — Ken
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>


Re: Command line arguments getting munged with CLI?

2016-04-27 Thread Timur Fayruzov
Hi Ken,

I have built parameter parser in my jar to work with '--' instead of '-'
and it works fine (on 1.0.0 and on current master). After a cursory look at
parameter parser Flink uses (http://commons.apache.org/proper/commons-cli/)
it seems that double vs single dash could make a difference, so you could
try it.


On Tue, Apr 26, 2016 at 11:45 AM, Ken Krugler 
wrote:

> Hi all,
>
> I’m running this command, on the master in an EMR cluster:
>
> ./bin/flink run -m yarn-cluster -yn 25 -yjm 1024 -ytm 4096 -c 
>  -planner flink -inputdir xxx
>
> Everything seems to be starting up fine, up to:
>
> All TaskManagers are connected
> Using the parallelism provided by the remote cluster (25). To use another
> parallelism, set it at the ./bin/flink client.
>
> My main class then tries to parse the passed arguments, and fails.
>
> The arguments being passed to my main() class aren’t what I expect. I
> get...
>
> -p
> lanner
> flink
> inputdir
> s3n://su-wikidump/wikidump-20151112/data/
>
> It looks like Flink is trying to process all of my arguments, so -planner
> looks like “-p”, and -inputdir gets the ‘-‘ consumed before it realizes
> that there is no ‘-i’ parameter that it knows about.
>
> I was assuming that everything after the jar parameter would be sent
> through as is.
>
> Any ideas?
>
> Thanks,
>
> — Ken
>
> PS - this is with flink 1.0.2
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>


Re: "No more bytes left" at deserialization

2016-04-27 Thread Timur Fayruzov
Hi Ken,

Good point actually, thanks for pointing this out. In Flink project I see
that there is dependency on 2.24 and then I see transitive dependencies
through twitter.carbonite has a dependency on 2.21. Also, twitter.chill
that is used to manipulate Kryo as far as I understand, shows up with
versions 0.7.4 (Flink) and 0.3.5 (carbonite again). I wonder if this could
cause issues since if classes are not deduplicated class loading order
could be different on different machines. The maven project setup is fairly
complicated and I'm not a maven expert, so I would appreciate a second look
on that.

Thanks,
Timur


On Tue, Apr 26, 2016 at 6:51 PM, Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> I don’t know if this is helpful, but I’d run into a similar issue (array
> index out of bounds during Kryo deserialization) due to having a different
> version of Kryo on the classpath.
>
> — Ken
>
> On Apr 26, 2016, at 6:23pm, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
>
> I built master with scala 2.11 and hadoop 2.7.1, now get a different
> exception (still serialization-related though):
>
> java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
> at
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162))
> -> Filter (Filter at
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))'
> , caused an error: Error obtaining the sorted input: Thread 'SortMerger
> Reading Thread' terminated due to an exception: Index: 97, Size: 11
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
> Thread 'SortMerger Reading Thread' terminated due to an exception: Index:
> 97, Size: 11
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> at
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> at
> org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> terminated due to an exception: Index: 97, Size: 11
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11
> at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> at java.util.ArrayList.get(ArrayList.java:429)
> at
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
> at
> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75)
> at
> org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
> at
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
>
>
> On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <trohrm...@apache.org>
> 

Re: "No more bytes left" at deserialization

2016-04-26 Thread Timur Fayruzov
I built master with scala 2.11 and hadoop 2.7.1, now get a different
exception (still serialization-related though):

java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup
at
com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162))
-> Filter (Filter at
com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))'
, caused an error: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: Index: 97, Size: 11
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception: Index:
97, Size: 11
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at
org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: Index: 97, Size: 11
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
at
org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75)
at
org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106)
at
org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at
org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163)
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)



On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Then let's keep finger crossed that we've found the culprit :-)
>
> On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
>
>> Thank you Till.
>>
>> I will try to run with new binaries today. As I have mentioned, the error
>> is reproducible only on a full dataset, so coming up with sample input data
>> may be problematic (not to mention that the real data can't be shared).
>> I'll see if I can replicate it, but could take a bit longer. Thank you very
>> much for your effort.
>>
>> On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Timur,
>>>
>>> I’ve got good and not so good news. Let’s start with the not so good
>>> news. I couldn’t reproduce your problem but the good news is that I found a
>>> bug in the duplication logic of the OptionSerializer. I’ve already
>>> committed a patch to the master to fix it.
>>>
>>> Thus, I wanted to ask you, whether you could try out the latest master
>>> and check whether your problem still persists. If that’s the case, could
>>> you send me your complete code with sample input data which reproduces your
>>> problem?
>>>
>>> Cheers,
>>>

Re: Job hangs

2016-04-26 Thread Timur Fayruzov
Robert, Ufuk, logs, execution plan and a screenshot of the console are in
the archive:
https://www.dropbox.com/s/68gyl6f3rdzn7o1/debug-stuck.tar.gz?dl=0

Note that when I looked in the backpressure view I saw back pressure 'high'
on following paths:

Input->code_line:123,124->map->join
Input->code_line:134,135->map->join
Input->code_line:121->map->join

Unfortunately, I was not able to take thread dumps nor heap dumps (neither
kill -3, jstack nor jmap worked, some Amazon AMI problem I assume).

Hope that helps.

Please, let me know if I can assist you in any way. Otherwise, I probably
would not be actively looking at this problem.

Thanks,
Timur


On Tue, Apr 26, 2016 at 8:11 AM, Ufuk Celebi <u...@apache.org> wrote:

> Can you please further provide the execution plan via
>
> env.getExecutionPlan()
>
>
>
> On Tue, Apr 26, 2016 at 4:23 PM, Timur Fayruzov
> <timur.fairu...@gmail.com> wrote:
> > Hello Robert,
> >
> > I observed progress for 2 hours(meaning numbers change on dashboard), and
> > then I waited for 2 hours more. I'm sure it had to spill at some point,
> but
> > I figured 2h is enough time.
> >
> > Thanks,
> > Timur
> >
> > On Apr 26, 2016 1:35 AM, "Robert Metzger" <rmetz...@apache.org> wrote:
> >>
> >> Hi Timur,
> >>
> >> thank you for sharing the source code of your job. That is helpful!
> >> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is
> much
> >> more IO heavy with the larger input data because all the joins start
> >> spilling?
> >> Our monitoring, in particular for batch jobs is really not very
> advanced..
> >> If we had some monitoring showing the spill status, we would maybe see
> that
> >> the job is still running.
> >>
> >> How long did you wait until you declared the job hanging?
> >>
> >> Regards,
> >> Robert
> >>
> >>
> >> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi <u...@apache.org> wrote:
> >>>
> >>> No.
> >>>
> >>> If you run on YARN, the YARN logs are the relevant ones for the
> >>> JobManager and TaskManager. The client log submitting the job should
> >>> be found in /log.
> >>>
> >>> – Ufuk
> >>>
> >>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
> >>> <timur.fairu...@gmail.com> wrote:
> >>> > I will do it my tomorrow. Logs don't show anything unusual. Are there
> >>> > any
> >>> > logs besides what's in flink/log and yarn container logs?
> >>> >
> >>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi" <u...@apache.org> wrote:
> >>> >
> >>> > Hey Timur,
> >>> >
> >>> > is it possible to connect to the VMs and get stack traces of the
> Flink
> >>> > processes as well?
> >>> >
> >>> > We can first have a look at the logs, but the stack traces will be
> >>> > helpful if we can't figure out what the issue is.
> >>> >
> >>> > – Ufuk
> >>> >
> >>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann <trohrm...@apache.org
> >
> >>> > wrote:
> >>> >> Could you share the logs with us, Timur? That would be very helpful.
> >>> >>
> >>> >> Cheers,
> >>> >> Till
> >>> >>
> >>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" <timur.fairu...@gmail.com
> >
> >>> >> wrote:
> >>> >>>
> >>> >>> Hello,
> >>> >>>
> >>> >>> Now I'm at the stage where my job seem to completely hang. Source
> >>> >>> code is
> >>> >>> attached (it won't compile but I think gives a very good idea of
> what
> >>> >>> happens). Unfortunately I can't provide the datasets. Most of them
> >>> >>> are
> >>> >>> about
> >>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks
> 6GB
> >>> >>> memory
> >>> >>> for each.
> >>> >>>
> >>> >>> It was working for smaller input sizes. Any idea on what I can do
> >>> >>> differently is appreciated.
> >>> >>>
> >>> >>> Thans,
> >>> >>> Timur
> >>
> >>
> >
>


Re: Job hangs

2016-04-26 Thread Timur Fayruzov
Hello Robert,

I observed progress for 2 hours(meaning numbers change on dashboard), and
then I waited for 2 hours more. I'm sure it had to spill at some point, but
I figured 2h is enough time.

Thanks,
Timur
On Apr 26, 2016 1:35 AM, "Robert Metzger" <rmetz...@apache.org> wrote:

> Hi Timur,
>
> thank you for sharing the source code of your job. That is helpful!
> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is much
> more IO heavy with the larger input data because all the joins start
> spilling?
> Our monitoring, in particular for batch jobs is really not very advanced..
> If we had some monitoring showing the spill status, we would maybe see that
> the job is still running.
>
> How long did you wait until you declared the job hanging?
>
> Regards,
> Robert
>
>
> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi <u...@apache.org> wrote:
>
>> No.
>>
>> If you run on YARN, the YARN logs are the relevant ones for the
>> JobManager and TaskManager. The client log submitting the job should
>> be found in /log.
>>
>> – Ufuk
>>
>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
>> <timur.fairu...@gmail.com> wrote:
>> > I will do it my tomorrow. Logs don't show anything unusual. Are there
>> any
>> > logs besides what's in flink/log and yarn container logs?
>> >
>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi" <u...@apache.org> wrote:
>> >
>> > Hey Timur,
>> >
>> > is it possible to connect to the VMs and get stack traces of the Flink
>> > processes as well?
>> >
>> > We can first have a look at the logs, but the stack traces will be
>> > helpful if we can't figure out what the issue is.
>> >
>> > – Ufuk
>> >
>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>> >> Could you share the logs with us, Timur? That would be very helpful.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" <timur.fairu...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Hello,
>> >>>
>> >>> Now I'm at the stage where my job seem to completely hang. Source
>> code is
>> >>> attached (it won't compile but I think gives a very good idea of what
>> >>> happens). Unfortunately I can't provide the datasets. Most of them are
>> >>> about
>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
>> >>> memory
>> >>> for each.
>> >>>
>> >>> It was working for smaller input sizes. Any idea on what I can do
>> >>> differently is appreciated.
>> >>>
>> >>> Thans,
>> >>> Timur
>>
>
>


Re: convert Json to Tuple

2016-04-25 Thread Timur Fayruzov
Normally, Json4s or Jackson+scala plugin work well for json to scala data
structure conversions. However, I would not expect they support a special
case for tuples, since JSON key-value fields would normally convert to case
classes and JSON arrays are converted to, well, arrays. That's being said,
StackOverflow to the rescue:
http://stackoverflow.com/questions/31909308/is-it-possible-to-parse-json-array-into-a-tuple-with-json4s

I didn't try the approach myself, though.

On Mon, Apr 25, 2016 at 6:50 PM, Alexander Smirnov <
alexander.smirn...@gmail.com> wrote:

> Hello everybody!
>
> my RMQSource function receives string with JSONs in it.
> Because many operations in Flink rely on Tuple operations, I think it is a
> good idea to convert JSON to Tuple.
>
> I believe this task has been solved already :)
> what's the common approach for this conversion?
>
> Thank you,
> Alex
>


Job hangs

2016-04-25 Thread Timur Fayruzov
Hello,

Now I'm at the stage where my job seem to completely hang. Source code is
attached (it won't compile but I think gives a very good idea of what
happens). Unfortunately I can't provide the datasets. Most of them are
about 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
memory for each.

It was working for smaller input sizes. Any idea on what I can do
differently is appreciated.

Thans,
Timur


FaithResolution.scala
Description: Binary data


Re: YARN terminating TaskNode

2016-04-25 Thread Timur Fayruzov
Great answer, thanks you Max for a very detailed explanation! Illuminating
how off-heap parameter affects the memory allocation.

I read this post:
https://blogs.oracle.com/jrockit/entry/why_is_my_jvm_process_larger_t

and the thing that jumped on me is the allocation of memory for jni libs. I
do use a native library in my application, which is likely the culprit. I
need to account for its memory footprint when doing my memory calculations.

Thanks,
Timur


On Mon, Apr 25, 2016 at 10:28 AM, Maximilian Michels <m...@apache.org> wrote:

> Hi Timur,
>
> Shedding some light on the memory calculation:
>
> You have a total memory size of 2500 MB for each TaskManager. The
> default for 'taskmanager.memory.fraction' is 0.7. This is the fraction
> of the memory used by the memory manager. When you have turned on
> off-heap memory, this memory is allocated off-heap. As you pointed
> out, the default Yarn cutoff ratio is 0.25.
>
> Memory cutoff for Yarn: 2500 * 0.25 MB = 625 MB
>
> Java heap size with off-heap disabled: 2500 MB - 625 MB = 1875 MB
>
> Java heap size with off-heap enabled: (2500 MB - 625 MB) * 0.3 = 562,5
> MB (~570 MB in your case)
> Off-heap memory size: (2500 MB - 625 MB) * 0.7 = 1312,5 MB
>
> The heap memory limits in your log seem to be calculated correctly.
> Note that we don't set a strict limit for the off-heap memory because
> the Flink memory manager controls the amount of memory allocated. It
> will preallocate memory when you have 'taskmanager.memory.preallocate'
> set to true. Otherwise it will allocate dynamically. Still, you should
> have about 500 MB memory left with everything allocated. There is some
> more direct (off-heap) memory allocated for the network stack
> adjustable with 'taskmanager.network.numberOfBuffers' which is set to
> 2048 by default and corresponds to 2048 * 32 KB = 64 MB memory. I
> believe this can grow up to twice of that size. Still, should be
> enough memory left.
>
> Are you running a streaming or batch job? Off-heap memory and memory
> preallocation are mostly beneficial for batch jobs which use the
> memory manager a lot for sorting, hashing and caching.
>
> For streaming I'd suggest to use Flink's defaults:
>
> taskmanager.memory.off-heap: false
> taskmanager.memory.preallocate: false
>
> Raising the cutoff ratio should prevent killing of the TaskManagers.
> As Robert mentioned, in practice the JVM tends to allocate more than
> the maximum specified heap size. You can put the following in your
> flink-conf.yaml:
>
> # slightly raise the cut off ratio (might need to be even higher)
> yarn.heap-cutoff-ratio: 0.3
>
> Thanks,
> Max
>
> On Mon, Apr 25, 2016 at 5:52 PM, Timur Fayruzov
> <timur.fairu...@gmail.com> wrote:
> > Hello Maximilian,
> >
> > I'm using 1.0.0 compiled with Scala 2.11 and Hadoop 2.7. I'm running
> this on
> > EMR. I didn't see any exceptions in other logs. What are the logs you are
> > interested in?
> >
> > Thanks,
> > Timur
> >
> > On Mon, Apr 25, 2016 at 3:44 AM, Maximilian Michels <m...@apache.org>
> wrote:
> >>
> >> Hi Timur,
> >>
> >> Which version of Flink are you using? Could you share the entire logs?
> >>
> >> Thanks,
> >> Max
> >>
> >> On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger <rmetz...@apache.org>
> >> wrote:
> >> > Hi Timur,
> >> >
> >> > The reason why we only allocate 570mb for the heap is because you are
> >> > allocating most of the memory as off heap (direct byte buffers).
> >> >
> >> > In theory, the memory footprint of the JVM is limited to 570 (heap) +
> >> > 1900
> >> > (direct mem) = 2470 MB (which is below 2500). But in practice thje JVM
> >> > is
> >> > allocating more memory, causing these killings by YARN.
> >> >
> >> > I have to check the code of Flink again, because I would expect the
> >> > safety
> >> > boundary to be much larger than 30 mb.
> >> >
> >> > Regards,
> >> > Robert
> >> >
> >> >
> >> > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov
> >> > <timur.fairu...@gmail.com>
> >> > wrote:
> >> >>
> >> >> Hello,
> >> >>
> >> >> Next issue in a string of things I'm solving is that my application
> >> >> fails
> >> >> with the message 'Connection unexpectedly closed by remote task
> >> >> manager'.
> >> >>
> >> >> Yarn log shows the following:
> 

Re: Access to a shared resource within a mapper

2016-04-25 Thread Timur Fayruzov
Hi Fabian,

I didn't realize you meant that lazy val should be inside RichMapFunction
implementation, it makes sense. That's what I ended up doing already.

Thanks!
Timur

On Mon, Apr 25, 2016 at 3:34 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Timur,
>
> a TaskManager may run as many subtasks of a Map operator as it has slots.
> Each subtask of an operator runs in a different thread. Each parallel
> subtask of a Map operator has its own MapFunction object, so it should be
> possible to use a lazy val.
>
> However, you should not use static variables to hold state, because these
> are shared between all MapFunction in a TaskManager (JVM).
>
> 2016-04-22 21:21 GMT+02:00 Timur Fayruzov <timur.fairu...@gmail.com>:
>
>> Actually, a follow-up question: is map function single-threaded (within
>> one task manager, that is). If it's not then lazy initialization wont'
>> work, is it right?
>>
>> On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> You may also be able to initialize the client only in the parallel
>>> execution by making it a "lazy" variable in Scala.
>>>
>>> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <
>>> timur.fairu...@gmail.com> wrote:
>>>
>>>> Outstanding! Thanks, Aljoscha.
>>>>
>>>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> you could use a RichMapFunction that has an open method:
>>>>>
>>>>> data.map(new RichMapFunction[...]() {
>>>>>   def open(): () = {
>>>>> // initialize client
>>>>>   }
>>>>>
>>>>>   def map(input: INT): OUT = {
>>>>> // use client
>>>>>   }
>>>>> }
>>>>>
>>>>> the open() method is called before any elements are passed to the
>>>>> function. The counterpart of open() is close(), which is called after all
>>>>> elements are through or if the job cancels.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I'm writing a Scala Flink application. I have a standalone process
>>>>>> that exists on every Flink node that I need to call to transform my data.
>>>>>> To access this process I need to initialize non thread-safe client 
>>>>>> first. I
>>>>>> would like to avoid initializing a client for each element being
>>>>>> transformed. A straightforward implementation would be something like 
>>>>>> this:
>>>>>> ```
>>>>>>
>>>>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"
>>>>>> val pool  = new ArrayBlockingQueue[Client](5)
>>>>>> // pool is filled here
>>>>>> data.map(e => {
>>>>>>   val client = pool.take()
>>>>>>   val res = client.transform(e)
>>>>>>   pool.put(client)
>>>>>>   res
>>>>>> })
>>>>>>
>>>>>> ```
>>>>>> However, this causes a runtime exception with message "Task not
>>>>>> serializable", which makes sense.
>>>>>>
>>>>>> Function parameters and broadcast variables won't work either as far
>>>>>> as I understand. Is there a way to make this happen?
>>>>>>
>>>>>> Thanks,
>>>>>> Timur
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: YARN terminating TaskNode

2016-04-25 Thread Timur Fayruzov
Hello Maximilian,

I'm using 1.0.0 compiled with Scala 2.11 and Hadoop 2.7. I'm running this
on EMR. I didn't see any exceptions in other logs. What are the logs you
are interested in?

Thanks,
Timur

On Mon, Apr 25, 2016 at 3:44 AM, Maximilian Michels <m...@apache.org> wrote:

> Hi Timur,
>
> Which version of Flink are you using? Could you share the entire logs?
>
> Thanks,
> Max
>
> On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger <rmetz...@apache.org>
> wrote:
> > Hi Timur,
> >
> > The reason why we only allocate 570mb for the heap is because you are
> > allocating most of the memory as off heap (direct byte buffers).
> >
> > In theory, the memory footprint of the JVM is limited to 570 (heap) +
> 1900
> > (direct mem) = 2470 MB (which is below 2500). But in practice thje JVM is
> > allocating more memory, causing these killings by YARN.
> >
> > I have to check the code of Flink again, because I would expect the
> safety
> > boundary to be much larger than 30 mb.
> >
> > Regards,
> > Robert
> >
> >
> > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov <
> timur.fairu...@gmail.com>
> > wrote:
> >>
> >> Hello,
> >>
> >> Next issue in a string of things I'm solving is that my application
> fails
> >> with the message 'Connection unexpectedly closed by remote task
> manager'.
> >>
> >> Yarn log shows the following:
> >>
> >> Container [pid=4102,containerID=container_1461341357870_0004_01_15]
> is
> >> running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB
> >> physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing
> >> container.
> >> Dump of the process-tree for container_1461341357870_0004_01_15 :
> >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> >> |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c
> >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
> >> -XX:MaxDirectMemorySize=1900m
> >>
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log
> >> -Dlogback.configurationFile=file:logback.xml
> >> -Dlog4j.configuration=file:log4j.properties
> >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
> >>
> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.out
> >> 2>
> >>
> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.err
> >> |- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460
> >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
> >> -XX:MaxDirectMemorySize=1900m
> >>
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log
> >> -Dlogback.configurationFile=file:logback.xml
> >> -Dlog4j.configuration=file:log4j.properties
> >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir .
> >>
> >> One thing that drew my attention is `-Xmx570m`. I expected it to be
> >> TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the
> >> application as follows:
> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 -yjm
> >> 4096 -ytm 2500 eval-assembly-1.0.jar
> >>
> >> In flink logs I do see 'Task Manager memory: 2500'. When I look at the
> >> yarn container logs on the cluster node I see that it starts with 570mb,
> >> which puzzles me. When I look at the actually allocated memory for a
> Yarn
> >> container using 'top' I see 2.2GB used. Am I interpreting these
> parameters
> >> correctly?
> >>
> >> I also have set (it failed in the same way without this as well):
> >> taskmanager.memory.off-heap: true
> >>
> >> Also, I don't understand why this happens at all. I assumed that Flink
> >> won't overcommit allocated resources and will spill to the disk when
> running
> >> out of heap memory. Appreciate if someone can shed light on this too.
> >>
> >> Thanks,
> >> Timur
> >
> >
>


Re: "No more bytes left" at deserialization

2016-04-24 Thread Timur Fayruzov
y the TM timed out.
>
>
> Are you on 1.0.x or on 1.1-SNAPSHOT?
> We recently changed something related to the ExecutionConfig which has
> lead to Kryo issues in the past.
>
>
> On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov <timur.fairu...@gmail.com
> > wrote:
>
>> Trying to use ProtobufSerializer -- program consistently fails with the
>> following exception:
>>
>> java.lang.IllegalStateException: Update task on instance
>> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
>> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
>> at
>> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
>> at akka.dispatch.OnFailure.internal(Future.scala:228)
>> at akka.dispatch.OnFailure.internal(Future.scala:227)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> at
>> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
>> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
>> after [1 ms]
>> at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
>> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>> at
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>> at
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
>> at
>> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
>> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I'm at my wits' end now, any suggestions are highly appreciated.
>>
>> Thanks,
>> Timur
>>
>>
>> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <timur.fairu...@gmail.com
>> > wrote:
>>
>>> Hello,
>>>
>>> I'm running a Flink program that is failing with the following exception:
>>>
>>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>>> - Error while running the command.
>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>> execution failed: Job execution failed.
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>>> at
>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>> at
>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>> at
>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>> at
>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
>>> at
>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>> at
>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
>>> at scala.Option.foreach(Option.scala:257)
>>> at
>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
>>> at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.

Re: "No more bytes left" at deserialization

2016-04-24 Thread Timur Fayruzov
Trying to use ProtobufSerializer -- program consistently fails with the
following exception:

java.lang.IllegalStateException: Update task on instance
52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL:
akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to:
at
org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954)
at akka.dispatch.OnFailure.internal(Future.scala:228)
at akka.dispatch.OnFailure.internal(Future.scala:227)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]]
after [1 ms]
at
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)
at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)
at
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
at
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
at
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
at
akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)
at
akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)
at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)
at java.lang.Thread.run(Thread.java:745)

I'm at my wits' end now, any suggestions are highly appreciated.

Thanks,
Timur


On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <timur.fairu...@gmail.com>
wrote:

> Hello,
>
> I'm running a Flink program that is failing with the following exception:
>
> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
>   - Error while running the command.
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> at
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
> at
> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
> at
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
> at
> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
> at scala.Option.foreach(Option.scala:257)
> at
> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
> at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyO

"No more bytes left" at deserialization

2016-04-24 Thread Timur Fayruzov
Hello,

I'm running a Flink program that is failing with the following exception:

2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend
- Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
at
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
at
com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136)
at
com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at
com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48)
at scala.Option.foreach(Option.scala:257)
at com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48)
at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: The data preparation for task 'CHAIN
CoGroup (CoGroup at
com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) ->
Filter (Filter at
com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' ,
caused an error: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: No more bytes left.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
Thread 'SortMerger Reading Thread' terminated due to an exception: No more
bytes left.
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at
org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
terminated due to an exception: No more bytes left.
at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.io.EOFException: No more bytes left.
at
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77)
at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542)
at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535)
at 

YARN terminating TaskNode

2016-04-22 Thread Timur Fayruzov
Hello,

Next issue in a string of things I'm solving is that my application fails
with the message 'Connection unexpectedly closed by remote task manager'.

Yarn log shows the following:

Container [pid=4102,containerID=container_1461341357870_0004_01_15] is
running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB
physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing
container.
Dump of the process-tree for container_1461341357870_0004_01_15 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c
/usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
-XX:MaxDirectMemorySize=1900m
-Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log
-Dlogback.configurationFile=file:logback.xml
-Dlog4j.configuration=file:log4j.properties
org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.out
2>
/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.err
|- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460
/usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m
-XX:MaxDirectMemorySize=1900m
-Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log
-Dlogback.configurationFile=file:logback.xml
-Dlog4j.configuration=file:log4j.properties
org.apache.flink.yarn.YarnTaskManagerRunner --configDir .

One thing that drew my attention is `-Xmx570m`. I expected it to be
TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the
application as follows:
HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 -yjm 4096
-ytm 2500 eval-assembly-1.0.jar

In flink logs I do see 'Task Manager memory: 2500'. When I look at the yarn
container logs on the cluster node I see that it starts with 570mb, which
puzzles me. When I look at the actually allocated memory for a Yarn
container using 'top' I see 2.2GB used. Am I interpreting these parameters
correctly?

I also have set (it failed in the same way without this as well):
taskmanager.memory.off-heap: true

Also, I don't understand why this happens at all. I assumed that Flink
won't overcommit allocated resources and will spill to the disk when
running out of heap memory. Appreciate if someone can shed light on this
too.

Thanks,
Timur


Re: Access to a shared resource within a mapper

2016-04-22 Thread Timur Fayruzov
Actually, a follow-up question: is map function single-threaded (within one
task manager, that is). If it's not then lazy initialization wont' work, is
it right?

On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote:

> You may also be able to initialize the client only in the parallel
> execution by making it a "lazy" variable in Scala.
>
> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <timur.fairu...@gmail.com
> > wrote:
>
>> Outstanding! Thanks, Aljoscha.
>>
>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> you could use a RichMapFunction that has an open method:
>>>
>>> data.map(new RichMapFunction[...]() {
>>>   def open(): () = {
>>> // initialize client
>>>   }
>>>
>>>   def map(input: INT): OUT = {
>>> // use client
>>>   }
>>> }
>>>
>>> the open() method is called before any elements are passed to the
>>> function. The counterpart of open() is close(), which is called after all
>>> elements are through or if the job cancels.
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm writing a Scala Flink application. I have a standalone process that
>>>> exists on every Flink node that I need to call to transform my data. To
>>>> access this process I need to initialize non thread-safe client first. I
>>>> would like to avoid initializing a client for each element being
>>>> transformed. A straightforward implementation would be something like this:
>>>> ```
>>>>
>>>> val env = ExecutionEnvironment.getExecutionEnvironment
>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"
>>>> val pool  = new ArrayBlockingQueue[Client](5)
>>>> // pool is filled here
>>>> data.map(e => {
>>>>   val client = pool.take()
>>>>   val res = client.transform(e)
>>>>   pool.put(client)
>>>>   res
>>>> })
>>>>
>>>> ```
>>>> However, this causes a runtime exception with message "Task not
>>>> serializable", which makes sense.
>>>>
>>>> Function parameters and broadcast variables won't work either as far as
>>>> I understand. Is there a way to make this happen?
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>
>>
>


Re: Access to a shared resource within a mapper

2016-04-22 Thread Timur Fayruzov
Outstanding! Thanks, Aljoscha.

On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
> you could use a RichMapFunction that has an open method:
>
> data.map(new RichMapFunction[...]() {
>   def open(): () = {
> // initialize client
>   }
>
>   def map(input: INT): OUT = {
> // use client
>   }
> }
>
> the open() method is called before any elements are passed to the
> function. The counterpart of open() is close(), which is called after all
> elements are through or if the job cancels.
>
> Cheers,
> Aljoscha
>
> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I'm writing a Scala Flink application. I have a standalone process that
>> exists on every Flink node that I need to call to transform my data. To
>> access this process I need to initialize non thread-safe client first. I
>> would like to avoid initializing a client for each element being
>> transformed. A straightforward implementation would be something like this:
>> ```
>>
>> val env = ExecutionEnvironment.getExecutionEnvironment
>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c"
>> val pool  = new ArrayBlockingQueue[Client](5)
>> // pool is filled here
>> data.map(e => {
>>   val client = pool.take()
>>   val res = client.transform(e)
>>   pool.put(client)
>>   res
>> })
>>
>> ```
>> However, this causes a runtime exception with message "Task not
>> serializable", which makes sense.
>>
>> Function parameters and broadcast variables won't work either as far as I
>> understand. Is there a way to make this happen?
>>
>> Thanks,
>> Timur
>>
>


Re: Integrate Flink with S3 on EMR cluster

2016-04-07 Thread Timur Fayruzov
The exception does not show up in the console when I run the job, it only
shows in the logs. I thought it means that it happens either on AM or TM (I
assume what I see in stdout is client log). Is my thinking right?


On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Timur,
>
> Just had a chat with Robert about this. I agree that the error message
> is confusing, but it is fine it this case. The file system classes are
> not on the class path of the client process, which is submitting the
> job.

Do you mean that classes should be in the classpath of
`org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
tried to add EMRFS jars to this classpath but it did not help. BTW, it
seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
point that I control here to add to classpath, so I had to set it manually.


> It fails to sample the input file sizes, but this is just an
> optimization step and hence it does not fail the job and only logs the
> error.
>
Is this optimization only for client side? In other words, does it affect
Flink's ability to choose proper type of a join?


>
> After the job is submitted everything should run as expected.
>
> You should be able to get rid of that exception by adding the missing
> classes to the class path of the client process (started via
> bin/flink), for example via the lib folder.
>
The above approach did not work, could you elaborate what you meant by 'lib
folder'?

Thanks,
Timur


> – Ufuk
>
>
>
>
> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> > There's one more filesystem integration failure that I have found. My
> job on
> > a toy dataset succeeds, but Flink log contains the following message:
> > 2016-04-07 18:10:01,339 ERROR
> > org.apache.flink.api.common.io.DelimitedInputFormat   -
> Unexpected
> > problen while getting the file statistics for file 's3://...':
> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> > java.lang.RuntimeException: java.lang.RuntimeException:
> > java.lang.ClassNotFoundException: Class
> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
> > at
> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
> > at
> >
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
> > at
> >
> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
> > at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
> > at
> >
> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
> > at
> >
> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
> > at
> >
> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
> > at
> >
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
> > at
> >
> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
> > at
> >
> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
> > at
> >
> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
> > at
> > org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
> > at
> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
> > at
> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
> > at
> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
> > at
> > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
> > at
> > org.apache.flink.client.program.Client.runBlocking(Client.java:314)
> > at
> >
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> > at
> >
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> > at
> >
> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
> > at
> >
> co

Re: Integrate Flink with S3 on EMR cluster

2016-04-07 Thread Timur Fayruzov
t;u...@apache.org> wrote:

> Yes, for sure.
>
> I added some documentation for AWS here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
>
> Would be nice to update that page with your pull request. :-)
>
> – Ufuk
>
>
> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <chiwanp...@apache.org> wrote:
> > Hi Timur,
> >
> > Great! Bootstrap action for Flink is good for AWS users. I think the
> bootstrap action scripts would be placed in `flink-contrib` directory.
> >
> > If you want, one of people in PMC of Flink will be assign FLINK-1337 to
> you.
> >
> > Regards,
> > Chiwan Park
> >
> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> >>
> >> I had a guide like that.
> >>
> >
>


Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Timur Fayruzov
Thank you, Till! setting the flag in flink-conf.yalm worked, I'm very glad
that it was resolved. Note, however, passing it as an argument to flink
script did not work. I tried to pass it as:
`-yDenv.java.opts="-Djava.library.path="`. I did not investigate any
further at this time.

Thanks,
Timur

On Thu, Apr 7, 2016 at 2:48 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> For passing the dynamic property directly when running things on YARN, you
> have to use -yDenv.java.opts="..."
> ​
>
> On Thu, Apr 7, 2016 at 11:42 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Timur,
>>
>> what you can try doing is to pass the JVM parameter
>> -Djava.library.path= via the env.java.opts to the system. You
>> simply have to add env.java.opts: "-Djava.library.path=" in the
>> flink-conf.yaml or via -Denv.java.opts="-Djava.library.path=", if
>> I’m not mistaken.
>>
>> Cheers
>> Till
>> ​
>>
>> On Thu, Apr 7, 2016 at 10:07 AM, Timur Fayruzov <timur.fairu...@gmail.com
>> > wrote:
>>
>>> there is a hack for this issue: copying my native library to
>>> $HADOOP_HOME/lib/native makes it discoverable and a program runs, however
>>> this is not an appropriate solution and it seems to be fragile.
>>>
>>> I tried to find where 'lib/native' path appears in the configuration and
>>> found 2 places:
>>> hadoop-env.sh: export
>>> JAVA_LIBRARY_PATH="$JAVA_LIBRARY_PATH:/usr/lib/hadoop-lzo/lib/native
>>> mapred-site.xml: key: mapreduce.admin.user.env
>>>
>>> I tried to add path to dir with my native lib in both places, but still
>>> no luck.
>>>
>>> Thanks,
>>> Timur
>>>
>>> On Wed, Apr 6, 2016 at 11:21 PM, Timur Fayruzov <
>>> timur.fairu...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm not sure whether it's a Hadoop or Flink-specific question, but
>>>> since I ran into this in the context of Flink I'm asking here. I would be
>>>> glad if anyone can suggest a more appropriate place.
>>>>
>>>> I have a native library that I need to use in my Flink batch job that I
>>>> run on EMR, and I try to point JVM to the location of native library.
>>>> Normally, I'd do this using java.library.path parameter. So I try to run as
>>>> follows:
>>>> `
>>>> HADOOP_CONF_DIR=/etc/hadoop/conf
>>>> JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m
>>>> yarn-cluster -yn 1 -yjm 768 -ytm 768 
>>>> `
>>>> It does not work, fails with `java.lang.UnsatisfiedLinkError` when
>>>> trying to load the native lib. It probably has to do with YARN not not
>>>> passing this parameter to task nodes, but my understanding of this
>>>> mechanism is quite limited so far.
>>>>
>>>> I dug up this Jira ticket:
>>>> https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting
>>>> LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem
>>>> either.
>>>>
>>>> Any help or hint where to look is highly appreciated.
>>>>
>>>> Thanks,
>>>> Timur
>>>>
>>>
>>>
>>
>


Re: Using native libraries in Flink EMR jobs

2016-04-07 Thread Timur Fayruzov
there is a hack for this issue: copying my native library to
$HADOOP_HOME/lib/native makes it discoverable and a program runs, however
this is not an appropriate solution and it seems to be fragile.

I tried to find where 'lib/native' path appears in the configuration and
found 2 places:
hadoop-env.sh: export
JAVA_LIBRARY_PATH="$JAVA_LIBRARY_PATH:/usr/lib/hadoop-lzo/lib/native
mapred-site.xml: key: mapreduce.admin.user.env

I tried to add path to dir with my native lib in both places, but still no
luck.

Thanks,
Timur

On Wed, Apr 6, 2016 at 11:21 PM, Timur Fayruzov <timur.fairu...@gmail.com>
wrote:

> Hello,
>
> I'm not sure whether it's a Hadoop or Flink-specific question, but since I
> ran into this in the context of Flink I'm asking here. I would be glad if
> anyone can suggest a more appropriate place.
>
> I have a native library that I need to use in my Flink batch job that I
> run on EMR, and I try to point JVM to the location of native library.
> Normally, I'd do this using java.library.path parameter. So I try to run as
> follows:
> `
> HADOOP_CONF_DIR=/etc/hadoop/conf
> JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m
> yarn-cluster -yn 1 -yjm 768 -ytm 768 
> `
> It does not work, fails with `java.lang.UnsatisfiedLinkError` when trying
> to load the native lib. It probably has to do with YARN not not passing
> this parameter to task nodes, but my understanding of this mechanism is
> quite limited so far.
>
> I dug up this Jira ticket:
> https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting
> LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem
> either.
>
> Any help or hint where to look is highly appreciated.
>
> Thanks,
> Timur
>


Using native libraries in Flink EMR jobs

2016-04-07 Thread Timur Fayruzov
Hello,

I'm not sure whether it's a Hadoop or Flink-specific question, but since I
ran into this in the context of Flink I'm asking here. I would be glad if
anyone can suggest a more appropriate place.

I have a native library that I need to use in my Flink batch job that I run
on EMR, and I try to point JVM to the location of native library. Normally,
I'd do this using java.library.path parameter. So I try to run as follows:
`
HADOOP_CONF_DIR=/etc/hadoop/conf
JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m
yarn-cluster -yn 1 -yjm 768 -ytm 768 
`
It does not work, fails with `java.lang.UnsatisfiedLinkError` when trying
to load the native lib. It probably has to do with YARN not not passing
this parameter to task nodes, but my understanding of this mechanism is
quite limited so far.

I dug up this Jira ticket:
https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting
LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem
either.

Any help or hint where to look is highly appreciated.

Thanks,
Timur


Re: Integrate Flink with S3 on EMR cluster

2016-04-05 Thread Timur Fayruzov
Yes, Hadoop version was the culprit. It turns out that EMRFS requires at
least 2.4.0 (judging from the exception in the initial post, I was not able
to find the official requirements).

Rebuilding Flink with Hadoop 2.7.1 and with Scala 2.11 worked like a charm
and I was able to run WordCount using S3 both for inputs and outputs. I did
*not* need to change any configuration (as outlined
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/connectors.html
).

Thanks for bearing with me, Ufuk!

While looking for the solution I found this issue:
https://issues.apache.org/jira/browse/FLINK-1337. I have a setup for EMR
cluster now, so I can make  PR describing it. if it's still relevant. I,
for one example, would have saved couple days if I had a guide like that.

Thanks,
Timur


On Tue, Apr 5, 2016 at 10:43 AM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Timur,
>
> if you are using EMR with IAM roles, Flink should work out of the box.
> You don't need to change the Hadoop config and the IAM role takes care
> of setting up all credentials at runtime. You don't need to hardcode
> any keys in your application that way and this is the recommended way
> to go in order to not worry about securely exchanging the keys and
> then keeping them secure afterwards.
>
> With EMR 4.4.0 you have to use a Flink binary version built against
> Hadoop 2.7. Did you do that? Can you please retry with an
> out-of-the-box Flink and just run it like this:
>
> HADOOP_CONF_DIR =/etc/hadoop/conf bin/flink etc.
>
> Hope this helps! Please report back. :-)
>
> – Ufuk
>
>
> On Tue, Apr 5, 2016 at 5:47 PM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> > Hello Ufuk,
> >
> > I'm using EMR 4.4.0.
> >
> > Thanks,
> > Timur
> >
> > On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi <u...@apache.org> wrote:
> >>
> >> Hey Timur,
> >>
> >> which EMR version are you using?
> >>
> >> – Ufuk
> >>
> >> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <
> timur.fairu...@gmail.com>
> >> wrote:
> >> > Thanks for the answer, Ken.
> >> >
> >> > My understanding is that file system selection is driven by the
> >> > following
> >> > sections in core-site.xml:
> >> > 
> >> >   fs.s3.impl
> >> >
> >> >   org.apache.hadoop.fs.s3native.NativeS3FileSystem
> >> > 
> >> >
> >> > 
> >> >   fs.s3n.impl
> >> >   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
> >> > 
> >> >
> >> > If I run the program using configuration above with s3n (and also
> >> > modifying
> >> > credential keys to use s3n) it fails with the same error, but there is
> >> > no
> >> > "... opening key ..." logs. S3a seems to be not supported, it fails
> with
> >> > the
> >> > following:
> >> > Caused by: java.io.IOException: No file system found with scheme s3a,
> >> > referenced in file URI 's3a://'.
> >> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
> >> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> >> > at
> >> >
> >> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> >> > at
> >> >
> >> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> >> > at
> >> >
> >> >
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156)
> >> > ... 23 more
> >> >
> >> > I am puzzled by the fact that EMRFS is still apparently referenced
> >> > somewhere
> >> > as an implementation for S3 protocol, I'm not able to locate where
> this
> >> > configuration is set.
> >> >
> >> >
> >> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler
> >> > <kkrugler_li...@transpac.com>
> >> > wrote:
> >> >>
> >> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
> >> >> s3.
> >> >>
> >> >> Though EMR has some support for magically treating the s3 protocol as
> >> >> s3n
> >> >> (or maybe s3a now, with Hadoop 2.6 or later)
> >> >>
> >> >> What happens if you use s3n:/// for the
> --input
> >> >> parameter?
> >> >>
> >> >> — Ken
> >> >>
> >> >>

Re: Integrate Flink with S3 on EMR cluster

2016-04-05 Thread Timur Fayruzov
Hello Ufuk,

I'm using EMR 4.4.0.

Thanks,
Timur

On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Timur,
>
> which EMR version are you using?
>
> – Ufuk
>
> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> > Thanks for the answer, Ken.
> >
> > My understanding is that file system selection is driven by the following
> > sections in core-site.xml:
> > 
> >   fs.s3.impl
> >
> >   org.apache.hadoop.fs.s3native.NativeS3FileSystem
> > 
> >
> > 
> >   fs.s3n.impl
> >   com.amazon.ws.emr.hadoop.fs.EmrFileSystem
> > 
> >
> > If I run the program using configuration above with s3n (and also
> modifying
> > credential keys to use s3n) it fails with the same error, but there is no
> > "... opening key ..." logs. S3a seems to be not supported, it fails with
> the
> > following:
> > Caused by: java.io.IOException: No file system found with scheme s3a,
> > referenced in file URI 's3a://'.
> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
> > at
> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
> > at
> >
> org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
> > at
> >
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156)
> > ... 23 more
> >
> > I am puzzled by the fact that EMRFS is still apparently referenced
> somewhere
> > as an implementation for S3 protocol, I'm not able to locate where this
> > configuration is set.
> >
> >
> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler <kkrugler_li...@transpac.com
> >
> > wrote:
> >>
> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not
> s3.
> >>
> >> Though EMR has some support for magically treating the s3 protocol as
> s3n
> >> (or maybe s3a now, with Hadoop 2.6 or later)
> >>
> >> What happens if you use s3n:/// for the --input
> >> parameter?
> >>
> >> — Ken
> >>
> >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <timur.fairu...@gmail.com>
> >> wrote:
> >>
> >> Hello,
> >>
> >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I
> succeeded
> >> with a three-step procedure: load data from S3 to cluster's HDFS, run
> Flink
> >> Job, unload outputs from HDFS to S3.
> >>
> >> However, ideally I'd like to read/write data directly from/to S3. I
> >> followed the instructions here:
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
> ,
> >> more specifically I:
> >>   1. Modified flink-conf to point to /etc/hadoop/conf
> >>   2. Modified core-site.xml per link above (not clear why why it is not
> >> using IAM, I had to provide AWS keys explicitly).
> >>
> >> Run the following command:
> >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m
> yarn-cluster
> >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
> >> s3:// --output hdfs:///flink-output
> >>
> >> First, I see messages like that:
> >> 2016-04-04 21:37:10,418 INFO
> >> org.apache.hadoop.fs.s3native.NativeS3FileSystem  - Opening
> key
> >> '' for reading at position '333000'
> >>
> >> Then, it fails with the following error:
> >>
> >> 
> >>
> >>  The program finished with the following exception:
> >>
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The program
> >> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
> >> (WordCount Example)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> >>
> >> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> >>
> >> at
> >>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> >>
> >> at
> >>
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
> >>
> >> at sun.reflect.Native

Re: Integrate Flink with S3 on EMR cluster

2016-04-04 Thread Timur Fayruzov
Thanks for the answer, Ken.

My understanding is that file system selection is driven by the following
sections in core-site.xml:

  fs.s3.impl
   
  org.apache.hadoop.fs.s3native.NativeS3FileSystem



  fs.s3n.impl
  com.amazon.ws.emr.hadoop.fs.EmrFileSystem


If I run the program using configuration above with s3n (and also modifying
credential keys to use s3n) it fails with the same error, but there is no
"... opening key ..." logs. S3a seems to be not supported, it fails with
the following:
Caused by: java.io.IOException: No file system found with scheme s3a,
referenced in file URI 's3a://'.
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450)
at
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156)
... 23 more

I am puzzled by the fact that EMRFS is still apparently referenced
somewhere as an implementation for S3 protocol, I'm not able to locate
where this configuration is set.


On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not s3.
>
> Though EMR has some support for magically treating the s3 protocol as s3n
> (or maybe s3a now, with Hadoop 2.6 or later)
>
> What happens if you use s3n:/// for the --input
> parameter?
>
> — Ken
>
> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
>
> Hello,
>
> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded
> with a three-step procedure: load data from S3 to cluster's HDFS, run Flink
> Job, unload outputs from HDFS to S3.
>
> However, ideally I'd like to read/write data directly from/to S3. I
> followed the instructions here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
> more specifically I:
>   1. Modified flink-conf to point to /etc/hadoop/conf
>   2. Modified core-site.xml per link above (not clear why why it is not
> using IAM, I had to provide AWS keys explicitly).
>
> Run the following command:
> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster
> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
> s3:// --output hdfs:///flink-output
>
> First, I see messages like that:
> 2016-04-04 21:37:10,418 INFO
> org.apache.hadoop.fs.s3native.NativeS3FileSystem  - Opening key
> '' for reading at position '333000'
>
> Then, it fails with the following error:
>
> 
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
> (WordCount Example)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
>
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>
> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>
> at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>
> at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed
> to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)
>
> at org.apache.flink.runtime.jobmanager.JobManager.org
> <http://org.apache.flink.runtime.jobmanager.jobmanager.org/>
> $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)
>
> at
> org.apache.flin

Integrate Flink with S3 on EMR cluster

2016-04-04 Thread Timur Fayruzov
Hello,

I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded
with a three-step procedure: load data from S3 to cluster's HDFS, run Flink
Job, unload outputs from HDFS to S3.

However, ideally I'd like to read/write data directly from/to S3. I
followed the instructions here:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html,
more specifically I:
  1. Modified flink-conf to point to /etc/hadoop/conf
  2. Modified core-site.xml per link above (not clear why why it is not
using IAM, I had to provide AWS keys explicitly).

Run the following command:
HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster
-yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input
s3:// --output hdfs:///flink-output

First, I see messages like that:
2016-04-04 21:37:10,418 INFO
org.apache.hadoop.fs.s3native.NativeS3FileSystem  - Opening key
'' for reading at position '333000'

Then, it fails with the following error:



 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90
(WordCount Example)

at org.apache.flink.client.program.Client.runBlocking(Client.java:381)

at org.apache.flink.client.program.Client.runBlocking(Client.java:355)

at org.apache.flink.client.program.Client.runBlocking(Client.java:315)

at
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)

at
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)

at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)

at org.apache.flink.client.program.Client.runBlocking(Client.java:248)

at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)

at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)

at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)

at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to
submit job fc13373d993539e647f164e12d82bf90 (WordCount Example)

at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100)

at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)

at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)

at akka.actor.Actor$class.aroundReceive(Actor.scala:465)

at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

at akka.actor.ActorCell.invoke(ActorCell.scala:487)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

at akka.dispatch.Mailbox.run(Mailbox.scala:221)

at akka.dispatch.Mailbox.exec(Mailbox.scala:231)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.JobException: Creating the input splits
caused an error:
org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V

at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:172)

at
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696)

at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023)

... 21 more

Caused by: java.lang.NoSuchMethodError:

Re: Implicit inference of TypeInformation for join keys

2016-03-31 Thread Timur Fayruzov
I'm very content with an extra `apply`, it's much cleaner than any of my
initial solutions.

On Thu, Mar 31, 2016 at 2:18 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> I'm afraid there is no way around having that extra ".apply" because the
> Scala compiler will get confused with the additional implicit parameter.
> It's a bit ugly, though ...
>
> On Wed, 30 Mar 2016 at 18:34 Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
>
>> Actually, there is an even easier solution (which I saw in your reply to
>> my other question):
>> ```
>> a.coGroup(b)
>>   .where(e => (e.f1, e.f2))
>>   .equalTo(e => (e.f1, e.f2)).apply {
>>
>> (left, right) => 1
>>   }.print()
>> ```
>> pretty much does what I want. Explicit `apply` gives a hint that a
>> compiler was missing before. Nevertheless, `createTypeInformation` works
>> too, thanks for sharing!
>>
>> Thanks,
>> Timur
>>
>> On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <chiwanp...@apache.org>
>> wrote:
>>
>>> Hi Timur,
>>>
>>> You have to use `createTypeInfomation` method in `org.apache.flink.api`
>>> package to create TypeInformation object for Scala-specific objects such as
>>> case classes, tuples, eithers, options. For example:
>>>
>>> ```
>>> import org.apache.flink.api.scala._ // to import package object
>>>
>>> val a: DataSet[Thing] = …
>>> val b: DataSet[Thing] = …
>>>
>>> a.coGroup(b)
>>>   .where(e => (e.f1, e.f2))
>>>   .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
>>> (left, right) => 1
>>>   }.print()
>>> ```
>>>
>>> Note that Flink creates internally copied 2-tuples consisted of
>>> (extracted key by KeySelector, original value). So there is some
>>> performance decrease when you are using KeySelector.
>>>
>>> Regards,
>>> Chiwan Park
>>>
>>> > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com>
>>> wrote:
>>> >
>>> > Thank you Chiwan! Yes, I understand that there are workarounds that
>>> don't use function argument (and thus do not require implicit arguments). I
>>> try to avoid positional and string-based keys because there is no compiler
>>> guarantees when you refactor or accidentally change the underlying case
>>> classes. Providing a function is the cleanest solution (and arguably is the
>>> most readable) so it'd be great to make it work.
>>> >
>>> > BTW, TypeInformation.of has an implementation that takes TypeHint (
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java)
>>> which, according to documentation, is supposed to be used for generic
>>> classes, but using it still leads to the same exception.
>>> >
>>> > Thanks,
>>> > Timur
>>> >
>>> >
>>> > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org>
>>> wrote:
>>> > Hi Timur,
>>> >
>>> > You can use a composite key [1] to compare keys consisting of multiple
>>> fields. For example:
>>> >
>>> > ```
>>> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
>>> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
>>> > a.coGroup(b)
>>> >   .where(“f1”, “f2”) // Flink compares the values of f1 first, and
>>> compares the values of f2 if values of f1 are same.
>>> >   .equalTo(“f1”, “f2”) { // Note that you must specify same number of
>>> keys
>>> > (left, right) => 1
>>> >   }
>>> > ```
>>> >
>>> > Composite key can be applied to Scala tuple also:
>>> >
>>> > ```
>>> > val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
>>> > val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
>>> > a.coGroup(b)
>>> >   .where(0, 1) // Note that field numbers start from 0.
>>> >   .equalTo(0, 1) {
>>> > (left, right) => 1
>>> >   }
>>> > ```
>>> >
>>> > I hope this helps.
>>> >
>>> > [1]:
>>> https://ci.apache.org/project

Re: Implicit inference of TypeInformation for join keys

2016-03-30 Thread Timur Fayruzov
Actually, there is an even easier solution (which I saw in your reply to my
other question):
```
a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2)).apply {
(left, right) => 1
  }.print()
```
pretty much does what I want. Explicit `apply` gives a hint that a compiler
was missing before. Nevertheless, `createTypeInformation` works too, thanks
for sharing!

Thanks,
Timur

On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Timur,
>
> You have to use `createTypeInfomation` method in `org.apache.flink.api`
> package to create TypeInformation object for Scala-specific objects such as
> case classes, tuples, eithers, options. For example:
>
> ```
> import org.apache.flink.api.scala._ // to import package object
>
> val a: DataSet[Thing] = …
> val b: DataSet[Thing] = …
>
> a.coGroup(b)
>   .where(e => (e.f1, e.f2))
>   .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) {
> (left, right) => 1
>   }.print()
> ```
>
> Note that Flink creates internally copied 2-tuples consisted of (extracted
> key by KeySelector, original value). So there is some performance decrease
> when you are using KeySelector.
>
> Regards,
> Chiwan Park
>
> > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> >
> > Thank you Chiwan! Yes, I understand that there are workarounds that
> don't use function argument (and thus do not require implicit arguments). I
> try to avoid positional and string-based keys because there is no compiler
> guarantees when you refactor or accidentally change the underlying case
> classes. Providing a function is the cleanest solution (and arguably is the
> most readable) so it'd be great to make it work.
> >
> > BTW, TypeInformation.of has an implementation that takes TypeHint (
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java)
> which, according to documentation, is supposed to be used for generic
> classes, but using it still leads to the same exception.
> >
> > Thanks,
> > Timur
> >
> >
> > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org>
> wrote:
> > Hi Timur,
> >
> > You can use a composite key [1] to compare keys consisting of multiple
> fields. For example:
> >
> > ```
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(“f1”, “f2”) // Flink compares the values of f1 first, and
> compares the values of f2 if values of f1 are same.
> >   .equalTo(“f1”, “f2”) { // Note that you must specify same number of
> keys
> > (left, right) => 1
> >   }
> > ```
> >
> > Composite key can be applied to Scala tuple also:
> >
> > ```
> > val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> > val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> > a.coGroup(b)
> >   .where(0, 1) // Note that field numbers start from 0.
> >   .equalTo(0, 1) {
> > (left, right) => 1
> >   }
> > ```
> >
> > I hope this helps.
> >
> > [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
> >
> > Regards,
> > Chiwan Park
> >
> > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> > >
> > > Hello,
> > >
> > > Another issue I have encountered is incorrect implicit resolution (I'm
> using Scala 2.11.7). Here's the example (with a workaround):
> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > > a.coGroup(b)
> > >   .where(e => e.f1)
> > >   //.equalTo(e => e) { //this fails to compile because equalTo expects
> an implicit
> > >   .equalTo("f1") {
> > > (left, right) => 1
> > >   }
> > > However, the workaround does not quite work when key is a tuple (I
> suspect this applies to other generic classes as well):
> > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing(&q

Re: Why Scala Option is not a valid key?

2016-03-30 Thread Timur Fayruzov
Thank you for your answers, Chiwan! That would mean that a generic type
can't be used as a key in general? This is a non-obvious limitation of
Flink DSL that I didn't see in documentation.

Could you please elaborate what you mean by KeyExtractor? I see that inside
`where` operator an instance of KeySelector is initialized, but I don't see
how can I pass a custom KeySelector in.

Thanks,
Timur

On Wed, Mar 30, 2016 at 12:53 AM, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Timur,
>
> Because Option[T] is not comparable type generally (if T is a POJO type),
> you cannot use Option[T] as a key type. I think you have to implement
> KeyExtractor to compare objects including Option[T]s.
>
> ```
> case class MyKey(k1: Option[String], k2: Option[String])
>
> val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"),
> Some("c")))
> val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"),
> Some("c")))
>
> data1.join(data2)
>   .where(_.hashCode())
>   .equalTo(_.hashCode()).apply {
> (left: MyKey, right: MyKey) => (left, right)
>   }.print()
> ```
>
> Note that the approach in example (using hashCode()) cannot be applied to
> sort task.
>
> Regards,
> Chiwan Park
>
> > On Mar 30, 2016, at 2:37 AM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> >
> > There is some more detail to this question that I missed initially. It
> turns out that my key is a case class of a form MyKey(k1: Option[String],
> k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive
> check whether every element of the MyKey class can be a key and fails when
> encountering an Option.
> >
> > Is it possible to work around this situation without giving up Options?
> Inability to use Options in Domain objects could be really frustrating.
> >
> > Thanks,
> > Timur
> >
> > On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov <
> timur.fairu...@gmail.com> wrote:
> > Hello,
> >
> > I'm evaluating Flink and one thing I noticed is Option[A] can't be used
> as a key for coGroup (looking specifically here:
> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39).
> I'm not clear about the reason of this and appreciate if someone can
> explain.
> >
> > Thanks,
> > Timur
> >
>
>


Re: Implicit inference of TypeInformation for join keys

2016-03-30 Thread Timur Fayruzov
Thank you Chiwan! Yes, I understand that there are workarounds that don't
use function argument (and thus do not require implicit arguments). I try
to avoid positional and string-based keys because there is no compiler
guarantees when you refactor or accidentally change the underlying case
classes. Providing a function is the cleanest solution (and arguably is the
most readable) so it'd be great to make it work.

BTW, TypeInformation.of has an implementation that takes TypeHint (
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java)
which, according to documentation, is supposed to be used for generic
classes, but using it still leads to the same exception.

Thanks,
Timur


On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> wrote:

> Hi Timur,
>
> You can use a composite key [1] to compare keys consisting of multiple
> fields. For example:
>
> ```
> val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> a.coGroup(b)
>   .where(“f1”, “f2”) // Flink compares the values of f1 first, and
> compares the values of f2 if values of f1 are same.
>   .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys
> (left, right) => 1
>   }
> ```
>
> Composite key can be applied to Scala tuple also:
>
> ```
> val a = env.fromCollection(Seq(("a", "b"), ("c", "d")))
> val b = env.fromCollection(Seq(("a", "x"), ("z", "m")))
> a.coGroup(b)
>   .where(0, 1) // Note that field numbers start from 0.
>   .equalTo(0, 1) {
>     (left, right) => 1
>   }
> ```
>
> I hope this helps.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples
>
> Regards,
> Chiwan Park
>
> > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com>
> wrote:
> >
> > Hello,
> >
> > Another issue I have encountered is incorrect implicit resolution (I'm
> using Scala 2.11.7). Here's the example (with a workaround):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => e.f1)
> >   //.equalTo(e => e) { //this fails to compile because equalTo expects
> an implicit
> >   .equalTo("f1") {
> > (left, right) => 1
> >   }
> > However, the workaround does not quite work when key is a tuple (I
> suspect this applies to other generic classes as well):
> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String,
> String)])) { (left, right) => 1} // throws InvalidProgramException
> > Here, I try to provide the implicit TypeInformation explicitly, but
> apparently it's not compatible with the way implicit inference is done.
> (TypeInformation I generate is GenericType, while
> scala.Tuple2<String, String> is expected).
> >
> > Now, I can split this in 2 operations like below:
> > val tmp = a.coGroup(b)
> >   .where(e => (e.f1, e.f2))
> >   .equalTo(e => (e.f1, e.f2))
> >
> > tmp { (left, right) => 1}
> > but, I would like to avoid adding clutter to my processing logic, and
> it's not entirely clear to me how this would be scheduled.
> >
> > As an option, I can hash the hell out of my keys like that:
> > a.coGroup(b)
> >   .where(e => (e.f1, e.f2).hashCode)
> >   .equalTo(e => (e.f1,
> e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}
> > but that, again, adds some indirection and clutter, not mentioning the
> hassle of dealing with collisions (which can be alleviated by using fancy
> hashes, but I'd like to avoid that).
> >
> > Any insights on what is the way to go here are highly appreciated.
> >
> > Thanks,
> > Timur
>
>


Implicit inference of TypeInformation for join keys

2016-03-29 Thread Timur Fayruzov
Hello,

Another issue I have encountered is incorrect implicit resolution (I'm
using Scala 2.11.7). Here's the example (with a workaround):

val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
a.coGroup(b)
  .where(e => e.f1)
  //.equalTo(e => e) { //this fails to compile because equalTo expects
an implicit
  .equalTo("f1") {
(left, right) => 1
  }

However, the workaround does not quite work when key is a tuple (I suspect
this applies to other generic classes as well):

val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d")))
val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m")))
a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String,
String)])) { (left, right) => 1} // throws InvalidProgramException

Here, I try to provide the implicit TypeInformation explicitly, but
apparently it's not compatible with the way implicit inference is done.
(TypeInformation I generate is GenericType, while
scala.Tuple2 is expected).

Now, I can split this in 2 operations like below:

val tmp = a.coGroup(b)
  .where(e => (e.f1, e.f2))
  .equalTo(e => (e.f1, e.f2))

tmp { (left, right) => 1}

but, I would like to avoid adding clutter to my processing logic, and it's
not entirely clear to me how this would be scheduled.

As an option, I can hash the hell out of my keys like that:

a.coGroup(b)
  .where(e => (e.f1, e.f2).hashCode)
  .equalTo(e => (e.f1,
e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1}

but that, again, adds some indirection and clutter, not mentioning the
hassle of dealing with collisions (which can be alleviated by using fancy
hashes, but I'd like to avoid that).

Any insights on what is the way to go here are highly appreciated.

Thanks,
Timur


Re: Why Scala Option is not a valid key?

2016-03-29 Thread Timur Fayruzov
There is some more detail to this question that I missed initially. It
turns out that my key is a case class of a form MyKey(k1: Option[String],
k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive
check whether every element of the MyKey class can be a key and fails when
encountering an Option.

Is it possible to work around this situation without giving up Options?
Inability to use Options in Domain objects could be really frustrating.

Thanks,
Timur

On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov <timur.fairu...@gmail.com>
wrote:

> Hello,
>
> I'm evaluating Flink and one thing I noticed is Option[A] can't be used as
> a key for coGroup (looking specifically here:
> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39).
> I'm not clear about the reason of this and appreciate if someone can
> explain.
>
> Thanks,
> Timur
>


Why Scala Option is not a valid key?

2016-03-29 Thread Timur Fayruzov
Hello,

I'm evaluating Flink and one thing I noticed is Option[A] can't be used as
a key for coGroup (looking specifically here:
https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39).
I'm not clear about the reason of this and appreciate if someone can
explain.

Thanks,
Timur