[ 
https://issues.apache.org/jira/browse/SOLR-11600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258474#comment-16258474
 ] 

Amrit Sarkar edited comment on SOLR-11600 at 11/19/17 12:44 PM:
----------------------------------------------------------------

Examples are listed under 
https://lucene.apache.org/solr/guide/6_6/streaming-expressions.html#StreamingExpressions-StreamingRequestsandResponses
 and http://joelsolr.blogspot.in/2015/04/the-streaming-api-solrjio-basics.html.

I have cooked one example against {{master}} branch, which strictly required 
httpClient::4.5.3

{code}
package stream.example;

import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.eval.DivideEvaluator;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.SelectStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;

public class QuerySolr {

    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    static StreamFactory streamFactory = new StreamFactory()
            .withCollectionZkHost("collection1","localhost:9983")
            .withFunctionName("select", SelectStream.class)
            .withFunctionName("search", CloudSolrStream.class)
            .withFunctionName("div", DivideEvaluator.class);

    public static void main(String args[]) throws IOException, 
SolrServerException {

        SelectStream stream = (SelectStream)streamFactory
                .constructStream("select(\n" +
                        "  search(collection1, fl=\"id,A_i,B_i\", q=\"*:*\", 
sort=\"id asc\"),\n" +
                        "  id as UNIQUE_KEY,\n" +
                        "  div(A_i,B_i) as divRes\n" +
                        ")");

        attachStreamFactory(stream);

        List<Tuple> tuples = getTuples(stream);
        for (Tuple tuple : tuples) {
            log.info("tuple: " + tuple.getMap());
            System.out.println("tuple: " + tuple.getMap());
        }
        System.exit(0);
    }

    private static void attachStreamFactory(TupleStream tupleStream) {
        StreamContext context = new StreamContext();
        context.setSolrClientCache(new SolrClientCache());
        context.setStreamFactory(streamFactory);
        tupleStream.setStreamContext(context);
    }

    private static List<Tuple> getTuples(TupleStream tupleStream) throws 
IOException {
        tupleStream.open();
        List<Tuple> tuples = new ArrayList();
        for(;;) {
            Tuple t = tupleStream.read();
            if(t.EOF) {
                break;
            } else {
                tuples.add(t);
            }
        }
        tupleStream.close();
        return tuples;
    }
}
{code}

I need {{System.exit(0);}} to terminate the program, so pretty sure some 
httpclient is not getting closed properly or such.

*_Also, the patch above is absolutely not required to make this work_*, we can 
move forward with above examples and streams can be constructed without adding 
constructors to each stream source, decorators or evaluators. The only 
condition is we have to pass our own {{streamFactory}}.

Hope it helps.

P.S. Please disregard the PATCH, it serves no purpose.


was (Author: sarkaramr...@gmail.com):
Examples are listed under 
https://lucene.apache.org/solr/guide/6_6/streaming-expressions.html#StreamingExpressions-StreamingRequestsandResponses
 and http://joelsolr.blogspot.in/2015/04/the-streaming-api-solrjio-basics.html.

I have cook one example against {{master}} branch, which strictly required 
httpClient::4.5.3

{code}
package stream.example;

import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.io.SolrClientCache;
import org.apache.solr.client.solrj.io.Tuple;
import org.apache.solr.client.solrj.io.eval.DivideEvaluator;
import org.apache.solr.client.solrj.io.stream.CloudSolrStream;
import org.apache.solr.client.solrj.io.stream.SelectStream;
import org.apache.solr.client.solrj.io.stream.StreamContext;
import org.apache.solr.client.solrj.io.stream.TupleStream;
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;

public class QuerySolr {

    private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    static StreamFactory streamFactory = new StreamFactory()
            .withCollectionZkHost("collection1","localhost:9983")
            .withFunctionName("select", SelectStream.class)
            .withFunctionName("search", CloudSolrStream.class)
            .withFunctionName("div", DivideEvaluator.class);

    public static void main(String args[]) throws IOException, 
SolrServerException {

        SelectStream stream = (SelectStream)streamFactory
                .constructStream("select(\n" +
                        "  search(collection1, fl=\"id,A_i,B_i\", q=\"*:*\", 
sort=\"id asc\"),\n" +
                        "  id as UNIQUE_KEY,\n" +
                        "  div(A_i,B_i) as divRes\n" +
                        ")");

        attachStreamFactory(stream);

        List<Tuple> tuples = getTuples(stream);
        for (Tuple tuple : tuples) {
            log.info("tuple: " + tuple.getMap());
            System.out.println("tuple: " + tuple.getMap());
        }
        System.exit(0);
    }

    private static void attachStreamFactory(TupleStream tupleStream) {
        StreamContext context = new StreamContext();
        context.setSolrClientCache(new SolrClientCache());
        context.setStreamFactory(streamFactory);
        tupleStream.setStreamContext(context);
    }

    private static List<Tuple> getTuples(TupleStream tupleStream) throws 
IOException {
        tupleStream.open();
        List<Tuple> tuples = new ArrayList();
        for(;;) {
            Tuple t = tupleStream.read();
            if(t.EOF) {
                break;
            } else {
                tuples.add(t);
            }
        }
        tupleStream.close();
        return tuples;
    }
}
{code}

I need {{System.exit(0);}} to terminate the program, so pretty sure some 
httpclient is not getting closed properly or such.

*_Also, the patch above is absolutely not required to make this work_*, we can 
move forward with above examples and streams can be constructed without adding 
constructors to each stream source, decorators or evaluators. The only 
condition is we have to pass our own {{streamFactory}}.

Hope it helps.

P.S. Please disregard the PATCH, it serves no purpose.

> Add Constructor to SelectStream which takes StreamEvaluators as argument. 
> Current schema forces one to enter a stream expression string only 
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SOLR-11600
>                 URL: https://issues.apache.org/jira/browse/SOLR-11600
>             Project: Solr
>          Issue Type: Improvement
>      Security Level: Public(Default Security Level. Issues are Public) 
>          Components: SolrJ, streaming expressions
>    Affects Versions: 6.6.1, 7.1
>            Reporter: Aroop
>            Priority: Trivial
>              Labels: easyfix
>         Attachments: SOLR-11600.patch
>
>
> The use case is to be able able to supply stream evaluators over a rollup 
> stream in the following manner, but with instead with Strongly typed objects 
> and not steaming-expression strings.
> {code:bash}
> curl --data-urlencode 'expr=select(
> id,
> div(sum(cat1_i),sum(cat2_i)) as metric1,
> coalesce(div(sum(cat1_i),if(eq(sum(cat2_i),0),null,sum(cat2_i))),0) as 
> metric2,
> rollup(
> search(col1, q=*:*, fl="id,cat1_i,cat2_i,cat_s", qt="/export", sort="cat_s 
> asc"),
> over="cat_s",sum(cat1_i),sum(cat2_i)
> ))' http://localhost:8983/solr/col1/stream
> {code}
> the current code base does not allow one to provide selectedEvaluators in a 
> constructor, so one cannot prepare their select stream via java code:
> {code:java}
> public class SelectStream extends TupleStream implements Expressible {
>     private static final long serialVersionUID = 1L;
>     private TupleStream stream;
>     private StreamContext streamContext;
>     private Map<String, String> selectedFields;
>     private Map<StreamEvaluator, String> selectedEvaluators;
>     private List<StreamOperation> operations;
>     public SelectStream(TupleStream stream, List<String> selectedFields) 
> throws IOException {
>         this.stream = stream;
>         this.selectedFields = new HashMap();
>         Iterator var3 = selectedFields.iterator();
>         while(var3.hasNext()) {
>             String selectedField = (String)var3.next();
>             this.selectedFields.put(selectedField, selectedField);
>         }
>         this.operations = new ArrayList();
>         this.selectedEvaluators = new HashMap();
>     }
>     public SelectStream(TupleStream stream, Map<String, String> 
> selectedFields) throws IOException {
>         this.stream = stream;
>         this.selectedFields = selectedFields;
>         this.operations = new ArrayList();
>         this.selectedEvaluators = new HashMap();
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org
For additional commands, e-mail: dev-h...@lucene.apache.org

Reply via email to