Author: jnioche
Date: Fri May 30 14:55:51 2014
New Revision: 1598622

URL: http://svn.apache.org/r1598622
Log:
NUTCH-1768 Upgrade to ElasticSearch 1.1.0

Modified:
    nutch/branches/2.x/CHANGES.txt
    nutch/branches/2.x/ivy/ivy.xml
    nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml
    nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml
    
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java

Modified: nutch/branches/2.x/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/CHANGES.txt?rev=1598622&r1=1598621&r2=1598622&view=diff
==============================================================================
--- nutch/branches/2.x/CHANGES.txt (original)
+++ nutch/branches/2.x/CHANGES.txt Fri May 30 14:55:51 2014
@@ -2,6 +2,8 @@ Nutch Change Log
 
 Current Development
 
+* NUTCH-1768 Upgrade to ElasticSearch 1.1.0 (jnioche)
+
 * NUTCH-1634 readdb -stats shows the result twice (kaveh minooie via jnioche)
 
 * NUTCH-1780 ttl and gc_grace_seconds attributes are missing from 
gora-cassandra-mapping.xml file (kaveh minooie via lewismc)

Modified: nutch/branches/2.x/ivy/ivy.xml
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/ivy/ivy.xml?rev=1598622&r1=1598621&r2=1598622&view=diff
==============================================================================
--- nutch/branches/2.x/ivy/ivy.xml (original)
+++ nutch/branches/2.x/ivy/ivy.xml Fri May 30 14:55:51 2014
@@ -32,9 +32,6 @@
   </publications>
 
   <dependencies>
-    <dependency org="org.elasticsearch" name="elasticsearch" rev="0.19.4" 
-                conf="*->default"/>
-  
     <dependency org="org.apache.solr" name="solr-solrj" rev="4.6.0"
       conf="*->default" />
     <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.1"

Modified: nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml?rev=1598622&r1=1598621&r2=1598622&view=diff
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml (original)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/ivy.xml Fri May 30 14:55:51 
2014
@@ -29,7 +29,7 @@ language governing permissions and limit
 
   <dependencies>
     <dependency org="org.elasticsearch" name="elasticsearch"
-      rev="0.90.1" conf="*->default" />
+      rev="1.1.0" conf="*->default" />
   </dependencies>
 
 </ivy-module>

Modified: nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml?rev=1598622&r1=1598621&r2=1598622&view=diff
==============================================================================
--- nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml (original)
+++ nutch/branches/2.x/src/plugin/indexer-elastic/plugin.xml Fri May 30 
14:55:51 2014
@@ -23,22 +23,21 @@
       <export name="*" />
     </library>
     
-    <library name="elasticsearch-0.90.1.jar"/>
-    <library name="jna-3.3.0.jar"/>
-    <library name="jts-1.12.jar"/>
-    <library name="log4j-1.2.17.jar"/>
-    <library name="lucene-codecs-4.3.0.jar"/>
-    <library name="lucene-core-4.3.0.jar"/>
-    <library name="lucene-grouping-4.3.0.jar"/>
-    <library name="lucene-highlighter-4.3.0.jar"/>
-    <library name="lucene-join-4.3.0.jar"/>
-    <library name="lucene-memory-4.3.0.jar"/>
-    <library name="lucene-queries-4.3.0.jar"/>
-    <library name="lucene-queryparser-4.3.0.jar"/>
-    <library name="lucene-sandbox-4.3.0.jar"/>
-    <library name="lucene-spatial-4.3.0.jar"/>
-    <library name="lucene-suggest-4.3.0.jar"/>
-    <library name="spatial4j-0.3.jar"/>
+    <library name="elasticsearch-1.1.0.jar"/>
+    <library name="lucene-analyzers-common-4.7.0.jar"/>
+    <library name="lucene-codecs-4.7.0.jar"/>
+    <library name="lucene-core-4.7.0.jar"/>
+    <library name="lucene-grouping-4.7.0.jar"/>
+    <library name="lucene-highlighter-4.7.0.jar"/>
+    <library name="lucene-join-4.7.0.jar"/>
+    <library name="lucene-memory-4.7.0.jar"/>
+    <library name="lucene-misc-4.7.0.jar"/>
+    <library name="lucene-queries-4.7.0.jar"/>
+    <library name="lucene-queryparser-4.7.0.jar"/>
+    <library name="lucene-sandbox-4.7.0.jar"/>
+    <library name="lucene-spatial-4.7.0.jar"/>
+    <library name="lucene-suggest-4.7.0.jar"/>
+    <library name="spatial4j-0.4.1.jar"/>
   </runtime>
 
   <requires>

Modified: 
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
URL: 
http://svn.apache.org/viewvc/nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java?rev=1598622&r1=1598621&r2=1598622&view=diff
==============================================================================
--- 
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
 (original)
+++ 
nutch/branches/2.x/src/plugin/indexer-elastic/src/java/org/apache/nutch/indexwriter/elastic/ElasticIndexWriter.java
 Fri May 30 14:55:51 2014
@@ -14,36 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 package org.apache.nutch.indexwriter.elastic;
 
 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
 
+import java.io.BufferedReader;
 import java.io.IOException;
-import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
-import java.io.BufferedReader;
-import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.indexer.IndexWriter;
-import org.elasticsearch.ElasticSearchException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ListenableActionFuture;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.delete.DeleteRequestBuilder;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.settings.ImmutableSettings.Builder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.node.Node;
 import org.slf4j.Logger;
@@ -79,11 +75,13 @@ public class ElasticIndexWriter implemen
   public void open(Configuration job) throws IOException {
     clusterName = job.get(ElasticConstants.CLUSTER);
     host = job.get(ElasticConstants.HOST);
-    port = job.getInt(ElasticConstants.PORT, -1);
+    port = job.getInt(ElasticConstants.PORT, 9300);
+
+    Builder settingsBuilder = ImmutableSettings.settingsBuilder().classLoader(
+        Settings.class.getClassLoader());
 
-    Builder settingsBuilder = ImmutableSettings.settingsBuilder();
-    
-    BufferedReader reader = new 
BufferedReader(job.getConfResourceAsReader("elasticsearch.conf"));
+    BufferedReader reader = new BufferedReader(
+        job.getConfResourceAsReader("elasticsearch.conf"));
     String line;
     String parts[];
 
@@ -98,12 +96,16 @@ public class ElasticIndexWriter implemen
       }
     }
 
+    if (StringUtils.isNotBlank(clusterName))
+      settingsBuilder.put("cluster.name", clusterName);
+
     // Set the cluster name and build the settings
-    Settings settings = settingsBuilder.put("cluster.name", 
clusterName).build();
-    
+    Settings settings = settingsBuilder.build();
+
     // Prefer TransportClient
     if (host != null && port > 1) {
-      client = new TransportClient(settings).addTransportAddress(new 
InetSocketTransportAddress(host, port));
+      client = new TransportClient(settings)
+          .addTransportAddress(new InetSocketTransportAddress(host, port));
     } else if (clusterName != null) {
       node = nodeBuilder().settings(settings).client(true).node();
       client = node.client();
@@ -111,16 +113,18 @@ public class ElasticIndexWriter implemen
 
     bulk = client.prepareBulk();
     defaultIndex = job.get(ElasticConstants.INDEX, "nutch");
-    maxBulkDocs = job.getInt(
-            ElasticConstants.MAX_BULK_DOCS, DEFAULT_MAX_BULK_DOCS);
-    maxBulkLength = job.getInt(
-            ElasticConstants.MAX_BULK_LENGTH, DEFAULT_MAX_BULK_LENGTH);
+    maxBulkDocs = job.getInt(ElasticConstants.MAX_BULK_DOCS,
+        DEFAULT_MAX_BULK_DOCS);
+    maxBulkLength = job.getInt(ElasticConstants.MAX_BULK_LENGTH,
+        DEFAULT_MAX_BULK_LENGTH);
   }
+
   @Override
   public void write(NutchDocument doc) throws IOException {
-    String id = (String)doc.getFieldValue("url");
+    String id = (String) doc.getFieldValue("url");
     String type = doc.getDocumentMeta().get("type");
-    if (type == null) type = "doc";
+    if (type == null)
+      type = "doc";
     IndexRequestBuilder request = client.prepareIndex(defaultIndex, type, id);
 
     Map<String, Object> source = new HashMap<String, Object>();
@@ -147,30 +151,28 @@ public class ElasticIndexWriter implemen
 
     if (bulkDocs >= maxBulkDocs || bulkLength >= maxBulkLength) {
       LOG.info("Processing bulk request [docs = " + bulkDocs + ", length = "
-              + bulkLength + ", total docs = " + indexedDocs
-              + ", last doc in bulk = '" + id + "']");
+          + bulkLength + ", total docs = " + indexedDocs
+          + ", last doc in bulk = '" + id + "']");
       // Flush the bulk of indexing requests
       createNewBulk = true;
       commit();
     }
   }
 
-
   @Override
   public void delete(String key) throws IOException {
-    try{
-      DeleteRequestBuilder builder =  client.prepareDelete();
+    try {
+      DeleteRequestBuilder builder = client.prepareDelete();
       builder.setIndex(defaultIndex);
-      builder.setType("doc");      
+      builder.setType("doc");
       builder.setId(key);
       builder.execute().actionGet();
-    }catch(ElasticSearchException e)
-    {
+    } catch (ElasticsearchException e) {
       throw makeIOException(e);
     }
   }
 
-  public static IOException makeIOException(ElasticSearchException e) {
+  public static IOException makeIOException(ElasticsearchException e) {
     final IOException ioe = new IOException();
     ioe.initCause(e);
     return ioe;
@@ -191,13 +193,13 @@ public class ElasticIndexWriter implemen
         for (BulkItemResponse item : actionGet) {
           if (item.isFailed()) {
             throw new RuntimeException("First failure in bulk: "
-                    + item.getFailureMessage());
+                + item.getFailureMessage());
           }
         }
       }
       long msWaited = System.currentTimeMillis() - beforeWait;
       LOG.info("Previous took in ms " + actionGet.getTookInMillis()
-              + ", including wait " + msWaited);
+          + ", including wait " + msWaited);
       execute = null;
     }
     if (bulk != null) {
@@ -219,7 +221,7 @@ public class ElasticIndexWriter implemen
   public void close() throws IOException {
     // Flush pending requests
     LOG.info("Processing remaining requests [docs = " + bulkDocs
-            + ", length = " + bulkLength + ", total docs = " + indexedDocs + 
"]");
+        + ", length = " + bulkLength + ", total docs = " + indexedDocs + "]");
     createNewBulk = false;
     commit();
     // flush one more time to finalize the last bulk
@@ -237,12 +239,17 @@ public class ElasticIndexWriter implemen
   @Override
   public String describe() {
     StringBuffer sb = new StringBuffer("ElasticIndexWriter\n");
-    sb.append("\t").append(ElasticConstants.CLUSTER).append(" : elastic prefix 
cluster\n");
+    sb.append("\t").append(ElasticConstants.CLUSTER)
+        .append(" : elastic prefix cluster\n");
     sb.append("\t").append(ElasticConstants.HOST).append(" : hostname\n");
-    sb.append("\t").append(ElasticConstants.PORT).append(" : port\n");
-    sb.append("\t").append(ElasticConstants.INDEX).append(" : elastic index 
command \n");
-    sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS).append(" : elastic 
bulk index doc counts. (default 250) \n");
-    sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH).append(" : 
elastic bulk index length. (default 2500500 ~2.5MB)\n");
+    sb.append("\t").append(ElasticConstants.PORT)
+        .append(" : port  (default 9300)\n");
+    sb.append("\t").append(ElasticConstants.INDEX)
+        .append(" : elastic index command \n");
+    sb.append("\t").append(ElasticConstants.MAX_BULK_DOCS)
+        .append(" : elastic bulk index doc counts. (default 250) \n");
+    sb.append("\t").append(ElasticConstants.MAX_BULK_LENGTH)
+        .append(" : elastic bulk index length. (default 2500500 ~2.5MB)\n");
     return sb.toString();
   }
 
@@ -250,16 +257,18 @@ public class ElasticIndexWriter implemen
   public void setConf(Configuration conf) {
     config = conf;
     String cluster = conf.get(ElasticConstants.CLUSTER);
-    if (cluster == null) {
-      String message = "Missing elastic.cluster. Should be set in 
nutch-site.xml ";
-      message+="\n"+describe();
+    String host = conf.get(ElasticConstants.HOST);
+
+    if (StringUtils.isBlank(cluster) && StringUtils.isBlank(host)) {
+      String message = "Missing elastic.cluster and elastic.host. At least one 
of them should be set in nutch-site.xml ";
+      message += "\n" + describe();
       LOG.error(message);
       throw new RuntimeException(message);
     }
   }
-    
+
   @Override
   public Configuration getConf() {
     return config;
-  }  
+  }
 }


Reply via email to