Author: jnioche Date: Fri May 30 14:55:51 2014 New Revision: 1598622 URL: http://svn.apache.org/r1598622 Log: NUTCH-1768 Upgrade to ElasticSearch 1.1.0
Modified: nutch/branches/2.x/CHANGES.txt nutch/branches/2.x/ivy/ivy.xml nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java Modified: nutch/branches/2.x/CHANGES.txt URL: http://svn.apache.org/viewvc/nutch/branches/2.x/CHANGES.txt?rev=1598622&r1=1598621&r2=1598622&view=diff ============================================================================== --- nutch/branches/2.x/CHANGES.txt (original) +++ nutch/branches/2.x/CHANGES.txt Fri May 30 14:55:51 2014 @@ -2,6 +2,8 @@ Nutch Change Log Current Development +* NUTCH-1768 Upgrade to ElasticSearch 1.1.0 (jnioche) + * NUTCH-1634 readdb -stats shows the result twice (kaveh minooie via jnioche) * NUTCH-1780 ttl and gc_grace_seconds attributes are missing from gora-cassandra-mapping.xml file (kaveh minooie via lewismc) Modified: nutch/branches/2.x/ivy/ivy.xml URL: http://svn.apache.org/viewvc/nutch/branches/2.x/ivy/ivy.xml?rev=1598622&r1=1598621&r2=1598622&view=diff ============================================================================== --- nutch/branches/2.x/ivy/ivy.xml (original) +++ nutch/branches/2.x/ivy/ivy.xml Fri May 30 14:55:51 2014 @@ -32,9 +32,6 @@ </publications> <dependencies> - <dependency org="org.elasticsearch" name="elasticsearch" rev="0.19.4" - conf="*->default"/> - <dependency org="org.apache.solr" name="solr-solrj" rev="4.6.0" conf="*->default" /> <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.1" Modified: nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml?rev=1598622&r1=1598621&r2=1598622&view=diff ============================================================================== --- nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml (original) +++ nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml Fri May 30 14:55:51 2014 @@ -29,7 +29,7 @@ language governing permissions and limit <dependencies> <dependency org="org.elasticsearch" name="elasticsearch" - rev="0.90.1" conf="*->default" /> + rev="1.1.0" conf="*->default" /> </dependencies> </ivy-module> Modified: nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml?rev=1598622&r1=1598621&r2=1598622&view=diff ============================================================================== --- nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml (original) +++ nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml Fri May 30 14:55:51 2014 @@ -23,22 +23,21 @@ <export name="*" /> </library> - <library name="elasticsearch-0.90.1.jar"/> - <library name="jna-3.3.0.jar"/> - <library name="jts-1.12.jar"/> - <library name="log4j-1.2.17.jar"/> - <library name="lucene-codecs-4.3.0.jar"/> - <library name="lucene-core-4.3.0.jar"/> - <library name="lucene-grouping-4.3.0.jar"/> - <library name="lucene-highlighter-4.3.0.jar"/> - <library name="lucene-join-4.3.0.jar"/> - <library name="lucene-memory-4.3.0.jar"/> - <library name="lucene-queries-4.3.0.jar"/> - <library name="lucene-queryparser-4.3.0.jar"/> - <library name="lucene-sandbox-4.3.0.jar"/> - <library name="lucene-spatial-4.3.0.jar"/> - <library name="lucene-suggest-4.3.0.jar"/> - <library name="spatial4j-0.3.jar"/> + <library name="elasticsearch-1.1.0.jar"/> + <library name="lucene-analyzers-common-4.7.0.jar"/> + <library name="lucene-codecs-4.7.0.jar"/> + <library name="lucene-core-4.7.0.jar"/> + <library name="lucene-grouping-4.7.0.jar"/> + <library name="lucene-highlighter-4.7.0.jar"/> + <library name="lucene-join-4.7.0.jar"/> + <library name="lucene-memory-4.7.0.jar"/> + <library name="lucene-misc-4.7.0.jar"/> + <library name="lucene-queries-4.7.0.jar"/> + <library name="lucene-queryparser-4.7.0.jar"/> + <library name="lucene-sandbox-4.7.0.jar"/> + <library name="lucene-spatial-4.7.0.jar"/> + <library name="lucene-suggest-4.7.0.jar"/> + <library name="spatial4j-0.4.1.jar"/> </runtime> <requires> Modified: nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java?rev=1598622&r1=1598621&r2=1598622&view=diff ============================================================================== --- nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java (original) +++ nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java Fri May 30 14:55:51 2014 @@ -14,36 +14,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - + package org.apache.nutch.indexwriter.elastic; import static org.elasticsearch.node.NodeBuilder.nodeBuilder; +import java.io.BufferedReader; import java.io.IOException; -import java.net.URL; import java.util.HashMap; import java.util.Map; -import java.io.BufferedReader; -import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.JobConf; -import org.apache.nutch.indexer.NutchDocument; import org.apache.nutch.indexer.IndexWriter; -import org.elasticsearch.ElasticSearchException; +import org.apache.nutch.indexer.NutchDocument; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.ImmutableSettings.Builder; -import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.node.Node; import org.slf4j.Logger; @@ -79,11 +75,13 @@ public class ElasticIndexWriter implemen public void open(Configuration job) throws IOException { clusterName = job.get(ElasticConstants.CLUSTER); host = job.get(ElasticConstants.HOST); - port = job.getInt(ElasticConstants.PORT, -1); + port = job.getInt(ElasticConstants.PORT, 9300); + + Builder settingsBuilder = ImmutableSettings.settingsBuilder().classLoader( + Settings.class.getClassLoader()); - Builder settingsBuilder = ImmutableSettings.settingsBuilder(); - - BufferedReader reader = new BufferedReader(job.getConfResourceAsReader("elasticsearch.conf")); + BufferedReader reader = new BufferedReader( + job.getConfResourceAsReader("elasticsearch.conf")); String line; String parts[]; @@ -98,12 +96,16 @@ public class ElasticIndexWriter implemen } } + if (StringUtils.isNotBlank(clusterName)) + settingsBuilder.put("cluster.name", clusterName); + // Set the cluster name and build the settings - Settings settings = settingsBuilder.put("cluster.name", clusterName).build(); - + Settings settings = settingsBuilder.build(); + // Prefer TransportClient if (host != null && port > 1) { - client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress(host, port)); + client = new TransportClient(settings) + .addTransportAddress(new InetSocketTransportAddress(host, port)); } else if (clusterName != null) { node = nodeBuilder().settings(settings).client(true).node(); client = node.client(); @@ -111,16 +113,18 @@ public class ElasticIndexWriter implemen bulk = client.prepareBulk(); defaultIndex = job.get(ElasticConstants.INDEX, "nutch"); - maxBulkDocs = job.getInt( - ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS); - maxBulkLength = job.getInt( - ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH); + maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS, + DEFAULT_MAX_BULK_DOCS); + maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH, + DEFAULT_MAX_BULK_LENGTH); } + @Override public void write(NutchDocument doc) throws IOException { - String id = (String)doc.getFieldValue("url"); + String id = (String) doc.getFieldValue("url"); String type = doc.getDocumentMeta().get("type"); - if (type == null) type = "doc"; + if (type == null) + type = "doc"; IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id); Map<String, Object> source = new HashMap<String, Object>(); @@ -147,30 +151,28 @@ public class ElasticIndexWriter implemen if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) { LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = " - + bulkLength + ", total docs = " + indexedDocs - + ", last doc in bulk = '" + id + "']"); + + bulkLength + ", total docs = " + indexedDocs + + ", last doc in bulk = '" + id + "']"); // Flush the bulk of indexing requests createNewBulk = true; commit(); } } - @Override public void delete(String key) throws IOException { - try{ - DeleteRequestBuilder builder = client.prepareDelete(); + try { + DeleteRequestBuilder builder = client.prepareDelete(); builder.setIndex(defaultIndex); - builder.setType("doc"); + builder.setType("doc"); builder.setId(key); builder.execute().actionGet(); - }catch(ElasticSearchException e) - { + } catch (ElasticsearchException e) { throw makeIOException(e); } } - public static IOException makeIOException(ElasticSearchException e) { + public static IOException makeIOException(ElasticsearchException e) { final IOException ioe = new IOException(); ioe.initCause(e); return ioe; @@ -191,13 +193,13 @@ public class ElasticIndexWriter implemen for (BulkItemResponse item : actionGet) { if (item.isFailed()) { throw new RuntimeException("First failure in bulk: " - + item.getFailureMessage()); + + item.getFailureMessage()); } } } long msWaited = System.currentTimeMillis() - beforeWait; LOG.info("Previous took in ms " + actionGet.getTookInMillis() - + ", including wait " + msWaited); + + ", including wait " + msWaited); execute = null; } if (bulk != null) { @@ -219,7 +221,7 @@ public class ElasticIndexWriter implemen public void close() throws IOException { // Flush pending requests LOG.info("Processing remaining requests [docs = " + bulkDocs - + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]"); + + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]"); createNewBulk = false; commit(); // flush one more time to finalize the last bulk @@ -237,12 +239,17 @@ public class ElasticIndexWriter implemen @Override public String describe() { StringBuffer sb = new StringBuffer("ElasticIndexWriter\n"); - sb.append("\t").append(ElasticConstants.CLUSTER).append(" : elastic prefix cluster\n"); + sb.append("\t").append(ElasticConstants.CLUSTER) + .append(" : elastic prefix cluster\n"); sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n"); - sb.append("\t").append(ElasticConstants.PORT).append(" : port\n"); - sb.append("\t").append(ElasticConstants.INDEX).append(" : elastic index command \n"); - sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS).append(" : elastic bulk index doc counts. (default 250) \n"); - sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH).append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n"); + sb.append("\t").append(ElasticConstants.PORT) + .append(" : port (default 9300)\n"); + sb.append("\t").append(ElasticConstants.INDEX) + .append(" : elastic index command \n"); + sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS) + .append(" : elastic bulk index doc counts. (default 250) \n"); + sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH) + .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n"); return sb.toString(); } @@ -250,16 +257,18 @@ public class ElasticIndexWriter implemen public void setConf(Configuration conf) { config = conf; String cluster = conf.get(ElasticConstants.CLUSTER); - if (cluster == null) { - String message = "Missing elastic.cluster. Should be set in nutch-site.xml "; - message+="\n"+describe(); + String host = conf.get(ElasticConstants.HOST); + + if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) { + String message = "Missing elastic.cluster and elastic.host. At least one of them should be set in nutch-site.xml "; + message += "\n" + describe(); LOG.error(message); throw new RuntimeException(message); } } - + @Override public Configuration getConf() { return config; - } + } }