Imran, Gerard, Indeed your suggestions were correct and it helped me. Thank you for your replies.
-- Emre On Tue, May 5, 2015 at 4:24 PM, Imran Rashid <iras...@cloudera.com> wrote: > Gerard is totally correct -- to expand a little more, I think what you > want to do is a solrInputDocumentJavaRDD.foreachPartition, instead of > solrInputDocumentJavaRDD.foreach: > > > solrInputDocumentJavaRDD.foreachPartition( > new VoidFunction<Iterator<SolrInputDocument>>() { > @Override > public void call(Iterator<SolrInputDocument> docItr) { > List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(); > for(SolrInputDocument solrInputDocument: docItr) { > // Add the solrInputDocument to the list of SolrInputDocuments > docs.add(solrInputDocument); > } > // push things to solr **from the executor, for this partition** > // so for this make sense, you need to be sure solr can handle a > bunch > // of executors pushing into it simultaneously. > addThingsToSolr(docs); > } > }); > > On Mon, May 4, 2015 at 8:44 AM, Gerard Maas <gerard.m...@gmail.com> wrote: > >> I'm not familiar with the Solr API but provided that ' SolrIndexerDriver' >> is a singleton, I guess that what's going on when running on a cluster is >> that the call to: >> >> SolrIndexerDriver.solrInputDocumentList.add(elem) >> >> is happening on different singleton instances of the SolrIndexerDriver >> on different JVMs while >> >> SolrIndexerDriver.solrServer.commit >> >> is happening on the driver. >> >> In practical terms, the lists on the executors are being filled-in but >> they are never committed and on the driver the opposite is happening. >> >> -kr, Gerard >> >> On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc <emre.sev...@gmail.com> >> wrote: >> >>> I'm trying to deal with some code that runs differently on Spark >>> stand-alone mode and Spark running on a cluster. Basically, for each item >>> in an RDD, I'm trying to add it to a list, and once this is done, I want to >>> send this list to Solr. >>> >>> This works perfectly fine when I run the following code in stand-alone >>> mode of Spark, but does not work when the same code is run on a cluster. >>> When I run the same code on a cluster, it is like "send to Solr" part of >>> the code is executed before the list to be sent to Solr is filled with >>> items. I try to force the execution by solrInputDocumentJavaRDD.collect(); >>> after foreach, but it seems like it does not have any effect. >>> >>> // For each RDD >>> solrInputDocumentJavaDStream.foreachRDD( >>> new Function<JavaRDD<SolrInputDocument>, Void>() { >>> @Override >>> public Void call(JavaRDD<SolrInputDocument> >>> solrInputDocumentJavaRDD) throws Exception { >>> >>> // For each item in a single RDD >>> solrInputDocumentJavaRDD.foreach( >>> new VoidFunction<SolrInputDocument>() { >>> @Override >>> public void call(SolrInputDocument >>> solrInputDocument) { >>> >>> // Add the solrInputDocument to the list of >>> SolrInputDocuments >>> >>> SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument); >>> } >>> }); >>> >>> // Try to force execution >>> solrInputDocumentJavaRDD.collect(); >>> >>> >>> // After having finished adding every SolrInputDocument to >>> the list >>> // add it to the solrServer, and commit, waiting for the >>> commit to be flushed >>> try { >>> >>> // Seems like when run in cluster mode, the list size is >>> zero, >>> // therefore the following part is never executed >>> >>> if (SolrIndexerDriver.solrInputDocumentList != null >>> && SolrIndexerDriver.solrInputDocumentList.size() >>> > 0) { >>> >>> SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList); >>> SolrIndexerDriver.solrServer.commit(true, true); >>> SolrIndexerDriver.solrInputDocumentList.clear(); >>> } >>> } catch (SolrServerException | IOException e) { >>> e.printStackTrace(); >>> } >>> >>> >>> return null; >>> } >>> } >>> ); >>> >>> >>> What should I do, so that sending-to-Solr part executes after the list >>> of SolrDocuments are added to solrInputDocumentList (and works also in >>> cluster mode)? >>> >>> >>> -- >>> Emre Sevinç >>> >> >> > -- Emre Sevinc