[jira] Commented: (SOLR-1375) BloomFilter on a field
[ https://issues.apache.org/jira/browse/SOLR-1375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12846166#action_12846166 ] Ted Dunning commented on SOLR-1375: --- Sorry to comment late here, but when indexing in hadoop, it is really nice to avoid any central dependence. It is also nice to focus the map-side join on items likely to match. Thirdly, reduce side indexing is typically really important. The conclusions from these three considerations vary by duplication rate. Using reduce-side indexing gets rid of most of the problems of duplicate versions of a single document (with the same sort key) since the reducer can scan to see whether it has the final copy handy before adding a document to the index. There remain problems where we have to not index documents that already exist in the index or to generate a deletion list that can assist in applying the index update. The former problem is usually the more severe one because it isn't unusual for data sources to just include a full dump of all documents and assume that the consumer will figure out which are new or updated. Here you would like to only index new and modified documents. My own preference for this is to avoid the complication of the map-side join using Bloom filters and simply export a very simple list of stub documents that correspond to the documents in the index. These stub documents should be much smaller than the average document (unless you are indexing tweets) which makes passing around great masses of stub documents not such a problem since Hadoop shuffle, copy and sort times are all dominated by Lucene index times. Passing stub documents allows the reducer to simply iterate through all documents with the same key keeping the latest version or any stub that is encountered. For documents without a stub, normal indexing can be done with the slight addition exporting a list of stub documents for the new additions. The same thing could be done with a map-side join, but the trade-off is that you now need considerably more memory for the mapper to store the entire bitmap in memory as opposed needing (somewhat) more time to pass the stub documents around. How that trade-off plays out in the real world isn't clear. My personal preference is to keep heap space small since the time cost is pretty minimal for me. This problem also turns up in our PDF conversion pipeline where we keep check-sums of each PDF that has already been converted to viewable forms. In that case, the ratio of real document size to stub size is even more preponderate. BloomFilter on a field -- Key: SOLR-1375 URL: https://issues.apache.org/jira/browse/SOLR-1375 Project: Solr Issue Type: New Feature Components: update Affects Versions: 1.4 Reporter: Jason Rutherglen Priority: Minor Fix For: 1.5 Attachments: SOLR-1375.patch, SOLR-1375.patch, SOLR-1375.patch, SOLR-1375.patch, SOLR-1375.patch Original Estimate: 120h Remaining Estimate: 120h * A bloom filter is a read only probabilistic set. Its useful for verifying a key exists in a set, though it returns false positives. http://en.wikipedia.org/wiki/Bloom_filter * The use case is indexing in Hadoop and checking for duplicates against a Solr cluster (which when using term dictionary or a query) is too slow and exceeds the time consumed for indexing. When a match is found, the host, segment, and term are returned. If the same term is found on multiple servers, multiple results are returned by the distributed process. (We'll need to add in the core name I just realized). * When new segments are created, and commit is called, a new bloom filter is generated from a given field (default:id) by iterating over the term dictionary values. There's a bloom filter file per segment, which is managed on each Solr shard. When segments are merged away, their corresponding .blm files is also removed. In a future version we'll have a central server for the bloom filters so we're not abusing the thread pool of the Solr proxy and the networking of the Solr cluster (this will be done sooner than later after testing this version). I held off because the central server requires syncing the Solr servers' files (which is like reverse replication). * The patch uses the BloomFilter from Hadoop 0.20. I want to jar up only the necessary classes so we don't have a giant Hadoop jar in lib. http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/util/bloom/BloomFilter.html * Distributed code is added and seems to work, I extended TestDistributedSearch to test over multiple HTTP servers. I chose this approach rather than the manual method used by (for example) TermVectorComponent.testDistributed because I'm new to Solr's distributed search
[jira] Commented: (SOLR-1814) select count(distinct fieldname) in SOLR
[ https://issues.apache.org/jira/browse/SOLR-1814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12843632#action_12843632 ] Ted Dunning commented on SOLR-1814: --- Trove is GPL. The Mahout project has a partial set of replacements for Trove collections in case you want to go forward with this. Our plan is to consider breaking out the collections package from Mahout at some point in case you don't want to drag along the rest of Mahout. select count(distinct fieldname) in SOLR Key: SOLR-1814 URL: https://issues.apache.org/jira/browse/SOLR-1814 Project: Solr Issue Type: New Feature Components: SearchComponents - other Affects Versions: 1.4, 1.5, 1.6, 2.0 Reporter: Marcus Herou Fix For: 1.4, 1.5, 1.6, 2.0 Attachments: CountComponent.java I have seen questions on the mailinglist about having the functionality for counting distinct on a field. We at Tailsweep as well want to that in for example our blogsearch. Example: You had 1345 hits on 244 blogs The 244 part is not possible in SOLR today (correct me if I am wrong). So I've written a component which does this. Attaching it. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (SOLR-1724) Real Basic Core Management with Zookeeper
[ https://issues.apache.org/jira/browse/SOLR-1724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12835963#action_12835963 ] Ted Dunning commented on SOLR-1724: --- Will this http access also allow a cluster with incrementally updated cores to replicate a core after a node failure? Real Basic Core Management with Zookeeper - Key: SOLR-1724 URL: https://issues.apache.org/jira/browse/SOLR-1724 Project: Solr Issue Type: New Feature Components: multicore Affects Versions: 1.4 Reporter: Jason Rutherglen Fix For: 1.5 Attachments: commons-lang-2.4.jar, gson-1.4.jar, hadoop-0.20.2-dev-core.jar, hadoop-0.20.2-dev-test.jar, SOLR-1724.patch, SOLR-1724.patch, SOLR-1724.patch, SOLR-1724.patch, SOLR-1724.patch, SOLR-1724.patch, SOLR-1724.patch Though we're implementing cloud, I need something real soon I can play with and deploy. So this'll be a patch that only deploys new cores, and that's about it. The arch is real simple: On Zookeeper there'll be a directory that contains files that represent the state of the cores of a given set of servers which will look like the following: /production/cores-1.txt /production/cores-2.txt /production/core-host-1-actual.txt (ephemeral node per host) Where each core-N.txt file contains: hostname,corename,instanceDir,coredownloadpath coredownloadpath is a URL such as file://, http://, hftp://, hdfs://, ftp://, etc and core-host-actual.txt contains: hostname,corename,instanceDir,size Everytime a new core-N.txt file is added, the listening host finds it's entry in the list and begins the process of trying to match the entries. Upon completion, it updates it's /core-host-1-actual.txt file to it's completed state or logs an error. When all host actual files are written (without errors), then a new core-1-actual.txt file is written which can be picked up by another process that can create a new core proxy. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (SOLR-1301) Solr + Hadoop
[ https://issues.apache.org/jira/browse/SOLR-1301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12828961#action_12828961 ] Ted Dunning commented on SOLR-1301: --- {quote} Based on these observation, I have few questions. (I am a beginner to the Hadoop Solr world. So, please forgive me if my questions are silly): 1. As per above observation, SOLR-1045 patch is functionally better (performance I have not verified yet ). Can anyone tell me, whats the actual advantage SOLR-1301 patch offers over SOLR-1045 patch? 2. If both the jira issues are trying to solve the same problem, do we really need 2 separate issues? {quote} In the katta community, the recommended practice started with SOLR-1045 (what I call map-side indexing) behavior, but I think that the consensus now is that SOLR-1301 behavior (what I call reduce side indexing) is much, much better. This is not necessarily the obvious result given your observations. There are some operational differences between katta and SOLR that might make the conclusions different, but what I have observed is the following: a) index merging is a really bad idea that seems very attractive to begin with because it is actually pretty expensive and doesn't solve the real problems of bad document distribution across shards. It is much better to simply have lots of shards per machine (aka micro-sharding) and use one reducer per shard. For large indexes, this gives entirely acceptable performance. On a pretty small cluster, we can index 50-100 million large documents in multiple ways in 2-3 hours. Index merging gives you no benefit compared to reduce side indexing and just increases code complexity. b) map-side indexing leaves you with indexes that are heavily skewed by being composed of of documents from a single input split. At retrieval time, this means that different shards have very different term frequency profiles and very different numbers of relevant documents. This makes lots of statistics very difficult including term frequency computation, term weighting and determining the number of documents to retrieve. Map-side merge virtually guarantees that you have to do two cluster queries, one to gather term frequency statistics and another to do the actual query. With reduce side indexing, you can provide strong probabilistic bounds on how different the statistics in each shard can be so you can use local term statistics and you can depend on the score distribution being this same which radically decreases the number of documents you need to retrieve from each shard. c) reduce-side indexing improves the balance of computation during retrieval. If (as is the rule) some document subset is hotter than other document subset due, say to data-source boosting or recency boosting, you will have very bad cluster utilization with skewed shards from map-side indexing while all shards will cost about the same for any query leading to good cluster utilization and faster queries with reduce-side indexing. d) with reduce-side indexing has properties that can be mathematically stated and proved. Map-side indexing only has comparable properties if you make unrealistic assumptions about your original data. e) micro-sharding allows very simple and very effective use of multiple cores on multiple machines in a search cluster. This can be very difficult to do with large shards or a single index. Now, as you say, these advantages may evaporate if you are looking to produce a single output index. That seems, however, to contradict the whole point of scaling. If you need to scale indexing, presumably you also need to scale search speed and throughput. As such you probably want to have many shards rather than few. Conversely, if you can stand to search a single index, then you probably can stand to index on a single machine. Another thing to think about is the fact SOLR doesn't yet do micro-sharding or clustering very well and, in particular, doesn't handle multiple shards per core. That will be changing before long, however, and it is very dangerous to design for the past rather than the future. In case, you didn't notice, I strongly suggest you stick with reduce-side indexing. Solr + Hadoop - Key: SOLR-1301 URL: https://issues.apache.org/jira/browse/SOLR-1301 Project: Solr Issue Type: Improvement Affects Versions: 1.4 Reporter: Andrzej Bialecki Fix For: 1.5 Attachments: commons-logging-1.0.4.jar, commons-logging-api-1.0.4.jar, hadoop-0.19.1-core.jar, hadoop.patch, log4j-1.2.15.jar, README.txt, SOLR-1301.patch, SOLR-1301.patch, SOLR-1301.patch, SOLR-1301.patch, SOLR-1301.patch, SOLR-1301.patch, SOLR-1301.patch, SolrRecordWriter.java This patch contains a contrib module that provides distributed indexing (using Hadoop)
[jira] Commented: (SOLR-1301) Solr + Hadoop
[ https://issues.apache.org/jira/browse/SOLR-1301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12806547#action_12806547 ] Ted Dunning commented on SOLR-1301: --- It is critical to put indexes in the task local area on both local and hdfs storage areas not just because of task cleanup, but also because a task may be run more than once. Hadoop handles all the race conditions that would otherwise happen as a result. Solr + Hadoop - Key: SOLR-1301 URL: https://issues.apache.org/jira/browse/SOLR-1301 Project: Solr Issue Type: Improvement Affects Versions: 1.4 Reporter: Andrzej Bialecki Fix For: 1.5 Attachments: commons-logging-1.0.4.jar, commons-logging-api-1.0.4.jar, hadoop-0.19.1-core.jar, hadoop.patch, log4j-1.2.15.jar, README.txt, SOLR-1301.patch, SOLR-1301.patch, SOLR-1301.patch, SOLR-1301.patch, SOLR-1301.patch, SolrRecordWriter.java This patch contains a contrib module that provides distributed indexing (using Hadoop) to Solr EmbeddedSolrServer. The idea behind this module is twofold: * provide an API that is familiar to Hadoop developers, i.e. that of OutputFormat * avoid unnecessary export and (de)serialization of data maintained on HDFS. SolrOutputFormat consumes data produced by reduce tasks directly, without storing it in intermediate files. Furthermore, by using an EmbeddedSolrServer, the indexing task is split into as many parts as there are reducers, and the data to be indexed is not sent over the network. Design -- Key/value pairs produced by reduce tasks are passed to SolrOutputFormat, which in turn uses SolrRecordWriter to write this data. SolrRecordWriter instantiates an EmbeddedSolrServer, and it also instantiates an implementation of SolrDocumentConverter, which is responsible for turning Hadoop (key, value) into a SolrInputDocument. This data is then added to a batch, which is periodically submitted to EmbeddedSolrServer. When reduce task completes, and the OutputFormat is closed, SolrRecordWriter calls commit() and optimize() on the EmbeddedSolrServer. The API provides facilities to specify an arbitrary existing solr.home directory, from which the conf/ and lib/ files will be taken. This process results in the creation of as many partial Solr home directories as there were reduce tasks. The output shards are placed in the output directory on the default filesystem (e.g. HDFS). Such part-N directories can be used to run N shard servers. Additionally, users can specify the number of reduce tasks, in particular 1 reduce task, in which case the output will consist of a single shard. An example application is provided that processes large CSV files and uses this API. It uses a custom CSV processing to avoid (de)serialization overhead. This patch relies on hadoop-core-0.19.1.jar - I attached the jar to this issue, you should put it in contrib/hadoop/lib. Note: the development of this patch was sponsored by an anonymous contributor and approved for release under Apache License. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (SOLR-1724) Real Basic Core Management with Zookeeper
[ https://issues.apache.org/jira/browse/SOLR-1724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12803371#action_12803371 ] Ted Dunning commented on SOLR-1724: --- {quote} ... I agree, I'm not really into ephemeral ZK nodes for Solr hosts/nodes. The reason is contact with ZK is highly superficial and can be intermittent. {quote} I have found that when I was having trouble with ZK connectivity, the problems were simply surfacing issues that I had anyway. You do have to configure the ZK client to not have long pauses (that is incompatible with SOLR how?) and you may need to adjust the timeouts on the ZK side. More importantly, any issues with ZK connectivity will have their parallels with any other heartbeat mechanism and replicating a heartbeat system that tries to match ZK for reliability is going to be a significant source of very nasty bugs. Better to not rewrite that already works. Keep in mind that ZK *connection* issues are not the same as session expiration. Katta has a fairly important set of bugfixes now to make that distinction and ZK will soon handle connection loss on its own. It isn't a bad idea to keep shards around for a while if a node goes down. That can seriously decrease the cost of momentary outages such as for a software upgrade. The idea is that when the node comes back, it can advertise availability of some shards and replication of those shards should cease. Real Basic Core Management with Zookeeper - Key: SOLR-1724 URL: https://issues.apache.org/jira/browse/SOLR-1724 Project: Solr Issue Type: New Feature Components: multicore Affects Versions: 1.4 Reporter: Jason Rutherglen Fix For: 1.5 Attachments: SOLR-1724.patch Though we're implementing cloud, I need something real soon I can play with and deploy. So this'll be a patch that only deploys new cores, and that's about it. The arch is real simple: On Zookeeper there'll be a directory that contains files that represent the state of the cores of a given set of servers which will look like the following: /production/cores-1.txt /production/cores-2.txt /production/core-host-1-actual.txt (ephemeral node per host) Where each core-N.txt file contains: hostname,corename,instanceDir,coredownloadpath coredownloadpath is a URL such as file://, http://, hftp://, hdfs://, ftp://, etc and core-host-actual.txt contains: hostname,corename,instanceDir,size Everytime a new core-N.txt file is added, the listening host finds it's entry in the list and begins the process of trying to match the entries. Upon completion, it updates it's /core-host-1-actual.txt file to it's completed state or logs an error. When all host actual files are written (without errors), then a new core-1-actual.txt file is written which can be picked up by another process that can create a new core proxy. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (SOLR-1724) Real Basic Core Management with Zookeeper
[ https://issues.apache.org/jira/browse/SOLR-1724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12801296#action_12801296 ] Ted Dunning commented on SOLR-1724: --- {quote} We actually started out that way... (when a node went down there wasn't really any trace it ever existed) but have been moving away from it. ZK may not just be a reflection of the cluster but may also control certain aspects of the cluster that you want persistent. For example, marking a node as disabled (i.e. don't use it). One could create APIs on the node to enable and disable and have that reflected in ZK, but it seems like more work than simply saying change this znode. {quote} I see this as a conflation of two or three goals that leads to trouble. All of the goals are worthy and important, but the conflation of them leads to difficult problems. Taken separately, the goals are easily met. One goal is the reflection of current cluster state. That is most reliably done using ephemeral files roughly as I described. Another goal is the reflection of constraints or desired state of the cluster. This is best handled as you describe, with permanent files since you don't want this desired state to disappear when a node disappears. The real issue is making sure that the source of whatever information is most directly connected to the physical manifestation of that information. Moreover, it is important in some cases (node state, for instance) that the state stay correct even when the source of that state loses control by crashing, hanging or becoming otherwise indisposed. Inserting an intermediary into this chain of control is a bad idea. Replicating ZK's rather well implemented ephemeral state mechanism with ad hoc heartbeats is also a bad idea (remember how *many* bugs there have been in hadoop relative to heartbeats and the name node?). A somewhat secondary issue is whether the cluster master has to be involved in every query. That seems like a really bad bottleneck to me and Katta provides a proof of existence that this is not necessary. After trying several options in production, what I find is best is that the master lay down a statement of desired state and the nodes publish their status in a different and ephemeral fashion. The master can record a history or there may be general directions such as your disabled list however you like but that shouldn't be mixed into the node status because you otherwise get into a situation where ephemeral files can no longer be used for what they are good at. Real Basic Core Management with Zookeeper - Key: SOLR-1724 URL: https://issues.apache.org/jira/browse/SOLR-1724 Project: Solr Issue Type: New Feature Components: multicore Affects Versions: 1.4 Reporter: Jason Rutherglen Fix For: 1.5 Though we're implementing cloud, I need something real soon I can play with and deploy. So this'll be a patch that only deploys new cores, and that's about it. The arch is real simple: On Zookeeper there'll be a directory that contains files that represent the state of the cores of a given set of servers which will look like the following: /production/cores-1.txt /production/cores-2.txt /production/core-host-1-actual.txt (ephemeral node per host) Where each core-N.txt file contains: hostname,corename,instanceDir,coredownloadpath coredownloadpath is a URL such as file://, http://, hftp://, hdfs://, ftp://, etc and core-host-actual.txt contains: hostname,corename,instanceDir,size Everytime a new core-N.txt file is added, the listening host finds it's entry in the list and begins the process of trying to match the entries. Upon completion, it updates it's /core-host-1-actual.txt file to it's completed state or logs an error. When all host actual files are written (without errors), then a new core-1-actual.txt file is written which can be picked up by another process that can create a new core proxy. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.
[jira] Commented: (SOLR-1724) Real Basic Core Management with Zookeeper
[ https://issues.apache.org/jira/browse/SOLR-1724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=12801051#action_12801051 ] Ted Dunning commented on SOLR-1724: --- Katta had some interesting issues in the design of this. These are discussed here: http://oss.101tec.com/jira/browse/KATTA-43 The basic design consideration is that failure of a node needs to automagically update the ZK state accordingly. This allows all important updates to files to go one direction as well. Real Basic Core Management with Zookeeper - Key: SOLR-1724 URL: https://issues.apache.org/jira/browse/SOLR-1724 Project: Solr Issue Type: New Feature Components: multicore Affects Versions: 1.4 Reporter: Jason Rutherglen Fix For: 1.5 Though we're implementing cloud, I need something real soon I can play with and deploy. So this'll be a patch that only deploys new cores, and that's about it. The arch is real simple: On Zookeeper there'll be a directory that contains files that represent the state of the cores of a given set of servers which will look like the following: /production/cores-1.txt /production/cores-2.txt /production/core-host-1-actual.txt (ephemeral node per host) Where each core-N.txt file contains: hostname,corename,instanceDir,coredownloadpath coredownloadpath is a URL such as file://, http://, hftp://, hdfs://, ftp://, etc and core-host-actual.txt contains: hostname,corename,instanceDir,size Everytime a new core-N.txt file is added, the listening host finds it's entry in the list and begins the process of trying to match the entries. Upon completion, it updates it's /core-host-1-actual.txt file to it's completed state or logs an error. When all host actual files are written (without errors), then a new core-1-actual.txt file is written which can be picked up by another process that can create a new core proxy. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.