You can always explicitly request a broadcast join, via "joinWithTiny",
"joinWithHuge", or by supplying a JoinHint.

Greetings,
Stephan


On Sat, Apr 9, 2016 at 1:56 AM, Timur Fayruzov <timur.fairu...@gmail.com>
wrote:

> Thank you Robert. One of my test cases is broadcast join, so I need to
> make statistics work. The only workaround I have found so far is to copy
> the contents of /usr/share/aws/emr/emrfs/lib/, /usr/share/aws/aws-java-sdk/
> and /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp-2.2.0.jar to flink/lib.
> Putting these directories to HADOOP_CLASSPATH unfortunately did not work (I
> am running a single machine cluster, so YARN ResourceManager and
> NodeManager share the same machine). Apparently, classpath for YARN
> containers does not include HADOOP_CLASSPATH.
>
> I'm not a Hadoop expert, so the relationship between YARN, hadoop
> executable and Flink seem strange to me. hadoop executable sets up a lot of
> env vars (including hadoop classpath), but it seems that this setup has no
> effect on YARN application containers. Not sure it is an expected situation.
>
> Thanks,
> Timur
>
> On Fri, Apr 8, 2016 at 2:38 AM, Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Timur,
>>
>> the Flink optimizer runs on the client, so the exception is thrown from
>> the JVM running the ./bin/flink client.
>> Since the statistics sampling is an optional step, its surrounded by a
>> try / catch block that just logs the error message.
>>
>> More answers inline below
>>
>>
>> On Thu, Apr 7, 2016 at 11:32 PM, Timur Fayruzov <timur.fairu...@gmail.com
>> > wrote:
>>
>>> The exception does not show up in the console when I run the job, it
>>> only shows in the logs. I thought it means that it happens either on AM or
>>> TM (I assume what I see in stdout is client log). Is my thinking right?
>>>
>>>
>>> On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi <u...@apache.org> wrote:
>>>
>>>> Hey Timur,
>>>>
>>>> Just had a chat with Robert about this. I agree that the error message
>>>> is confusing, but it is fine it this case. The file system classes are
>>>> not on the class path of the client process, which is submitting the
>>>> job.
>>>
>>> Do you mean that classes should be in the classpath of
>>> `org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
>>> tried to add EMRFS jars to this classpath but it did not help. BTW, it
>>> seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
>>> not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
>>> point that I control here to add to classpath, so I had to set it manually.
>>>
>>
>> Yes, they have to be in the classpath of the CliFrontend.
>> The client should also work without the HADOOP_CLASSPATH being set. Its
>> optional for cases where you want to manually add jars to the classpath.
>> For example on Google Compute they set the HADOOP_CLASSPATH.
>>
>> Please note that we are not transferring the contents of the
>> HADOOP_CLASSPATH to the other workers on the cluster. So you have to set
>> the HADOOP_CLASSPATH on all machines.
>> Another approach is just putting the required jar into the "lib/" folder
>> of your Flink installation (the folder is next to "bin/", "conf/", "logs/").
>>
>>
>>>
>>>
>>>> It fails to sample the input file sizes, but this is just an
>>>> optimization step and hence it does not fail the job and only logs the
>>>> error.
>>>>
>>> Is this optimization only for client side? In other words, does it
>>> affect Flink's ability to choose proper type of a join?
>>>
>>
>> Your DataSet program is translated into a generic representation. Then,
>> this representation is passed into the optimizer, which decides on join /
>> sorting / data shipping strategies. The output of the optimizer is sent to
>> the JobManager for execution.
>> If the optimizer is not able to get good statistics about the input (like
>> in your case), it will default to robust execution strategies. I don't know
>> the input sizes of your job and the structure of your job, but chances are
>> high that the final plan is the same with and without the input statistics.
>> Only in cases where one join side is very small the input statistics
>> might be relevant.
>> Other optimizations, such as reusing existing data partitioning or
>> ordering work independent of the input sampling.
>>
>>
>>>
>>>
>>>>
>>>> After the job is submitted everything should run as expected.
>>>>
>>>> You should be able to get rid of that exception by adding the missing
>>>> classes to the class path of the client process (started via
>>>> bin/flink), for example via the lib folder.
>>>>
>>> The above approach did not work, could you elaborate what you meant by
>>> 'lib folder'?
>>>
>>
>> See above.
>>
>>
>>
>>>
>>> Thanks,
>>> Timur
>>>
>>>
>>>> – Ufuk
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <
>>>> timur.fairu...@gmail.com> wrote:
>>>> > There's one more filesystem integration failure that I have found. My
>>>> job on
>>>> > a toy dataset succeeds, but Flink log contains the following message:
>>>> > 2016-04-07 18:10:01,339 ERROR
>>>> > org.apache.flink.api.common.io.DelimitedInputFormat           -
>>>> Unexpected
>>>> > problen while getting the file statistics for file 's3://...':
>>>> > java.lang.RuntimeException: java.lang.ClassNotFoundException: Class
>>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>> > java.lang.RuntimeException: java.lang.RuntimeException:
>>>> > java.lang.ClassNotFoundException: Class
>>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>> >         at
>>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
>>>> >         at
>>>> >
>>>> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getHadoopWrapperClassNameForFileSystem(HadoopFileSystem.java:460)
>>>> >         at
>>>> >
>>>> org.apache.flink.core.fs.FileSystem.getHadoopWrapperClassNameForFileSystem(FileSystem.java:352)
>>>> >         at
>>>> org.apache.flink.core.fs.FileSystem.get(FileSystem.java:280)
>>>> >         at
>>>> >
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:293)
>>>> >         at
>>>> >
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.getStatistics(DelimitedInputFormat.java:45)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.dag.DataSourceNode.computeOperatorSpecificDefaultEstimates(DataSourceNode.java:166)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.dag.OptimizerNode.computeOutputEstimates(OptimizerNode.java:588)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:61)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.traversals.IdAndEstimatesVisitor.postVisit(IdAndEstimatesVisitor.java:32)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.dag.DataSourceNode.accept(DataSourceNode.java:250)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.dag.SingleInputNode.accept(SingleInputNode.java:515)
>>>> >         at
>>>> >
>>>> org.apache.flink.optimizer.dag.DataSinkNode.accept(DataSinkNode.java:248)
>>>> >         at
>>>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:477)
>>>> >         at
>>>> org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:228)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:567)
>>>> >         at
>>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:314)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
>>>> >         at
>>>> >
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>>>> >         at
>>>> >
>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:638)
>>>> >         at
>>>> >
>>>> com.whitepages.data.flink.FaithResolution$.pipeline(FaithResolution.scala:100)
>>>> >         at
>>>> >
>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>>>> >         at
>>>> >
>>>> com.whitepages.data.flink.FaithResolution$$anonfun$main$1.apply(FaithResolution.scala:39)
>>>> >         at scala.Option.foreach(Option.scala:257)
>>>> >         at
>>>> >
>>>> com.whitepages.data.flink.FaithResolution$.main(FaithResolution.scala:39)
>>>> >         at
>>>> > com.whitepages.data.flink.FaithResolution.main(FaithResolution.scala)
>>>> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> >         at
>>>> >
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> >         at
>>>> >
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> >         at java.lang.reflect.Method.invoke(Method.java:606)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>> >         at
>>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>> >         at
>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>> >         at
>>>> >
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>> >         at
>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>> > Caused by: java.lang.RuntimeException:
>>>> java.lang.ClassNotFoundException:
>>>> > Class com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>> >         at
>>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
>>>> >         at
>>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
>>>> >         ... 37 more
>>>> > Caused by: java.lang.ClassNotFoundException: Class
>>>> > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found
>>>> >         at
>>>> >
>>>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
>>>> >         at
>>>> > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
>>>> >         ... 38 more
>>>> >
>>>> > I assume this may be a big problem if run on large datasets as there
>>>> will be
>>>> > no information for optimizer. I tried to change EMRFS to NativeS3
>>>> driver,
>>>> > but get the same error, which is surprising. I expected
>>>> NativeS3FileSystem
>>>> > to be in the classpath since it ships with Flink runtime.
>>>> >
>>>> > Thanks,
>>>> > Timur
>>>> >
>>>> >
>>>> > On Wed, Apr 6, 2016 at 2:10 AM, Ufuk Celebi <u...@apache.org> wrote:
>>>> >>
>>>> >> Yes, for sure.
>>>> >>
>>>> >> I added some documentation for AWS here:
>>>> >>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
>>>> >>
>>>> >> Would be nice to update that page with your pull request. :-)
>>>> >>
>>>> >> – Ufuk
>>>> >>
>>>> >>
>>>> >> On Wed, Apr 6, 2016 at 4:58 AM, Chiwan Park <chiwanp...@apache.org>
>>>> wrote:
>>>> >> > Hi Timur,
>>>> >> >
>>>> >> > Great! Bootstrap action for Flink is good for AWS users. I think
>>>> the
>>>> >> > bootstrap action scripts would be placed in `flink-contrib`
>>>> directory.
>>>> >> >
>>>> >> > If you want, one of people in PMC of Flink will be assign
>>>> FLINK-1337 to
>>>> >> > you.
>>>> >> >
>>>> >> > Regards,
>>>> >> > Chiwan Park
>>>> >> >
>>>> >> >> On Apr 6, 2016, at 3:36 AM, Timur Fayruzov <
>>>> timur.fairu...@gmail.com>
>>>> >> >> wrote:
>>>> >> >>
>>>> >> >> I had a guide like that.
>>>> >> >>
>>>> >> >
>>>> >
>>>> >
>>>>
>>>
>>>
>>
>

Reply via email to