Gerard is totally correct -- to expand a little more, I think what you want to do is a solrInputDocumentJavaRDD.foreachPartition, instead of solrInputDocumentJavaRDD.foreach:
solrInputDocumentJavaRDD.foreachPartition( new VoidFunction<Iterator<SolrInputDocument>>() { @Override public void call(Iterator<SolrInputDocument> docItr) { List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(); for(SolrInputDocument solrInputDocument: docItr) { // Add the solrInputDocument to the list of SolrInputDocuments docs.add(solrInputDocument); } // push things to solr **from the executor, for this partition** // so for this make sense, you need to be sure solr can handle a bunch // of executors pushing into it simultaneously. addThingsToSolr(docs); } }); On Mon, May 4, 2015 at 8:44 AM, Gerard Maas <gerard.m...@gmail.com> wrote: > I'm not familiar with the Solr API but provided that ' SolrIndexerDriver' > is a singleton, I guess that what's going on when running on a cluster is > that the call to: > > SolrIndexerDriver.solrInputDocumentList.add(elem) > > is happening on different singleton instances of the SolrIndexerDriver > on different JVMs while > > SolrIndexerDriver.solrServer.commit > > is happening on the driver. > > In practical terms, the lists on the executors are being filled-in but > they are never committed and on the driver the opposite is happening. > > -kr, Gerard > > On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc <emre.sev...@gmail.com> wrote: > >> I'm trying to deal with some code that runs differently on Spark >> stand-alone mode and Spark running on a cluster. Basically, for each item >> in an RDD, I'm trying to add it to a list, and once this is done, I want to >> send this list to Solr. >> >> This works perfectly fine when I run the following code in stand-alone >> mode of Spark, but does not work when the same code is run on a cluster. >> When I run the same code on a cluster, it is like "send to Solr" part of >> the code is executed before the list to be sent to Solr is filled with >> items. I try to force the execution by solrInputDocumentJavaRDD.collect(); >> after foreach, but it seems like it does not have any effect. >> >> // For each RDD >> solrInputDocumentJavaDStream.foreachRDD( >> new Function<JavaRDD<SolrInputDocument>, Void>() { >> @Override >> public Void call(JavaRDD<SolrInputDocument> >> solrInputDocumentJavaRDD) throws Exception { >> >> // For each item in a single RDD >> solrInputDocumentJavaRDD.foreach( >> new VoidFunction<SolrInputDocument>() { >> @Override >> public void call(SolrInputDocument >> solrInputDocument) { >> >> // Add the solrInputDocument to the list of >> SolrInputDocuments >> >> SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument); >> } >> }); >> >> // Try to force execution >> solrInputDocumentJavaRDD.collect(); >> >> >> // After having finished adding every SolrInputDocument to >> the list >> // add it to the solrServer, and commit, waiting for the >> commit to be flushed >> try { >> >> // Seems like when run in cluster mode, the list size is >> zero, >> // therefore the following part is never executed >> >> if (SolrIndexerDriver.solrInputDocumentList != null >> && SolrIndexerDriver.solrInputDocumentList.size() > >> 0) { >> >> SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList); >> SolrIndexerDriver.solrServer.commit(true, true); >> SolrIndexerDriver.solrInputDocumentList.clear(); >> } >> } catch (SolrServerException | IOException e) { >> e.printStackTrace(); >> } >> >> >> return null; >> } >> } >> ); >> >> >> What should I do, so that sending-to-Solr part executes after the list of >> SolrDocuments are added to solrInputDocumentList (and works also in cluster >> mode)? >> >> >> -- >> Emre Sevinç >> > >