I'm trying to deal with some code that runs differently on Spark
stand-alone mode and Spark running on a cluster. Basically, for each item
in an RDD, I'm trying to add it to a list, and once this is done, I want to
send this list to Solr.

This works perfectly fine when I run the following code in stand-alone mode
of Spark, but does not work when the same code is run on a cluster. When I
run the same code on a cluster, it is like "send to Solr" part of the code
is executed before the list to be sent to Solr is filled with items. I try
to force the execution by solrInputDocumentJavaRDD.collect(); after
foreach, but it seems like it does not have any effect.

// For each RDD
solrInputDocumentJavaDStream.foreachRDD(
        new Function<JavaRDD<SolrInputDocument>, Void>() {
          @Override
          public Void call(JavaRDD<SolrInputDocument>
solrInputDocumentJavaRDD) throws Exception {

            // For each item in a single RDD
            solrInputDocumentJavaRDD.foreach(
                    new VoidFunction<SolrInputDocument>() {
                      @Override
                      public void call(SolrInputDocument solrInputDocument)
{

                        // Add the solrInputDocument to the list of
SolrInputDocuments

SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
                      }
                    });

            // Try to force execution
            solrInputDocumentJavaRDD.collect();


            // After having finished adding every SolrInputDocument to the
list
            // add it to the solrServer, and commit, waiting for the commit
to be flushed
            try {

              // Seems like when run in cluster mode, the list size is zero,
             // therefore the following part is never executed

              if (SolrIndexerDriver.solrInputDocumentList != null
                      && SolrIndexerDriver.solrInputDocumentList.size() >
0) {

SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
                SolrIndexerDriver.solrServer.commit(true, true);
                SolrIndexerDriver.solrInputDocumentList.clear();
              }
            } catch (SolrServerException | IOException e) {
              e.printStackTrace();
            }


            return null;
          }
        }
);


What should I do, so that sending-to-Solr part executes after the list of
SolrDocuments are added to solrInputDocumentList (and works also in cluster
mode)?


-- 
Emre Sevinç

Reply via email to