Hi Ryan,

The rows=100000 on the /select handler is likely going to cause problems
with 8 workers. This is calling the /select handler with 8 concurrent
workers each retrieving 100,000 rows. The /select handler bogs down as the
number of rows increases. So using the rows parameter with the /select
handler is really not a strategy for limiting the size of the join. To
limit the size of the join you would need to place some kind of filter on
the query and still use the /export handler.

The /export handler was developed to handle large exports and not get
bogged down.

You may want to start just getting an understanding of how much data a
single node can export, and how long it takes.

1) Try running a single *:* search() using the /export handler on the
triple collection. Time how long it takes. If you run into problems getting
this to complete then attach a memory profiler. It may be that 8 gigs is
not enough to hold the docValues in memory and process the query. The
/export handler does not use more memory as the result set rises, so the
/export handler should be able process the entire query (30,000,000 docs).
But it does take a lot of memory to hold the docValues fields in memory.
This query will likely take some time to complete though as you are sorting
and exporting 30,000,000 million docs from a single node.

2) Then try running the same *:* search() against the /export handler in
parallel() gradually increasing the number of workers. Time how long it
takes as you add workers and watch the load it places on the server.
Eventually you'll max out your performance.


Then you'll start to get an idea of how fast a single node can sort and
export data.




Joel Bernstein
http://joelsolr.blogspot.com/

On Sat, May 14, 2016 at 4:14 PM, Ryan Cutter <ryancut...@gmail.com> wrote:

> Hello, I'm running Solr on my laptop with -Xmx8g and gave each collection 4
> shards and 2 replicas.
>
> Even grabbing 100k triple documents (like the following) is taking 20
> seconds to complete and prone to fall over.  I could try this in a proper
> cluster with multiple hosts and more sharding, etc.  I just thought I was
> tinkering with a small enough data set to use locally.
>
> parallel(
>     triple,
>     innerJoin(
>       search(triple, q=*:*, fl="subject_id,type_id", sort="type_id asc",
> partitionKeys="type_id", rows="100000"),
>       search(triple_type, q=*:*, fl="triple_type_id", sort="triple_type_id
> asc", partitionKeys="triple_type_id", qt="/export"),
>       on="type_id=triple_type_id"
>     ),
>     sort="subject_id asc",
>     workers="8")
>
>
> When Solr does crash, it's leaving messages like this.
>
> ERROR - 2016-05-14 20:00:53.892; [c:triple s:shard3 r:core_node2
> x:triple_shard3_replica2] org.apache.solr.common.SolrException;
> null:java.io.IOException: java.util.concurrent.TimeoutException: Idle
> timeout expired: 50001/50000 ms
>
> at
>
> org.eclipse.jetty.util.SharedBlockingCallback$Blocker.block(SharedBlockingCallback.java:226)
>
> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:164)
>
> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:530)
>
> at
>
> org.apache.solr.response.QueryResponseWriterUtil$1.write(QueryResponseWriterUtil.java:54)
>
> at java.io.OutputStream.write(OutputStream.java:116)
>
> at sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
>
> at sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
>
> at sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
>
> at java.io.OutputStreamWriter.write(OutputStreamWriter.java:207)
>
> at org.apache.solr.util.FastWriter.flush(FastWriter.java:140)
>
> at org.apache.solr.util.FastWriter.write(FastWriter.java:54)
>
> at
>
> org.apache.solr.response.JSONWriter.writeMapCloser(JSONResponseWriter.java:420)
>
> at
>
> org.apache.solr.response.JSONWriter.writeSolrDocument(JSONResponseWriter.java:364)
>
> at
>
> org.apache.solr.response.TextResponseWriter.writeDocuments(TextResponseWriter.java:246)
>
> at
>
> org.apache.solr.response.TextResponseWriter.writeVal(TextResponseWriter.java:150)
>
> at
>
> org.apache.solr.response.JSONWriter.writeNamedListAsMapWithDups(JSONResponseWriter.java:183)
>
> On Fri, May 13, 2016 at 5:50 PM, Joel Bernstein <joels...@gmail.com>
> wrote:
>
> > Also the hashJoin is going to read the entire entity table into memory.
> If
> > that's a large index that could be using lots of memory.
> >
> > 25 million docs should be ok to /export from one node, as long as you
> have
> > enough memory to load the docValues for the fields for sorting and
> > exporting.
> >
> > Breaking down the query into it's parts will show where the issue is.
> Also
> > adding more heap might give you enough memory.
> >
> > In my testing the max docs per second I've seen the /export handler push
> > from a single node is 650,000. In order to get 650,000 docs per second on
> > one node you have to partition the stream with workers. In my testing it
> > took 8 workers hitting one node to achieve the 650,000 docs per second.
> >
> > But the numbers get big as the cluster grows. With 20 shards and 4
> replicas
> > and 32 workers, you could export 52,000,000 docs per-second. With 40
> > shards, 5 replicas and 40 workers you could export 130,000,000 docs per
> > second.
> >
> > So with large clusters you could do very large distributed joins with
> > sub-second performance.
> >
> >
> >
> >
> > Joel Bernstein
> > http://joelsolr.blogspot.com/
> >
> > On Fri, May 13, 2016 at 8:11 PM, Ryan Cutter <ryancut...@gmail.com>
> wrote:
> >
> > > Thanks very much for the advice.  Yes, I'm running in a very basic
> single
> > > shard environment.  I thought that 25M docs was small enough to not
> > require
> > > anything special but I will try scaling like you suggest and let you
> know
> > > what happens.
> > >
> > > Cheers, Ryan
> > >
> > > On Fri, May 13, 2016 at 4:53 PM, Joel Bernstein <joels...@gmail.com>
> > > wrote:
> > >
> > > > I would try breaking down the second query to see when the problems
> > > occur.
> > > >
> > > > 1) Start with just a single *:* search from one of the collections.
> > > > 2) Then test the innerJoin. The innerJoin won't take much memory as
> > it's
> > > a
> > > > streaming merge join.
> > > > 3) Then try the full thing.
> > > >
> > > > If you're running a large join like this all on one host then you
> might
> > > not
> > > > have enough memory for the docValues and the two joins. In general
> > > > streaming is designed to scale by adding servers. It scales 3 ways:
> > > >
> > > > 1) Adding shards, splits up the index for more pushing power.
> > > > 2) Adding workers, partitions the streams and splits up the join /
> > merge
> > > > work.
> > > > 3) Adding replicas, when you have workers you will add pushing power
> by
> > > > adding replicas. This is because workers will fetch partitions of the
> > > > streams from across the entire cluster. So ALL replicas will be
> pushing
> > > at
> > > > once.
> > > >
> > > > So, imagine a setup with 20 shards, 4 replicas, and 20 workers. You
> can
> > > > perform massive joins quickly.
> > > >
> > > > But for you're scenario and available hardware you can experiment
> with
> > > > different cluster sizes.
> > > >
> > > >
> > > >
> > > > Joel Bernstein
> > > > http://joelsolr.blogspot.com/
> > > >
> > > > On Fri, May 13, 2016 at 7:27 PM, Ryan Cutter <ryancut...@gmail.com>
> > > wrote:
> > > >
> > > > > qt="/export" immediately fixed the query in Question #1.  Sorry for
> > > > missing
> > > > > that in the docs!
> > > > >
> > > > > The second query (with /export) crashes the server so I was going
> to
> > > look
> > > > > at parallelization if you think that's a good idea.  It also seems
> > > unwise
> > > > > to joining into 26M docs so maybe I can reconfigure the query to
> run
> > > > along
> > > > > a more happy path :-)  The schema is very RDBMS-centric so maybe
> that
> > > > just
> > > > > won't ever work in this framework.
> > > > >
> > > > > Here's the log but it's not very helpful.
> > > > >
> > > > >
> > > > > INFO  - 2016-05-13 23:18:13.214; [c:triple s:shard1 r:core_node1
> > > > > x:triple_shard1_replica1] org.apache.solr.core.SolrCore;
> > > > > [triple_shard1_replica1]  webapp=/solr path=/export
> > > > >
> > > > >
> > > >
> > >
> >
> params={q=*:*&distrib=false&fl=triple_id,subject_id,type_id&sort=type_id+asc&wt=json&version=2.2}
> > > > > hits=26305619 status=0 QTime=61
> > > > >
> > > > > INFO  - 2016-05-13 23:18:13.747; [c:triple_type s:shard1
> r:core_node1
> > > > > x:triple_type_shard1_replica1] org.apache.solr.core.SolrCore;
> > > > > [triple_type_shard1_replica1]  webapp=/solr path=/export
> > > > >
> > > > >
> > > >
> > >
> >
> params={q=*:*&distrib=false&fl=triple_type_id,triple_type_label&sort=triple_type_id+asc&wt=json&version=2.2}
> > > > > hits=702 status=0 QTime=2
> > > > >
> > > > > INFO  - 2016-05-13 23:18:48.504; [   ]
> > > > > org.apache.solr.common.cloud.ConnectionManager; Watcher
> > > > > org.apache.solr.common.cloud.ConnectionManager@6ad0f304
> > > > > name:ZooKeeperConnection Watcher:localhost:9983 got event
> > WatchedEvent
> > > > > state:Disconnected type:None path:null path:null type:None
> > > > >
> > > > > INFO  - 2016-05-13 23:18:48.504; [   ]
> > > > > org.apache.solr.common.cloud.ConnectionManager; zkClient has
> > > disconnected
> > > > >
> > > > > ERROR - 2016-05-13 23:18:51.316; [c:triple s:shard1 r:core_node1
> > > > > x:triple_shard1_replica1] org.apache.solr.common.SolrException;
> > > > null:Early
> > > > > Client Disconnect
> > > > >
> > > > > WARN  - 2016-05-13 23:18:51.431; [   ]
> > > > > org.apache.zookeeper.ClientCnxn$SendThread; Session
> 0x154ac66c81e0002
> > > for
> > > > > server localhost/0:0:0:0:0:0:0:1:9983, unexpected error, closing
> > socket
> > > > > connection and attempting reconnect
> > > > >
> > > > > java.io.IOException: Connection reset by peer
> > > > >
> > > > >         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > > > >
> > > > >         at
> sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > > > >
> > > > >         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > > > >
> > > > >         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> > > > >
> > > > >         at
> > > sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> > > > >
> > > > >         at
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
> > > > >
> > > > >         at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
> > > > >
> > > > >         at
> > > > >
> org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> > > > >
> > > > > On Fri, May 13, 2016 at 3:09 PM, Joel Bernstein <
> joels...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > A couple of other things:
> > > > > >
> > > > > > 1) Your innerJoin can parallelized across workers to improve
> > > > performance.
> > > > > > Take a look at the docs on the parallel function for the details.
> > > > > >
> > > > > > 2) It looks like you might be doing graph operations with joins.
> > You
> > > > > might
> > > > > > to take a look at the gatherNodes function coming in 6.1:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=62693238
> > > > > >
> > > > > > Joel Bernstein
> > > > > > http://joelsolr.blogspot.com/
> > > > > >
> > > > > > On Fri, May 13, 2016 at 5:57 PM, Joel Bernstein <
> > joels...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > When doing things that require all the results (like joins) you
> > > need
> > > > to
> > > > > > > specify the /export handler in the search function.
> > > > > > >
> > > > > > > qt="/export"
> > > > > > >
> > > > > > > The search function defaults to the /select handler which is
> > > designed
> > > > > to
> > > > > > > return the top N results. The /export handler always returns
> all
> > > > > results
> > > > > > > that match the query. Also keep in mind that the /export
> handler
> > > > > requires
> > > > > > > that sort fields and fl fields have docValues set.
> > > > > > >
> > > > > > > Joel Bernstein
> > > > > > > http://joelsolr.blogspot.com/
> > > > > > >
> > > > > > > On Fri, May 13, 2016 at 5:36 PM, Ryan Cutter <
> > ryancut...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > >> Question #1:
> > > > > > >>
> > > > > > >> triple_type collection has a few hundred docs and triple has
> 25M
> > > > docs.
> > > > > > >>
> > > > > > >> When I search for a particular subject_id in triple which I
> know
> > > has
> > > > > 14
> > > > > > >> results and do not pass in 'rows' params, it returns 0
> results:
> > > > > > >>
> > > > > > >> innerJoin(
> > > > > > >>     search(triple, q=subject_id:1656521,
> > > > > > >> fl="triple_id,subject_id,type_id",
> > > > > > >> sort="type_id asc"),
> > > > > > >>     search(triple_type, q=*:*,
> > > > fl="triple_type_id,triple_type_label",
> > > > > > >> sort="triple_type_id asc"),
> > > > > > >>     on="type_id=triple_type_id"
> > > > > > >> )
> > > > > > >>
> > > > > > >> When I do the same search with rows=10000, it returns 14
> > results:
> > > > > > >>
> > > > > > >> innerJoin(
> > > > > > >>     search(triple, q=subject_id:1656521,
> > > > > > >> fl="triple_id,subject_id,type_id",
> > > > > > >> sort="type_id asc", rows=10000),
> > > > > > >>     search(triple_type, q=*:*,
> > > > fl="triple_type_id,triple_type_label",
> > > > > > >> sort="triple_type_id asc", rows=10000),
> > > > > > >>     on="type_id=triple_type_id"
> > > > > > >> )
> > > > > > >>
> > > > > > >> Am I doing this right?  Is there a magic number to pass into
> > rows
> > > > > which
> > > > > > >> says "give me all the results which match this query"?
> > > > > > >>
> > > > > > >>
> > > > > > >> Question #2:
> > > > > > >>
> > > > > > >> Perhaps related to the first question but I want to run the
> > > > > innerJoin()
> > > > > > >> without the subject_id - rather have it use the results of
> > another
> > > > > > query.
> > > > > > >> But this does not return any results.  I'm saying "search for
> > this
> > > > > > entity
> > > > > > >> based on id then use that result's entity_id as the subject_id
> > to
> > > > look
> > > > > > >> through the triple/triple_type collections:
> > > > > > >>
> > > > > > >> hashJoin(
> > > > > > >>     innerJoin(
> > > > > > >>         search(triple, q=*:*,
> fl="triple_id,subject_id,type_id",
> > > > > > >> sort="type_id asc"),
> > > > > > >>         search(triple_type, q=*:*,
> > > > > > fl="triple_type_id,triple_type_label",
> > > > > > >> sort="triple_type_id asc"),
> > > > > > >>         on="type_id=triple_type_id"
> > > > > > >>     ),
> > > > > > >>     hashed=search(entity,
> > > > > > >>
> q=id:"urn:sid:entity:455dfa1aa27eedad21ac2115797c1580bb3b3b4e",
> > > > > > >> fl="entity_id,entity_label", sort="entity_id asc"),
> > > > > > >>     on="subject_id=entity_id"
> > > > > > >> )
> > > > > > >>
> > > > > > >> Am I using doing this hashJoin right?
> > > > > > >>
> > > > > > >> Thanks very much, Ryan
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to