[
https://issues.apache.org/jira/browse/JENA-1305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15925987#comment-15925987
]
ASF GitHub Bot commented on JENA-1305:
--------------------------------------
Github user osma commented on a diff in the pull request:
https://github.com/apache/jena/pull/227#discussion_r106146265
--- Diff:
jena-text/src/main/java/org/apache/jena/query/text/TextIndexES.java ---
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.query.text;
+
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.NodeFactory;
+import org.apache.jena.sparql.util.NodeFactoryExtra;
+import
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.get.GetField;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import static
org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+/**
+ * Elastic Search Implementation of {@link TextIndex}
+ *
+ */
+public class TextIndexES implements TextIndex {
+
+ /**
+ * The definition of the Entity we are trying to Index
+ */
+ private final EntityDefinition docDef ;
+
+ /**
+ * Thread safe ElasticSearch Java Client to perform Index operations
+ */
+ private static Client client;
+
+ /**
+ * The name of the index. Defaults to 'test'
+ */
+ private final String INDEX_NAME;
+
+ static final String CLUSTER_NAME = "cluster.name";
+
+ static final String NUM_OF_SHARDS = "number_of_shards";
+
+ static final String NUM_OF_REPLICAS = "number_of_replicas";
+
+ private boolean isMultilingual ;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TextIndexES.class) ;
+
+ public TextIndexES(TextIndexConfig config, ESSettings esSettings)
throws Exception{
+
+ this.INDEX_NAME = esSettings.getIndexName();
+ this.docDef = config.getEntDef();
+
+
+ this.isMultilingual = config.isMultilingualSupport();
+ if (this.isMultilingual && config.getEntDef().getLangField() ==
null) {
+ //multilingual index cannot work without lang field
+ docDef.setLangField("lang");
+ }
+ if(client == null) {
+
+ LOGGER.debug("Initializing the Elastic Search Java Client with
settings: " + esSettings);
+ Settings settings = Settings.builder()
+ .put(CLUSTER_NAME,
esSettings.getClusterName()).build();
+ List<InetSocketTransportAddress> addresses = new ArrayList<>();
+ for(String host: esSettings.getHostToPortMapping().keySet()) {
+ InetSocketTransportAddress addr = new
InetSocketTransportAddress(InetAddress.getByName(host),
esSettings.getHostToPortMapping().get(host));
+ addresses.add(addr);
+ }
+
+ InetSocketTransportAddress socketAddresses[] = new
InetSocketTransportAddress[addresses.size()];
+ client = new
PreBuiltTransportClient(settings).addTransportAddresses(addresses.toArray(socketAddresses));
+ LOGGER.debug("Successfully initialized the client");
+ }
+
+
+ IndicesExistsResponse exists = client.admin().indices().exists(new
IndicesExistsRequest(INDEX_NAME)).get();
+ if(!exists.isExists()) {
+ Settings indexSettings = Settings.builder()
+ .put(NUM_OF_SHARDS, esSettings.getShards())
+ .put(NUM_OF_REPLICAS, esSettings.getReplicas())
+ .build();
+ LOGGER.debug("Index with name " + INDEX_NAME + " does not
exist yet. Creating one with settings: " + indexSettings.toString());
+
client.admin().indices().prepareCreate(INDEX_NAME).setSettings(indexSettings).get();
+ }
+
+
+
+ }
+
+
+ /**
+ * Constructor used mainly for performing Integration tests
+ * @param config an instance of {@link TextIndexConfig}
+ * @param client an instance of {@link TransportClient}. The client
should already have been initialized with an index
+ */
+ public TextIndexES(TextIndexConfig config, Client client, String
indexName) {
+ this.docDef = config.getEntDef();
+ this.isMultilingual = true;
+ this.client = client;
+ this.INDEX_NAME = indexName;
+ }
+
+ /**
+ * We do not have any specific logic to perform before committing
+ */
+ @Override
+ public void prepareCommit() {
+ //Do Nothing
+
+ }
+
+ /**
+ * Commit happens in the individual get/add/delete operations
+ */
+ @Override
+ public void commit() {
+ // Do Nothing
+ }
+
+ /**
+ * not really sure what we need to roll back.
+ */
+ @Override
+ public void rollback() {
+ //Not sure what to do here
+
+ }
+
+ /**
+ * We don't have resources that need to be closed explicitely
+ */
+ @Override
+ public void close() {
+ // Do Nothing
+
+ }
+
+ /**
+ * Update an Entity. Since we are doing Upserts in add entity anyways,
we simply call {@link #addEntity(Entity)}
+ * method that takes care of updating the Entity as well.
+ * @param entity the entity to update.
+ */
+ @Override
+ public void updateEntity(Entity entity) {
+ //Since Add entity also updates the indexed document in case it
already exists,
+ // we can simply call the addEntity from here.
+ addEntity(entity);
+ }
+
+
+ /**
+ * Add an Entity to the ElasticSearch Index.
+ * The entity will be added as a new document in ES, if it does not
already exists.
+ * If the Entity exists, then the entity will simply be updated.
+ * The entity will never be replaced.
+ * @param entity the entity to add
+ */
+ @Override
+ public void addEntity(Entity entity) {
+ LOGGER.debug("Adding/Updating the entity in ES");
+
+ //The field that has a not null value in the current Entity
instance.
+ //Required, mainly for building a script for the update command.
+ String fieldToAdd = null;
+ String fieldValueToAdd = "";
+ try {
+ XContentBuilder builder = jsonBuilder()
+ .startObject();
+
+ //Currently ignoring Graph field based indexing
+// if (docDef.getGraphField() != null) {
+// builder = builder.field(docDef.getGraphField(),
entity.getGraph());
+// }
+
+ for(String field: docDef.fields()) {
+ if(entity.get(field) != null) {
+ if(entity.getLanguage() != null &&
!entity.getLanguage().isEmpty() && isMultilingual) {
+ fieldToAdd = field + "_" + entity.getLanguage();
+ } else {
+ fieldToAdd = field;
+ }
+
+ fieldValueToAdd = (String) entity.get(field);
+ builder = builder.field(fieldToAdd,
Arrays.asList(fieldValueToAdd));
+ break;
+ } else {
+ //We are making sure that the field is at-least added
to the index.
+ //This will help us tremendously when we are appending
the data later in an already indexed document.
+ builder = builder.field(field,
Collections.emptyList());
+ }
+
+ }
+
+ builder = builder.endObject();
+ IndexRequest indexRequest = new IndexRequest(INDEX_NAME,
docDef.getEntityField(), entity.getId())
+ .source(builder);
+
+ /**
+ * We are creating an upsert request here instead of a simple
insert request.
+ * The reason is we want to add a document if it does not
exist with the given Subject Id (URI).
+ * But if the document exists with the same Subject Id, we
want to do an update to it instead of deleting it and
+ * then creating it with only the latest field values.
+ * This functionality is called Upsert functionality and more
can be learned about it here:
+ *
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html#upserts
+ */
+
+ //First Search of the field exists or not
+ SearchResponse existsResponse =
client.prepareSearch(INDEX_NAME)
+ .setTypes(docDef.getEntityField())
+ .setQuery(QueryBuilders.existsQuery(fieldToAdd))
+ .get();
+ String script;
+ if(existsResponse != null && existsResponse.getHits() != null
&& existsResponse.getHits().totalHits() > 0) {
--- End diff --
Updates now consist of two ES operations: first checking whether the
entity+field exists, and then doing either an add or an update depending on the
result. I wonder if a race condition is possible here, if many additions happen
around the same time from multiple threads? I think that a single-operation
atomic update would be preferable, if it is possible to implement with ES
scripting. It would likely also perform much better, since the overhead of an
extra ES HTTP request is likely quite significant considering that we are
dealing with individual triples here.
> Elastic Search Support for Apache Jena Text
> --------------------------------------------
>
> Key: JENA-1305
> URL: https://issues.apache.org/jira/browse/JENA-1305
> Project: Apache Jena
> Issue Type: New Feature
> Components: Text
> Affects Versions: Jena 3.2.0
> Reporter: Anuj Kumar
> Assignee: Osma Suominen
> Labels: elasticsearch
> Original Estimate: 240h
> Remaining Estimate: 240h
>
> This Jira tracks the development of Jena Text ElasticSearch Implementation.
> The goal is to extend Jena Text capability to index, at scale, in
> ElasticSearch. This implementation would be similar to the Lucene and Solr
> implementations.
> We will use ES version 5.2.1 for the implementation.
> The following functionalities would be supported:
> * Indexing Literal values
> * Updating indexed values
> * Deleting Indexed values
> * Custom Analyzer Support
> * Configuration using Assembler as well as Java techniques.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)