Gerard is totally correct -- to expand a little more, I think what you want
to do is a solrInputDocumentJavaRDD.foreachPartition, instead of
solrInputDocumentJavaRDD.foreach:
solrInputDocumentJavaRDD.foreachPartition(
new VoidFunction<Iterator<SolrInputDocument>>() {
@Override
public void call(Iterator<SolrInputDocument> docItr) {
List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
for(SolrInputDocument solrInputDocument: docItr) {
// Add the solrInputDocument to the list of SolrInputDocuments
docs.add(solrInputDocument);
}
// push things to solr **from the executor, for this partition**
// so for this make sense, you need to be sure solr can handle a bunch
// of executors pushing into it simultaneously.
addThingsToSolr(docs);
}
});
On Mon, May 4, 2015 at 8:44 AM, Gerard Maas <[email protected]> wrote:
> I'm not familiar with the Solr API but provided that ' SolrIndexerDriver'
> is a singleton, I guess that what's going on when running on a cluster is
> that the call to:
>
> SolrIndexerDriver.solrInputDocumentList.add(elem)
>
> is happening on different singleton instances of the SolrIndexerDriver
> on different JVMs while
>
> SolrIndexerDriver.solrServer.commit
>
> is happening on the driver.
>
> In practical terms, the lists on the executors are being filled-in but
> they are never committed and on the driver the opposite is happening.
>
> -kr, Gerard
>
> On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc <[email protected]> wrote:
>
>> I'm trying to deal with some code that runs differently on Spark
>> stand-alone mode and Spark running on a cluster. Basically, for each item
>> in an RDD, I'm trying to add it to a list, and once this is done, I want to
>> send this list to Solr.
>>
>> This works perfectly fine when I run the following code in stand-alone
>> mode of Spark, but does not work when the same code is run on a cluster.
>> When I run the same code on a cluster, it is like "send to Solr" part of
>> the code is executed before the list to be sent to Solr is filled with
>> items. I try to force the execution by solrInputDocumentJavaRDD.collect();
>> after foreach, but it seems like it does not have any effect.
>>
>> // For each RDD
>> solrInputDocumentJavaDStream.foreachRDD(
>> new Function<JavaRDD<SolrInputDocument>, Void>() {
>> @Override
>> public Void call(JavaRDD<SolrInputDocument>
>> solrInputDocumentJavaRDD) throws Exception {
>>
>> // For each item in a single RDD
>> solrInputDocumentJavaRDD.foreach(
>> new VoidFunction<SolrInputDocument>() {
>> @Override
>> public void call(SolrInputDocument
>> solrInputDocument) {
>>
>> // Add the solrInputDocument to the list of
>> SolrInputDocuments
>>
>> SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument);
>> }
>> });
>>
>> // Try to force execution
>> solrInputDocumentJavaRDD.collect();
>>
>>
>> // After having finished adding every SolrInputDocument to
>> the list
>> // add it to the solrServer, and commit, waiting for the
>> commit to be flushed
>> try {
>>
>> // Seems like when run in cluster mode, the list size is
>> zero,
>> // therefore the following part is never executed
>>
>> if (SolrIndexerDriver.solrInputDocumentList != null
>> && SolrIndexerDriver.solrInputDocumentList.size() >
>> 0) {
>>
>> SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList);
>> SolrIndexerDriver.solrServer.commit(true, true);
>> SolrIndexerDriver.solrInputDocumentList.clear();
>> }
>> } catch (SolrServerException | IOException e) {
>> e.printStackTrace();
>> }
>>
>>
>> return null;
>> }
>> }
>> );
>>
>>
>> What should I do, so that sending-to-Solr part executes after the list of
>> SolrDocuments are added to solrInputDocumentList (and works also in cluster
>> mode)?
>>
>>
>> --
>> Emre Sevinç
>>
>
>