This is an automated email from the ASF dual-hosted git repository. rnewson pushed a commit to branch nouveau-streaming-index-update-alt in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 5cabfef0216df1c5c61deeffb5ba880012bc911c Author: Robert Newson <[email protected]> AuthorDate: Thu Feb 19 21:45:37 2026 +0000 stream index updates in one request for performance --- .../apache/couchdb/nouveau/NouveauApplication.java | 2 +- .../couchdb/nouveau/api/DocumentDeleteRequest.java | 4 +- .../couchdb/nouveau/api/DocumentRequest.java | 27 +++ .../couchdb/nouveau/api/DocumentUpdateRequest.java | 4 +- .../couchdb/nouveau/api/IndexInfoRequest.java | 2 +- .../apache/couchdb/nouveau/api/UpdateRequest.java | 28 ++++ .../couchdb/nouveau/health/IndexHealthCheck.java | 2 +- .../couchdb/nouveau/resources/IndexResource.java | 55 +++++- .../nouveau/health/IndexHealthCheckTest.java | 5 +- .../couchdb/nouveau/lucene/LuceneIndexTest.java | 28 ++-- src/nouveau/src/nouveau_api.erl | 184 ++++++++++++--------- src/nouveau/src/nouveau_index_updater.erl | 28 ++-- 12 files changed, 256 insertions(+), 113 deletions(-) diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java index c2230d1eb..eb886d826 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/NouveauApplication.java @@ -77,7 +77,7 @@ public class NouveauApplication extends Application<NouveauApplicationConfigurat environment.jersey().register(analyzeResource); // IndexResource - final IndexResource indexResource = new IndexResource(indexManager); + final IndexResource indexResource = new IndexResource(indexManager, environment.getObjectMapper()); environment.jersey().register(indexResource); // Health checks diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java index 82e9b716a..ddc067a16 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java @@ -17,7 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Positive; import jakarta.validation.constraints.PositiveOrZero; -public final class DocumentDeleteRequest { +public final class DocumentDeleteRequest extends DocumentRequest { @PositiveOrZero private final long matchSeq; @@ -28,9 +28,11 @@ public final class DocumentDeleteRequest { private final boolean purge; public DocumentDeleteRequest( + @JsonProperty("doc_id") final String id, @JsonProperty("match_seq") final long matchSeq, @JsonProperty("seq") final long seq, @JsonProperty("purge") final boolean purge) { + super(id); if (matchSeq < 0) { throw new IllegalArgumentException("matchSeq must be 0 or greater"); } diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java new file mode 100644 index 000000000..4301b5b10 --- /dev/null +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java @@ -0,0 +1,27 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.api; + +public abstract class DocumentRequest extends UpdateRequest { + + private final String id; + + protected DocumentRequest(final String id) { + this.id = id; + } + + public final String getId() { + return id; + } +} diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java index 82c196602..7b1db0c09 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java @@ -20,7 +20,7 @@ import jakarta.validation.constraints.Positive; import jakarta.validation.constraints.PositiveOrZero; import java.util.Collection; -public final class DocumentUpdateRequest { +public final class DocumentUpdateRequest extends DocumentRequest { @PositiveOrZero private final long matchSeq; @@ -35,10 +35,12 @@ public final class DocumentUpdateRequest { private final Collection<Field> fields; public DocumentUpdateRequest( + @JsonProperty("doc_id") final String id, @JsonProperty("match_seq") final long matchSeq, @JsonProperty("seq") final long seq, @JsonProperty("partition") final String partition, @JsonProperty("fields") final Collection<Field> fields) { + super(id); this.matchSeq = matchSeq; this.seq = seq; this.partition = partition; diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java index cc008231c..538e664b9 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/IndexInfoRequest.java @@ -17,7 +17,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.constraints.Positive; import java.util.OptionalLong; -public final class IndexInfoRequest { +public final class IndexInfoRequest extends UpdateRequest { private final OptionalLong matchUpdateSeq; diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/UpdateRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/UpdateRequest.java new file mode 100644 index 000000000..2ae5deb7f --- /dev/null +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/UpdateRequest.java @@ -0,0 +1,28 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.api; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; + +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = DocumentUpdateRequest.class, name = "update"), + @JsonSubTypes.Type(value = DocumentDeleteRequest.class, name = "delete"), + @JsonSubTypes.Type(value = IndexInfoRequest.class, name = "index_info"), +}) +public abstract class UpdateRequest {} diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java index 7e5facb2e..3a70dd7a7 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java @@ -42,7 +42,7 @@ public final class IndexHealthCheck extends HealthCheck { indexResource.createIndex(name, new IndexDefinition(IndexDefinition.LATEST_LUCENE_VERSION, "standard", null)); try { final DocumentUpdateRequest documentUpdateRequest = - new DocumentUpdateRequest(0, 1, null, Collections.emptyList()); + new DocumentUpdateRequest("foo", 0, 1, null, Collections.emptyList()); indexResource.updateDoc(name, "foo", documentUpdateRequest); final SearchRequest searchRequest = new SearchRequest(); diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java index 9ba382109..9aa727cd2 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java @@ -16,6 +16,8 @@ package org.apache.couchdb.nouveau.resources; import com.codahale.metrics.annotation.ExceptionMetered; import com.codahale.metrics.annotation.Metered; import com.codahale.metrics.annotation.ResponseMetered; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.servlet.http.HttpServletRequest; import jakarta.validation.Valid; import jakarta.validation.constraints.NotNull; import jakarta.ws.rs.Consumes; @@ -27,6 +29,7 @@ import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response.Status; import java.io.IOException; @@ -40,7 +43,10 @@ import org.apache.couchdb.nouveau.api.IndexInfoRequest; import org.apache.couchdb.nouveau.api.Ok; import org.apache.couchdb.nouveau.api.SearchRequest; import org.apache.couchdb.nouveau.api.SearchResults; +import org.apache.couchdb.nouveau.api.UpdateRequest; import org.apache.couchdb.nouveau.core.IndexManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Path("/index/{name}") @Metered @@ -50,10 +56,15 @@ import org.apache.couchdb.nouveau.core.IndexManager; @Produces(MediaType.APPLICATION_JSON) public final class IndexResource { + private static final Logger LOGGER = LoggerFactory.getLogger(IndexResource.class); + private final IndexManager indexManager; - public IndexResource(final IndexManager indexManager) { + private final ObjectMapper objectMapper; + + public IndexResource(final IndexManager indexManager, final ObjectMapper objectMapper) { this.indexManager = Objects.requireNonNull(indexManager); + this.objectMapper = Objects.requireNonNull(objectMapper); } @PUT @@ -67,6 +78,7 @@ public final class IndexResource { return Ok.INSTANCE; } + @Deprecated(since = "2.5.2", forRemoval = true) @DELETE @Path("/doc/{docId}") public Ok deleteDoc( @@ -120,6 +132,7 @@ public final class IndexResource { }); } + @Deprecated(since = "2.5.2", forRemoval = true) @PUT @Path("/doc/{docId}") public Ok updateDoc( @@ -132,4 +145,44 @@ public final class IndexResource { return Ok.INSTANCE; }); } + + @POST + @Path("/update") + @Consumes({"application/json-seq"}) + public Ok updates(@PathParam("name") String name, @Context HttpServletRequest req) throws Exception { + var reader = req.getReader(); + return indexManager.with(name, (index) -> { + String line; + while ((line = reader.readLine()) != null) { + if (line.charAt(0) != 30) { + throw new WebApplicationException("malformed row", Status.BAD_REQUEST); + } + var updateReq = objectMapper.readValue(line.substring(1), UpdateRequest.class); + if (updateReq instanceof DocumentUpdateRequest) { + var documentUpdateRequest = (DocumentUpdateRequest) updateReq; + index.update(documentUpdateRequest.getId(), documentUpdateRequest); + } + if (updateReq instanceof DocumentDeleteRequest) { + var documentDeleteRequest = (DocumentDeleteRequest) updateReq; + index.delete(documentDeleteRequest.getId(), documentDeleteRequest); + } + if (updateReq instanceof IndexInfoRequest) { + var indexInfoRequest = (IndexInfoRequest) updateReq; + if (indexInfoRequest.getMatchUpdateSeq().isPresent() + && indexInfoRequest.getUpdateSeq().isPresent()) { + index.setUpdateSeq( + indexInfoRequest.getMatchUpdateSeq().getAsLong(), + indexInfoRequest.getUpdateSeq().getAsLong()); + } + if (indexInfoRequest.getMatchPurgeSeq().isPresent() + && indexInfoRequest.getPurgeSeq().isPresent()) { + index.setPurgeSeq( + indexInfoRequest.getMatchPurgeSeq().getAsLong(), + indexInfoRequest.getPurgeSeq().getAsLong()); + } + } + } + return Ok.INSTANCE; + }); + } } diff --git a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java index 0c777fbab..77f47a52c 100644 --- a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java +++ b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/health/IndexHealthCheckTest.java @@ -29,17 +29,18 @@ public class IndexHealthCheckTest { @Test public void testIndexHealthCheck(@TempDir final Path tempDir) throws Exception { var manager = new IndexManager(); + var objectMapper = new ObjectMapper(); manager.setCommitIntervalSeconds(30); manager.setIdleSeconds(60); manager.setMaxIndexesOpen(1); - manager.setObjectMapper(new ObjectMapper()); + manager.setObjectMapper(objectMapper); manager.setRootDir(tempDir); manager.setScheduledExecutorService(Executors.newScheduledThreadPool(2)); manager.setSearcherFactory(new SearcherFactory()); manager.start(); try { - var resource = new IndexResource(manager); + var resource = new IndexResource(manager, objectMapper); var check = new IndexHealthCheck(resource); var result = check.check(); assertTrue(result.isHealthy(), result.toString()); diff --git a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java index f87af2fe0..2d56b14e9 100644 --- a/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java +++ b/extra/nouveau/src/test/java/org/apache/couchdb/nouveau/lucene/LuceneIndexTest.java @@ -77,7 +77,7 @@ public class LuceneIndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("foo", "bar", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -97,7 +97,7 @@ public class LuceneIndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("foo", "bar", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -118,7 +118,7 @@ public class LuceneIndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("bar", "baz", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -139,7 +139,7 @@ public class LuceneIndexTest { final int count = 100; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new DoubleField("bar", (double) i, false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } final SearchRequest request = new SearchRequest(); @@ -164,13 +164,13 @@ public class LuceneIndexTest { final int count = 50; for (int i = 1; i <= count; i++) { final Collection<Field> fields = List.of(new StringField("bar", "bar", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } for (int i = count + 1; i <= (count * 2) + 5; i++) { final Collection<Field> fields = List.of(new StringField("bar", "baz", false)); - final DocumentUpdateRequest request = new DocumentUpdateRequest(i - 1, i, null, fields); + final DocumentUpdateRequest request = new DocumentUpdateRequest(null, i - 1, i, null, fields); index.update("doc" + i, request); } @@ -195,15 +195,15 @@ public class LuceneIndexTest { // get match seq wrong assertThrows( UpdatesOutOfOrderException.class, - () -> index.update("foo", new DocumentUpdateRequest(1, 2, null, fields))); + () -> index.update("foo", new DocumentUpdateRequest(null, 1, 2, null, fields))); // Go to 2. - index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); + index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, fields)); // Should be prevented from going down to 1. assertThrows( UpdatesOutOfOrderException.class, - () -> index.update("foo", new DocumentUpdateRequest(2, 1, null, fields))); + () -> index.update("foo", new DocumentUpdateRequest(null, 2, 1, null, fields))); } finally { cleanup(index); } @@ -235,7 +235,7 @@ public class LuceneIndexTest { assertThat(info.getUpdateSeq()).isEqualTo(0); final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); + index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, fields)); index.commit(); info = index.info(); @@ -252,13 +252,13 @@ public class LuceneIndexTest { Index index = setup(path); try { final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); + index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, fields)); index.commit(); IndexInfo info = index.info(); assertThat(info.getNumDocs()).isEqualTo(1); - index.delete("foo", new DocumentDeleteRequest(2, 3, false)); + index.delete("foo", new DocumentDeleteRequest(null, 2, 3, false)); index.commit(); info = index.info(); @@ -274,13 +274,13 @@ public class LuceneIndexTest { Index index = setup(path); try { final Collection<Field> fields = List.of(new DoubleField("bar", 12.0, false)); - index.update("foo", new DocumentUpdateRequest(0, 2, null, fields)); + index.update("foo", new DocumentUpdateRequest(null, 0, 2, null, fields)); index.commit(); IndexInfo info = index.info(); assertThat(info.getNumDocs()).isEqualTo(1); - index.delete("foo", new DocumentDeleteRequest(0, 3, true)); + index.delete("foo", new DocumentDeleteRequest(null, 0, 3, true)); index.commit(); info = index.info(); diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index 2d140e580..821484d30 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -26,6 +26,8 @@ delete_doc/4, purge_doc/4, update_doc/6, + start_update/1, + end_update/1, search/2, set_purge_seq/3, set_update_seq/3, @@ -34,6 +36,7 @@ ]). -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}). +-define(JSON_SEQ_CONTENT_TYPE, {"Content-Type", "application/json-seq"}). analyze(Text, Analyzer) when is_binary(Text), is_binary(Analyzer) @@ -99,41 +102,54 @@ delete_path(Path, Exclusions) when send_error(Reason) end. -delete_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq) when +delete_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, is_integer(UpdateSeq), UpdateSeq > 0 -> - ReqBody = #{match_seq => MatchSeq, seq => UpdateSeq, purge => false}, - Resp = send_if_enabled( - doc_path(Index, DocId), - [?JSON_CONTENT_TYPE], - <<"DELETE">>, - jiffy:encode(ReqBody) - ), - case Resp of - {ok, 200, _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; - {error, Reason} -> - send_error(Reason) - end. + Row = #{ + <<"@type">> => delete, + doc_id => DocId, + match_seq => MatchSeq, + seq => UpdateSeq, + purge => false + }, + ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)), + check_status(PoolStreamRef). -purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when +purge_doc({_, _} = PoolStreamRef, DocId, MatchSeq, PurgeSeq) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, is_integer(PurgeSeq), PurgeSeq > 0 -> - ReqBody = #{match_seq => MatchSeq, seq => PurgeSeq, purge => true}, - Resp = send_if_enabled( - doc_path(Index, DocId), [?JSON_CONTENT_TYPE], <<"DELETE">>, jiffy:encode(ReqBody) - ), - case Resp of + Row = #{ + <<"@type">> => delete, + doc_id => DocId, + match_seq => MatchSeq, + seq => PurgeSeq, + purge => true + }, + ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)), + check_status(PoolStreamRef). + +start_update(#index{} = Index) -> + case nouveau:enabled() of + true -> + gun_pool:post( + update_path(Index), + [nouveau_gun:host_header(), ?JSON_SEQ_CONTENT_TYPE] + ); + false -> + {error, nouveau_not_enabled} + end. + +end_update({_, _} = PoolStreamRef) -> + ok = gun_pool:data(PoolStreamRef, fin, <<>>), + case await(PoolStreamRef) of {ok, 200, _, _} -> ok; {ok, StatusCode, _, RespBody} -> @@ -142,7 +158,7 @@ purge_doc(#index{} = Index, DocId, MatchSeq, PurgeSeq) when send_error(Reason) end. -update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when +update_doc({_, _} = PoolStreamRef, DocId, MatchSeq, UpdateSeq, Partition, Fields) when is_binary(DocId), is_integer(MatchSeq), MatchSeq >= 0, @@ -151,26 +167,16 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when (is_binary(Partition) orelse Partition == null), is_list(Fields) -> - ReqBody = #{ + Row = #{ + <<"@type">> => update, + doc_id => DocId, match_seq => MatchSeq, seq => UpdateSeq, partition => Partition, fields => Fields }, - Resp = send_if_enabled( - doc_path(Index, DocId), - [?JSON_CONTENT_TYPE], - <<"PUT">>, - jiffy:encode(ReqBody) - ), - case Resp of - {ok, 200, _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; - {error, Reason} -> - send_error(Reason) - end. + ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)), + check_status(PoolStreamRef). search(#index{} = Index, QueryArgs) -> Resp = send_if_enabled( @@ -188,32 +194,23 @@ search(#index{} = Index, QueryArgs) -> send_error(Reason) end. -set_update_seq(#index{} = Index, MatchSeq, UpdateSeq) -> - ReqBody = #{ +set_update_seq({_, _} = PoolStreamRef, MatchSeq, UpdateSeq) -> + Row = #{ + <<"@type">> => index_info, match_update_seq => MatchSeq, update_seq => UpdateSeq }, - set_seq(Index, ReqBody). + ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)), + check_status(PoolStreamRef). -set_purge_seq(#index{} = Index, MatchSeq, PurgeSeq) -> - ReqBody = #{ +set_purge_seq({_, _} = PoolStreamRef, MatchSeq, PurgeSeq) -> + Row = #{ + <<"@type">> => index_info, match_purge_seq => MatchSeq, purge_seq => PurgeSeq }, - set_seq(Index, ReqBody). - -set_seq(#index{} = Index, ReqBody) -> - Resp = send_if_enabled( - index_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, jiffy:encode(ReqBody) - ), - case Resp of - {ok, 200, _, _} -> - ok; - {ok, StatusCode, _, RespBody} -> - {error, jaxrs_error(StatusCode, RespBody)}; - {error, Reason} -> - send_error(Reason) - end. + ok = gun_pool:data(PoolStreamRef, nofin, encode_json_seq(Row)), + check_status(PoolStreamRef). supported_lucene_versions() -> Resp = send_if_enabled(<<"/">>, [], <<"GET">>), @@ -234,17 +231,12 @@ index_path(Path) when is_binary(Path) -> index_path(#index{} = Index) -> [<<"/index/">>, couch_util:url_encode(nouveau_util:index_name(Index))]. -doc_path(#index{} = Index, DocId) -> - [ - <<"/index/">>, - couch_util:url_encode(nouveau_util:index_name(Index)), - <<"/doc/">>, - couch_util:url_encode(DocId) - ]. - search_path(#index{} = Index) -> [index_path(Index), <<"/search">>]. +update_path(#index{} = Index) -> + [index_path(Index), <<"/update">>]. + jaxrs_error(400, Body) -> {bad_request, message(Body)}; jaxrs_error(404, Body) -> @@ -291,20 +283,7 @@ send_if_enabled(Path, ReqHeaders, Method, ReqBody, RemainingTries) -> ) of {async, PoolStreamRef} -> - Timeout = config:get_integer("nouveau", "request_timeout", 30000), - case gun_pool:await(PoolStreamRef, Timeout) of - {response, fin, Status, RespHeaders} -> - {ok, Status, RespHeaders, []}; - {response, nofin, Status, RespHeaders} -> - case gun_pool:await_body(PoolStreamRef, Timeout) of - {ok, RespBody} -> - {ok, Status, RespHeaders, RespBody}; - {error, Reason} -> - {error, Reason} - end; - {error, Reason} -> - {error, Reason} - end; + await(PoolStreamRef); {error, no_connection_available, _Reason} when RemainingTries > 0 -> timer:sleep(1000), send_if_enabled(Path, ReqHeaders, Method, ReqBody, RemainingTries - 1); @@ -314,3 +293,50 @@ send_if_enabled(Path, ReqHeaders, Method, ReqBody, RemainingTries) -> false -> {error, nouveau_not_enabled} end. + +await(PoolStreamRef) -> + Timeout = config:get_integer("nouveau", "request_timeout", 30000), + await(PoolStreamRef, Timeout). + +await({ConnPid, _} = PoolStreamRef, Timeout) -> + MRef = monitor(process, ConnPid), + T0 = now_ms(), + Res = + case gun_pool:await(PoolStreamRef, Timeout, MRef) of + {response, fin, Status, RespHeaders} -> + {ok, Status, RespHeaders, []}; + {response, nofin, Status, RespHeaders} -> + Elapsed = now_ms() - T0, + case gun_pool:await_body(PoolStreamRef, max(0, Timeout - Elapsed), MRef) of + {ok, RespBody} -> + {ok, Status, RespHeaders, RespBody}; + {'DOWN', MRef, process, ConnPid, Reason} -> + {error, Reason}; + {error, Reason} -> + {error, Reason} + end; + {'DOWN', MRef, process, ConnPid, Reason} -> + {error, Reason}; + {error, Reason} -> + {error, Reason} + end, + demonitor(MRef, [flush]), + Res. + +now_ms() -> + erlang:monotonic_time(millisecond). + +encode_json_seq(Data) -> + [$\x{1e}, jiffy:encode(Data), $\n]. + +check_status({_, _} = PoolStreamRef) -> + case await(PoolStreamRef, 0) of + {error, timeout} -> + ok; + {ok, 200, _, _} -> + ok; + {ok, StatusCode, _, RespBody} -> + {error, jaxrs_error(StatusCode, RespBody)}; + {error, Reason} -> + send_error(Reason) + end. diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index 4bfea753a..8d9aa3267 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -27,6 +27,7 @@ -import(nouveau_util, [index_path/1]). -record(acc, { + pool_stream_ref, db, index, proc, @@ -80,14 +81,15 @@ update(#index{} = Index) -> index_update_seq = IndexUpdateSeq, index_purge_seq = IndexPurgeSeq }, - {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0), - + {async, PoolStreamRef} = nouveau_api:start_update(Index), + {ok, PurgeAcc1} = purge_index(PoolStreamRef, Db, Index, PurgeAcc0), NewCurSeq = couch_db:get_update_seq(Db), Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), Acc0 = #acc{ + pool_stream_ref = PoolStreamRef, db = Db, index = Index, proc = Proc, @@ -99,8 +101,10 @@ update(#index{} = Index) -> {ok, Acc1} = couch_db:fold_changes( Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, [] ), - exit(nouveau_api:set_update_seq(Index, Acc1#acc.update_seq, NewCurSeq)) + nouveau_api:set_update_seq(PoolStreamRef, Acc1#acc.update_seq, NewCurSeq), + exit(nouveau_api:end_update(PoolStreamRef)) after + gun_pool:cancel(PoolStreamRef), ret_os_process(Proc) end end @@ -125,8 +129,8 @@ load_docs(FDI, #acc{} = Acc1) -> false -> case update_or_delete_index( + Acc1#acc.pool_stream_ref, Acc1#acc.db, - Acc1#acc.index, Acc1#acc.update_seq, DI, Acc1#acc.proc @@ -142,11 +146,11 @@ load_docs(FDI, #acc{} = Acc1) -> end, {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}. -update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) -> +update_or_delete_index(PoolStreamRef, Db, MatchSeq, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq); false -> {ok, Doc} = couch_db:open_doc(Db, DI, []), Json = couch_doc:to_json_obj(Doc, []), @@ -160,10 +164,10 @@ update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) - end, case Fields of [] -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + nouveau_api:delete_doc(PoolStreamRef, Id, MatchSeq, Seq); _ -> nouveau_api:update_doc( - Index, Id, MatchSeq, Seq, Partition, Fields + PoolStreamRef, Id, MatchSeq, Seq, Partition, Fields ) end end. @@ -209,7 +213,7 @@ index_definition(#index{} = Index) -> <<"field_analyzers">> => Index#index.field_analyzers }. -purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> +purge_index(PoolStreamRef, Db, Index, #purge_acc{} = PurgeAcc0) -> Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), @@ -218,7 +222,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> case couch_db:get_full_doc_info(Db, Id) of not_found -> ok = nouveau_api:purge_doc( - Index, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq + PoolStreamRef, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq ), PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq}; FDI -> @@ -229,8 +233,8 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> PurgeAcc1; false -> update_or_delete_index( + PoolStreamRef, Db, - Index, PurgeAcc1#purge_acc.index_update_seq, DI, Proc @@ -250,7 +254,7 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> ), DbPurgeSeq = couch_db:get_purge_seq(Db), ok = nouveau_api:set_purge_seq( - Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq + PoolStreamRef, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq ), update_local_doc(Db, Index, DbPurgeSeq), {ok, PurgeAcc3}
