You can always explicitly request a broadcast join, via "joinWithTiny", "joinWithHuge", or by supplying a JoinHint.
Greetings, Stephan On Sat, Apr 9, 2016 at 1:56 AM, Timur Fayruzov <timur.fairu...@gmail.com> wrote: > Thank you Robert. One of my test cases is broadcast join, so I need to > make statistics work. The only workaround I have found so far is to copy > the contents of /usr/share/aws/emr/emrfs/lib/, /usr/share/aws/aws-java-sdk/ > and /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp-2.2.0.jar to flink/lib. > Putting these directories to HADOOP_CLASSPATH unfortunately did not work (I > am running a single machine cluster, so YARN ResourceManager and > NodeManager share the same machine). Apparently, classpath for YARN > containers does not include HADOOP_CLASSPATH. > > I'm not a Hadoop expert, so the relationship between YARN, hadoop > executable and Flink seem strange to me. hadoop executable sets up a lot of > env vars (including hadoop classpath), but it seems that this setup has no > effect on YARN application containers. Not sure it is an expected situation. > > Thanks, > Timur > > On Fri, Apr 8, 2016 at 2:38 AM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Timur, >> >> the Flink optimizer runs on the client, so the exception is thrown from >> the JVM running the ./bin/flink client. >> Since the statistics sampling is an optional step, its surrounded by a >> try / catch block that just logs the error message. >> >> More answers inline below >> >> >> On Thu, Apr 7, 2016 at 11:32 PM, Timur Fayruzov <timur.fairu...@gmail.com >> > wrote: >> >>> The exception does not show up in the console when I run the job, it >>> only shows in the logs. I thought it means that it happens either on AM or >>> TM (I assume what I see in stdout is client log). Is my thinking right? >>> >>> >>> On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi <u...@apache.org> wrote: >>> >>>> Hey Timur, >>>> >>>> Just had a chat with Robert about this. I agree that the error message >>>> is confusing, but it is fine it this case. The file system classes are >>>> not on the class path of the client process, which is submitting the >>>> job. >>> >>> Do you mean that classes should be in the classpath of >>> `org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I >>> tried to add EMRFS jars to this classpath but it did not help. BTW, it >>> seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is >>> not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only >>> point that I control here to add to classpath, so I had to set it manually. >>> >> >> Yes, they have to be in the classpath of the CliFrontend. >> The client should also work without the HADOOP_CLASSPATH being set. Its >> optional for cases where you want to manually add jars to the classpath. >> For example on Google Compute they set the HADOOP_CLASSPATH. >> >> Please note that we are not transferring the contents of the >> HADOOP_CLASSPATH to the other workers on the cluster. So you have to set >> the HADOOP_CLASSPATH on all machines. >> Another approach is just putting the required jar into the "lib/" folder >> of your Flink installation (the folder is next to "bin/", "conf/", "logs/"). >> >> >>> >>> >>>> It fails to sample the input file sizes, but this is just an >>>> optimization step and hence it does not fail the job and only logs the >>>> error. >>>> >>> Is this optimization only for client side? In other words, does it >>> affect Flink's ability to choose proper type of a join? >>> >> >> Your DataSet program is translated into a generic representation. Then, >> this representation is passed into the optimizer, which decides on join / >> sorting / data shipping strategies. The output of the optimizer is sent to >> the JobManager for execution. >> If the optimizer is not able to get good statistics about the input (like >> in your case), it will default to robust execution strategies. I don't know >> the input sizes of your job and the structure of your job, but chances are >> high that the final plan is the same with and without the input statistics. >> Only in cases where one join side is very small the input statistics >> might be relevant. >> Other optimizations, such as reusing existing data partitioning or >> ordering work independent of the input sampling. >> >> >>> >>> >>>> >>>> After the job is submitted everything should run as expected. >>>> >>>> You should be able to get rid of that exception by adding the missing >>>> classes to the class path of the client process (started via >>>> bin/flink), for example via the lib folder. >>>> >>> The above approach did not work, could you elaborate what you meant by >>> 'lib folder'? >>> >> >> See above. >> >> >> >>> >>> Thanks, >>> Timur >>> >>> >>>> – Ufuk >>>> >>>> >>>> >>>> >>>> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov < >>>> timur.fairu...@gmail.com> wrote: >>>> > There's one more filesystem integration failure that I have found. My >>>> job on >>>> > a toy dataset succeeds, but Flink log contains the following message: >>>> > 2016-04-07 18:10:01,339 ERROR >>>> > org.apache.flink.api.common.io.DelimitedInputFormat - >>>> Unexpected >>>> > problen while getting the file statistics for file 's3://...': >>>> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class >>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found >>>> > java.lang.RuntimeException: java.lang.RuntimeException: >>>> > java.lang.ClassNotFoundException: Class >>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found >>>> > at >>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227) >>>> > at >>>> > >>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460) >>>> > at >>>> > >>>> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352) >>>> > at >>>> org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280) >>>> > at >>>> > >>>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293) >>>> > at >>>> > >>>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45) >>>> > at >>>> > >>>> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166) >>>> > at >>>> > >>>> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588) >>>> > at >>>> > >>>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61) >>>> > at >>>> > >>>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32) >>>> > at >>>> > >>>> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250) >>>> > at >>>> > >>>> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515) >>>> > at >>>> > >>>> org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248) >>>> > at >>>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477) >>>> > at >>>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398) >>>> > at >>>> > >>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228) >>>> > at >>>> > >>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567) >>>> > at >>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:314) >>>> > at >>>> > >>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) >>>> > at >>>> > >>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) >>>> > at >>>> > >>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638) >>>> > at >>>> > >>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100) >>>> > at >>>> > >>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39) >>>> > at >>>> > >>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39) >>>> > at scala.Option.foreach(Option.scala:257) >>>> > at >>>> > >>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39) >>>> > at >>>> > com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala) >>>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> > at >>>> > >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>>> > at >>>> > >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> > at java.lang.reflect.Method.invoke(Method.java:606) >>>> > at >>>> > >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>> > at >>>> > >>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>> > at >>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>> > at >>>> > >>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>> > at >>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>> > at >>>> > >>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>> > at >>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>> > Caused by: java.lang.RuntimeException: >>>> java.lang.ClassNotFoundException: >>>> > Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found >>>> > at >>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195) >>>> > at >>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219) >>>> > ... 37 more >>>> > Caused by: java.lang.ClassNotFoundException: Class >>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found >>>> > at >>>> > >>>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101) >>>> > at >>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193) >>>> > ... 38 more >>>> > >>>> > I assume this may be a big problem if run on large datasets as there >>>> will be >>>> > no information for optimizer. I tried to change EMRFS to NativeS3 >>>> driver, >>>> > but get the same error, which is surprising. I expected >>>> NativeS3FileSystem >>>> > to be in the classpath since it ships with Flink runtime. >>>> > >>>> > Thanks, >>>> > Timur >>>> > >>>> > >>>> > On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <u...@apache.org> wrote: >>>> >> >>>> >> Yes, for sure. >>>> >> >>>> >> I added some documentation for AWS here: >>>> >> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html >>>> >> >>>> >> Would be nice to update that page with your pull request. :-) >>>> >> >>>> >> – Ufuk >>>> >> >>>> >> >>>> >> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <chiwanp...@apache.org> >>>> wrote: >>>> >> > Hi Timur, >>>> >> > >>>> >> > Great! Bootstrap action for Flink is good for AWS users. I think >>>> the >>>> >> > bootstrap action scripts would be placed in `flink-contrib` >>>> directory. >>>> >> > >>>> >> > If you want, one of people in PMC of Flink will be assign >>>> FLINK-1337 to >>>> >> > you. >>>> >> > >>>> >> > Regards, >>>> >> > Chiwan Park >>>> >> > >>>> >> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov < >>>> timur.fairu...@gmail.com> >>>> >> >> wrote: >>>> >> >> >>>> >> >> I had a guide like that. >>>> >> >> >>>> >> > >>>> > >>>> > >>>> >>> >>> >> >