Author: tommaso Date: Thu Aug 18 14:17:38 2016 New Revision: 1756791 URL: http://svn.apache.org/viewvc?rev=1756791&view=rev Log: SLING-5977 - moved SimpleDistributionAgent inner classes out, added tests, created proper class for agent authentication info
Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java (with props) sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java (with props) sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java (with props) sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java (with props) sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java (with props) sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java (with props) Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/DistributionUtils.java sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java?rev=1756791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java (added) +++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java Thu Aug 18 14:17:38 2016 @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.agent.impl; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.sling.distribution.DistributionRequestState; +import org.apache.sling.distribution.DistributionResponse; +import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.component.impl.DistributionComponentKind; +import org.apache.sling.distribution.event.DistributionEventTopics; +import org.apache.sling.distribution.event.impl.DistributionEventFactory; +import org.apache.sling.distribution.impl.SimpleDistributionResponse; +import org.apache.sling.distribution.log.impl.DefaultDistributionLog; +import org.apache.sling.distribution.packaging.DistributionPackage; +import org.apache.sling.distribution.packaging.DistributionPackageProcessor; +import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils; +import org.apache.sling.distribution.queue.DistributionQueueItemState; +import org.apache.sling.distribution.queue.DistributionQueueItemStatus; +import org.apache.sling.distribution.queue.DistributionQueueProvider; +import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy; + +/** + * The package exporter callback function is responsible to process the exported packages. + * The exported packages are scheduled for import by passing them to a {@link DistributionQueueDispatchingStrategy}. + */ +class DistributionPackageExporterProcessor implements DistributionPackageProcessor { + + private final String callingUser; + private final String requestId; + private final long requestStartTime; + private final AtomicInteger packagesCount = new AtomicInteger(); + private final AtomicLong packagesSize = new AtomicLong(); + private final List<DistributionResponse> allResponses = new LinkedList<DistributionResponse>(); + + private final DistributionEventFactory distributionEventFactory; + private final DistributionQueueDispatchingStrategy scheduleQueueStrategy; + private final DistributionQueueProvider queueProvider; + private final DefaultDistributionLog log; + private final String agentName; + + public List<DistributionResponse> getAllResponses() { + return allResponses; + } + + public int getPackagesCount() { + return packagesCount.get(); + } + + public long getPackagesSize() { + return packagesSize.get(); + } + + DistributionPackageExporterProcessor(@Nullable String callingUser, @Nonnull String requestId, long requestStartTime, + @Nonnull DistributionEventFactory distributionEventFactory, + @Nonnull DistributionQueueDispatchingStrategy scheduleQueueStrategy, + @Nonnull DistributionQueueProvider queueProvider, @Nonnull DefaultDistributionLog log, + @Nonnull String agentName) { + this.callingUser = callingUser; + this.requestId = requestId; + this.requestStartTime = requestStartTime; + this.distributionEventFactory = distributionEventFactory; + this.scheduleQueueStrategy = scheduleQueueStrategy; + this.queueProvider = queueProvider; + this.log = log; + this.agentName = agentName; + } + + @Override + public void process(DistributionPackage distributionPackage) { + final long startTime = System.currentTimeMillis(); + + Collection<SimpleDistributionResponse> responses = scheduleImportPackage(distributionPackage, callingUser, + requestId, requestStartTime); + packagesCount.incrementAndGet(); + packagesSize.addAndGet(distributionPackage.getSize()); + allResponses.addAll(responses); + + final long endTime = System.currentTimeMillis(); + + log.debug("PACKAGE-QUEUED {}: packageId={}, paths={}, queueTime={}ms, responses={}", requestId, distributionPackage.getId(), + distributionPackage.getInfo().getPaths(), endTime - startTime, responses.size()); + } + + private Collection<SimpleDistributionResponse> scheduleImportPackage(DistributionPackage distributionPackage, String callingUser, String requestId, long startTime) { + Collection<SimpleDistributionResponse> distributionResponses = new LinkedList<SimpleDistributionResponse>(); + + // dispatch the distribution package to one or more queues + try { + // add metadata to the package + distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, callingUser); + distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID, requestId); + distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME, startTime); + + // put the package in the queue + Iterable<DistributionQueueItemStatus> states = scheduleQueueStrategy.add(distributionPackage, queueProvider); + for (DistributionQueueItemStatus state : states) { + DistributionRequestState requestState = getRequestStateFromQueueState(state.getItemState()); + distributionResponses.add(new SimpleDistributionResponse(requestState, state.getItemState().toString())); + } + + distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_QUEUED, + DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo()); + } catch (DistributionException e) { + log.error("an error happened during dispatching items to the queue(s)", e); + distributionResponses.add(new SimpleDistributionResponse(DistributionRequestState.DROPPED, e.toString())); + } + + return distributionResponses; + } + + /* Convert the state of a certain item in the queue into a request state */ + private DistributionRequestState getRequestStateFromQueueState(DistributionQueueItemState itemState) { + DistributionRequestState requestState; + switch (itemState) { + case QUEUED: + requestState = DistributionRequestState.ACCEPTED; + break; + case ERROR: + requestState = DistributionRequestState.DROPPED; + break; + default: + requestState = DistributionRequestState.DROPPED; + break; + } + return requestState; + } +} Propchange: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java?rev=1756791&r1=1756790&r2=1756791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java (original) +++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java Thu Aug 18 14:17:38 2016 @@ -20,23 +20,12 @@ package org.apache.sling.distribution.ag import javax.annotation.Nonnull; import javax.annotation.Nullable; -import javax.jcr.RepositoryException; -import javax.jcr.Session; -import javax.jcr.SimpleCredentials; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.sling.api.resource.LoginException; -import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.distribution.DistributionRequest; @@ -45,38 +34,28 @@ import org.apache.sling.distribution.Dis import org.apache.sling.distribution.DistributionResponse; import org.apache.sling.distribution.agent.DistributionAgent; import org.apache.sling.distribution.agent.DistributionAgentState; -import org.apache.sling.distribution.common.RecoverableDistributionException; +import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.component.impl.DistributionComponentKind; import org.apache.sling.distribution.component.impl.SettingsUtils; import org.apache.sling.distribution.event.DistributionEventTopics; import org.apache.sling.distribution.event.impl.DistributionEventFactory; import org.apache.sling.distribution.impl.CompositeDistributionResponse; -import org.apache.sling.distribution.common.DistributionException; import org.apache.sling.distribution.impl.SimpleDistributionResponse; import org.apache.sling.distribution.log.DistributionLog; import org.apache.sling.distribution.log.impl.DefaultDistributionLog; -import org.apache.sling.distribution.packaging.DistributionPackageProcessor; -import org.apache.sling.distribution.queue.impl.SimpleAgentDistributionQueue; import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.distribution.packaging.DistributionPackageExporter; import org.apache.sling.distribution.packaging.DistributionPackageImporter; -import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils; import org.apache.sling.distribution.queue.DistributionQueue; -import org.apache.sling.distribution.queue.DistributionQueueEntry; - -import org.apache.sling.distribution.queue.DistributionQueueItem; -import org.apache.sling.distribution.queue.DistributionQueueItemState; -import org.apache.sling.distribution.queue.DistributionQueueItemStatus; import org.apache.sling.distribution.queue.DistributionQueueProcessor; import org.apache.sling.distribution.queue.DistributionQueueProvider; import org.apache.sling.distribution.queue.DistributionQueueState; import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy; +import org.apache.sling.distribution.queue.impl.SimpleAgentDistributionQueue; import org.apache.sling.distribution.trigger.DistributionRequestHandler; import org.apache.sling.distribution.trigger.DistributionTrigger; import org.apache.sling.distribution.util.impl.DistributionUtils; import org.apache.sling.jcr.api.SlingRepository; -import org.apache.sling.jcr.resource.JcrResourceConstants; - /** * Basic implementation of a {@link org.apache.sling.distribution.agent.DistributionAgent} @@ -92,22 +71,19 @@ public class SimpleDistributionAgent imp private final DistributionRequestAuthorizationStrategy distributionRequestAuthorizationStrategy; private final DefaultDistributionLog log; private final DistributionEventFactory distributionEventFactory; + private final DistributionQueueProcessor queueProcessor; private AgentBasedRequestHandler agentBasedRequestHandler; - private final SlingRepository slingRepository; - private final ResourceResolverFactory resourceResolverFactory; - private final String name; private final boolean queueProcessingEnabled; private final DistributionRequestType[] allowedRequests; - private final String subServiceName; private boolean active = false; private final Set<String> processingQueues; - private final int retryAttempts; - private final boolean impersonateUser; private final String[] allowedRoots; private final AtomicInteger nextRequestId = new AtomicInteger(); + private final SimpleDistributionAgentAuthenticationInfo agentAuthenticationInfo; + public SimpleDistributionAgent(String name, boolean queueProcessingEnabled, Set<String> processingQueues, @@ -125,19 +101,14 @@ public class SimpleDistributionAgent imp DistributionRequestType[] allowedRequests, String[] allowedRoots, int retryAttempts) { - this.slingRepository = slingRepository; this.log = log; this.allowedRequests = allowedRequests; this.processingQueues = processingQueues; - this.retryAttempts = retryAttempts; validateConfiguration(name, queueProcessingEnabled, subServiceName, distributionPackageImporter, distributionPackageExporter, distributionRequestAuthorizationStrategy, queueProvider, scheduleQueueStrategy, distributionEventFactory, resourceResolverFactory); this.allowedRoots = SettingsUtils.removeEmptyEntries(allowedRoots); - this.subServiceName = SettingsUtils.removeEmptyEntry(subServiceName); - this.impersonateUser = this.subServiceName == null; this.distributionRequestAuthorizationStrategy = distributionRequestAuthorizationStrategy; - this.resourceResolverFactory = resourceResolverFactory; this.name = SettingsUtils.removeEmptyEntry(name); this.queueProcessingEnabled = queueProcessingEnabled; this.distributionPackageImporter = distributionPackageImporter; @@ -146,6 +117,9 @@ public class SimpleDistributionAgent imp this.scheduleQueueStrategy = scheduleQueueStrategy; this.errorQueueStrategy = errorQueueStrategy; this.distributionEventFactory = distributionEventFactory; + this.agentAuthenticationInfo = new SimpleDistributionAgentAuthenticationInfo(slingRepository, DEFAULT_AGENT_SERVICE, resourceResolverFactory, subServiceName); + this.queueProcessor = new SimpleDistributionAgentQueueProcessor(distributionPackageExporter, distributionPackageImporter, + retryAttempts, errorQueueStrategy, log, queueProvider, distributionEventFactory, agentAuthenticationInfo, name); } private void validateConfiguration(String name, boolean queueProcessingEnabled, String subServiceName, DistributionPackageImporter distributionPackageImporter, DistributionPackageExporter distributionPackageExporter, DistributionRequestAuthorizationStrategy distributionRequestAuthorizationStrategy, DistributionQueueProvider queueProvider, DistributionQueueDispatchingStrategy scheduleQueueStrategy, DistributionEventFactory distributionEventFactory, ResourceResolverFactory resourceResolverFactory) { @@ -178,7 +152,7 @@ public class SimpleDistributionAgent imp ResourceResolver agentResourceResolver = null; - final String requestId = "DSTRQ"+ nextRequestId.incrementAndGet(); + final String requestId = "DSTRQ" + nextRequestId.incrementAndGet(); String callingUser = resourceResolver.getUserID(); try { @@ -196,13 +170,15 @@ public class SimpleDistributionAgent imp boolean silent = DistributionRequestType.PULL.equals(distributionRequest.getRequestType()); - log.info(silent, "REQUEST-START {}: {} paths={}, user={}", new Object[]{ requestId, + log.info(silent, "REQUEST-START {}: {} paths={}, user={}", new Object[]{requestId, distributionRequest.getRequestType(), distributionRequest.getPaths(), callingUser}); // check permissions distributionRequestAuthorizationStrategy.checkPermission(resourceResolver, distributionRequest); - agentResourceResolver = getAgentResourceResolver(callingUser); + agentResourceResolver = DistributionUtils.getResourceResolver(callingUser, agentAuthenticationInfo.getAgentService(), + agentAuthenticationInfo.getSlingRepository(), agentAuthenticationInfo.getSubServiceName(), + agentAuthenticationInfo.getResourceResolverFactory()); // export packages CompositeDistributionResponse distributionResponse = exportPackages(agentResourceResolver, distributionRequest, callingUser, requestId); @@ -222,7 +198,7 @@ public class SimpleDistributionAgent imp distributionRequest.getRequestType(), distributionRequest.getPaths(), callingUser, e.getMessage()}); throw e; } finally { - ungetAgentResourceResolver(agentResourceResolver); + DistributionUtils.ungetResourceResolver(agentResourceResolver); } } @@ -233,7 +209,8 @@ public class SimpleDistributionAgent imp private CompositeDistributionResponse exportPackages(ResourceResolver agentResourceResolver, DistributionRequest distributionRequest, String callingUser, String requestId) throws DistributionException { final long startTime = System.currentTimeMillis(); // callback function - PackageExporterProcessor packageProcessor = new PackageExporterProcessor(callingUser, requestId, startTime); + DistributionPackageExporterProcessor packageProcessor = new DistributionPackageExporterProcessor(callingUser, requestId, startTime, + distributionEventFactory, scheduleQueueStrategy, queueProvider, log, name); // export packages distributionPackageExporter.exportPackages(agentResourceResolver, distributionRequest, packageProcessor); @@ -250,34 +227,6 @@ public class SimpleDistributionAgent imp return new CompositeDistributionResponse(distributionResponses, packagesCount, packagesSize, endTime - startTime); } - - private Collection<SimpleDistributionResponse> scheduleImportPackage(DistributionPackage distributionPackage, String callingUser, String requestId, long startTime) { - Collection<SimpleDistributionResponse> distributionResponses = new LinkedList<SimpleDistributionResponse>(); - - // dispatch the distribution package to the queue distribution handler - try { - // add metadata to packages - distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, callingUser); - distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID, requestId); - distributionPackage.getInfo().put(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME, startTime); - - // put the packages in the queue - Iterable<DistributionQueueItemStatus> states = scheduleQueueStrategy.add(distributionPackage, queueProvider); - for (DistributionQueueItemStatus state : states) { - DistributionRequestState requestState = getRequestStateFromQueueState(state.getItemState()); - distributionResponses.add(new SimpleDistributionResponse(requestState, state.getItemState().toString())); - } - - generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_QUEUED, distributionPackage); - } catch (DistributionException e) { - log.error("an error happened during dispatching items to the queue(s)", e); - distributionResponses.add(new SimpleDistributionResponse(DistributionRequestState.DROPPED, e.toString())); - } - - return distributionResponses; - } - - @Nonnull public Set<String> getQueueNames() { Set<String> queueNames = new TreeSet<String>(); @@ -347,7 +296,7 @@ public class SimpleDistributionAgent imp if (!isPassive()) { try { - queueProvider.enableQueueProcessing(new PackageQueueProcessor(), processingQueues.toArray(new String[processingQueues.size()])); + queueProvider.enableQueueProcessing(queueProcessor, processingQueues.toArray(new String[processingQueues.size()])); } catch (DistributionException e) { log.error("cannot enable queue processing", e); } @@ -402,126 +351,6 @@ public class SimpleDistributionAgent imp } - private boolean processQueueItem(String queueName, DistributionQueueEntry queueEntry) throws DistributionException { - boolean removeItemFromQueue = false; - ResourceResolver agentResourceResolver = null; - DistributionPackage distributionPackage = null; - DistributionQueueItem queueItem = queueEntry.getItem(); - DistributionQueueItemStatus queueItemStatus = queueEntry.getStatus(); - try { - - String callingUser = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, String.class); - String requestId = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID, String.class); - Long globalStartTime = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME, Long.class); - - agentResourceResolver = getAgentResourceResolver(callingUser); - - final long startTime = System.currentTimeMillis(); - - distributionPackage = distributionPackageExporter.getPackage(agentResourceResolver, queueItem.getPackageId()); - - if (distributionPackage != null) { - final long packageSize = distributionPackage.getSize(); - DistributionPackageUtils.mergeQueueEntry(distributionPackage.getInfo(), queueEntry); - - final DistributionRequestType requestType = distributionPackage.getInfo().getRequestType(); - final String[] paths = distributionPackage.getInfo().getPaths(); - - try { - distributionPackageImporter.importPackage(agentResourceResolver, distributionPackage); - generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, distributionPackage); - removeItemFromQueue = true; - final long endTime = System.currentTimeMillis(); - - log.info("[{}] PACKAGE-DELIVERED {}: {} paths={}, importTime={}ms, execTime={}ms, size={}B", new Object[] { - queueName, requestId, - requestType, paths, - endTime - startTime, endTime - globalStartTime, - packageSize - }); - } catch (RecoverableDistributionException e) { - log.error("[{}] PACKAGE-FAIL {}: could not deliver {}, {}", queueName, requestId, distributionPackage.getId(), e.getMessage()); - log.debug("could not deliver package {}", distributionPackage.getId(), e); - } catch (Throwable e) { - log.error("[{}] PACKAGE-FAIL {}: could not deliver package {} {}", queueName, requestId, distributionPackage.getId(), e.getMessage(), e); - - if (errorQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) { - removeItemFromQueue = reEnqueuePackage(distributionPackage); - log.info("[{}] PACKAGE-QUEUED {}: distribution package {} was enqueued to an error queue", queueName, requestId, distributionPackage.getId()); - } - } - } else { - removeItemFromQueue = true; // return success if package does not exist in order to clear the queue. - log.error("distribution package with id {} does not exist. the package will be skipped.", queueItem.getPackageId()); - } - } finally { - if (removeItemFromQueue) { - DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName); - } else { - DistributionPackageUtils.closeSafely(distributionPackage); - } - ungetAgentResourceResolver(agentResourceResolver); - } - - // return true if item should be removed from queue - return removeItemFromQueue; - } - - - private boolean reEnqueuePackage(DistributionPackage distributionPackage) { - - if (errorQueueStrategy == null) { - return false; - } - - try { - errorQueueStrategy.add(distributionPackage, queueProvider); - } catch (DistributionException e) { - log.error("could not reenqueue package {}", distributionPackage.getId(), e); - return false; - } - - return true; - } - - private ResourceResolver getAgentResourceResolver(String user) throws DistributionException { - ResourceResolver resourceResolver; - - try { - Map<String, Object> authenticationInfo = new HashMap<String, Object>(); - - if (impersonateUser && user != null) { - Session session = slingRepository.impersonateFromService(DEFAULT_AGENT_SERVICE, new SimpleCredentials(user, new char[0]), null); - authenticationInfo.put(JcrResourceConstants.AUTHENTICATION_INFO_SESSION, session); - resourceResolver = resourceResolverFactory.getResourceResolver(authenticationInfo); - } else { - authenticationInfo.put(ResourceResolverFactory.SUBSERVICE, subServiceName); - resourceResolver = resourceResolverFactory.getServiceResourceResolver(authenticationInfo); - } - - return resourceResolver; - } catch (LoginException le) { - throw new DistributionException(le); - } catch (RepositoryException re) { - throw new DistributionException(re); - } - } - - private void ungetAgentResourceResolver(ResourceResolver resourceResolver) { - - if (resourceResolver != null) { - try { - if (resourceResolver.hasChanges()) { - resourceResolver.commit(); - } - } catch (PersistenceException e) { - log.error("cannot commit changes to resource resolver", e); - } finally { - DistributionUtils.safelyLogout(resourceResolver); - } - } - } - private void generatePackageEvent(String topic, DistributionPackage... distributionPackages) { for (DistributionPackage distributionPackage : distributionPackages) { distributionEventFactory.generatePackageEvent(topic, DistributionComponentKind.AGENT, name, distributionPackage.getInfo()); @@ -582,82 +411,6 @@ public class SimpleDistributionAgent imp return true; } - /** - * processor for items in the queue - */ - class PackageQueueProcessor implements DistributionQueueProcessor { - public boolean process(@Nonnull String queueName, @Nonnull DistributionQueueEntry queueEntry) { - DistributionQueueItem queueItem = queueEntry.getItem(); - - try { - final long startTime = System.currentTimeMillis(); - - log.debug("[{}] ITEM-PROCESS processing item={}", queueName, queueItem); - - boolean success = processQueueItem(queueName, queueEntry); - - final long endTime = System.currentTimeMillis(); - - - log.debug("[{}] ITEM-PROCESSED item={}, status={}, processingTime={}ms", queueName, queueItem, success, endTime - startTime); - - return success; - - } catch (Throwable t) { - log.error("[{}] ITEM-FAIL item={}", queueName, queueItem, t); - return false; - } - } - } - - /** - * The package exporter callback function to process the exported packages. - * The exported packages are scheduled for import. - */ - class PackageExporterProcessor implements DistributionPackageProcessor { - - private final String callingUser; - private final String requestId; - private final long requestStartTime; - private final AtomicInteger packagesCount = new AtomicInteger(); - private final AtomicLong packagesSize = new AtomicLong(); - private final List<DistributionResponse> allResponses = new ArrayList<DistributionResponse>(); - - public List<DistributionResponse> getAllResponses() { - return allResponses; - } - - public int getPackagesCount() { - return packagesCount.get(); - } - - public long getPackagesSize() { - return packagesSize.get(); - } - - PackageExporterProcessor(String callingUser, String requestId, long requestStartTime) { - this.callingUser = callingUser; - this.requestId = requestId; - this.requestStartTime = requestStartTime; - } - - @Override - public void process(DistributionPackage distributionPackage) { - final long startTime = System.currentTimeMillis(); - - Collection<SimpleDistributionResponse> responses = scheduleImportPackage(distributionPackage, callingUser, - requestId, requestStartTime); - packagesCount.incrementAndGet(); - packagesSize.addAndGet(distributionPackage.getSize()); - allResponses.addAll(responses); - - final long endTime = System.currentTimeMillis(); - - log.debug("PACKAGE-QUEUED {}: packageId={}, paths={}, queueTime={}ms, responses={}", requestId, distributionPackage.getId(), - distributionPackage.getInfo().getPaths(), endTime - startTime, responses.size()); - } - } - public class AgentBasedRequestHandler implements DistributionRequestHandler { private final DistributionAgent agent; @@ -682,34 +435,20 @@ public class SimpleDistributionAgent imp ResourceResolver agentResourceResolver = null; try { - agentResourceResolver = getAgentResourceResolver(null); + agentResourceResolver = DistributionUtils.getResourceResolver(null, agentAuthenticationInfo.getAgentService(), + agentAuthenticationInfo.getSlingRepository(), agentAuthenticationInfo.getSubServiceName(), + agentAuthenticationInfo.getResourceResolverFactory()); agent.execute(agentResourceResolver, request); } catch (Throwable e) { log.error("Error executing handler {}", request, e); } finally { - ungetAgentResourceResolver(agentResourceResolver); + DistributionUtils.ungetResourceResolver(agentResourceResolver); } } } } - /* Convert the state of a certain item in the queue into a request state */ - private DistributionRequestState getRequestStateFromQueueState(DistributionQueueItemState itemState) { - DistributionRequestState requestState; - switch (itemState) { - case QUEUED: - requestState = DistributionRequestState.ACCEPTED; - break; - case ERROR: - requestState = DistributionRequestState.DROPPED; - break; - default: - requestState = DistributionRequestState.DROPPED; - break; - } - return requestState; - } } Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java?rev=1756791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java (added) +++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java Thu Aug 18 14:17:38 2016 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.agent.impl; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.jcr.api.SlingRepository; + +/** + * Authentication information required by a {@link SimpleDistributionAgent} to perform its tasks. + */ +public class SimpleDistributionAgentAuthenticationInfo { + + private final SlingRepository slingRepository; + private final String agentService; + + private final ResourceResolverFactory resourceResolverFactory; + private final String subServiceName; + + public SimpleDistributionAgentAuthenticationInfo(@Nonnull SlingRepository slingRepository, @Nonnull String agentService, + @Nonnull ResourceResolverFactory resourceResolverFactory, + @Nullable String subServiceName) { + this.slingRepository = slingRepository; + this.agentService = agentService; + this.resourceResolverFactory = resourceResolverFactory; + this.subServiceName = subServiceName; + } + + public SlingRepository getSlingRepository() { + return slingRepository; + } + + public String getAgentService() { + return agentService; + } + + public ResourceResolverFactory getResourceResolverFactory() { + return resourceResolverFactory; + } + + public String getSubServiceName() { + return subServiceName; + } +} Propchange: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfo.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java?rev=1756791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java (added) +++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java Thu Aug 18 14:17:38 2016 @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.agent.impl; + +import javax.annotation.Nonnull; + +import org.apache.sling.api.resource.ResourceResolver; +import org.apache.sling.distribution.DistributionRequestType; +import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.common.RecoverableDistributionException; +import org.apache.sling.distribution.component.impl.DistributionComponentKind; +import org.apache.sling.distribution.event.DistributionEventTopics; +import org.apache.sling.distribution.event.impl.DistributionEventFactory; +import org.apache.sling.distribution.log.impl.DefaultDistributionLog; +import org.apache.sling.distribution.packaging.DistributionPackage; +import org.apache.sling.distribution.packaging.DistributionPackageExporter; +import org.apache.sling.distribution.packaging.DistributionPackageImporter; +import org.apache.sling.distribution.packaging.impl.DistributionPackageUtils; +import org.apache.sling.distribution.queue.DistributionQueueEntry; +import org.apache.sling.distribution.queue.DistributionQueueItem; +import org.apache.sling.distribution.queue.DistributionQueueItemStatus; +import org.apache.sling.distribution.queue.DistributionQueueProcessor; +import org.apache.sling.distribution.queue.DistributionQueueProvider; +import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy; +import org.apache.sling.distribution.util.impl.DistributionUtils; + +/** + * A processor of agent queue entries, each entry's underlying package is fecthed and passed to the {@link DistributionPackageImporter} for import. + */ +class SimpleDistributionAgentQueueProcessor implements DistributionQueueProcessor { + + private final DistributionPackageExporter distributionPackageExporter; + private final DistributionPackageImporter distributionPackageImporter; + private final int retryAttempts; + private final DistributionQueueDispatchingStrategy errorQueueStrategy; + private final DefaultDistributionLog log; + private final DistributionQueueProvider queueProvider; + private final DistributionEventFactory distributionEventFactory; + private final SimpleDistributionAgentAuthenticationInfo authenticationInfo; + private final String agentName; + + public SimpleDistributionAgentQueueProcessor(DistributionPackageExporter distributionPackageExporter, + DistributionPackageImporter distributionPackageImporter, int retryAttempts, + DistributionQueueDispatchingStrategy errorQueueStrategy, DefaultDistributionLog log, + DistributionQueueProvider queueProvider, DistributionEventFactory distributionEventFactory, + SimpleDistributionAgentAuthenticationInfo authenticationInfo, String agentName) { + this.distributionPackageExporter = distributionPackageExporter; + + this.distributionPackageImporter = distributionPackageImporter; + this.retryAttempts = retryAttempts; + this.errorQueueStrategy = errorQueueStrategy; + this.log = log; + this.queueProvider = queueProvider; + this.distributionEventFactory = distributionEventFactory; + this.authenticationInfo = authenticationInfo; + this.agentName = agentName; + } + + @Override + public boolean process(@Nonnull String queueName, @Nonnull DistributionQueueEntry queueEntry) { + DistributionQueueItem queueItem = queueEntry.getItem(); + + try { + final long startTime = System.currentTimeMillis(); + + log.debug("[{}] ITEM-PROCESS processing item={}", queueName, queueItem); + + boolean success = processQueueItem(queueName, queueEntry); + + final long endTime = System.currentTimeMillis(); + + log.debug("[{}] ITEM-PROCESSED item={}, status={}, processingTime={}ms", queueName, queueItem, success, endTime - startTime); + + return success; + + } catch (Throwable t) { + log.error("[{}] ITEM-FAIL item={}", queueName, queueItem, t); + return false; + } + } + + private boolean processQueueItem(String queueName, DistributionQueueEntry queueEntry) throws DistributionException { + boolean removeItemFromQueue = false; + ResourceResolver agentResourceResolver = null; + DistributionPackage distributionPackage = null; + DistributionQueueItem queueItem = queueEntry.getItem(); + DistributionQueueItemStatus queueItemStatus = queueEntry.getStatus(); + try { + + String callingUser = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_USER, String.class); + String requestId = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_ID, String.class); + Long globalStartTime = queueItem.get(DistributionPackageUtils.PACKAGE_INFO_PROPERTY_REQUEST_START_TIME, Long.class); + + agentResourceResolver = DistributionUtils.getResourceResolver(callingUser, authenticationInfo.getAgentService(), + authenticationInfo.getSlingRepository(), authenticationInfo.getSubServiceName(), + authenticationInfo.getResourceResolverFactory()); + + final long startTime = System.currentTimeMillis(); + + distributionPackage = distributionPackageExporter.getPackage(agentResourceResolver, queueItem.getPackageId()); + + if (distributionPackage != null) { + final long packageSize = distributionPackage.getSize(); + DistributionPackageUtils.mergeQueueEntry(distributionPackage.getInfo(), queueEntry); + + final DistributionRequestType requestType = distributionPackage.getInfo().getRequestType(); + final String[] paths = distributionPackage.getInfo().getPaths(); + + try { + // import package + distributionPackageImporter.importPackage(agentResourceResolver, distributionPackage); + + // generated event + distributionEventFactory.generatePackageEvent(DistributionEventTopics.AGENT_PACKAGE_DISTRIBUTED, + DistributionComponentKind.AGENT, agentName, distributionPackage.getInfo()); + + removeItemFromQueue = true; + final long endTime = System.currentTimeMillis(); + + log.info("[{}] PACKAGE-DELIVERED {}: {} paths={}, importTime={}ms, execTime={}ms, size={}B", new Object[]{ + queueName, requestId, + requestType, paths, + endTime - startTime, endTime - globalStartTime, + packageSize + }); + } catch (RecoverableDistributionException e) { + log.error("[{}] PACKAGE-FAIL {}: could not deliver {}, {}", queueName, requestId, distributionPackage.getId(), e.getMessage()); + log.debug("could not deliver package {}", distributionPackage.getId(), e); + } catch (Throwable e) { + log.error("[{}] PACKAGE-FAIL {}: could not deliver package {} {}", queueName, requestId, distributionPackage.getId(), e.getMessage(), e); + + if (errorQueueStrategy != null && queueItemStatus.getAttempts() > retryAttempts) { + removeItemFromQueue = reEnqueuePackage(distributionPackage); + log.info("[{}] PACKAGE-QUEUED {}: distribution package {} was enqueued to an error queue", queueName, requestId, distributionPackage.getId()); + } + } + } else { + removeItemFromQueue = true; // return success if package does not exist in order to clear the queue. + log.error("distribution package with id {} does not exist. the package will be skipped.", queueItem.getPackageId()); + } + } finally { + if (removeItemFromQueue) { + DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName); + } else { + DistributionPackageUtils.closeSafely(distributionPackage); + } + DistributionUtils.ungetResourceResolver(agentResourceResolver); + } + + // return true if item should be removed from queue + return removeItemFromQueue; + } + + private boolean reEnqueuePackage(DistributionPackage distributionPackage) { + + if (errorQueueStrategy == null) { + return false; + } + + try { + errorQueueStrategy.add(distributionPackage, queueProvider); + } catch (DistributionException e) { + log.error("could not reenqueue package {}", distributionPackage.getId(), e); + return false; + } + + return true; + } + +} Propchange: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java?rev=1756791&r1=1756790&r2=1756791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java (original) +++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/ByteBufferBackedInputStream.java Thu Aug 18 14:17:38 2016 @@ -18,6 +18,8 @@ */ package org.apache.sling.distribution.util.impl; +import javax.annotation.Nonnull; + import static java.lang.Math.min; import java.io.File; @@ -52,7 +54,7 @@ final class ByteBufferBackedInputStream return memory.get() & 0xFF; } - public int read(byte[] bytes, int off, int len) throws IOException { + public int read(@Nonnull byte[] bytes, int off, int len) throws IOException { if (!memory.hasRemaining()) { if (fileInputStream != null) { return fileInputStream.read(bytes, off, len); Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/DistributionUtils.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/DistributionUtils.java?rev=1756791&r1=1756790&r2=1756791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/DistributionUtils.java (original) +++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/util/impl/DistributionUtils.java Thu Aug 18 14:17:38 2016 @@ -20,12 +20,18 @@ package org.apache.sling.distribution.util.impl; import org.apache.sling.api.resource.LoginException; +import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.jcr.api.SlingRepository; +import org.apache.sling.jcr.resource.JcrResourceConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jcr.RepositoryException; import javax.jcr.Session; +import javax.jcr.SimpleCredentials; import java.util.HashMap; import java.util.Map; @@ -55,4 +61,44 @@ public class DistributionUtils { log.error("cannot safely close resource resolver {}", resourceResolver); } } + + public static void ungetResourceResolver(ResourceResolver resourceResolver) { + + if (resourceResolver != null) { + try { + if (resourceResolver.hasChanges()) { + resourceResolver.commit(); + } + } catch (PersistenceException e) { + log.error("cannot commit changes to resource resolver", e); + } finally { + safelyLogout(resourceResolver); + } + } + } + + public static ResourceResolver getResourceResolver(String user, String service, SlingRepository slingRepository, + String subServiceName, ResourceResolverFactory resourceResolverFactory) + throws DistributionException { + ResourceResolver resourceResolver; + + try { + Map<String, Object> authenticationInfo = new HashMap<String, Object>(); + + if (subServiceName == null && user != null) { + Session session = slingRepository.impersonateFromService(service, new SimpleCredentials(user, new char[0]), null); + authenticationInfo.put(JcrResourceConstants.AUTHENTICATION_INFO_SESSION, session); + resourceResolver = resourceResolverFactory.getResourceResolver(authenticationInfo); + } else { + authenticationInfo.put(ResourceResolverFactory.SUBSERVICE, subServiceName); + resourceResolver = resourceResolverFactory.getServiceResourceResolver(authenticationInfo); + } + + return resourceResolver; + } catch (LoginException le) { + throw new DistributionException(le); + } catch (RepositoryException re) { + throw new DistributionException(re); + } + } } Added: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java?rev=1756791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java (added) +++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java Thu Aug 18 14:17:38 2016 @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.agent.impl; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; + +import org.apache.sling.distribution.DistributionResponse; +import org.apache.sling.distribution.event.impl.DistributionEventFactory; +import org.apache.sling.distribution.log.impl.DefaultDistributionLog; +import org.apache.sling.distribution.packaging.DistributionPackage; +import org.apache.sling.distribution.packaging.DistributionPackageInfo; +import org.apache.sling.distribution.queue.DistributionQueueItemState; +import org.apache.sling.distribution.queue.DistributionQueueItemStatus; +import org.apache.sling.distribution.queue.DistributionQueueProvider; +import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link DistributionPackageExporterProcessor} + */ +public class DistributionPackageExporterProcessorTest { + + @Test + public void testGetAllResponses() throws Exception { + String callingUser = "mr-who-cares"; + String requestId = "id231"; + long startTime = System.currentTimeMillis(); + DistributionEventFactory eventFactory = mock(DistributionEventFactory.class); + DistributionQueueDispatchingStrategy scheduleQueueStrategy = mock(DistributionQueueDispatchingStrategy.class); + DistributionQueueProvider queueProvider = mock(DistributionQueueProvider.class); + DefaultDistributionLog log = mock(DefaultDistributionLog.class); + String agentName = "dummy"; + DistributionPackageExporterProcessor exporterProcessor = new DistributionPackageExporterProcessor(callingUser, requestId, + startTime, eventFactory, scheduleQueueStrategy, queueProvider, log, agentName); + + List<DistributionResponse> allResponses = exporterProcessor.getAllResponses(); + assertNotNull(allResponses); + assertEquals(0, allResponses.size()); + } + + @Test + public void testGetPackagesCount() throws Exception { + String callingUser = "mr-who-cares"; + String requestId = "id231"; + long startTime = System.currentTimeMillis(); + DistributionEventFactory eventFactory = mock(DistributionEventFactory.class); + DistributionQueueDispatchingStrategy scheduleQueueStrategy = mock(DistributionQueueDispatchingStrategy.class); + DistributionQueueProvider queueProvider = mock(DistributionQueueProvider.class); + DefaultDistributionLog log = mock(DefaultDistributionLog.class); + String agentName = "dummy"; + DistributionPackageExporterProcessor exporterProcessor = new DistributionPackageExporterProcessor(callingUser, requestId, + startTime, eventFactory, scheduleQueueStrategy, queueProvider, log, agentName); + + int packagesCount = exporterProcessor.getPackagesCount(); + assertEquals(0, packagesCount); + + } + + @Test + public void testGetPackagesSize() throws Exception { + String callingUser = "mr-who-cares"; + String requestId = "id231"; + long startTime = System.currentTimeMillis(); + DistributionEventFactory eventFactory = mock(DistributionEventFactory.class); + DistributionQueueDispatchingStrategy scheduleQueueStrategy = mock(DistributionQueueDispatchingStrategy.class); + DistributionQueueProvider queueProvider = mock(DistributionQueueProvider.class); + DefaultDistributionLog log = mock(DefaultDistributionLog.class); + String agentName = "dummy"; + DistributionPackageExporterProcessor exporterProcessor = new DistributionPackageExporterProcessor(callingUser, requestId, + startTime, eventFactory, scheduleQueueStrategy, queueProvider, log, agentName); + + long packagesSize = exporterProcessor.getPackagesSize(); + assertEquals(0L, packagesSize); + } + + @Test + public void testProcess() throws Exception { + String callingUser = "mr-who-cares"; + String requestId = "id231"; + long startTime = System.currentTimeMillis(); + DistributionEventFactory eventFactory = mock(DistributionEventFactory.class); + DistributionQueueProvider queueProvider = mock(DistributionQueueProvider.class); + DistributionPackage distributionPackage = mock(DistributionPackage.class); + DistributionQueueDispatchingStrategy scheduleQueueStrategy = mock(DistributionQueueDispatchingStrategy.class); + // assume scheduling works + List<DistributionQueueItemStatus> statuses = new LinkedList<DistributionQueueItemStatus>(); + DistributionQueueItemStatus qis = new DistributionQueueItemStatus(DistributionQueueItemState.QUEUED, "queue-1"); + statuses.add(qis); + when(scheduleQueueStrategy.add(distributionPackage, queueProvider)).thenReturn(statuses); + + DefaultDistributionLog log = mock(DefaultDistributionLog.class); + String agentName = "dummy"; + DistributionPackageExporterProcessor exporterProcessor = new DistributionPackageExporterProcessor(callingUser, requestId, + startTime, eventFactory, scheduleQueueStrategy, queueProvider, log, agentName); + + DistributionPackageInfo info = new DistributionPackageInfo("type-a", new HashMap<String, Object>()); + when(distributionPackage.getInfo()).thenReturn(info); + exporterProcessor.process(distributionPackage); + } +} \ No newline at end of file Propchange: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/DistributionPackageExporterProcessorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java?rev=1756791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java (added) +++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java Thu Aug 18 14:17:38 2016 @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.agent.impl; + +import org.apache.sling.api.resource.ResourceResolverFactory; +import org.apache.sling.jcr.api.SlingRepository; +import org.junit.Test; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link SimpleDistributionAgentAuthenticationInfo} + */ +public class SimpleDistributionAgentAuthenticationInfoTest { + + @Test + public void testInfoNotNull() throws Exception { + SlingRepository slingRepository = mock(SlingRepository.class); + String agentService = "dumb"; + ResourceResolverFactory resourceResolverFactory = mock(ResourceResolverFactory.class); + String subServiceName = "ssn"; + SimpleDistributionAgentAuthenticationInfo authenticationInfo = new SimpleDistributionAgentAuthenticationInfo(slingRepository, + agentService, resourceResolverFactory, subServiceName); + assertNotNull(authenticationInfo.getAgentService()); + assertNotNull(authenticationInfo.getResourceResolverFactory()); + assertNotNull(authenticationInfo.getSlingRepository()); + assertNotNull(authenticationInfo.getSubServiceName()); + } + +} \ No newline at end of file Propchange: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentAuthenticationInfoTest.java ------------------------------------------------------------------------------ svn:eol-style = native Added: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java?rev=1756791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java (added) +++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java Thu Aug 18 14:17:38 2016 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.agent.impl; + +import java.util.HashMap; + +import org.apache.sling.distribution.event.impl.DistributionEventFactory; +import org.apache.sling.distribution.log.impl.DefaultDistributionLog; +import org.apache.sling.distribution.packaging.DistributionPackageExporter; +import org.apache.sling.distribution.packaging.DistributionPackageImporter; +import org.apache.sling.distribution.queue.DistributionQueueEntry; +import org.apache.sling.distribution.queue.DistributionQueueItem; +import org.apache.sling.distribution.queue.DistributionQueueItemState; +import org.apache.sling.distribution.queue.DistributionQueueItemStatus; +import org.apache.sling.distribution.queue.DistributionQueueProvider; +import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link SimpleDistributionAgentQueueProcessor} + */ +public class SimpleDistributionAgentQueueProcessorTest { + + @Test + public void testProcess() throws Exception { + DistributionPackageExporter packageExporter = mock(DistributionPackageExporter.class); + DistributionPackageImporter packageImporter = mock(DistributionPackageImporter.class); + int retryAttempts = 3; + DefaultDistributionLog log = mock(DefaultDistributionLog.class); + DistributionQueueProvider queueProvider = mock(DistributionQueueProvider.class); + DistributionEventFactory eventFactory = mock(DistributionEventFactory.class); + SimpleDistributionAgentAuthenticationInfo authenticationInfo = mock(SimpleDistributionAgentAuthenticationInfo.class); + String agentName = "dummy-a"; + SimpleDistributionAgentQueueProcessor queueProcessor = new SimpleDistributionAgentQueueProcessor(packageExporter, + packageImporter, retryAttempts, null, log, queueProvider, eventFactory, authenticationInfo, agentName); + + String id = "123-456"; + DistributionQueueItem item = new DistributionQueueItem("pckg-123", new HashMap<String, Object>()); + String queueName = "queue-1"; + DistributionQueueItemStatus status = new DistributionQueueItemStatus(DistributionQueueItemState.QUEUED, queueName); + DistributionQueueEntry entry = new DistributionQueueEntry(id, item, status); + queueProcessor.process(queueName, entry); + } +} \ No newline at end of file Propchange: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentQueueProcessorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java?rev=1756791&r1=1756790&r2=1756791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java (original) +++ sling/trunk/contrib/extensions/distribution/core/src/test/java/org/apache/sling/distribution/util/impl/FileBackedMemoryOutputStreamTest.java Thu Aug 18 14:17:38 2016 @@ -36,6 +36,9 @@ import java.util.Random; import org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream.MemoryUnit; import org.junit.Test; +/** + * Tests for {@link org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream} + */ public class FileBackedMemoryOutputStreamTest { @Test