kevdoran commented on code in PR #10909:
URL: https://github.com/apache/nifi/pull/10909#discussion_r2823831869


##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -367,17 +404,158 @@ public FrameworkConnectorInitializationContextBuilder 
createInitializationContex
         return new StandardConnectorInitializationContext.Builder();
     }
 
+    // ConnectorAssetRepository is an internal implementation detail;
+    // all external callers should use the asset methods on 
ConnectorRepository directly.
+
+    @Override
+    public Asset storeAsset(final String connectorId, final String assetId, 
final String assetName, final InputStream content) throws IOException {
+        if (configurationProvider == null) {
+            return assetRepository.storeAsset(connectorId, assetId, assetName, 
content);
+        }
+
+        // Buffer content so we can send it to both the local store and the 
provider
+        final byte[] contentBytes = content.readAllBytes();
+
+        final Asset localAsset = assetRepository.storeAsset(connectorId, 
assetId, assetName, new java.io.ByteArrayInputStream(contentBytes));
+
+        try {
+            final String externalId = 
configurationProvider.storeAsset(connectorId, assetName, new 
java.io.ByteArrayInputStream(contentBytes));
+            recordIdMapping(connectorId, assetId, externalId);
+            logger.debug("Stored asset [nifiId={}, externalId={}] for 
connector [{}]", assetId, externalId, connectorId);
+        } catch (final Exception e) {
+            logger.error("Failed to store asset [{}] to provider for connector 
[{}]; rolling back local asset", assetName, connectorId, e);
+            assetRepository.deleteAsset(assetId);
+            throw new IOException("Failed to store asset to provider", e);
+        }
+
+        return localAsset;
+    }
+
+    @Override
+    public Optional<Asset> getAsset(final String assetId) {
+        return assetRepository.getAsset(assetId);
+    }
+
     @Override
-    public ConnectorAssetRepository getAssetRepository() {
-        return assetRepository;
+    public List<Asset> getAssets(final String connectorId) {
+        return assetRepository.getAssets(connectorId);
+    }
+
+    @Override
+    public void deleteAssets(final String connectorId) {
+        if (configurationProvider != null) {
+            final Map<String, String> nifiToExt = 
nifiToExternalId.getOrDefault(connectorId, Map.of());
+            for (final Map.Entry<String, String> entry : nifiToExt.entrySet()) 
{
+                try {
+                    configurationProvider.deleteAsset(connectorId, 
entry.getValue());
+                } catch (final Exception e) {
+                    logger.warn("Failed to delete asset [externalId={}] from 
provider for connector [{}]", entry.getValue(), connectorId, e);
+                }
+            }
+            nifiToExternalId.remove(connectorId);
+            externalToNifiId.remove(connectorId);
+        }
+        cachedAssetMetadata.remove(connectorId);
+        assetRepository.deleteAssets(connectorId);
+    }
+
+    // --- ID Mapping Helpers ---
+
+    private void recordIdMapping(final String connectorId, final String 
nifiUuid, final String externalId) {
+        nifiToExternalId.computeIfAbsent(connectorId, k -> new 
ConcurrentHashMap<>()).put(nifiUuid, externalId);
+        externalToNifiId.computeIfAbsent(connectorId, k -> new 
ConcurrentHashMap<>()).put(externalId, nifiUuid);
+    }
+
+    private void removeIdMapping(final String connectorId, final String 
nifiUuid) {
+        final Map<String, String> nifiToExt = 
nifiToExternalId.get(connectorId);
+        if (nifiToExt != null) {
+            final String externalId = nifiToExt.remove(nifiUuid);
+            if (externalId != null) {
+                final Map<String, String> extToNifi = 
externalToNifiId.get(connectorId);
+                if (extToNifi != null) {
+                    extToNifi.remove(externalId);
+                }
+            }
+        }
+    }
+
+    private String lookupNifiUuid(final String connectorId, final String 
externalId) {
+        final Map<String, String> extToNifi = 
externalToNifiId.get(connectorId);
+        return extToNifi != null ? extToNifi.get(externalId) : null;
+    }
+
+    private String lookupExternalId(final String connectorId, final String 
nifiUuid) {
+        final Map<String, String> nifiToExt = 
nifiToExternalId.get(connectorId);
+        return nifiToExt != null ? nifiToExt.get(nifiUuid) : null;
+    }
+
+    private ConnectorAssetMetadata findAssetMetadata(final 
List<ConnectorAssetMetadata> metadataList, final String externalId) {
+        for (final ConnectorAssetMetadata metadata : metadataList) {
+            if (externalId.equals(metadata.getIdentifier())) {
+                return metadata;
+            }
+        }
+        return null;
+    }
+
+    // --- Asset Sync from Provider ---
+
+    @Override
+    public void syncAssetsFromProvider(final ConnectorNode connector) {
+        if (configurationProvider == null) {
+            return;
+        }
+
+        final String connectorId = connector.getIdentifier();
+        final List<ConnectorAssetMetadata> metadataList = 
cachedAssetMetadata.getOrDefault(connectorId, List.of());
+
+        for (final ConnectorAssetMetadata metadata : metadataList) {
+            final String externalId = metadata.getIdentifier();
+            final String nifiUuid = lookupNifiUuid(connectorId, externalId);
+            if (nifiUuid == null) {
+                logger.warn("No NiFi UUID mapping found for external asset 
[{}] in connector [{}]; skipping sync", externalId, connectorId);
+                continue;
+            }
+
+            final Optional<Asset> localAssetOpt = 
assetRepository.getAsset(nifiUuid);
+            final boolean localFileMissing = localAssetOpt.isEmpty() || 
!localAssetOpt.get().getFile().exists();
+
+            final String providerDigest = metadata.getDigest();
+            final String lastDigest = 
lastDownloadedProviderDigest.get(nifiUuid);
+            final boolean digestChanged = providerDigest != null && 
!providerDigest.equals(lastDigest);
+
+            if (localFileMissing || digestChanged || lastDigest == null) {

Review Comment:
   Yeah that was a downside of my initial approach. It meant that on a 
restart/scale up that we would have to re-fetch every connector asset from the 
provider in order to rebuild this mapping. My new approach puts the asset 
mapping into the provider impl, so that the provider has more flexibilty for 
how to retain that. So providers may want to store that mapping in a remote 
persistence such as a DB. Other impls might want to persist the mapping to 
local filesystem on the nifi node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to