Gerard is totally correct -- to expand a little more, I think what you want
to do is a solrInputDocumentJavaRDD.foreachPartition, instead of
solrInputDocumentJavaRDD.foreach:


solrInputDocumentJavaRDD.foreachPartition(
  new VoidFunction<Iterator<SolrInputDocument>>() {
    @Override
    public void call(Iterator<SolrInputDocument> docItr) {
      List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
      for(SolrInputDocument solrInputDocument: docItr) {
        // Add the solrInputDocument to the list of SolrInputDocuments
        docs.add(solrInputDocument);
      }
      // push things to solr **from the executor, for this partition**
      // so for this make sense, you need to be sure solr can handle a bunch
      // of executors pushing into it simultaneously.
      addThingsToSolr(docs);
    }
});

On Mon, May 4, 2015 at 8:44 AM, Gerard Maas <gerard.m...@gmail.com> wrote:

> I'm not familiar with the Solr API but provided that ' SolrIndexerDriver'
> is a singleton, I guess that what's going on when running on a cluster is
> that the call to:
>
>  SolrIndexerDriver.solrInputDocumentList.add(elem)
>
> is happening on different singleton instances of the  SolrIndexerDriver
> on different JVMs while
>
> SolrIndexerDriver.solrServer.commit
>
> is happening on the driver.
>
> In practical terms, the lists on the executors are being filled-in but
> they are never committed and on the driver the opposite is happening.
>
> -kr, Gerard
>
> On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc <emre.sev...@gmail.com> wrote:
>
>> I'm trying to deal with some code that runs differently on Spark
>> stand-alone mode and Spark running on a cluster. Basically, for each item
>> in an RDD, I'm trying to add it to a list, and once this is done, I want to
>> send this list to Solr.
>>
>> This works perfectly fine when I run the following code in stand-alone
>> mode of Spark, but does not work when the same code is run on a cluster.
>> When I run the same code on a cluster, it is like "send to Solr" part of
>> the code is executed before the list to be sent to Solr is filled with
>> items. I try to force the execution by solrInputDocumentJavaRDD.collect();
>> after foreach, but it seems like it does not have any effect.
>>
>> // For each RDD
>> solrInputDocumentJavaDStream.foreachRDD(
>>         new Function<JavaRDD<SolrInputDocument>, Void>() {
>>           @Override
>>           public Void call(JavaRDD<SolrInputDocument>
>> solrInputDocumentJavaRDD) throws Exception {
>>
>>             // For each item in a single RDD
>>             solrInputDocumentJavaRDD.foreach(
>>                     new VoidFunction<SolrInputDocument>() {
>>                       @Override
>>                       public void call(SolrInputDocument
>> solrInputDocument) {
>>
>>                         // Add the solrInputDocument to the list of
>> SolrInputDocuments
>>
>> SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
>>                       }
>>                     });
>>
>>             // Try to force execution
>>             solrInputDocumentJavaRDD.collect();
>>
>>
>>             // After having finished adding every SolrInputDocument to
>> the list
>>             // add it to the solrServer, and commit, waiting for the
>> commit to be flushed
>>             try {
>>
>>               // Seems like when run in cluster mode, the list size is
>> zero,
>>              // therefore the following part is never executed
>>
>>               if (SolrIndexerDriver.solrInputDocumentList != null
>>                       && SolrIndexerDriver.solrInputDocumentList.size() >
>> 0) {
>>
>> SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
>>                 SolrIndexerDriver.solrServer.commit(true, true);
>>                 SolrIndexerDriver.solrInputDocumentList.clear();
>>               }
>>             } catch (SolrServerException | IOException e) {
>>               e.printStackTrace();
>>             }
>>
>>
>>             return null;
>>           }
>>         }
>> );
>>
>>
>> What should I do, so that sending-to-Solr part executes after the list of
>> SolrDocuments are added to solrInputDocumentList (and works also in cluster
>> mode)?
>>
>>
>> --
>> Emre Sevinç
>>
>
>

Reply via email to