Github user kevdoran commented on a diff in the pull request: https://github.com/apache/nifi-registry/pull/148#discussion_r236837704 --- Diff: nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/extension/StandardExtensionService.java --- @@ -0,0 +1,589 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.registry.service.extension; + +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; +import org.apache.nifi.registry.bucket.Bucket; +import org.apache.nifi.registry.db.entity.BucketEntity; +import org.apache.nifi.registry.db.entity.ExtensionBundleEntity; +import org.apache.nifi.registry.db.entity.ExtensionBundleEntityType; +import org.apache.nifi.registry.db.entity.ExtensionBundleVersionEntity; +import org.apache.nifi.registry.exception.ResourceNotFoundException; +import org.apache.nifi.registry.extension.BundleCoordinate; +import org.apache.nifi.registry.extension.BundleDetails; +import org.apache.nifi.registry.extension.BundleExtractor; +import org.apache.nifi.registry.extension.ExtensionBundle; +import org.apache.nifi.registry.extension.ExtensionBundleContext; +import org.apache.nifi.registry.extension.ExtensionBundlePersistenceProvider; +import org.apache.nifi.registry.extension.ExtensionBundleType; +import org.apache.nifi.registry.extension.ExtensionBundleVersion; +import org.apache.nifi.registry.extension.ExtensionBundleVersionDependency; +import org.apache.nifi.registry.extension.ExtensionBundleVersionMetadata; +import org.apache.nifi.registry.extension.repo.ExtensionRepoArtifact; +import org.apache.nifi.registry.extension.repo.ExtensionRepoBucket; +import org.apache.nifi.registry.extension.repo.ExtensionRepoGroup; +import org.apache.nifi.registry.extension.repo.ExtensionRepoVersionSummary; +import org.apache.nifi.registry.properties.NiFiRegistryProperties; +import org.apache.nifi.registry.provider.extension.StandardExtensionBundleContext; +import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils; +import org.apache.nifi.registry.service.DataModelMapper; +import org.apache.nifi.registry.service.MetadataService; +import org.apache.nifi.registry.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.validation.ConstraintViolation; +import javax.validation.ConstraintViolationException; +import javax.validation.Validator; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.security.DigestInputStream; +import java.security.MessageDigest; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.stream.Collectors; + +@Service +public class StandardExtensionService implements ExtensionService { + + private static final Logger LOGGER = LoggerFactory.getLogger(StandardExtensionService.class); + + private final MetadataService metadataService; + private final Map<ExtensionBundleType, BundleExtractor> extractors; + private final ExtensionBundlePersistenceProvider bundlePersistenceProvider; + private final Validator validator; + private final File extensionsWorkingDir; + + @Autowired + public StandardExtensionService(final MetadataService metadataService, + final Map<ExtensionBundleType, BundleExtractor> extractors, + final ExtensionBundlePersistenceProvider bundlePersistenceProvider, + final Validator validator, + final NiFiRegistryProperties properties) { + this.metadataService = metadataService; + this.extractors = extractors; + this.bundlePersistenceProvider = bundlePersistenceProvider; + this.validator = validator; + this.extensionsWorkingDir = properties.getExtensionsWorkingDirectory(); + Validate.notNull(this.metadataService); + Validate.notNull(this.extractors); + Validate.notNull(this.bundlePersistenceProvider); + Validate.notNull(this.validator); + Validate.notNull(this.extensionsWorkingDir); + } + + private <T> void validate(T t, String invalidMessage) { + final Set<ConstraintViolation<T>> violations = validator.validate(t); + if (violations.size() > 0) { + throw new ConstraintViolationException(invalidMessage, violations); + } + } + + @Override + public ExtensionBundleVersion createExtensionBundleVersion(final String bucketIdentifier, final ExtensionBundleType bundleType, + final InputStream inputStream) throws IOException { + if (StringUtils.isBlank(bucketIdentifier)) { + throw new IllegalArgumentException("Bucket identifier cannot be null or blank"); + } + + if (bundleType == null) { + throw new IllegalArgumentException("Bundle type cannot be null"); + } + + if (inputStream == null) { + throw new IllegalArgumentException("Extension bundle input stream cannot be null"); + } + + if (!extractors.containsKey(bundleType)) { + throw new IllegalArgumentException("No metadata extractor is registered for bundle-type: " + bundleType); + } + + // ensure the bucket exists + final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier); + if (existingBucket == null) { + LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier); + throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry."); + } + + // ensure the extensions directory exists and we can read and write to it + FileUtils.ensureDirectoryExistAndCanReadAndWrite(extensionsWorkingDir); + + final String extensionWorkingFilename = UUID.randomUUID().toString(); + final File extensionWorkingFile = new File(extensionsWorkingDir, extensionWorkingFilename); + LOGGER.debug("Writing bundle contents to working directory at {}", new Object[]{extensionWorkingFile.getAbsolutePath()}); + + try { + // write the contents of the input stream to a temporary file in the extensions working directory + final MessageDigest sha256Digest = DigestUtils.getSha256Digest(); + try (final DigestInputStream digestInputStream = new DigestInputStream(inputStream, sha256Digest); + final OutputStream out = new FileOutputStream(extensionWorkingFile)) { + IOUtils.copy(digestInputStream, out); + } + + final String sha256Hex = Hex.encodeHexString(sha256Digest.digest()); + + // extract the details of the bundle from the temp file in the working directory + final BundleDetails bundleDetails; + try (final InputStream in = new FileInputStream(extensionWorkingFile)) { + final BundleExtractor extractor = extractors.get(bundleType); + bundleDetails = extractor.extract(in); + } + + final BundleCoordinate bundleCoordinate = bundleDetails.getBundleCoordinate(); + final BundleCoordinate dependencyCoordinate = bundleDetails.getDependencyBundleCoordinate(); + + final String groupId = bundleCoordinate.getGroupId(); + final String artifactId = bundleCoordinate.getArtifactId(); + final String version = bundleCoordinate.getVersion(); + LOGGER.debug("Extracted bundle details - '{}' - '{}' - '{}'", new Object[]{groupId, artifactId, version}); + + // a bundle with the same group, artifact, and version can exist in multiple buckets, but only if it contains the same binary content, + // we can determine that by comparing the SHA-256 digest of the incoming bundle against existing bundles with the same group, artifact, version + final List<ExtensionBundleVersionEntity> allExistingVersions = metadataService.getExtensionBundleVersionsGlobal(groupId, artifactId, version); + for (final ExtensionBundleVersionEntity existingVersionEntity : allExistingVersions) { + if (!existingVersionEntity.getSha256Hex().equals(sha256Hex)) { + throw new IllegalStateException("Found existing extension bundle with same group, artifact, and version, but different SHA-256 check-sum"); + } + } + + // get the existing extension bundle entity, or create a new one if one does not exist in the bucket with the group + artifact + final ExtensionBundleEntity extensionBundle = getOrCreateExtensionBundle(bucketIdentifier, groupId, artifactId, bundleType); + + // ensure there isn't already a version of the bundle with the same version + final ExtensionBundleVersionEntity existingVersion = metadataService.getExtensionBundleVersion(bucketIdentifier, groupId, artifactId, version); + if (existingVersion != null) { + LOGGER.warn("The specified version [{}] already exists for extension bundle [{}].", new Object[]{version, extensionBundle.getId()}); + throw new IllegalStateException("The specified version already exists for the given extension bundle"); + } + + // create the bundle version in the metadata db + final String userIdentity = NiFiUserUtils.getNiFiUserIdentity(); + final long bundleCreatedTime = extensionBundle.getCreated().getTime(); + + final ExtensionBundleVersionMetadata versionMetadata = new ExtensionBundleVersionMetadata(); + versionMetadata.setId(UUID.randomUUID().toString()); + versionMetadata.setExtensionBundleId(extensionBundle.getId()); + versionMetadata.setBucketId(bucketIdentifier); + versionMetadata.setVersion(version); + versionMetadata.setTimestamp(bundleCreatedTime); + versionMetadata.setAuthor(userIdentity); + versionMetadata.setSha256Hex(sha256Hex); + + if (dependencyCoordinate != null) { + final ExtensionBundleVersionDependency versionDependency = new ExtensionBundleVersionDependency(); + versionDependency.setGroupId(dependencyCoordinate.getGroupId()); + versionDependency.setArtifactId(dependencyCoordinate.getArtifactId()); + versionDependency.setVersion(dependencyCoordinate.getVersion()); + versionMetadata.setDependency(versionDependency); + } + + validate(versionMetadata, "Cannot create extension bundle version"); + + final ExtensionBundleVersionEntity versionEntity = DataModelMapper.map(versionMetadata); + metadataService.createExtensionBundleVersion(versionEntity); + + // persist the content of the bundle to the persistence provider + final ExtensionBundleContext context = new StandardExtensionBundleContext.Builder() + .bundleType(getProviderBundleType(bundleType)) + .bucketId(existingBucket.getId()) + .bucketName(existingBucket.getName()) + .bundleId(extensionBundle.getId()) + .bundleGroupId(extensionBundle.getGroupId()) + .bundleArtifactId(extensionBundle.getArtifactId()) + .bundleVersion(versionMetadata.getVersion()) + .author(versionMetadata.getAuthor()) + .timestamp(versionMetadata.getTimestamp()) + .build(); + + try (final InputStream in = new FileInputStream(extensionWorkingFile); + final InputStream bufIn = new BufferedInputStream(in)) { + bundlePersistenceProvider.saveBundleVersion(context, bufIn); + LOGGER.debug("Bundle saved to persistence provider - '{}' - '{}' - '{}'", + new Object[]{groupId, artifactId, version}); + } + + // get the updated extension bundle so it contains the correct version count + final ExtensionBundleEntity updatedBundle = metadataService.getExtensionBundle(bucketIdentifier, groupId, artifactId); + + // create the full ExtensionBundleVersion instance to return + final ExtensionBundleVersion extensionBundleVersion = new ExtensionBundleVersion(); + extensionBundleVersion.setVersionMetadata(versionMetadata); + extensionBundleVersion.setExtensionBundle(DataModelMapper.map(existingBucket, updatedBundle)); + extensionBundleVersion.setBucket(DataModelMapper.map(existingBucket)); + return extensionBundleVersion; + + } finally { + if (extensionWorkingFile.exists()) { + try { + extensionWorkingFile.delete(); + } catch (Exception e) { + LOGGER.warn("Error removing temporary extension bundle file at {}", + new Object[]{extensionWorkingFile.getAbsolutePath()}); + } + } + } + } + + private ExtensionBundleEntity getOrCreateExtensionBundle(final String bucketId, final String groupId, + final String artifactId, final ExtensionBundleType bundleType) { + ExtensionBundleEntity existingBundleEntity = metadataService.getExtensionBundle(bucketId, groupId, artifactId); + if (existingBundleEntity == null) { + final ExtensionBundle bundle = new ExtensionBundle(); + bundle.setIdentifier(UUID.randomUUID().toString()); + bundle.setBucketIdentifier(bucketId); + bundle.setName(groupId + " - " + artifactId); --- End diff -- Don't know how others feel about this, but personally I would have a slight preference for using the Maven coordinate style here for the inferred bundle name, i.e.: bundle.setName(groupId + ":" + artifactId);
---