kevdoran commented on code in PR #10909:
URL: https://github.com/apache/nifi/pull/10909#discussion_r2823831869
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java:
##########
@@ -367,17 +404,158 @@ public FrameworkConnectorInitializationContextBuilder
createInitializationContex
return new StandardConnectorInitializationContext.Builder();
}
+ // ConnectorAssetRepository is an internal implementation detail;
+ // all external callers should use the asset methods on
ConnectorRepository directly.
+
+ @Override
+ public Asset storeAsset(final String connectorId, final String assetId,
final String assetName, final InputStream content) throws IOException {
+ if (configurationProvider == null) {
+ return assetRepository.storeAsset(connectorId, assetId, assetName,
content);
+ }
+
+ // Buffer content so we can send it to both the local store and the
provider
+ final byte[] contentBytes = content.readAllBytes();
+
+ final Asset localAsset = assetRepository.storeAsset(connectorId,
assetId, assetName, new java.io.ByteArrayInputStream(contentBytes));
+
+ try {
+ final String externalId =
configurationProvider.storeAsset(connectorId, assetName, new
java.io.ByteArrayInputStream(contentBytes));
+ recordIdMapping(connectorId, assetId, externalId);
+ logger.debug("Stored asset [nifiId={}, externalId={}] for
connector [{}]", assetId, externalId, connectorId);
+ } catch (final Exception e) {
+ logger.error("Failed to store asset [{}] to provider for connector
[{}]; rolling back local asset", assetName, connectorId, e);
+ assetRepository.deleteAsset(assetId);
+ throw new IOException("Failed to store asset to provider", e);
+ }
+
+ return localAsset;
+ }
+
+ @Override
+ public Optional<Asset> getAsset(final String assetId) {
+ return assetRepository.getAsset(assetId);
+ }
+
@Override
- public ConnectorAssetRepository getAssetRepository() {
- return assetRepository;
+ public List<Asset> getAssets(final String connectorId) {
+ return assetRepository.getAssets(connectorId);
+ }
+
+ @Override
+ public void deleteAssets(final String connectorId) {
+ if (configurationProvider != null) {
+ final Map<String, String> nifiToExt =
nifiToExternalId.getOrDefault(connectorId, Map.of());
+ for (final Map.Entry<String, String> entry : nifiToExt.entrySet())
{
+ try {
+ configurationProvider.deleteAsset(connectorId,
entry.getValue());
+ } catch (final Exception e) {
+ logger.warn("Failed to delete asset [externalId={}] from
provider for connector [{}]", entry.getValue(), connectorId, e);
+ }
+ }
+ nifiToExternalId.remove(connectorId);
+ externalToNifiId.remove(connectorId);
+ }
+ cachedAssetMetadata.remove(connectorId);
+ assetRepository.deleteAssets(connectorId);
+ }
+
+ // --- ID Mapping Helpers ---
+
+ private void recordIdMapping(final String connectorId, final String
nifiUuid, final String externalId) {
+ nifiToExternalId.computeIfAbsent(connectorId, k -> new
ConcurrentHashMap<>()).put(nifiUuid, externalId);
+ externalToNifiId.computeIfAbsent(connectorId, k -> new
ConcurrentHashMap<>()).put(externalId, nifiUuid);
+ }
+
+ private void removeIdMapping(final String connectorId, final String
nifiUuid) {
+ final Map<String, String> nifiToExt =
nifiToExternalId.get(connectorId);
+ if (nifiToExt != null) {
+ final String externalId = nifiToExt.remove(nifiUuid);
+ if (externalId != null) {
+ final Map<String, String> extToNifi =
externalToNifiId.get(connectorId);
+ if (extToNifi != null) {
+ extToNifi.remove(externalId);
+ }
+ }
+ }
+ }
+
+ private String lookupNifiUuid(final String connectorId, final String
externalId) {
+ final Map<String, String> extToNifi =
externalToNifiId.get(connectorId);
+ return extToNifi != null ? extToNifi.get(externalId) : null;
+ }
+
+ private String lookupExternalId(final String connectorId, final String
nifiUuid) {
+ final Map<String, String> nifiToExt =
nifiToExternalId.get(connectorId);
+ return nifiToExt != null ? nifiToExt.get(nifiUuid) : null;
+ }
+
+ private ConnectorAssetMetadata findAssetMetadata(final
List<ConnectorAssetMetadata> metadataList, final String externalId) {
+ for (final ConnectorAssetMetadata metadata : metadataList) {
+ if (externalId.equals(metadata.getIdentifier())) {
+ return metadata;
+ }
+ }
+ return null;
+ }
+
+ // --- Asset Sync from Provider ---
+
+ @Override
+ public void syncAssetsFromProvider(final ConnectorNode connector) {
+ if (configurationProvider == null) {
+ return;
+ }
+
+ final String connectorId = connector.getIdentifier();
+ final List<ConnectorAssetMetadata> metadataList =
cachedAssetMetadata.getOrDefault(connectorId, List.of());
+
+ for (final ConnectorAssetMetadata metadata : metadataList) {
+ final String externalId = metadata.getIdentifier();
+ final String nifiUuid = lookupNifiUuid(connectorId, externalId);
+ if (nifiUuid == null) {
+ logger.warn("No NiFi UUID mapping found for external asset
[{}] in connector [{}]; skipping sync", externalId, connectorId);
+ continue;
+ }
+
+ final Optional<Asset> localAssetOpt =
assetRepository.getAsset(nifiUuid);
+ final boolean localFileMissing = localAssetOpt.isEmpty() ||
!localAssetOpt.get().getFile().exists();
+
+ final String providerDigest = metadata.getDigest();
+ final String lastDigest =
lastDownloadedProviderDigest.get(nifiUuid);
+ final boolean digestChanged = providerDigest != null &&
!providerDigest.equals(lastDigest);
+
+ if (localFileMissing || digestChanged || lastDigest == null) {
Review Comment:
Yeah that was a downside of my initial approach. It meant that on a
restart/scale up that we would have to re-fetch every connector asset from the
provider in order to rebuild this mapping. My new approach puts the asset
mapping into the provider impl, so that the provider has more flexibilty for
how to retain that. So providers may want to store that mapping in a remote
persistence such as a DB. Other impls might want to persist the mapping to
local filesystem on the nifi node.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]