This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 5940e893a6942bc348d20025b03de099eed821f3
Author: jckautzmann <[email protected]>
AuthorDate: Tue Jan 28 14:52:57 2025 +0100

    support custom package handler
    
    - introduce package handler interface and package handler factory
---
 .../journal/bookkeeper/BookKeeperFactory.java      | 28 +++-----
 .../bookkeeper/ContentPackageExtractor.java        |  6 +-
 .../journal/bookkeeper/PackageHandler.java         | 84 +---------------------
 .../journal/bookkeeper/PackageHandlerFactory.java  | 26 +++++++
 .../bookkeeper/DefaultPackageHandler.java}         | 22 +++---
 .../bookkeeper/DefaultPackageHandlerFactory.java   | 40 +++++++++++
 .../journal/impl/eds/EdsPackageHandler.java        | 57 +++++++++++++++
 .../journal/impl/eds/EdsPackageHandlerFactory.java | 34 +++++++++
 .../impl/subscriber/DistributionSubscriber.java    | 19 ++++-
 9 files changed, 200 insertions(+), 116 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
index dd1b492..11dd1ab 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
@@ -37,15 +37,9 @@ import org.osgi.service.event.EventAdmin;
 public class BookKeeperFactory {
     @Reference
     private ResourceResolverFactory resolverFactory;
-    
-    @Reference
-    private EventAdmin eventAdmin;
-    
-    @Reference
-    Packaging packaging;
 
     @Reference
-    BinaryStore binaryStore;
+    private EventAdmin eventAdmin;
 
     @Reference
     ImportPreProcessor importPreProcessor;
@@ -57,22 +51,18 @@ public class BookKeeperFactory {
     InvalidationProcessor invalidationProcessor;
 
     public BookKeeper create(
-            DistributionPackageBuilder packageBuilder, 
-            BookKeeperConfig config, 
+            DistributionPackageBuilder packageBuilder,
+            BookKeeperConfig config,
             Consumer<PackageStatusMessage> statusSender,
-            Consumer<LogMessage> logSender, 
-            SubscriberMetrics subscriberMetrics
+            Consumer<LogMessage> logSender,
+            SubscriberMetrics subscriberMetrics,
+            PackageHandler packageHandler
             ) {
-        ContentPackageExtractor extractor = new ContentPackageExtractor(
-                packaging,
-                config.getPackageHandling(),
-                config.shouldExtractorOverwriteFolderPrimaryTypes());
-        PackageHandler packageHandler = new PackageHandler(packageBuilder, 
extractor, binaryStore);
         return new BookKeeper(
-                resolverFactory, 
-                subscriberMetrics, 
+                resolverFactory,
+                subscriberMetrics,
                 packageHandler,
-                eventAdmin, 
+                eventAdmin,
                 statusSender,
                 logSender,
                 config,
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/ContentPackageExtractor.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/ContentPackageExtractor.java
index 9c20bb7..b3fe33f 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/ContentPackageExtractor.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/ContentPackageExtractor.java
@@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
  * Each distribution package is inspected for possible content packages in 
/etc/packages.
  * Such content packages are installed via the Packaging service.
  */
-class ContentPackageExtractor {
+public class ContentPackageExtractor {
     private static final String PACKAGE_BASE_PATH = "/etc/packages/";
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -55,7 +55,7 @@ class ContentPackageExtractor {
     private final Packaging packageService;
     private final PackageHandling packageHandling;
     private final boolean overwritePrimaryTypesOfFolders;
-    
+
     public ContentPackageExtractor(
             Packaging packageService,
             PackageHandling packageHandling,
@@ -64,7 +64,7 @@ class ContentPackageExtractor {
         this.packageHandling = packageHandling;
         this.overwritePrimaryTypesOfFolders = overwritePrimaryTypesOfFolders;
     }
-    
+
     public void handle(ResourceResolver resourceResolver, List<String> paths) 
throws DistributionException {
         requireNonNull(resourceResolver, "Must provide resourceResolver");
         if (packageHandling == PackageHandling.Off) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java
index 7ee64f9..ba4f894 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java
@@ -18,94 +18,14 @@
  */
 package org.apache.sling.distribution.journal.bookkeeper;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
 
-import javax.annotation.Nonnull;
-
-import org.apache.commons.io.IOUtils;
 import org.apache.sling.api.resource.PersistenceException;
-import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
-import org.apache.sling.distribution.journal.BinaryStore;
-import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.String.format;
-
-class PackageHandler {
-    private static final Logger LOG = 
LoggerFactory.getLogger(PackageHandler.class);
-    
-    private final DistributionPackageBuilder packageBuilder;
-    
-    private final ContentPackageExtractor extractor;
-
-    private final BinaryStore binaryStore;
-
-    public PackageHandler(DistributionPackageBuilder packageBuilder, 
ContentPackageExtractor extractor,
-        BinaryStore binaryStore) {
-        this.packageBuilder = packageBuilder;
-        this.extractor = extractor;
-        this.binaryStore = binaryStore;
-    }
 
+public interface PackageHandler {
     public void apply(ResourceResolver resolver, PackageMessage pkgMsg)
-            throws DistributionException, PersistenceException {
-        PackageMessage.ReqType type = pkgMsg.getReqType();
-        switch (type) {
-            case ADD:
-                installAddPackage(resolver, pkgMsg);
-                break;
-            case DELETE:
-                installDeletePackage(resolver, pkgMsg);
-                break;
-            case TEST:
-                break;
-            default: throw new UnsupportedOperationException(format("Unable to 
process messages with type: %s", type));
-        }
-    }
-
-    private void installAddPackage(ResourceResolver resolver, PackageMessage 
pkgMsg)
-            throws DistributionException {
-        LOG.debug("Importing paths {}",pkgMsg.getPaths());
-        InputStream pkgStream = null;
-        try {
-            pkgStream = stream(resolver, pkgMsg, binaryStore);
-            packageBuilder.installPackage(resolver, pkgStream);
-            extractor.handle(resolver, pkgMsg.getPaths());
-        } finally {
-            IOUtils.closeQuietly(pkgStream);
-        }
-
-    }
-    
-    @Nonnull
-    public static InputStream stream(ResourceResolver resolver, PackageMessage 
pkgMsg, BinaryStore binaryStore) throws DistributionException {
-        if (pkgMsg.getPkgBinary() != null) {
-            return new ByteArrayInputStream(pkgMsg.getPkgBinary());
-        } else {
-            String pkgBinRef = pkgMsg.getPkgBinaryRef();
-            try {
-                return binaryStore.get(pkgBinRef);
-            } catch (IOException io) {
-                throw new DistributionException(io.getMessage(), io);
-            }
-        }
-    }
+            throws DistributionException, PersistenceException;
 
-    private void installDeletePackage(ResourceResolver resolver, 
PackageMessage pkgMsg)
-            throws PersistenceException {
-        LOG.info("Deleting paths {}",pkgMsg.getPaths());
-        for (String path : pkgMsg.getPaths()) {
-            Resource resource = resolver.getResource(path);
-            if (resource != null) {
-                resolver.delete(resource);
-            }
-        }
-    }
-    
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandlerFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandlerFactory.java
new file mode 100644
index 0000000..763a522
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandlerFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.bookkeeper;
+
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+
+public interface PackageHandlerFactory {
+
+    public PackageHandler create(DistributionPackageBuilder packageBuilder, 
ContentPackageExtractor extractor);
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/bookkeeper/DefaultPackageHandler.java
similarity index 87%
copy from 
src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java
copy to 
src/main/java/org/apache/sling/distribution/journal/impl/bookkeeper/DefaultPackageHandler.java
index 7ee64f9..081404c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/PackageHandler.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/bookkeeper/DefaultPackageHandler.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.distribution.journal.bookkeeper;
+package org.apache.sling.distribution.journal.impl.bookkeeper;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -29,25 +29,27 @@ import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.BinaryStore;
+import 
org.apache.sling.distribution.journal.bookkeeper.ContentPackageExtractor;
+import org.apache.sling.distribution.journal.bookkeeper.PackageHandler;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static java.lang.String.format;
 
-class PackageHandler {
-    private static final Logger LOG = 
LoggerFactory.getLogger(PackageHandler.class);
-    
+public class DefaultPackageHandler implements PackageHandler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultPackageHandler.class);
+
     private final DistributionPackageBuilder packageBuilder;
-    
+
     private final ContentPackageExtractor extractor;
 
     private final BinaryStore binaryStore;
 
-    public PackageHandler(DistributionPackageBuilder packageBuilder, 
ContentPackageExtractor extractor,
-        BinaryStore binaryStore) {
+    public DefaultPackageHandler(DistributionPackageBuilder packageBuilder, 
ContentPackageExtractor extractor,
+                          BinaryStore binaryStore) {
         this.packageBuilder = packageBuilder;
         this.extractor = extractor;
         this.binaryStore = binaryStore;
@@ -82,7 +84,7 @@ class PackageHandler {
         }
 
     }
-    
+
     @Nonnull
     public static InputStream stream(ResourceResolver resolver, PackageMessage 
pkgMsg, BinaryStore binaryStore) throws DistributionException {
         if (pkgMsg.getPkgBinary() != null) {
@@ -107,5 +109,5 @@ class PackageHandler {
             }
         }
     }
-    
+
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/bookkeeper/DefaultPackageHandlerFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/bookkeeper/DefaultPackageHandlerFactory.java
new file mode 100644
index 0000000..f6b3b36
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/bookkeeper/DefaultPackageHandlerFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.bookkeeper;
+
+import org.apache.sling.distribution.journal.BinaryStore;
+import 
org.apache.sling.distribution.journal.bookkeeper.ContentPackageExtractor;
+import org.apache.sling.distribution.journal.bookkeeper.PackageHandler;
+import org.apache.sling.distribution.journal.bookkeeper.PackageHandlerFactory;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+
+@Component(service = PackageHandlerFactory.class, name = "default")
+public class DefaultPackageHandlerFactory implements PackageHandlerFactory {
+
+    @Reference
+    private BinaryStore binaryStore;
+
+    @Override
+    public PackageHandler create(DistributionPackageBuilder packageBuilder, 
ContentPackageExtractor extractor) {
+        PackageHandler packageHandler = new 
DefaultPackageHandler(packageBuilder, extractor, binaryStore);
+        return packageHandler;
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/eds/EdsPackageHandler.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/eds/EdsPackageHandler.java
new file mode 100644
index 0000000..ac88046
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/eds/EdsPackageHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.eds;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.bookkeeper.PackageHandler;
+import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+
+public class EdsPackageHandler implements PackageHandler {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(EdsPackageHandler.class);
+
+    public EdsPackageHandler() {
+    }
+
+    @Override
+    public void apply(ResourceResolver resolver, PackageMessage pkgMsg)
+            throws DistributionException, PersistenceException {
+        PackageMessage.ReqType type = pkgMsg.getReqType();
+        switch (type) {
+            case ADD:
+                // update preview and publish in EDS
+                LOG.info("TODO: update preview and publish in EDS");
+                break;
+            case DELETE:
+                // delete preview and publish in EDS
+                LOG.info("TODO: delete preview and publish in EDS");
+                break;
+            case TEST:
+                break;
+            default: throw new UnsupportedOperationException(format("Unable to 
process messages with type: %s", type));
+        }
+    }
+
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/eds/EdsPackageHandlerFactory.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/eds/EdsPackageHandlerFactory.java
new file mode 100644
index 0000000..150b99c
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/eds/EdsPackageHandlerFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.eds;
+
+import 
org.apache.sling.distribution.journal.bookkeeper.ContentPackageExtractor;
+import org.apache.sling.distribution.journal.bookkeeper.PackageHandler;
+import org.apache.sling.distribution.journal.bookkeeper.PackageHandlerFactory;
+import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.osgi.service.component.annotations.Component;
+
+@Component(service = PackageHandlerFactory.class, name = "edge-delivery")
+public class EdsPackageHandlerFactory implements PackageHandlerFactory {
+    @Override
+    public PackageHandler create(DistributionPackageBuilder packageBuilder, 
ContentPackageExtractor extractor) {
+        PackageHandler packageHandler = new EdsPackageHandler();
+        return packageHandler;
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index ae51c5b..1f99c84 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -50,6 +50,7 @@ import javax.annotation.ParametersAreNonnullByDefault;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.jackrabbit.util.Text;
+import org.apache.jackrabbit.vault.packaging.Packaging;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.commons.metrics.MetricsService;
@@ -65,6 +66,9 @@ import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.bookkeeper.BookKeeper;
 import org.apache.sling.distribution.journal.bookkeeper.BookKeeperConfig;
 import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
+import 
org.apache.sling.distribution.journal.bookkeeper.ContentPackageExtractor;
+import org.apache.sling.distribution.journal.bookkeeper.PackageHandler;
+import org.apache.sling.distribution.journal.bookkeeper.PackageHandlerFactory;
 import org.apache.sling.distribution.journal.bookkeeper.SubscriberMetrics;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
 import 
org.apache.sling.distribution.journal.impl.precondition.Precondition.Decision;
@@ -125,6 +129,12 @@ public class DistributionSubscriber {
     @Reference
     private SubscriberReadyStore subscriberReadyStore;
 
+    @Reference(name = "packageHandlerFactory")
+    PackageHandlerFactory packageHandlerFactory;
+
+    @Reference
+    Packaging packaging;
+
     private SubscriberMetrics subscriberMetrics;
 
     private volatile Closeable idleReadyCheck; // NOSONAR
@@ -198,7 +208,12 @@ public class DistributionSubscriber {
                 config.packageHandling(),
                 packageNodeName,
                 
config.contentPackageExtractorOverwritePrimaryTypesOfFolders());
-        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, 
statusSender, logSender, this.subscriberMetrics);
+        ContentPackageExtractor extractor = new ContentPackageExtractor(
+                packaging,
+                bkConfig.getPackageHandling(),
+                bkConfig.shouldExtractorOverwriteFolderPrimaryTypes());
+        PackageHandler packageHandler = 
packageHandlerFactory.create(packageBuilder, extractor);
+        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, 
statusSender, logSender, this.subscriberMetrics, packageHandler);
 
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = startOffset > 0 ? 
messagingProvider.assignTo(startOffset) : null;
@@ -217,7 +232,7 @@ public class DistributionSubscriber {
         LOG.info("Started Subscriber agent={} at offset={}, subscribed to 
agent names {}, readyCheck={}", subAgentName, startOffset,
                 queueNames, config.subscriberIdleCheck());
     }
-    
+
     private String getFirst(String[] agentNames) {
         return agentNames != null && agentNames.length > 0 ? agentNames[0] : 
"";
     }

Reply via email to