Author: mpetria Date: Tue Jul 14 13:15:17 2015 New Revision: 1690924 URL: http://svn.apache.org/r1690924 Log: SLING-4651: delete remote package only after it is fetched
Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/HttpTransportUtils.java Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java?rev=1690924&r1=1690923&r2=1690924&view=diff ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java (original) +++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/servlet/DistributionPackageExporterServlet.java Tue Jul 14 13:15:17 2015 @@ -34,6 +34,7 @@ import org.apache.sling.distribution.Dis import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.distribution.packaging.DistributionPackageExporter; import org.apache.sling.distribution.resources.DistributionResourceTypes; +import org.apache.sling.distribution.transport.impl.HttpTransportUtils; import org.apache.sling.distribution.util.RequestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,10 +47,40 @@ public class DistributionPackageExporter private final Logger log = LoggerFactory.getLogger(getClass()); + + @Override protected void doPost(SlingHttpServletRequest request, SlingHttpServletResponse response) throws ServletException, IOException { + String operation = request.getParameter("operation"); + + + + try { + if ("delete".equals(operation)) { + + deletePackage(request, response); + + } if ("fetch".equals(operation)) { + + exportOnePackage(request, response, false); + + } else { + + exportOnePackage(request, response, true); + } + + } catch (Throwable t) { + response.setStatus(503); + log.error("error while exporting package", t); + } + } + + + protected void exportOnePackage(SlingHttpServletRequest request, SlingHttpServletResponse response, boolean delete) + throws ServletException, IOException { + DistributionPackageExporter distributionPackageExporter = request .getResource() .adaptTo(DistributionPackageExporter.class); @@ -77,14 +108,20 @@ public class DistributionPackageExporter InputStream inputStream = null; int bytesCopied = -1; try { + response.addHeader(HttpTransportUtils.HEADER_DISTRIBUTION_ORIGINAL_ID, distributionPackage.getId()); + inputStream = distributionPackage.createInputStream(); + bytesCopied = IOUtils.copy(inputStream, response.getOutputStream()); } finally { IOUtils.closeQuietly(inputStream); } - // delete the package permanently - distributionPackage.delete(); + if (delete) { + // delete the package permanently + distributionPackage.delete(); + } + // everything ok response.setStatus(200); @@ -107,4 +144,30 @@ public class DistributionPackageExporter } } + + + void deletePackage(final SlingHttpServletRequest request, final SlingHttpServletResponse response) { + DistributionPackageExporter distributionPackageExporter = request + .getResource() + .adaptTo(DistributionPackageExporter.class); + + ResourceResolver resourceResolver = request.getResourceResolver(); + + + String id = request.getParameter("id"); + + DistributionPackage distributionPackage = distributionPackageExporter.getPackage(resourceResolver, id); + + if (distributionPackage != null) { + distributionPackage.delete(); + + response.setStatus(200); + } else { + response.setStatus(204); + log.debug("nothing to delete {}", id); + } + + + } + } Added: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/HttpTransportUtils.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/HttpTransportUtils.java?rev=1690924&view=auto ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/HttpTransportUtils.java (added) +++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/HttpTransportUtils.java Tue Jul 14 13:15:17 2015 @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.distribution.transport.impl; + +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.fluent.Executor; +import org.apache.http.client.fluent.Request; +import org.apache.http.client.utils.URIBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +public class HttpTransportUtils { + + static final Logger log = LoggerFactory.getLogger(HttpTransportUtils.class); + + public final static String HEADER_DISTRIBUTION_ORIGINAL_ID = "X-Distribution-OriginalId"; + + + public static InputStream fetchNextPackage(Executor executor, URI distributionURI, Map<String, String> headers) throws URISyntaxException, IOException { + + // always clear the result headers map + headers.clear(); + + + URI fetchUri = getFetchUri(distributionURI); + Request fetchReq = Request.Post(fetchUri).useExpectContinue(); + HttpResponse httpResponse = executor.execute(fetchReq).returnResponse(); + + if (httpResponse.getStatusLine().getStatusCode() != 200) { + return null; + } + + HttpEntity entity = httpResponse.getEntity(); + + + Header header = httpResponse.getFirstHeader(HttpTransportUtils.HEADER_DISTRIBUTION_ORIGINAL_ID); + if (header != null && header.getValue() != null) { + String originalId = header.getValue(); + headers.put(HEADER_DISTRIBUTION_ORIGINAL_ID, originalId); + } else { + log.warn("cannot retrieve original id header"); + } + + return entity.getContent(); + } + + + + public static boolean deletePackage(Executor executor, URI distributionURI, String remotePackageId) throws URISyntaxException, IOException { + + URI deleteUri = getDeleteUri(distributionURI, remotePackageId); + Request deleteReq = Request.Post(deleteUri).useExpectContinue(); + HttpResponse httpResponse = executor.execute(deleteReq).returnResponse(); + + return httpResponse.getStatusLine().getStatusCode() == 200; + } + + + public static URI getFetchUri(URI uri) throws URISyntaxException { + URIBuilder uriBuilder = new URIBuilder(uri); + uriBuilder.addParameter("operation", "fetch"); + + return uriBuilder.build(); + } + + + + public static URI getDeleteUri(URI uri, String id) throws URISyntaxException { + URIBuilder uriBuilder = new URIBuilder(uri); + uriBuilder.addParameter("operation", "delete"); + uriBuilder.addParameter("id", id); + + return uriBuilder.build(); + } + +} Modified: sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java?rev=1690924&r1=1690923&r2=1690924&view=diff ============================================================================== --- sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java (original) +++ sling/trunk/contrib/extensions/distribution/core/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java Tue Jul 14 13:15:17 2015 @@ -23,10 +23,12 @@ import java.io.InputStream; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.io.IOUtils; +import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; @@ -42,6 +44,7 @@ import org.apache.sling.distribution.Dis import org.apache.sling.distribution.log.impl.DefaultDistributionLog; import org.apache.sling.distribution.packaging.DistributionPackage; import org.apache.sling.distribution.serialization.DistributionPackageBuilder; +import org.apache.sling.distribution.servlet.ServletJsonUtils; import org.apache.sling.distribution.transport.DistributionTransportSecretProvider; import org.apache.sling.distribution.transport.core.DistributionTransport; import org.apache.sling.distribution.transport.core.DistributionTransportException; @@ -145,29 +148,30 @@ public class SimpleHttpDistributionTrans // TODO : add queue parameter // continuously requests package streams as long as type header is received with the response (meaning there's a package of a certain type) - HttpResponse httpResponse; + InputStream inputStream; + final Map<String, String> headers = new HashMap<String, String>(); int pulls = 0; int maxNumberOfPackages = DistributionRequestType.PULL.equals(distributionRequest.getRequestType()) ? maxPullItems : 1; - while (pulls < maxNumberOfPackages - && (httpResponse = executor.execute(req).returnResponse()).getStatusLine().getStatusCode() == 200) { - HttpEntity entity = httpResponse.getEntity(); - if (entity != null) { - final DistributionPackage responsePackage = packageBuilder.readPackage(resourceResolver, entity.getContent()); - if (responsePackage != null) { - responsePackage.getInfo().setOrigin(distributionURI); - log.debug("pulled package no {} with info {}", pulls, responsePackage.getInfo()); - - result.add(responsePackage); - } else { - log.warn("responsePackage is null"); - } - pulls++; + while (pulls < maxNumberOfPackages && (inputStream = HttpTransportUtils.fetchNextPackage(executor, distributionURI, headers)) != null) { + + final DistributionPackage responsePackage = packageBuilder.readPackage(resourceResolver, inputStream); + if (responsePackage != null) { + responsePackage.getInfo().setOrigin(distributionURI); + log.debug("pulled package no {} with info {}", pulls, responsePackage.getInfo()); + + result.add(responsePackage); + + String originalId = headers.get(HttpTransportUtils.HEADER_DISTRIBUTION_ORIGINAL_ID); + + HttpTransportUtils.deletePackage(executor, distributionURI, originalId); + } else { - log.info("no entity available"); - break; + log.warn("responsePackage is null"); } + + pulls++; } } catch (HttpHostConnectException e) {