Hi ,alll,
i have a 36000 documents,and the document all transfer a vector , one doc is a
vector,and dimension is the same,so have DataSet
------------------------
val data :DataSet[(String,SparseVector)]= ....//36000 record
val toData = data.collect()
val docSims = data.map{x=>
val fromId=x._1
val docsims = toData.filter{y=>y._1!=fromId}.map{y=>
val score =1- cosDisticnce(x._2,y._2)
(y._1,score)
}.toList.sortWith{(a,b)=>a._2>b._2}.take(20)
(fromId,docsims)
}
docSims.writeAsText(file)
.....
when run the job on yarn,it will get error ,the message is following:
java.lang.InterruptedException at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2017)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2052)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:274)
someone can tell me ?thank you