The program consists of two executions - one that only collects() back to
the client, one that executes the map function.

Are you running this as a "YARN single job" execution? IN that case, there
may be an issue that this incorrectly tries to submit to a stopping YARN
cluster.



On Fri, Mar 24, 2017 at 10:32 AM, Robert Metzger <rmetz...@apache.org>
wrote:

> Hi,
>
> Can you provide more logs to help us understand whats going on?
>
> One note regarding your application: You are calling .collect() and send
> the collection with the map() call to the cluster again.
> This is pretty inefficient and can potentially break your application (in
> particular the RPC system of Flink).
>
> I would recommend to use broadcast variables to send the dataset to the
> map operator: https://cwiki.apache.org/confluence/display/
> FLINK/Variables+Closures+vs.+Broadcast+Variables
>
>
> On Thu, Mar 23, 2017 at 3:11 PM, <rimin...@sina.cn> wrote:
>
>> Hi ,alll,
>> i have a 36000 documents,and the document all transfer a vector , one doc
>> is a vector,and dimension is the same,so have DataSet
>> ------------------------
>> val data :DataSet[(String,SparseVector)]= ....//36000 record
>> val toData = data.collect()
>> val docSims = data.map{x=>
>>      val fromId=x._1
>>      val docsims = toData.filter{y=>y._1!=fromId}.map{y=>
>>           val score =1- cosDisticnce(x._2,y._2)
>>          (y._1,score)
>>      }.toList.sortWith{(a,b)=>a._2>b._2}.take(20)
>>    (fromId,docsims)
>> }
>> docSims.writeAsText(file)
>> .....
>> when run the job on yarn,it will get error ,the message is following:
>>        java.lang.InterruptedException  at java.util.concurrent.locks.Abs
>> tractQueuedSynchronizer$ConditionObject.reportInterruptAfter
>> Wait(AbstractQueuedSynchronizer.java:2017)
>>         at java.util.concurrent.locks.AbstractQueuedSynchronizer$Condit
>> ionObject.await(AbstractQueuedSynchronizer.java:2052)
>>         at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlocking
>> Queue.java:442)
>>         at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsync
>> Impl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)
>>
>>
>> someone can tell me ?thank you
>
>
>

Reply via email to