Re: Checking actual config values used by TaskManager
If you're talking about parameters that were set on JVM startup then `ps aux|grep flink` on an EMR slave node should do the trick, that'll give you the full command line. On Thu, Apr 28, 2016 at 9:00 PM, Ken Kruglerwrote: > Hi all, > > I’m running jobs on EMR via YARN, and wondering how to check exactly what > configuration settings are actually being used. > > This is mostly for the TaskManager. > > I know I can modify the conf/flink-conf.yaml file, and (via the CLI) I can > use -yD param=value. > > But my experience with Hadoop makes me want to see the exact values being > used, versus assuming I know what’s been set :) > > Thanks, > > — Ken > > > -- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > custom big data solutions & training > Hadoop, Cascading, Cassandra & Solr > > > >
Re: Command line arguments getting munged with CLI?
Hi Ken, I have built parameter parser in my jar to work with '--' instead of '-' and it works fine (on 1.0.0 and on current master). After a cursory look at parameter parser Flink uses (http://commons.apache.org/proper/commons-cli/) it seems that double vs single dash could make a difference, so you could try it. On Tue, Apr 26, 2016 at 11:45 AM, Ken Kruglerwrote: > Hi all, > > I’m running this command, on the master in an EMR cluster: > > ./bin/flink run -m yarn-cluster -yn 25 -yjm 1024 -ytm 4096 -c > -planner flink -inputdir xxx > > Everything seems to be starting up fine, up to: > > All TaskManagers are connected > Using the parallelism provided by the remote cluster (25). To use another > parallelism, set it at the ./bin/flink client. > > My main class then tries to parse the passed arguments, and fails. > > The arguments being passed to my main() class aren’t what I expect. I > get... > > -p > lanner > flink > inputdir > s3n://su-wikidump/wikidump-20151112/data/ > > It looks like Flink is trying to process all of my arguments, so -planner > looks like “-p”, and -inputdir gets the ‘-‘ consumed before it realizes > that there is no ‘-i’ parameter that it knows about. > > I was assuming that everything after the jar parameter would be sent > through as is. > > Any ideas? > > Thanks, > > — Ken > > PS - this is with flink 1.0.2 > > -- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > custom big data solutions & training > Hadoop, Cascading, Cassandra & Solr > >
Re: "No more bytes left" at deserialization
Hi Ken, Good point actually, thanks for pointing this out. In Flink project I see that there is dependency on 2.24 and then I see transitive dependencies through twitter.carbonite has a dependency on 2.21. Also, twitter.chill that is used to manipulate Kryo as far as I understand, shows up with versions 0.7.4 (Flink) and 0.3.5 (carbonite again). I wonder if this could cause issues since if classes are not deduplicated class loading order could be different on different machines. The maven project setup is fairly complicated and I'm not a maven expert, so I would appreciate a second look on that. Thanks, Timur On Tue, Apr 26, 2016 at 6:51 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote: > I don’t know if this is helpful, but I’d run into a similar issue (array > index out of bounds during Kryo deserialization) due to having a different > version of Kryo on the classpath. > > — Ken > > On Apr 26, 2016, at 6:23pm, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > I built master with scala 2.11 and hadoop 2.7.1, now get a different > exception (still serialization-related though): > > java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup > at > com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162)) > -> Filter (Filter at > com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))' > , caused an error: Error obtaining the sorted input: Thread 'SortMerger > Reading Thread' terminated due to an exception: Index: 97, Size: 11 > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: Index: > 97, Size: 11 > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) > at > org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > terminated due to an exception: Index: 97, Size: 11 > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) > Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11 > at java.util.ArrayList.rangeCheck(ArrayList.java:653) > at java.util.ArrayList.get(ArrayList.java:429) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232) > at > org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75) > at > org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) > > > > On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <trohrm...@apache.org> >
Re: "No more bytes left" at deserialization
I built master with scala 2.11 and hadoop 2.7.1, now get a different exception (still serialization-related though): java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:162)) -> Filter (Filter at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:163))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 97, Size: 11 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.lang.IndexOutOfBoundsException: Index: 97, Size: 11 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232) at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:75) at org.apache.flink.api.scala.typeutils.OptionSerializer.deserialize(OptionSerializer.scala:28) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:106) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:30) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:163) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) On Tue, Apr 26, 2016 at 9:07 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Then let's keep finger crossed that we've found the culprit :-) > > On Tue, Apr 26, 2016 at 6:02 PM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > >> Thank you Till. >> >> I will try to run with new binaries today. As I have mentioned, the error >> is reproducible only on a full dataset, so coming up with sample input data >> may be problematic (not to mention that the real data can't be shared). >> I'll see if I can replicate it, but could take a bit longer. Thank you very >> much for your effort. >> >> On Tue, Apr 26, 2016 at 8:46 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Timur, >>> >>> I’ve got good and not so good news. Let’s start with the not so good >>> news. I couldn’t reproduce your problem but the good news is that I found a >>> bug in the duplication logic of the OptionSerializer. I’ve already >>> committed a patch to the master to fix it. >>> >>> Thus, I wanted to ask you, whether you could try out the latest master >>> and check whether your problem still persists. If that’s the case, could >>> you send me your complete code with sample input data which reproduces your >>> problem? >>> >>> Cheers, >>>
Re: Job hangs
Robert, Ufuk, logs, execution plan and a screenshot of the console are in the archive: https://www.dropbox.com/s/68gyl6f3rdzn7o1/debug-stuck.tar.gz?dl=0 Note that when I looked in the backpressure view I saw back pressure 'high' on following paths: Input->code_line:123,124->map->join Input->code_line:134,135->map->join Input->code_line:121->map->join Unfortunately, I was not able to take thread dumps nor heap dumps (neither kill -3, jstack nor jmap worked, some Amazon AMI problem I assume). Hope that helps. Please, let me know if I can assist you in any way. Otherwise, I probably would not be actively looking at this problem. Thanks, Timur On Tue, Apr 26, 2016 at 8:11 AM, Ufuk Celebi <u...@apache.org> wrote: > Can you please further provide the execution plan via > > env.getExecutionPlan() > > > > On Tue, Apr 26, 2016 at 4:23 PM, Timur Fayruzov > <timur.fairu...@gmail.com> wrote: > > Hello Robert, > > > > I observed progress for 2 hours(meaning numbers change on dashboard), and > > then I waited for 2 hours more. I'm sure it had to spill at some point, > but > > I figured 2h is enough time. > > > > Thanks, > > Timur > > > > On Apr 26, 2016 1:35 AM, "Robert Metzger" <rmetz...@apache.org> wrote: > >> > >> Hi Timur, > >> > >> thank you for sharing the source code of your job. That is helpful! > >> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is > much > >> more IO heavy with the larger input data because all the joins start > >> spilling? > >> Our monitoring, in particular for batch jobs is really not very > advanced.. > >> If we had some monitoring showing the spill status, we would maybe see > that > >> the job is still running. > >> > >> How long did you wait until you declared the job hanging? > >> > >> Regards, > >> Robert > >> > >> > >> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi <u...@apache.org> wrote: > >>> > >>> No. > >>> > >>> If you run on YARN, the YARN logs are the relevant ones for the > >>> JobManager and TaskManager. The client log submitting the job should > >>> be found in /log. > >>> > >>> – Ufuk > >>> > >>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov > >>> <timur.fairu...@gmail.com> wrote: > >>> > I will do it my tomorrow. Logs don't show anything unusual. Are there > >>> > any > >>> > logs besides what's in flink/log and yarn container logs? > >>> > > >>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi" <u...@apache.org> wrote: > >>> > > >>> > Hey Timur, > >>> > > >>> > is it possible to connect to the VMs and get stack traces of the > Flink > >>> > processes as well? > >>> > > >>> > We can first have a look at the logs, but the stack traces will be > >>> > helpful if we can't figure out what the issue is. > >>> > > >>> > – Ufuk > >>> > > >>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann <trohrm...@apache.org > > > >>> > wrote: > >>> >> Could you share the logs with us, Timur? That would be very helpful. > >>> >> > >>> >> Cheers, > >>> >> Till > >>> >> > >>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" <timur.fairu...@gmail.com > > > >>> >> wrote: > >>> >>> > >>> >>> Hello, > >>> >>> > >>> >>> Now I'm at the stage where my job seem to completely hang. Source > >>> >>> code is > >>> >>> attached (it won't compile but I think gives a very good idea of > what > >>> >>> happens). Unfortunately I can't provide the datasets. Most of them > >>> >>> are > >>> >>> about > >>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks > 6GB > >>> >>> memory > >>> >>> for each. > >>> >>> > >>> >>> It was working for smaller input sizes. Any idea on what I can do > >>> >>> differently is appreciated. > >>> >>> > >>> >>> Thans, > >>> >>> Timur > >> > >> > > >
Re: Job hangs
Hello Robert, I observed progress for 2 hours(meaning numbers change on dashboard), and then I waited for 2 hours more. I'm sure it had to spill at some point, but I figured 2h is enough time. Thanks, Timur On Apr 26, 2016 1:35 AM, "Robert Metzger" <rmetz...@apache.org> wrote: > Hi Timur, > > thank you for sharing the source code of your job. That is helpful! > Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is much > more IO heavy with the larger input data because all the joins start > spilling? > Our monitoring, in particular for batch jobs is really not very advanced.. > If we had some monitoring showing the spill status, we would maybe see that > the job is still running. > > How long did you wait until you declared the job hanging? > > Regards, > Robert > > > On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi <u...@apache.org> wrote: > >> No. >> >> If you run on YARN, the YARN logs are the relevant ones for the >> JobManager and TaskManager. The client log submitting the job should >> be found in /log. >> >> – Ufuk >> >> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov >> <timur.fairu...@gmail.com> wrote: >> > I will do it my tomorrow. Logs don't show anything unusual. Are there >> any >> > logs besides what's in flink/log and yarn container logs? >> > >> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi" <u...@apache.org> wrote: >> > >> > Hey Timur, >> > >> > is it possible to connect to the VMs and get stack traces of the Flink >> > processes as well? >> > >> > We can first have a look at the logs, but the stack traces will be >> > helpful if we can't figure out what the issue is. >> > >> > – Ufuk >> > >> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >> Could you share the logs with us, Timur? That would be very helpful. >> >> >> >> Cheers, >> >> Till >> >> >> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" <timur.fairu...@gmail.com> >> >> wrote: >> >>> >> >>> Hello, >> >>> >> >>> Now I'm at the stage where my job seem to completely hang. Source >> code is >> >>> attached (it won't compile but I think gives a very good idea of what >> >>> happens). Unfortunately I can't provide the datasets. Most of them are >> >>> about >> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB >> >>> memory >> >>> for each. >> >>> >> >>> It was working for smaller input sizes. Any idea on what I can do >> >>> differently is appreciated. >> >>> >> >>> Thans, >> >>> Timur >> > >
Re: convert Json to Tuple
Normally, Json4s or Jackson+scala plugin work well for json to scala data structure conversions. However, I would not expect they support a special case for tuples, since JSON key-value fields would normally convert to case classes and JSON arrays are converted to, well, arrays. That's being said, StackOverflow to the rescue: http://stackoverflow.com/questions/31909308/is-it-possible-to-parse-json-array-into-a-tuple-with-json4s I didn't try the approach myself, though. On Mon, Apr 25, 2016 at 6:50 PM, Alexander Smirnov < alexander.smirn...@gmail.com> wrote: > Hello everybody! > > my RMQSource function receives string with JSONs in it. > Because many operations in Flink rely on Tuple operations, I think it is a > good idea to convert JSON to Tuple. > > I believe this task has been solved already :) > what's the common approach for this conversion? > > Thank you, > Alex >
Job hangs
Hello, Now I'm at the stage where my job seem to completely hang. Source code is attached (it won't compile but I think gives a very good idea of what happens). Unfortunately I can't provide the datasets. Most of them are about 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB memory for each. It was working for smaller input sizes. Any idea on what I can do differently is appreciated. Thans, Timur FaithResolution.scala Description: Binary data
Re: YARN terminating TaskNode
Great answer, thanks you Max for a very detailed explanation! Illuminating how off-heap parameter affects the memory allocation. I read this post: https://blogs.oracle.com/jrockit/entry/why_is_my_jvm_process_larger_t and the thing that jumped on me is the allocation of memory for jni libs. I do use a native library in my application, which is likely the culprit. I need to account for its memory footprint when doing my memory calculations. Thanks, Timur On Mon, Apr 25, 2016 at 10:28 AM, Maximilian Michels <m...@apache.org> wrote: > Hi Timur, > > Shedding some light on the memory calculation: > > You have a total memory size of 2500 MB for each TaskManager. The > default for 'taskmanager.memory.fraction' is 0.7. This is the fraction > of the memory used by the memory manager. When you have turned on > off-heap memory, this memory is allocated off-heap. As you pointed > out, the default Yarn cutoff ratio is 0.25. > > Memory cutoff for Yarn: 2500 * 0.25 MB = 625 MB > > Java heap size with off-heap disabled: 2500 MB - 625 MB = 1875 MB > > Java heap size with off-heap enabled: (2500 MB - 625 MB) * 0.3 = 562,5 > MB (~570 MB in your case) > Off-heap memory size: (2500 MB - 625 MB) * 0.7 = 1312,5 MB > > The heap memory limits in your log seem to be calculated correctly. > Note that we don't set a strict limit for the off-heap memory because > the Flink memory manager controls the amount of memory allocated. It > will preallocate memory when you have 'taskmanager.memory.preallocate' > set to true. Otherwise it will allocate dynamically. Still, you should > have about 500 MB memory left with everything allocated. There is some > more direct (off-heap) memory allocated for the network stack > adjustable with 'taskmanager.network.numberOfBuffers' which is set to > 2048 by default and corresponds to 2048 * 32 KB = 64 MB memory. I > believe this can grow up to twice of that size. Still, should be > enough memory left. > > Are you running a streaming or batch job? Off-heap memory and memory > preallocation are mostly beneficial for batch jobs which use the > memory manager a lot for sorting, hashing and caching. > > For streaming I'd suggest to use Flink's defaults: > > taskmanager.memory.off-heap: false > taskmanager.memory.preallocate: false > > Raising the cutoff ratio should prevent killing of the TaskManagers. > As Robert mentioned, in practice the JVM tends to allocate more than > the maximum specified heap size. You can put the following in your > flink-conf.yaml: > > # slightly raise the cut off ratio (might need to be even higher) > yarn.heap-cutoff-ratio: 0.3 > > Thanks, > Max > > On Mon, Apr 25, 2016 at 5:52 PM, Timur Fayruzov > <timur.fairu...@gmail.com> wrote: > > Hello Maximilian, > > > > I'm using 1.0.0 compiled with Scala 2.11 and Hadoop 2.7. I'm running > this on > > EMR. I didn't see any exceptions in other logs. What are the logs you are > > interested in? > > > > Thanks, > > Timur > > > > On Mon, Apr 25, 2016 at 3:44 AM, Maximilian Michels <m...@apache.org> > wrote: > >> > >> Hi Timur, > >> > >> Which version of Flink are you using? Could you share the entire logs? > >> > >> Thanks, > >> Max > >> > >> On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger <rmetz...@apache.org> > >> wrote: > >> > Hi Timur, > >> > > >> > The reason why we only allocate 570mb for the heap is because you are > >> > allocating most of the memory as off heap (direct byte buffers). > >> > > >> > In theory, the memory footprint of the JVM is limited to 570 (heap) + > >> > 1900 > >> > (direct mem) = 2470 MB (which is below 2500). But in practice thje JVM > >> > is > >> > allocating more memory, causing these killings by YARN. > >> > > >> > I have to check the code of Flink again, because I would expect the > >> > safety > >> > boundary to be much larger than 30 mb. > >> > > >> > Regards, > >> > Robert > >> > > >> > > >> > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov > >> > <timur.fairu...@gmail.com> > >> > wrote: > >> >> > >> >> Hello, > >> >> > >> >> Next issue in a string of things I'm solving is that my application > >> >> fails > >> >> with the message 'Connection unexpectedly closed by remote task > >> >> manager'. > >> >> > >> >> Yarn log shows the following: >
Re: Access to a shared resource within a mapper
Hi Fabian, I didn't realize you meant that lazy val should be inside RichMapFunction implementation, it makes sense. That's what I ended up doing already. Thanks! Timur On Mon, Apr 25, 2016 at 3:34 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Timur, > > a TaskManager may run as many subtasks of a Map operator as it has slots. > Each subtask of an operator runs in a different thread. Each parallel > subtask of a Map operator has its own MapFunction object, so it should be > possible to use a lazy val. > > However, you should not use static variables to hold state, because these > are shared between all MapFunction in a TaskManager (JVM). > > 2016-04-22 21:21 GMT+02:00 Timur Fayruzov <timur.fairu...@gmail.com>: > >> Actually, a follow-up question: is map function single-threaded (within >> one task manager, that is). If it's not then lazy initialization wont' >> work, is it right? >> >> On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote: >> >>> You may also be able to initialize the client only in the parallel >>> execution by making it a "lazy" variable in Scala. >>> >>> On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov < >>> timur.fairu...@gmail.com> wrote: >>> >>>> Outstanding! Thanks, Aljoscha. >>>> >>>> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> you could use a RichMapFunction that has an open method: >>>>> >>>>> data.map(new RichMapFunction[...]() { >>>>> def open(): () = { >>>>> // initialize client >>>>> } >>>>> >>>>> def map(input: INT): OUT = { >>>>> // use client >>>>> } >>>>> } >>>>> >>>>> the open() method is called before any elements are passed to the >>>>> function. The counterpart of open() is close(), which is called after all >>>>> elements are through or if the job cancels. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I'm writing a Scala Flink application. I have a standalone process >>>>>> that exists on every Flink node that I need to call to transform my data. >>>>>> To access this process I need to initialize non thread-safe client >>>>>> first. I >>>>>> would like to avoid initializing a client for each element being >>>>>> transformed. A straightforward implementation would be something like >>>>>> this: >>>>>> ``` >>>>>> >>>>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c" >>>>>> val pool = new ArrayBlockingQueue[Client](5) >>>>>> // pool is filled here >>>>>> data.map(e => { >>>>>> val client = pool.take() >>>>>> val res = client.transform(e) >>>>>> pool.put(client) >>>>>> res >>>>>> }) >>>>>> >>>>>> ``` >>>>>> However, this causes a runtime exception with message "Task not >>>>>> serializable", which makes sense. >>>>>> >>>>>> Function parameters and broadcast variables won't work either as far >>>>>> as I understand. Is there a way to make this happen? >>>>>> >>>>>> Thanks, >>>>>> Timur >>>>>> >>>>> >>>> >>> >> >
Re: YARN terminating TaskNode
Hello Maximilian, I'm using 1.0.0 compiled with Scala 2.11 and Hadoop 2.7. I'm running this on EMR. I didn't see any exceptions in other logs. What are the logs you are interested in? Thanks, Timur On Mon, Apr 25, 2016 at 3:44 AM, Maximilian Michels <m...@apache.org> wrote: > Hi Timur, > > Which version of Flink are you using? Could you share the entire logs? > > Thanks, > Max > > On Mon, Apr 25, 2016 at 12:05 PM, Robert Metzger <rmetz...@apache.org> > wrote: > > Hi Timur, > > > > The reason why we only allocate 570mb for the heap is because you are > > allocating most of the memory as off heap (direct byte buffers). > > > > In theory, the memory footprint of the JVM is limited to 570 (heap) + > 1900 > > (direct mem) = 2470 MB (which is below 2500). But in practice thje JVM is > > allocating more memory, causing these killings by YARN. > > > > I have to check the code of Flink again, because I would expect the > safety > > boundary to be much larger than 30 mb. > > > > Regards, > > Robert > > > > > > On Fri, Apr 22, 2016 at 9:47 PM, Timur Fayruzov < > timur.fairu...@gmail.com> > > wrote: > >> > >> Hello, > >> > >> Next issue in a string of things I'm solving is that my application > fails > >> with the message 'Connection unexpectedly closed by remote task > manager'. > >> > >> Yarn log shows the following: > >> > >> Container [pid=4102,containerID=container_1461341357870_0004_01_15] > is > >> running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB > >> physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing > >> container. > >> Dump of the process-tree for container_1461341357870_0004_01_15 : > >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) > >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE > >> |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c > >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m > >> -XX:MaxDirectMemorySize=1900m > >> > -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log > >> -Dlogback.configurationFile=file:logback.xml > >> -Dlog4j.configuration=file:log4j.properties > >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> > >> > /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.out > >> 2> > >> > /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.err > >> |- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460 > >> /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m > >> -XX:MaxDirectMemorySize=1900m > >> > -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log > >> -Dlogback.configurationFile=file:logback.xml > >> -Dlog4j.configuration=file:log4j.properties > >> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . > >> > >> One thing that drew my attention is `-Xmx570m`. I expected it to be > >> TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the > >> application as follows: > >> HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 -yjm > >> 4096 -ytm 2500 eval-assembly-1.0.jar > >> > >> In flink logs I do see 'Task Manager memory: 2500'. When I look at the > >> yarn container logs on the cluster node I see that it starts with 570mb, > >> which puzzles me. When I look at the actually allocated memory for a > Yarn > >> container using 'top' I see 2.2GB used. Am I interpreting these > parameters > >> correctly? > >> > >> I also have set (it failed in the same way without this as well): > >> taskmanager.memory.off-heap: true > >> > >> Also, I don't understand why this happens at all. I assumed that Flink > >> won't overcommit allocated resources and will spill to the disk when > running > >> out of heap memory. Appreciate if someone can shed light on this too. > >> > >> Thanks, > >> Timur > > > > >
Re: "No more bytes left" at deserialization
y the TM timed out. > > > Are you on 1.0.x or on 1.1-SNAPSHOT? > We recently changed something related to the ExecutionConfig which has > lead to Kryo issues in the past. > > > On Sun, Apr 24, 2016 at 11:38 AM, Timur Fayruzov <timur.fairu...@gmail.com > > wrote: > >> Trying to use ProtobufSerializer -- program consistently fails with the >> following exception: >> >> java.lang.IllegalStateException: Update task on instance >> 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL: >> akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to: >> at >> org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954) >> at akka.dispatch.OnFailure.internal(Future.scala:228) >> at akka.dispatch.OnFailure.internal(Future.scala:227) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) >> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> at >> scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) >> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) >> at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) >> at >> scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: akka.pattern.AskTimeoutException: Ask timed out on >> [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]] >> after [1 ms] >> at >> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) >> at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) >> at >> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) >> at >> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) >> at >> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) >> at >> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) >> at >> akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) >> at >> akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) >> at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) >> at java.lang.Thread.run(Thread.java:745) >> >> I'm at my wits' end now, any suggestions are highly appreciated. >> >> Thanks, >> Timur >> >> >> On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <timur.fairu...@gmail.com >> > wrote: >> >>> Hello, >>> >>> I'm running a Flink program that is failing with the following exception: >>> >>> 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend >>> - Error while running the command. >>> org.apache.flink.client.program.ProgramInvocationException: The program >>> execution failed: Job execution failed. >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381) >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355) >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:315) >>> at >>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>> at >>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) >>> at >>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136) >>> at >>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>> at >>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) >>> at scala.Option.foreach(Option.scala:257) >>> at >>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48) >>> at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.
Re: "No more bytes left" at deserialization
Trying to use ProtobufSerializer -- program consistently fails with the following exception: java.lang.IllegalStateException: Update task on instance 52b9d8ffa1c0bb7251a9731c565460eb @ ip-10-105-200-137 - 1 slots - URL: akka.tcp://flink@10.105.200.137:48990/user/taskmanager failed due to: at org.apache.flink.runtime.executiongraph.Execution$6.onFailure(Execution.java:954) at akka.dispatch.OnFailure.internal(Future.scala:228) at akka.dispatch.OnFailure.internal(Future.scala:227) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:174) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:171) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at scala.runtime.AbstractPartialFunction.applyOrElse(AbstractPartialFunction.scala:28) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136) at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:134) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@10.105.200.137:48990/user/taskmanager#1418296501]] after [1 ms] at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599) at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) I'm at my wits' end now, any suggestions are highly appreciated. Thanks, Timur On Sun, Apr 24, 2016 at 2:26 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > Hello, > > I'm running a Flink program that is failing with the following exception: > > 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend > - Error while running the command. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at org.apache.flink.client.program.Client.runBlocking(Client.java:381) > at org.apache.flink.client.program.Client.runBlocking(Client.java:355) > at org.apache.flink.client.program.Client.runBlocking(Client.java:315) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) > at > com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136) > at > com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) > at > com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) > at scala.Option.foreach(Option.scala:257) > at > com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48) > at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyO
"No more bytes left" at deserialization
Hello, I'm running a Flink program that is failing with the following exception: 2016-04-23 02:00:38,947 ERROR org.apache.flink.client.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) at com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:136) at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) at com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:48) at scala.Option.foreach(Option.scala:257) at com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:48) at com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: The data preparation for task 'CHAIN CoGroup (CoGroup at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:123)) -> Filter (Filter at com.whitepages.data.flink.Resolution$.pipeline(Resolution.scala:124))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No more bytes left. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:77) at com.esotericsoftware.kryo.io.Input.readUtf8_slow(Input.java:542) at com.esotericsoftware.kryo.io.Input.readUtf8(Input.java:535) at
YARN terminating TaskNode
Hello, Next issue in a string of things I'm solving is that my application fails with the message 'Connection unexpectedly closed by remote task manager'. Yarn log shows the following: Container [pid=4102,containerID=container_1461341357870_0004_01_15] is running beyond physical memory limits. Current usage: 2.5 GB of 2.5 GB physical memory used; 9.0 GB of 12.3 GB virtual memory used. Killing container. Dump of the process-tree for container_1461341357870_0004_01_15 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 4102 4100 4102 4102 (bash) 1 7 115806208 715 /bin/bash -c /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m -XX:MaxDirectMemorySize=1900m -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.out 2> /var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.err |- 4306 4102 4102 4102 (java) 172258 40265 9495257088 646460 /usr/lib/jvm/java-1.8.0/bin/java -Xms570m -Xmx570m -XX:MaxDirectMemorySize=1900m -Dlog.file=/var/log/hadoop-yarn/containers/application_1461341357870_0004/container_1461341357870_0004_01_15/taskmanager.log -Dlogback.configurationFile=file:logback.xml -Dlog4j.configuration=file:log4j.properties org.apache.flink.yarn.YarnTaskManagerRunner --configDir . One thing that drew my attention is `-Xmx570m`. I expected it to be TaskManagerMemory*0.75 (due to yarn.heap-cutoff-ratio). I run the application as follows: HADOOP_CONF_DIR=/etc/hadoop/conf flink run -m yarn-cluster -yn 18 -yjm 4096 -ytm 2500 eval-assembly-1.0.jar In flink logs I do see 'Task Manager memory: 2500'. When I look at the yarn container logs on the cluster node I see that it starts with 570mb, which puzzles me. When I look at the actually allocated memory for a Yarn container using 'top' I see 2.2GB used. Am I interpreting these parameters correctly? I also have set (it failed in the same way without this as well): taskmanager.memory.off-heap: true Also, I don't understand why this happens at all. I assumed that Flink won't overcommit allocated resources and will spill to the disk when running out of heap memory. Appreciate if someone can shed light on this too. Thanks, Timur
Re: Access to a shared resource within a mapper
Actually, a follow-up question: is map function single-threaded (within one task manager, that is). If it's not then lazy initialization wont' work, is it right? On Fri, Apr 22, 2016 at 11:50 AM, Stephan Ewen <se...@apache.org> wrote: > You may also be able to initialize the client only in the parallel > execution by making it a "lazy" variable in Scala. > > On Fri, Apr 22, 2016 at 11:46 AM, Timur Fayruzov <timur.fairu...@gmail.com > > wrote: > >> Outstanding! Thanks, Aljoscha. >> >> On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> you could use a RichMapFunction that has an open method: >>> >>> data.map(new RichMapFunction[...]() { >>> def open(): () = { >>> // initialize client >>> } >>> >>> def map(input: INT): OUT = { >>> // use client >>> } >>> } >>> >>> the open() method is called before any elements are passed to the >>> function. The counterpart of open() is close(), which is called after all >>> elements are through or if the job cancels. >>> >>> Cheers, >>> Aljoscha >>> >>> On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> I'm writing a Scala Flink application. I have a standalone process that >>>> exists on every Flink node that I need to call to transform my data. To >>>> access this process I need to initialize non thread-safe client first. I >>>> would like to avoid initializing a client for each element being >>>> transformed. A straightforward implementation would be something like this: >>>> ``` >>>> >>>> val env = ExecutionEnvironment.getExecutionEnvironment >>>> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c" >>>> val pool = new ArrayBlockingQueue[Client](5) >>>> // pool is filled here >>>> data.map(e => { >>>> val client = pool.take() >>>> val res = client.transform(e) >>>> pool.put(client) >>>> res >>>> }) >>>> >>>> ``` >>>> However, this causes a runtime exception with message "Task not >>>> serializable", which makes sense. >>>> >>>> Function parameters and broadcast variables won't work either as far as >>>> I understand. Is there a way to make this happen? >>>> >>>> Thanks, >>>> Timur >>>> >>> >> >
Re: Access to a shared resource within a mapper
Outstanding! Thanks, Aljoscha. On Fri, Apr 22, 2016 at 2:06 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > you could use a RichMapFunction that has an open method: > > data.map(new RichMapFunction[...]() { > def open(): () = { > // initialize client > } > > def map(input: INT): OUT = { > // use client > } > } > > the open() method is called before any elements are passed to the > function. The counterpart of open() is close(), which is called after all > elements are through or if the job cancels. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 22:21 Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > >> Hello, >> >> I'm writing a Scala Flink application. I have a standalone process that >> exists on every Flink node that I need to call to transform my data. To >> access this process I need to initialize non thread-safe client first. I >> would like to avoid initializing a client for each element being >> transformed. A straightforward implementation would be something like this: >> ``` >> >> val env = ExecutionEnvironment.getExecutionEnvironment >> val data = env.fromCollection(Seq(MyKey(Some("a")), MyKey(Some("c" >> val pool = new ArrayBlockingQueue[Client](5) >> // pool is filled here >> data.map(e => { >> val client = pool.take() >> val res = client.transform(e) >> pool.put(client) >> res >> }) >> >> ``` >> However, this causes a runtime exception with message "Task not >> serializable", which makes sense. >> >> Function parameters and broadcast variables won't work either as far as I >> understand. Is there a way to make this happen? >> >> Thanks, >> Timur >> >
Re: Integrate Flink with S3 on EMR cluster
The exception does not show up in the console when I run the job, it only shows in the logs. I thought it means that it happens either on AM or TM (I assume what I see in stdout is client log). Is my thinking right? On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi <u...@apache.org> wrote: > Hey Timur, > > Just had a chat with Robert about this. I agree that the error message > is confusing, but it is fine it this case. The file system classes are > not on the class path of the client process, which is submitting the > job. Do you mean that classes should be in the classpath of `org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I tried to add EMRFS jars to this classpath but it did not help. BTW, it seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only point that I control here to add to classpath, so I had to set it manually. > It fails to sample the input file sizes, but this is just an > optimization step and hence it does not fail the job and only logs the > error. > Is this optimization only for client side? In other words, does it affect Flink's ability to choose proper type of a join? > > After the job is submitted everything should run as expected. > > You should be able to get rid of that exception by adding the missing > classes to the class path of the client process (started via > bin/flink), for example via the lib folder. > The above approach did not work, could you elaborate what you meant by 'lib folder'? Thanks, Timur > – Ufuk > > > > > On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > There's one more filesystem integration failure that I have found. My > job on > > a toy dataset succeeds, but Flink log contains the following message: > > 2016-04-07 18:10:01,339 ERROR > > org.apache.flink.api.common.io.DelimitedInputFormat - > Unexpected > > problen while getting the file statistics for file 's3://...': > > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class > > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found > > java.lang.RuntimeException: java.lang.RuntimeException: > > java.lang.ClassNotFoundException: Class > > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found > > at > > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227) > > at > > > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) > > at > > > org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) > > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) > > at > > > org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293) > > at > > > org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45) > > at > > > org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166) > > at > > > org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588) > > at > > > org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61) > > at > > > org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32) > > at > > > org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250) > > at > > > org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515) > > at > > org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248) > > at > org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477) > > at > org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) > > at > > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228) > > at > > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567) > > at > > org.apache.flink.client.program.Client.runBlocking(Client.java:314) > > at > > > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) > > at > > > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) > > at > > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) > > at > > > co
Re: Integrate Flink with S3 on EMR cluster
t;u...@apache.org> wrote: > Yes, for sure. > > I added some documentation for AWS here: > https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html > > Would be nice to update that page with your pull request. :-) > > – Ufuk > > > On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <chiwanp...@apache.org> wrote: > > Hi Timur, > > > > Great! Bootstrap action for Flink is good for AWS users. I think the > bootstrap action scripts would be placed in `flink-contrib` directory. > > > > If you want, one of people in PMC of Flink will be assign FLINK-1337 to > you. > > > > Regards, > > Chiwan Park > > > >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > >> > >> I had a guide like that. > >> > > >
Re: Using native libraries in Flink EMR jobs
Thank you, Till! setting the flag in flink-conf.yalm worked, I'm very glad that it was resolved. Note, however, passing it as an argument to flink script did not work. I tried to pass it as: `-yDenv.java.opts="-Djava.library.path="`. I did not investigate any further at this time. Thanks, Timur On Thu, Apr 7, 2016 at 2:48 AM, Till Rohrmann <trohrm...@apache.org> wrote: > For passing the dynamic property directly when running things on YARN, you > have to use -yDenv.java.opts="..." > > > On Thu, Apr 7, 2016 at 11:42 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Timur, >> >> what you can try doing is to pass the JVM parameter >> -Djava.library.path= via the env.java.opts to the system. You >> simply have to add env.java.opts: "-Djava.library.path=" in the >> flink-conf.yaml or via -Denv.java.opts="-Djava.library.path=", if >> I’m not mistaken. >> >> Cheers >> Till >> >> >> On Thu, Apr 7, 2016 at 10:07 AM, Timur Fayruzov <timur.fairu...@gmail.com >> > wrote: >> >>> there is a hack for this issue: copying my native library to >>> $HADOOP_HOME/lib/native makes it discoverable and a program runs, however >>> this is not an appropriate solution and it seems to be fragile. >>> >>> I tried to find where 'lib/native' path appears in the configuration and >>> found 2 places: >>> hadoop-env.sh: export >>> JAVA_LIBRARY_PATH="$JAVA_LIBRARY_PATH:/usr/lib/hadoop-lzo/lib/native >>> mapred-site.xml: key: mapreduce.admin.user.env >>> >>> I tried to add path to dir with my native lib in both places, but still >>> no luck. >>> >>> Thanks, >>> Timur >>> >>> On Wed, Apr 6, 2016 at 11:21 PM, Timur Fayruzov < >>> timur.fairu...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> I'm not sure whether it's a Hadoop or Flink-specific question, but >>>> since I ran into this in the context of Flink I'm asking here. I would be >>>> glad if anyone can suggest a more appropriate place. >>>> >>>> I have a native library that I need to use in my Flink batch job that I >>>> run on EMR, and I try to point JVM to the location of native library. >>>> Normally, I'd do this using java.library.path parameter. So I try to run as >>>> follows: >>>> ` >>>> HADOOP_CONF_DIR=/etc/hadoop/conf >>>> JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m >>>> yarn-cluster -yn 1 -yjm 768 -ytm 768 >>>> ` >>>> It does not work, fails with `java.lang.UnsatisfiedLinkError` when >>>> trying to load the native lib. It probably has to do with YARN not not >>>> passing this parameter to task nodes, but my understanding of this >>>> mechanism is quite limited so far. >>>> >>>> I dug up this Jira ticket: >>>> https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting >>>> LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem >>>> either. >>>> >>>> Any help or hint where to look is highly appreciated. >>>> >>>> Thanks, >>>> Timur >>>> >>> >>> >> >
Re: Using native libraries in Flink EMR jobs
there is a hack for this issue: copying my native library to $HADOOP_HOME/lib/native makes it discoverable and a program runs, however this is not an appropriate solution and it seems to be fragile. I tried to find where 'lib/native' path appears in the configuration and found 2 places: hadoop-env.sh: export JAVA_LIBRARY_PATH="$JAVA_LIBRARY_PATH:/usr/lib/hadoop-lzo/lib/native mapred-site.xml: key: mapreduce.admin.user.env I tried to add path to dir with my native lib in both places, but still no luck. Thanks, Timur On Wed, Apr 6, 2016 at 11:21 PM, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > Hello, > > I'm not sure whether it's a Hadoop or Flink-specific question, but since I > ran into this in the context of Flink I'm asking here. I would be glad if > anyone can suggest a more appropriate place. > > I have a native library that I need to use in my Flink batch job that I > run on EMR, and I try to point JVM to the location of native library. > Normally, I'd do this using java.library.path parameter. So I try to run as > follows: > ` > HADOOP_CONF_DIR=/etc/hadoop/conf > JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m > yarn-cluster -yn 1 -yjm 768 -ytm 768 > ` > It does not work, fails with `java.lang.UnsatisfiedLinkError` when trying > to load the native lib. It probably has to do with YARN not not passing > this parameter to task nodes, but my understanding of this mechanism is > quite limited so far. > > I dug up this Jira ticket: > https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting > LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem > either. > > Any help or hint where to look is highly appreciated. > > Thanks, > Timur >
Using native libraries in Flink EMR jobs
Hello, I'm not sure whether it's a Hadoop or Flink-specific question, but since I ran into this in the context of Flink I'm asking here. I would be glad if anyone can suggest a more appropriate place. I have a native library that I need to use in my Flink batch job that I run on EMR, and I try to point JVM to the location of native library. Normally, I'd do this using java.library.path parameter. So I try to run as follows: ` HADOOP_CONF_DIR=/etc/hadoop/conf JVM_ARGS=-Djava.library.path= flink-1.0.0/bin/flink run -m yarn-cluster -yn 1 -yjm 768 -ytm 768 ` It does not work, fails with `java.lang.UnsatisfiedLinkError` when trying to load the native lib. It probably has to do with YARN not not passing this parameter to task nodes, but my understanding of this mechanism is quite limited so far. I dug up this Jira ticket: https://issues.apache.org/jira/browse/MAPREDUCE-3693, but setting LD_LIBRARY_PATH in mapreduce.admin.user.env did not solve the problem either. Any help or hint where to look is highly appreciated. Thanks, Timur
Re: Integrate Flink with S3 on EMR cluster
Yes, Hadoop version was the culprit. It turns out that EMRFS requires at least 2.4.0 (judging from the exception in the initial post, I was not able to find the official requirements). Rebuilding Flink with Hadoop 2.7.1 and with Scala 2.11 worked like a charm and I was able to run WordCount using S3 both for inputs and outputs. I did *not* need to change any configuration (as outlined https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/connectors.html ). Thanks for bearing with me, Ufuk! While looking for the solution I found this issue: https://issues.apache.org/jira/browse/FLINK-1337. I have a setup for EMR cluster now, so I can make PR describing it. if it's still relevant. I, for one example, would have saved couple days if I had a guide like that. Thanks, Timur On Tue, Apr 5, 2016 at 10:43 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Timur, > > if you are using EMR with IAM roles, Flink should work out of the box. > You don't need to change the Hadoop config and the IAM role takes care > of setting up all credentials at runtime. You don't need to hardcode > any keys in your application that way and this is the recommended way > to go in order to not worry about securely exchanging the keys and > then keeping them secure afterwards. > > With EMR 4.4.0 you have to use a Flink binary version built against > Hadoop 2.7. Did you do that? Can you please retry with an > out-of-the-box Flink and just run it like this: > > HADOOP_CONF_DIR =/etc/hadoop/conf bin/flink etc. > > Hope this helps! Please report back. :-) > > – Ufuk > > > On Tue, Apr 5, 2016 at 5:47 PM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > Hello Ufuk, > > > > I'm using EMR 4.4.0. > > > > Thanks, > > Timur > > > > On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi <u...@apache.org> wrote: > >> > >> Hey Timur, > >> > >> which EMR version are you using? > >> > >> – Ufuk > >> > >> On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov < > timur.fairu...@gmail.com> > >> wrote: > >> > Thanks for the answer, Ken. > >> > > >> > My understanding is that file system selection is driven by the > >> > following > >> > sections in core-site.xml: > >> > > >> > fs.s3.impl > >> > > >> > org.apache.hadoop.fs.s3native.NativeS3FileSystem > >> > > >> > > >> > > >> > fs.s3n.impl > >> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem > >> > > >> > > >> > If I run the program using configuration above with s3n (and also > >> > modifying > >> > credential keys to use s3n) it fails with the same error, but there is > >> > no > >> > "... opening key ..." logs. S3a seems to be not supported, it fails > with > >> > the > >> > following: > >> > Caused by: java.io.IOException: No file system found with scheme s3a, > >> > referenced in file URI 's3a://'. > >> > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296) > >> > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311) > >> > at > >> > > >> > > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450) > >> > at > >> > > >> > > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) > >> > at > >> > > >> > > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156) > >> > ... 23 more > >> > > >> > I am puzzled by the fact that EMRFS is still apparently referenced > >> > somewhere > >> > as an implementation for S3 protocol, I'm not able to locate where > this > >> > configuration is set. > >> > > >> > > >> > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler > >> > <kkrugler_li...@transpac.com> > >> > wrote: > >> >> > >> >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not > >> >> s3. > >> >> > >> >> Though EMR has some support for magically treating the s3 protocol as > >> >> s3n > >> >> (or maybe s3a now, with Hadoop 2.6 or later) > >> >> > >> >> What happens if you use s3n:/// for the > --input > >> >> parameter? > >> >> > >> >> — Ken > >> >> > >> >>
Re: Integrate Flink with S3 on EMR cluster
Hello Ufuk, I'm using EMR 4.4.0. Thanks, Timur On Tue, Apr 5, 2016 at 2:18 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Timur, > > which EMR version are you using? > > – Ufuk > > On Tue, Apr 5, 2016 at 1:43 AM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > Thanks for the answer, Ken. > > > > My understanding is that file system selection is driven by the following > > sections in core-site.xml: > > > > fs.s3.impl > > > > org.apache.hadoop.fs.s3native.NativeS3FileSystem > > > > > > > > fs.s3n.impl > > com.amazon.ws.emr.hadoop.fs.EmrFileSystem > > > > > > If I run the program using configuration above with s3n (and also > modifying > > credential keys to use s3n) it fails with the same error, but there is no > > "... opening key ..." logs. S3a seems to be not supported, it fails with > the > > following: > > Caused by: java.io.IOException: No file system found with scheme s3a, > > referenced in file URI 's3a://'. > > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296) > > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311) > > at > > > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450) > > at > > > org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) > > at > > > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156) > > ... 23 more > > > > I am puzzled by the fact that EMRFS is still apparently referenced > somewhere > > as an implementation for S3 protocol, I'm not able to locate where this > > configuration is set. > > > > > > On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler <kkrugler_li...@transpac.com > > > > wrote: > >> > >> Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not > s3. > >> > >> Though EMR has some support for magically treating the s3 protocol as > s3n > >> (or maybe s3a now, with Hadoop 2.6 or later) > >> > >> What happens if you use s3n:/// for the --input > >> parameter? > >> > >> — Ken > >> > >> On Apr 4, 2016, at 2:51pm, Timur Fayruzov <timur.fairu...@gmail.com> > >> wrote: > >> > >> Hello, > >> > >> I'm trying to run a Flink WordCount job on an AWS EMR cluster. I > succeeded > >> with a three-step procedure: load data from S3 to cluster's HDFS, run > Flink > >> Job, unload outputs from HDFS to S3. > >> > >> However, ideally I'd like to read/write data directly from/to S3. I > >> followed the instructions here: > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html > , > >> more specifically I: > >> 1. Modified flink-conf to point to /etc/hadoop/conf > >> 2. Modified core-site.xml per link above (not clear why why it is not > >> using IAM, I had to provide AWS keys explicitly). > >> > >> Run the following command: > >> HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m > yarn-cluster > >> -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input > >> s3:// --output hdfs:///flink-output > >> > >> First, I see messages like that: > >> 2016-04-04 21:37:10,418 INFO > >> org.apache.hadoop.fs.s3native.NativeS3FileSystem - Opening > key > >> '' for reading at position '333000' > >> > >> Then, it fails with the following error: > >> > >> > >> > >> The program finished with the following exception: > >> > >> > >> org.apache.flink.client.program.ProgramInvocationException: The program > >> execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90 > >> (WordCount Example) > >> > >> at org.apache.flink.client.program.Client.runBlocking(Client.java:381) > >> > >> at org.apache.flink.client.program.Client.runBlocking(Client.java:355) > >> > >> at org.apache.flink.client.program.Client.runBlocking(Client.java:315) > >> > >> at > >> > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) > >> > >> at > >> > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90) > >> > >> at sun.reflect.Native
Re: Integrate Flink with S3 on EMR cluster
Thanks for the answer, Ken. My understanding is that file system selection is driven by the following sections in core-site.xml: fs.s3.impl org.apache.hadoop.fs.s3native.NativeS3FileSystem fs.s3n.impl com.amazon.ws.emr.hadoop.fs.EmrFileSystem If I run the program using configuration above with s3n (and also modifying credential keys to use s3n) it fails with the same error, but there is no "... opening key ..." logs. S3a seems to be not supported, it fails with the following: Caused by: java.io.IOException: No file system found with scheme s3a, referenced in file URI 's3a://'. at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:296) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:311) at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:450) at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57) at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:156) ... 23 more I am puzzled by the fact that EMRFS is still apparently referenced somewhere as an implementation for S3 protocol, I'm not able to locate where this configuration is set. On Mon, Apr 4, 2016 at 4:07 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote: > Normally in Hadoop jobs you’d want to use s3n:// as the protocol, not s3. > > Though EMR has some support for magically treating the s3 protocol as s3n > (or maybe s3a now, with Hadoop 2.6 or later) > > What happens if you use s3n:/// for the --input > parameter? > > — Ken > > On Apr 4, 2016, at 2:51pm, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > Hello, > > I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded > with a three-step procedure: load data from S3 to cluster's HDFS, run Flink > Job, unload outputs from HDFS to S3. > > However, ideally I'd like to read/write data directly from/to S3. I > followed the instructions here: > https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html, > more specifically I: > 1. Modified flink-conf to point to /etc/hadoop/conf > 2. Modified core-site.xml per link above (not clear why why it is not > using IAM, I had to provide AWS keys explicitly). > > Run the following command: > HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster > -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input > s3:// --output hdfs:///flink-output > > First, I see messages like that: > 2016-04-04 21:37:10,418 INFO > org.apache.hadoop.fs.s3native.NativeS3FileSystem - Opening key > '' for reading at position '333000' > > Then, it fails with the following error: > > > > The program finished with the following exception: > > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90 > (WordCount Example) > > at org.apache.flink.client.program.Client.runBlocking(Client.java:381) > > at org.apache.flink.client.program.Client.runBlocking(Client.java:355) > > at org.apache.flink.client.program.Client.runBlocking(Client.java:315) > > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) > > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed > to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example) > > at org.apache.flink.runtime.jobmanager.JobManager.org > <http://org.apache.flink.runtime.jobmanager.jobmanager.org/> > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100) > > at > org.apache.flin
Integrate Flink with S3 on EMR cluster
Hello, I'm trying to run a Flink WordCount job on an AWS EMR cluster. I succeeded with a three-step procedure: load data from S3 to cluster's HDFS, run Flink Job, unload outputs from HDFS to S3. However, ideally I'd like to read/write data directly from/to S3. I followed the instructions here: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html, more specifically I: 1. Modified flink-conf to point to /etc/hadoop/conf 2. Modified core-site.xml per link above (not clear why why it is not using IAM, I had to provide AWS keys explicitly). Run the following command: HADOOP_CONF_DIR=/etc/hadoop/conf flink-1.0.0/bin/flink run -m yarn-cluster -yn 1 -yjm 768 -ytm 768 flink-1.0.0/examples/batch/WordCount.jar --input s3:// --output hdfs:///flink-output First, I see messages like that: 2016-04-04 21:37:10,418 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem - Opening key '' for reading at position '333000' Then, it fails with the following error: The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Failed to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example) at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) at org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:90) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to submit job fc13373d993539e647f164e12d82bf90 (WordCount Example) at org.apache.flink.runtime.jobmanager.JobManager.org $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1100) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:380) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: org.apache.hadoop.conf.Configuration.addResource(Lorg/apache/hadoop/conf/Configuration;)V at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:172) at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:696) at org.apache.flink.runtime.jobmanager.JobManager.org $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1023) ... 21 more Caused by: java.lang.NoSuchMethodError:
Re: Implicit inference of TypeInformation for join keys
I'm very content with an extra `apply`, it's much cleaner than any of my initial solutions. On Thu, Mar 31, 2016 at 2:18 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > I'm afraid there is no way around having that extra ".apply" because the > Scala compiler will get confused with the additional implicit parameter. > It's a bit ugly, though ... > > On Wed, 30 Mar 2016 at 18:34 Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > >> Actually, there is an even easier solution (which I saw in your reply to >> my other question): >> ``` >> a.coGroup(b) >> .where(e => (e.f1, e.f2)) >> .equalTo(e => (e.f1, e.f2)).apply { >> >> (left, right) => 1 >> }.print() >> ``` >> pretty much does what I want. Explicit `apply` gives a hint that a >> compiler was missing before. Nevertheless, `createTypeInformation` works >> too, thanks for sharing! >> >> Thanks, >> Timur >> >> On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <chiwanp...@apache.org> >> wrote: >> >>> Hi Timur, >>> >>> You have to use `createTypeInfomation` method in `org.apache.flink.api` >>> package to create TypeInformation object for Scala-specific objects such as >>> case classes, tuples, eithers, options. For example: >>> >>> ``` >>> import org.apache.flink.api.scala._ // to import package object >>> >>> val a: DataSet[Thing] = … >>> val b: DataSet[Thing] = … >>> >>> a.coGroup(b) >>> .where(e => (e.f1, e.f2)) >>> .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) { >>> (left, right) => 1 >>> }.print() >>> ``` >>> >>> Note that Flink creates internally copied 2-tuples consisted of >>> (extracted key by KeySelector, original value). So there is some >>> performance decrease when you are using KeySelector. >>> >>> Regards, >>> Chiwan Park >>> >>> > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com> >>> wrote: >>> > >>> > Thank you Chiwan! Yes, I understand that there are workarounds that >>> don't use function argument (and thus do not require implicit arguments). I >>> try to avoid positional and string-based keys because there is no compiler >>> guarantees when you refactor or accidentally change the underlying case >>> classes. Providing a function is the cleanest solution (and arguably is the >>> most readable) so it'd be great to make it work. >>> > >>> > BTW, TypeInformation.of has an implementation that takes TypeHint ( >>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) >>> which, according to documentation, is supposed to be used for generic >>> classes, but using it still leads to the same exception. >>> > >>> > Thanks, >>> > Timur >>> > >>> > >>> > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> >>> wrote: >>> > Hi Timur, >>> > >>> > You can use a composite key [1] to compare keys consisting of multiple >>> fields. For example: >>> > >>> > ``` >>> > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) >>> > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) >>> > a.coGroup(b) >>> > .where(“f1”, “f2”) // Flink compares the values of f1 first, and >>> compares the values of f2 if values of f1 are same. >>> > .equalTo(“f1”, “f2”) { // Note that you must specify same number of >>> keys >>> > (left, right) => 1 >>> > } >>> > ``` >>> > >>> > Composite key can be applied to Scala tuple also: >>> > >>> > ``` >>> > val a = env.fromCollection(Seq(("a", "b"), ("c", "d"))) >>> > val b = env.fromCollection(Seq(("a", "x"), ("z", "m"))) >>> > a.coGroup(b) >>> > .where(0, 1) // Note that field numbers start from 0. >>> > .equalTo(0, 1) { >>> > (left, right) => 1 >>> > } >>> > ``` >>> > >>> > I hope this helps. >>> > >>> > [1]: >>> https://ci.apache.org/project
Re: Implicit inference of TypeInformation for join keys
Actually, there is an even easier solution (which I saw in your reply to my other question): ``` a.coGroup(b) .where(e => (e.f1, e.f2)) .equalTo(e => (e.f1, e.f2)).apply { (left, right) => 1 }.print() ``` pretty much does what I want. Explicit `apply` gives a hint that a compiler was missing before. Nevertheless, `createTypeInformation` works too, thanks for sharing! Thanks, Timur On Wed, Mar 30, 2016 at 9:15 AM, Chiwan Park <chiwanp...@apache.org> wrote: > Hi Timur, > > You have to use `createTypeInfomation` method in `org.apache.flink.api` > package to create TypeInformation object for Scala-specific objects such as > case classes, tuples, eithers, options. For example: > > ``` > import org.apache.flink.api.scala._ // to import package object > > val a: DataSet[Thing] = … > val b: DataSet[Thing] = … > > a.coGroup(b) > .where(e => (e.f1, e.f2)) > .equalTo(e => (e.f1, e.f2))(createTypeInformation[(String, String)]) { > (left, right) => 1 > }.print() > ``` > > Note that Flink creates internally copied 2-tuples consisted of (extracted > key by KeySelector, original value). So there is some performance decrease > when you are using KeySelector. > > Regards, > Chiwan Park > > > On Mar 31, 2016, at 12:58 AM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > > > Thank you Chiwan! Yes, I understand that there are workarounds that > don't use function argument (and thus do not require implicit arguments). I > try to avoid positional and string-based keys because there is no compiler > guarantees when you refactor or accidentally change the underlying case > classes. Providing a function is the cleanest solution (and arguably is the > most readable) so it'd be great to make it work. > > > > BTW, TypeInformation.of has an implementation that takes TypeHint ( > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) > which, according to documentation, is supposed to be used for generic > classes, but using it still leads to the same exception. > > > > Thanks, > > Timur > > > > > > On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> > wrote: > > Hi Timur, > > > > You can use a composite key [1] to compare keys consisting of multiple > fields. For example: > > > > ``` > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > a.coGroup(b) > > .where(“f1”, “f2”) // Flink compares the values of f1 first, and > compares the values of f2 if values of f1 are same. > > .equalTo(“f1”, “f2”) { // Note that you must specify same number of > keys > > (left, right) => 1 > > } > > ``` > > > > Composite key can be applied to Scala tuple also: > > > > ``` > > val a = env.fromCollection(Seq(("a", "b"), ("c", "d"))) > > val b = env.fromCollection(Seq(("a", "x"), ("z", "m"))) > > a.coGroup(b) > > .where(0, 1) // Note that field numbers start from 0. > > .equalTo(0, 1) { > > (left, right) => 1 > > } > > ``` > > > > I hope this helps. > > > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples > > > > Regards, > > Chiwan Park > > > > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > > > > > Hello, > > > > > > Another issue I have encountered is incorrect implicit resolution (I'm > using Scala 2.11.7). Here's the example (with a workaround): > > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > > a.coGroup(b) > > > .where(e => e.f1) > > > //.equalTo(e => e) { //this fails to compile because equalTo expects > an implicit > > > .equalTo("f1") { > > > (left, right) => 1 > > > } > > > However, the workaround does not quite work when key is a tuple (I > suspect this applies to other generic classes as well): > > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing(&q
Re: Why Scala Option is not a valid key?
Thank you for your answers, Chiwan! That would mean that a generic type can't be used as a key in general? This is a non-obvious limitation of Flink DSL that I didn't see in documentation. Could you please elaborate what you mean by KeyExtractor? I see that inside `where` operator an instance of KeySelector is initialized, but I don't see how can I pass a custom KeySelector in. Thanks, Timur On Wed, Mar 30, 2016 at 12:53 AM, Chiwan Park <chiwanp...@apache.org> wrote: > Hi Timur, > > Because Option[T] is not comparable type generally (if T is a POJO type), > you cannot use Option[T] as a key type. I think you have to implement > KeyExtractor to compare objects including Option[T]s. > > ``` > case class MyKey(k1: Option[String], k2: Option[String]) > > val data1 = env.fromElements(MyKey(Some("a"), None), MyKey(Some("a"), > Some("c"))) > val data2 = env.fromElements(MyKey(Some("b"), None), MyKey(Some("a"), > Some("c"))) > > data1.join(data2) > .where(_.hashCode()) > .equalTo(_.hashCode()).apply { > (left: MyKey, right: MyKey) => (left, right) > }.print() > ``` > > Note that the approach in example (using hashCode()) cannot be applied to > sort task. > > Regards, > Chiwan Park > > > On Mar 30, 2016, at 2:37 AM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > > > There is some more detail to this question that I missed initially. It > turns out that my key is a case class of a form MyKey(k1: Option[String], > k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive > check whether every element of the MyKey class can be a key and fails when > encountering an Option. > > > > Is it possible to work around this situation without giving up Options? > Inability to use Options in Domain objects could be really frustrating. > > > > Thanks, > > Timur > > > > On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov < > timur.fairu...@gmail.com> wrote: > > Hello, > > > > I'm evaluating Flink and one thing I noticed is Option[A] can't be used > as a key for coGroup (looking specifically here: > https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39). > I'm not clear about the reason of this and appreciate if someone can > explain. > > > > Thanks, > > Timur > > > >
Re: Implicit inference of TypeInformation for join keys
Thank you Chiwan! Yes, I understand that there are workarounds that don't use function argument (and thus do not require implicit arguments). I try to avoid positional and string-based keys because there is no compiler guarantees when you refactor or accidentally change the underlying case classes. Providing a function is the cleanest solution (and arguably is the most readable) so it'd be great to make it work. BTW, TypeInformation.of has an implementation that takes TypeHint ( https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeHint.java) which, according to documentation, is supposed to be used for generic classes, but using it still leads to the same exception. Thanks, Timur On Wed, Mar 30, 2016 at 12:05 AM, Chiwan Park <chiwanp...@apache.org> wrote: > Hi Timur, > > You can use a composite key [1] to compare keys consisting of multiple > fields. For example: > > ``` > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > a.coGroup(b) > .where(“f1”, “f2”) // Flink compares the values of f1 first, and > compares the values of f2 if values of f1 are same. > .equalTo(“f1”, “f2”) { // Note that you must specify same number of keys > (left, right) => 1 > } > ``` > > Composite key can be applied to Scala tuple also: > > ``` > val a = env.fromCollection(Seq(("a", "b"), ("c", "d"))) > val b = env.fromCollection(Seq(("a", "x"), ("z", "m"))) > a.coGroup(b) > .where(0, 1) // Note that field numbers start from 0. > .equalTo(0, 1) { > (left, right) => 1 > } > ``` > > I hope this helps. > > [1]: > https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#define-keys-for-tuples > > Regards, > Chiwan Park > > > On Mar 30, 2016, at 3:54 AM, Timur Fayruzov <timur.fairu...@gmail.com> > wrote: > > > > Hello, > > > > Another issue I have encountered is incorrect implicit resolution (I'm > using Scala 2.11.7). Here's the example (with a workaround): > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > a.coGroup(b) > > .where(e => e.f1) > > //.equalTo(e => e) { //this fails to compile because equalTo expects > an implicit > > .equalTo("f1") { > > (left, right) => 1 > > } > > However, the workaround does not quite work when key is a tuple (I > suspect this applies to other generic classes as well): > > val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) > > val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) > > a.coGroup(b) > > .where(e => (e.f1, e.f2)) > > .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, > String)])) { (left, right) => 1} // throws InvalidProgramException > > Here, I try to provide the implicit TypeInformation explicitly, but > apparently it's not compatible with the way implicit inference is done. > (TypeInformation I generate is GenericType, while > scala.Tuple2<String, String> is expected). > > > > Now, I can split this in 2 operations like below: > > val tmp = a.coGroup(b) > > .where(e => (e.f1, e.f2)) > > .equalTo(e => (e.f1, e.f2)) > > > > tmp { (left, right) => 1} > > but, I would like to avoid adding clutter to my processing logic, and > it's not entirely clear to me how this would be scheduled. > > > > As an option, I can hash the hell out of my keys like that: > > a.coGroup(b) > > .where(e => (e.f1, e.f2).hashCode) > > .equalTo(e => (e.f1, > e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1} > > but that, again, adds some indirection and clutter, not mentioning the > hassle of dealing with collisions (which can be alleviated by using fancy > hashes, but I'd like to avoid that). > > > > Any insights on what is the way to go here are highly appreciated. > > > > Thanks, > > Timur > >
Implicit inference of TypeInformation for join keys
Hello, Another issue I have encountered is incorrect implicit resolution (I'm using Scala 2.11.7). Here's the example (with a workaround): val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) a.coGroup(b) .where(e => e.f1) //.equalTo(e => e) { //this fails to compile because equalTo expects an implicit .equalTo("f1") { (left, right) => 1 } However, the workaround does not quite work when key is a tuple (I suspect this applies to other generic classes as well): val a = env.fromCollection(Seq(Thing("a", "b"), Thing("c", "d"))) val b = env.fromCollection(Seq(Thing("a", "x"), Thing("z", "m"))) a.coGroup(b) .where(e => (e.f1, e.f2)) .equalTo(e => (e.f1, e.f2))(TypeInformation.of(classOf[(String, String)])) { (left, right) => 1} // throws InvalidProgramException Here, I try to provide the implicit TypeInformation explicitly, but apparently it's not compatible with the way implicit inference is done. (TypeInformation I generate is GenericType, while scala.Tuple2is expected). Now, I can split this in 2 operations like below: val tmp = a.coGroup(b) .where(e => (e.f1, e.f2)) .equalTo(e => (e.f1, e.f2)) tmp { (left, right) => 1} but, I would like to avoid adding clutter to my processing logic, and it's not entirely clear to me how this would be scheduled. As an option, I can hash the hell out of my keys like that: a.coGroup(b) .where(e => (e.f1, e.f2).hashCode) .equalTo(e => (e.f1, e.f2).hashCode)(TypeInformation.of(classOf[Int])){ (left, right) => 1} but that, again, adds some indirection and clutter, not mentioning the hassle of dealing with collisions (which can be alleviated by using fancy hashes, but I'd like to avoid that). Any insights on what is the way to go here are highly appreciated. Thanks, Timur
Re: Why Scala Option is not a valid key?
There is some more detail to this question that I missed initially. It turns out that my key is a case class of a form MyKey(k1: Option[String], k2: Option[String]). Keys.SelectorFunctionKeys is performing a recursive check whether every element of the MyKey class can be a key and fails when encountering an Option. Is it possible to work around this situation without giving up Options? Inability to use Options in Domain objects could be really frustrating. Thanks, Timur On Tue, Mar 29, 2016 at 10:19 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > Hello, > > I'm evaluating Flink and one thing I noticed is Option[A] can't be used as > a key for coGroup (looking specifically here: > https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39). > I'm not clear about the reason of this and appreciate if someone can > explain. > > Thanks, > Timur >
Why Scala Option is not a valid key?
Hello, I'm evaluating Flink and one thing I noticed is Option[A] can't be used as a key for coGroup (looking specifically here: https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionTypeInfo.scala#L39). I'm not clear about the reason of this and appreciate if someone can explain. Thanks, Timur