Repository: usergrid Updated Branches: refs/heads/master 8cce53c2b -> 1d7785b82
Allow index requests to be sent directly to ES Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/71397815 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/71397815 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/71397815 Branch: refs/heads/master Commit: 7139781518869529665a528f97215a7301074adc Parents: b93f8d4 Author: Peter Johnson <pjohn...@apigee.com> Authored: Thu Nov 2 11:46:27 2017 -0700 Committer: Peter Johnson <pjohn...@apigee.com> Committed: Thu Nov 2 11:46:27 2017 -0700 ---------------------------------------------------------------------- .../corepersistence/CpEntityManager.java | 16 +- .../corepersistence/CpRelationManager.java | 18 +- .../asyncevents/AsyncEventService.java | 6 +- .../asyncevents/AsyncEventServiceImpl.java | 53 +++-- .../asyncevents/AsyncIndexProvider.java | 44 ++++- .../asyncevents/direct/BufferedQueue.java | 49 +++++ .../asyncevents/direct/BufferedQueueImpl.java | 191 +++++++++++++++++++ .../asyncevents/direct/BufferedQueueNOP.java | 45 +++++ .../direct/DirectFirstEventServiceImpl.java | 188 ++++++++++++++++++ .../corepersistence/index/IndexingStrategy.java | 69 +++++++ .../corepersistence/util/CpCollectionUtils.java | 12 ++ .../persistence/queue/LegacyQueueFig.java | 5 + .../interceptors/GZIPWriterInterceptor.java | 78 ++++++++ 13 files changed, 731 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java index 3f3794e..1c979d6 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java @@ -32,6 +32,7 @@ import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.index.CollectionSettings; import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl; +import org.apache.usergrid.corepersistence.index.IndexingStrategy; import org.apache.usergrid.corepersistence.service.CollectionService; import org.apache.usergrid.corepersistence.service.ConnectionService; import org.apache.usergrid.corepersistence.util.CpCollectionUtils; @@ -523,7 +524,7 @@ public class CpEntityManager implements EntityManager { String entityType = cpEntity.getId().getType(); boolean skipIndexingForType = skipIndexingForType(entityType); - Boolean asyncIndex = asyncIndexingForType(entityType); + IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType); try { @@ -551,14 +552,14 @@ public class CpEntityManager implements EntityManager { } if (!skipIndexingForType) { - indexEntity(cpEntity, asyncIndex); + indexEntity(cpEntity, indexingStrategy); deIndexOldVersionsOfEntity(cpEntity); } } - private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, Boolean async) { + private void indexEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity, IndexingStrategy indexingStrategy) { // queue an event to update the new entity - indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , async); + indexService.queueEntityIndexUpdate( applicationScope, cpEntity, 0 , indexingStrategy); } private void deIndexOldVersionsOfEntity(org.apache.usergrid.persistence.model.entity.Entity cpEntity) { @@ -568,12 +569,11 @@ public class CpEntityManager implements EntityManager { } } - - private Boolean asyncIndexingForType( String type ) { - return CpCollectionUtils.asyncIndexingForType(collectionSettingsFactory, applicationId, type); - + private IndexingStrategy getIndexingStrategyForType(String type ) { + return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type); } + private boolean skipIndexingForType( String type ) { return CpCollectionUtils.skipIndexingForType(collectionSettingsFactory, applicationId, type); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java index 06f06ad..a23d6ac 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java @@ -23,6 +23,7 @@ import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService; import org.apache.usergrid.corepersistence.index.CollectionSettings; import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl; +import org.apache.usergrid.corepersistence.index.IndexingStrategy; import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage; import org.apache.usergrid.corepersistence.results.ConnectionRefQueryExecutor; import org.apache.usergrid.corepersistence.results.EntityQueryExecutor; @@ -396,8 +397,8 @@ public class CpRelationManager implements RelationManager { String entityType = cpHeadEntity.getId().getType(); if ( !skipIndexingForType( entityType) ) { - Boolean async = asyncIndexingForType(entityType); - indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, async); + IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType); + indexService.queueNewEdge(applicationScope, cpHeadEntity.getId(), reverseEdge, indexingStrategy); } } ); @@ -405,8 +406,8 @@ public class CpRelationManager implements RelationManager { String entityType = memberEntity.getId().getType(); if ( !skipIndexingForType( entityType ) ) { - Boolean async = asyncIndexingForType(entityType); - indexService.queueNewEdge(applicationScope, memberEntityId, edge, async); + IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType); + indexService.queueNewEdge(applicationScope, memberEntityId, edge, indexingStrategy); } @@ -714,7 +715,6 @@ public class CpRelationManager implements RelationManager { ConnectionRefImpl connection = new ConnectionRefImpl( headEntity, connectionType, connectedEntityRef ); - if ( logger.isTraceEnabled() ) { logger.trace( "createConnection(): Indexing connection type '{}'\n from source {}:{}]\n to target {}:{}\n app {}", connectionType, headEntity.getType(), headEntity.getUuid(), connectedEntityRef.getType(), @@ -738,8 +738,8 @@ public class CpRelationManager implements RelationManager { String entityType = targetEntity.getId().getType(); if ( !skipIndexingForType( entityType ) ) { - Boolean async = asyncIndexingForType(entityType); - indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, async); + IndexingStrategy indexingStrategy = getIndexingStrategyForType(entityType); + indexService.queueNewEdge(applicationScope, targetEntity.getId(), edge, indexingStrategy); } // remove any duplicate edges (keeps the duplicate edge with same timestamp) @@ -1100,8 +1100,8 @@ public class CpRelationManager implements RelationManager { } - private Boolean asyncIndexingForType( String type ) { - return CpCollectionUtils.asyncIndexingForType(collectionSettingsFactory, applicationId, type); + private IndexingStrategy getIndexingStrategyForType(String type ) { + return CpCollectionUtils.getIndexingStrategyForType(collectionSettingsFactory, applicationId, type); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java index 1ddbac4..4305aea 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java @@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.asyncevents; import org.apache.usergrid.corepersistence.index.CollectionDeleteAction; +import org.apache.usergrid.corepersistence.index.IndexingStrategy; import org.apache.usergrid.corepersistence.index.ReIndexAction; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; @@ -52,8 +53,9 @@ public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction * @param applicationScope * @param entity The entity to index. Should be fired when an entity is updated * @param updatedAfter + * @param */ - void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, Boolean async); + void queueEntityIndexUpdate(final ApplicationScope applicationScope, final Entity entity, long updatedAfter, IndexingStrategy strategy); /** @@ -66,7 +68,7 @@ public interface AsyncEventService extends ReIndexAction, CollectionDeleteAction * @param entityId * @param newEdge */ - void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, Boolean async); + void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, IndexingStrategy indexingStrategy); /** * Queue the deletion of an edge http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java index 3e67110..8257640 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java @@ -29,11 +29,9 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Singleton; +import org.apache.usergrid.corepersistence.index.IndexingStrategy; import org.apache.usergrid.corepersistence.asyncevents.model.*; -import org.apache.usergrid.corepersistence.index.EntityIndexOperation; -import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; -import org.apache.usergrid.corepersistence.index.IndexProcessorFig; -import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy; +import org.apache.usergrid.corepersistence.index.*; import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; import org.apache.usergrid.corepersistence.util.CpNamingUtils; import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer; @@ -216,6 +214,10 @@ public class AsyncEventServiceImpl implements AsyncEventService { start(); } + protected Histogram getMessageCycye() { + return messageCycle; + } + private String getQueueName(AsyncEventQueueType queueType) { switch (queueType) { case REGULAR: @@ -275,25 +277,29 @@ public class AsyncEventServiceImpl implements AsyncEventService { /** * Offer the EntityIdScope to SQS */ - private void offer(final Serializable operation) { - offer(operation, AsyncEventQueueType.REGULAR, null); + protected void offer(final Serializable operation) { + offer(operation, AsyncEventQueueType.REGULAR, IndexingStrategy.DIRECT); } /** * Offer the EntityIdScope to SQS */ - private void offer(final Serializable operation, Boolean async) { - offer(operation, AsyncEventQueueType.REGULAR, async); + protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) { + offer(operation, AsyncEventQueueType.REGULAR, indexingStrategy); } /** * Offer the EntityIdScope to SQS */ - private void offer(final Serializable operation, AsyncEventQueueType queueType, Boolean async) { + private void offer(final Serializable operation, AsyncEventQueueType queueType, IndexingStrategy indexingStrategy) { final Timer.Context timer = this.writeTimer.time(); try { //signal to SQS + Boolean async = null; + if (indexingStrategy != IndexingStrategy.DEFAULT) { + async = (indexingStrategy == IndexingStrategy.ASYNC); + } getQueue(queueType).sendMessageToLocalRegion(operation, async); } catch (IOException e) { @@ -402,7 +408,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { * @param messages * @return */ - private List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) { + protected List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) { if (logger.isDebugEnabled()) { logger.debug("callEventHandlers with {} message(s)", messages.size()); @@ -542,7 +548,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { @Override public void queueEntityIndexUpdate(final ApplicationScope applicationScope, - final Entity entity, long updatedAfter, Boolean async) { + final Entity entity, long updatedAfter, IndexingStrategy indexingStrategy) { if (logger.isTraceEnabled()) { @@ -555,7 +561,7 @@ public class AsyncEventServiceImpl implements AsyncEventService { new EntityIdScope(applicationScope, entity.getId()), updatedAfter); - offer(event, async); + offer(event, indexingStrategy); } @@ -593,14 +599,14 @@ public class AsyncEventServiceImpl implements AsyncEventService { public void queueNewEdge(final ApplicationScope applicationScope, final Id entityId, final Edge newEdge, - Boolean async) { + IndexingStrategy indexingStrategy) { if (logger.isTraceEnabled()) { logger.trace("Offering EdgeIndexEvent for edge type {} entity {}:{}", newEdge.getType(), entityId.getUuid(), entityId.getType()); } - offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), async); + offer( new EdgeIndexEvent( queueFig.getPrimaryRegion(), applicationScope, entityId, newEdge ), indexingStrategy); } @@ -704,7 +710,24 @@ public class AsyncEventServiceImpl implements AsyncEventService { offerTopic( elasticsearchIndexEvent, queueType ); } - private void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) + protected ElasticsearchIndexEvent getIndexOperationMessage(final IndexOperationMessage indexOperationMessage) { + + final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage ); + + final UUID newMessageId = UUIDGenerator.newTimeUUID(); + + final int expirationTimeInSeconds = + ( int ) TimeUnit.MILLISECONDS.toSeconds( indexProcessorFig.getIndexMessageTtl() ); + + //write to the map in ES + esMapPersistence.putString( newMessageId.toString(), jsonValue, expirationTimeInSeconds ); + + return new ElasticsearchIndexEvent(queueFig.getPrimaryRegion(), newMessageId ); + + } + + + protected void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent) throws IndexDocNotFoundException { Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" ); http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java index 2ba6c0b..e5e981b 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java @@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.asyncevents; import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; import org.apache.usergrid.corepersistence.index.IndexProcessorFig; +import org.apache.usergrid.corepersistence.asyncevents.direct.DirectFirstEventServiceImpl; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.metrics.MetricsFactory; @@ -105,7 +106,40 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { final LegacyQueueManager.Implementation impl = LegacyQueueManager.Implementation.valueOf(value); - final AsyncEventServiceImpl asyncEventService = new AsyncEventServiceImpl( + final AsyncEventServiceImpl asyncEventService = getAsyncEventService(); + + if ( impl.equals( LOCAL )) { + asyncEventService.MAX_TAKE = 1000; + } + + if ( impl.equals( DISTRIBUTED )) { + asyncEventService.MAX_TAKE = 500; + } + + return asyncEventService; + } + + + private AsyncEventServiceImpl getAsyncEventService() { + + + AsyncEventServiceImpl asyncEventService; +/* + asyncEventService = new AsyncEventServiceImpl( + queueManagerFactory, + indexProcessorFig, + indexProducer, + metricsFactory, + entityCollectionManagerFactory, + indexLocationStrategyFactory, + entityIndexFactory, + eventBuilder, + mapManagerFactory, + queueFig, + rxTaskScheduler); + */ + + asyncEventService = new DirectFirstEventServiceImpl( queueManagerFactory, indexProcessorFig, indexProducer, @@ -118,14 +152,6 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> { queueFig, rxTaskScheduler ); - if ( impl.equals( LOCAL )) { - asyncEventService.MAX_TAKE = 1000; - } - - if ( impl.equals( DISTRIBUTED )) { - asyncEventService.MAX_TAKE = 500; - } - return asyncEventService; } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueue.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueue.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueue.java new file mode 100644 index 0000000..ab5f0b9 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueue.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.corepersistence.asyncevents.direct; + + +import java.util.List; +import java.util.function.Consumer; + +/** + * Created by peterajohnson on 9/8/17. + */ +public interface BufferedQueue<T> { + + /** + * Set the consumer of these events + * @param consumer + */ + void setConsumer(Consumer<List<T>> consumer); + + /** + * Offer an entity. May block + * + * @param t + * @return + */ + boolean offer(T t); + + /** + * @return the current size of the queue + */ + int size(); + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java new file mode 100644 index 0000000..9123138 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueImpl.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.corepersistence.asyncevents.direct; + + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.function.Consumer; + +/** + * Bufferes events and dispatched then in batches. + * Ensures that the callback will be called at a min interval. + */ +public class BufferedQueueImpl<T> implements BufferedQueue<T> { + + private String fileName = "my_file_name.txt"; + private Consumer<List<T>> consumer; + + ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1); + + private final LinkedBlockingQueue<PendingDispatch> queue; + private final long intervalNanos; + private long timeOfLastDispatch = 0L; + + public BufferedQueueImpl(int size, long interval , TimeUnit intervalTimeUnit) { + + Runtime.getRuntime().addShutdownHook(new Thread(new DispatchTask())); + + this.intervalNanos = intervalTimeUnit.toNanos(interval); + threadPool.scheduleAtFixedRate(new DispatchTask(), intervalNanos,intervalNanos, TimeUnit.NANOSECONDS); + readBatchFile(); + queue = new LinkedBlockingQueue<>(size); + } + + public boolean offer(T t) { + PendingDispatch pd = new PendingDispatch(t); + if (timeOfLastDispatch + intervalNanos < System.nanoTime()) { + dispatchOne(pd); + return true; + } + try { + return queue.offer(pd, intervalNanos, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + return false; + } + } + + public void setConsumer(Consumer<List<T>> consumer) { + this.consumer = consumer; + } + + + private void dispatchOne(PendingDispatch pd) { + List<PendingDispatch> messages = new ArrayList<>(); + messages.add(pd); + dispatchMessages(messages); + } + + protected void dispatchAll() { + if (!queue.isEmpty()) { + List<PendingDispatch> messages = new ArrayList<>(); + queue.drainTo(messages); + dispatchMessages(messages); + } + } + + private void dispatchMessages(List<PendingDispatch> messages) { + List<T> m = new ArrayList<>(); + for (PendingDispatch pd : messages) { + if (!pd.isCancelled()) { + m.add(pd.getWrapped()); + } + } + timeOfLastDispatch = System.nanoTime(); + Boolean sent = Boolean.TRUE; + try { + consumer.accept(m); + } catch (Exception e) { + sent = Boolean.FALSE; + } + for (PendingDispatch pd : messages) { + pd.setResult(sent); + } + } + + + public int size() { + return queue.size(); + } + + private void readBatchFile() { + + } + + + // + // Internal Helper classes + // + + + + private class PendingDispatch implements Future<Boolean> { + T wrapped; + boolean canceled; + boolean done; + Boolean result = null; + + PendingDispatch(T wrapped) { + this.wrapped = wrapped; + canceled = false; + done = false; + } + + T getWrapped() { + return wrapped; + } + + void setResult(Boolean b) { + result = b; + done = true; + synchronized (this) { + notify(); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + canceled = true; + return canceled; + } + + @Override + public boolean isCancelled() { + return canceled; + } + + @Override + public boolean isDone() { + return done; + } + + @Override + public Boolean get() throws InterruptedException, ExecutionException { + while (!done) { + synchronized (this) { + wait(100); + } + } + return result; + } + + @Override + public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (!done) { + synchronized (this) { + wait(unit.toMillis(timeout)); + } + } + return result; + } + } + + + private class DispatchTask implements Runnable { + @Override + public void run() { + try { + dispatchAll(); + } catch (Throwable t) { + } + } + } + +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java new file mode 100644 index 0000000..f842cea --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/BufferedQueueNOP.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.corepersistence.asyncevents.direct; + +import java.util.function.Consumer; + +/** + * Created by peterajohnson on 10/27/17. + */ +public class BufferedQueueNOP<T> implements BufferedQueue<T> { + + private Consumer consumer; + + @Override + public void setConsumer(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public boolean offer(T o) { + consumer.accept(o); + return true; + } + + @Override + public int size() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java new file mode 100644 index 0000000..4dfce37 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/direct/DirectFirstEventServiceImpl.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.corepersistence.asyncevents.direct; + +import org.apache.usergrid.corepersistence.asyncevents.AsyncEventServiceImpl; +import org.apache.usergrid.corepersistence.asyncevents.EventBuilder; +import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent; +import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; +import org.apache.usergrid.corepersistence.index.IndexProcessorFig; +import org.apache.usergrid.corepersistence.index.IndexingStrategy; +import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; +import org.apache.usergrid.persistence.core.metrics.MetricsFactory; +import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; +import org.apache.usergrid.persistence.index.impl.IndexProducer; +import org.apache.usergrid.persistence.map.MapManagerFactory; +import org.apache.usergrid.persistence.queue.LegacyQueueFig; +import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory; +import org.apache.usergrid.persistence.queue.LegacyQueueMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Implementation of the AsyncEventService that writes first directly to ES + * and then submits to ASW as a backup. + * + * Created by peterajohnson on 8/29/17. + */ +public class DirectFirstEventServiceImpl extends AsyncEventServiceImpl { + + + private static final Logger logger = LoggerFactory.getLogger(DirectFirstEventServiceImpl.class); + + private IndexingStrategy configIndexingStrategy = IndexingStrategy.ASYNC; + + private BufferedQueue<Serializable> bufferedBatchQueue = new BufferedQueueNOP<>(); + + public DirectFirstEventServiceImpl(LegacyQueueManagerFactory queueManagerFactory, IndexProcessorFig indexProcessorFig, IndexProducer indexProducer, MetricsFactory metricsFactory, EntityCollectionManagerFactory entityCollectionManagerFactory, IndexLocationStrategyFactory indexLocationStrategyFactory, EntityIndexFactory entityIndexFactory, EventBuilder eventBuilder, MapManagerFactory mapManagerFactory, LegacyQueueFig queueFig, RxTaskScheduler rxTaskScheduler) { + super(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler); + + //bufferedBatchQueue = new BufferedQueueImpl<>(5000, 100, TimeUnit.MILLISECONDS); + bufferedBatchQueue.setConsumer((c) -> { dispatchToES(c); }); + + configIndexingStrategy = IndexingStrategy.get(queueFig.getQueueStrategy()); + + } + + protected void dispatchToES(final List<Serializable> bodies) { + + List<LegacyQueueMessage> messages = new ArrayList<>(); + for (Serializable body : bodies) { + String uuid = UUID.randomUUID().toString(); + LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here"); + messages.add(message); + } + + List<IndexEventResult> result = callEventHandlers(messages); + + // failed to dispatch send to SQS + try { + List<LegacyQueueMessage> indexedMessages = submitToIndex(result, false); + } catch (Exception e) { + e.printStackTrace(); + for (Serializable body : bodies) { + super.offer(body); + } + } + + + } + + /** + * Offer the EntityIdScope to SQS + * + * The body will be an implementation of one of the following: + * EntityIndexEvent + * EntityDeleteEvent + * EdgeIndexEvent + * EdgeDeleteEvent + */ + protected void offer(final Serializable body) { + List<LegacyQueueMessage> messages = getMessageArray(body); + List<IndexEventResult> result = callEventHandlers(messages); + submitToIndex( result, false ); + super.offer(body); + } + + private List<LegacyQueueMessage> getMessageArray(final Serializable body) { + String uuid = UUID.randomUUID().toString(); + + LegacyQueueMessage message = new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here"); + + if (logger.isDebugEnabled()) { + logger.debug("Sync Handler called for body of class {} ", body.getClass().getSimpleName()); + } + + List<LegacyQueueMessage> messages = new ArrayList<>(); + messages.add(message); + return messages; + } + + + protected void offer(final Serializable operation, IndexingStrategy indexingStrategy) { + if (shouldSendToDirectToES(indexingStrategy)) { + List<LegacyQueueMessage> messages = getMessageArray(operation); + List<IndexEventResult> result = callEventHandlers(messages); + submitToIndex( result, false ); + } + + // only if single region. + if (shouldSendToAWS(indexingStrategy)) { + super.offer(operation, indexingStrategy); + } + } + + + protected List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults, boolean forUtilityQueue) { + + // if nothing came back then return empty list + if(indexEventResults==null){ + return new ArrayList<>(0); + } + + IndexOperationMessage combined = new IndexOperationMessage(); + List<LegacyQueueMessage> queueMessages = indexEventResults.stream() + + // filter out messages that are not present, they were not processed and put into the results + .filter( result -> result.getQueueMessage().isPresent() ) + .map(indexEventResult -> { + + //record the cycle time + getMessageCycye().update(System.currentTimeMillis() - indexEventResult.getCreationTime()); + + // ingest each index op into our combined, single index op for the index producer + if(indexEventResult.getIndexOperationMessage().isPresent()){ + combined.ingest(indexEventResult.getIndexOperationMessage().get()); + } + + return indexEventResult.getQueueMessage().get(); + }) + // collect into a list of QueueMessages that can be ack'd later + .collect(Collectors.toList()); + + + // dispatch to ES + ElasticsearchIndexEvent elasticsearchIndexEvent = getIndexOperationMessage(combined); + handleIndexOperation(elasticsearchIndexEvent); + return queueMessages; + } + + private boolean shouldSendToDirectToES(IndexingStrategy indexingStrategy) { + if (indexingStrategy == IndexingStrategy.DEFAULT) { + indexingStrategy = configIndexingStrategy; + } + return (indexingStrategy == IndexingStrategy.DIRECT || indexingStrategy == IndexingStrategy.DIRECTONLY); + } + + private boolean shouldSendToAWS(IndexingStrategy indexingStrategy) { + if (indexingStrategy == IndexingStrategy.DEFAULT) { + indexingStrategy = configIndexingStrategy; + } + // and is in same region. + return (indexingStrategy != IndexingStrategy.DIRECTONLY); + } +} http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java new file mode 100644 index 0000000..69c5445 --- /dev/null +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexingStrategy.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.usergrid.corepersistence.index; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * This class describes the paths an index request can take + * between tomcat and ES. + * + * Created by peterajohnson on 10/30/17. + */ +public enum IndexingStrategy { + + DIRECTONLY("directonly"), // Index request is sent directly to ES and not to AWS + DIRECT("direct"), // Index request is sent directly to ES before sync ASW + SYNC("sync"), // Index request is sent via a sync AWS to ES + ASYNC("async"), // Index request is sent via an async AWS to ES + DEFAULT("default"); // Follow the default setting + + private String name; + + private static final Map<String,IndexingStrategy> NAME_MAP; + + static { + Map<String,IndexingStrategy> map = new HashMap<String,IndexingStrategy>(); + for (IndexingStrategy instance : IndexingStrategy.values()) { + map.put(instance.getName(),instance); + } + NAME_MAP = Collections.unmodifiableMap(map); + } + + IndexingStrategy(String name) { + this.name = name; + } + + public static IndexingStrategy get(String name) { + IndexingStrategy indexingStrategy = NAME_MAP.get(name); + if (indexingStrategy == null) { + return DEFAULT; + } + return indexingStrategy; + } + + + public String getName() { + return this.name; + } + +} + http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java ---------------------------------------------------------------------- diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java index cef6d12..f38cefa 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpCollectionUtils.java @@ -21,10 +21,12 @@ import org.apache.usergrid.corepersistence.index.CollectionSettings; import org.apache.usergrid.corepersistence.index.CollectionSettingsFactory; import org.apache.usergrid.corepersistence.index.CollectionSettingsScopeImpl; +import org.apache.usergrid.corepersistence.index.IndexingStrategy; import org.apache.usergrid.persistence.*; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.model.entity.SimpleId; + import java.util.*; import static org.apache.usergrid.persistence.Schema.*; @@ -52,6 +54,16 @@ public class CpCollectionUtils { return VALID_SETTING_NAMES; } + public static IndexingStrategy getIndexingStrategyForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) { + + IndexingStrategy indexingStrategy = IndexingStrategy.DEFAULT; + String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX); + if (indexing != null) { + indexingStrategy = IndexingStrategy.get(indexing); + } + return indexingStrategy; + } + public static Boolean asyncIndexingForType(CollectionSettingsFactory collectionSettingsFactory, UUID applicationId, String type ) { String indexing = getFieldForType(applicationId, collectionSettingsFactory, type, SETTING_QUEUE_INDEX); http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java index f19bede..4a12d14 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java @@ -114,4 +114,9 @@ public interface LegacyQueueFig extends GuicyFig { @Default("true") boolean isAsyncQueue(); + + @Key("usergrid.queue.strategy") + @Default("async") + String getQueueStrategy(); + } http://git-wip-us.apache.org/repos/asf/usergrid/blob/71397815/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java new file mode 100644 index 0000000..f562475 --- /dev/null +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/interceptors/GZIPWriterInterceptor.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.usergrid.rest.interceptors; + +import org.glassfish.jersey.server.ContainerRequest; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import javax.inject.*; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.ext.*; +import javax.ws.rs.ext.Provider; + +/** + * If the request had an ACCEPT_ENCODING header containing 'gzip' then + * gzip the response and add CONTENT_ENCODING gzip header + * + * * If the request had an CONTENT_ENCODING header containing 'gzip' then + * unzip the request and remove the CONTENT_ENCODING gzip header + * Created by peterajohnson on 11/1/17. + */ +@Provider +public class GZIPWriterInterceptor implements ReaderInterceptor, WriterInterceptor { + + final private static String GZIP = "gzip"; + @Inject + private javax.inject.Provider<ContainerRequest> requestProvider; + + @Override + public void aroundWriteTo(WriterInterceptorContext context) throws IOException,WebApplicationException { + ContainerRequest request = requestProvider.get(); + + if (request != null) { + List<String> aeHeaders = request.getRequestHeader(HttpHeaders.ACCEPT_ENCODING); + if (aeHeaders != null && aeHeaders.size() > 0) { + String acceptEncodingHeader = aeHeaders.get(0); + if (acceptEncodingHeader.contains(GZIP)) { + OutputStream outputStream = context.getOutputStream(); + context.setOutputStream(new GZIPOutputStream(outputStream)); + context.getHeaders().putSingle(HttpHeaders.CONTENT_ENCODING, GZIP); + } + } + } + context.proceed(); + } + + @Override + public Object aroundReadFrom(ReaderInterceptorContext context) throws IOException, WebApplicationException { + String encoding = context.getHeaders().getFirst(HttpHeaders.CONTENT_ENCODING); + if (GZIP.equalsIgnoreCase(encoding)) { + GZIPInputStream is = new GZIPInputStream(context.getInputStream()); + context.getHeaders().remove(HttpHeaders.CONTENT_ENCODING); + context.setInputStream(is); + } + + return context.proceed(); + } +}