This is an automated email from the ASF dual-hosted git repository.
mcgilman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new ce050f4 NIFI-7009: Atlas reporting task retrieves only the active
flow components
ce050f4 is described below
commit ce050f4ecb839df24b8e5235f91a48acfde30e01
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Sat Jan 4 22:48:44 2020 +0100
NIFI-7009: Atlas reporting task retrieves only the active flow components
Filter out the deleted components before querying them, instead of
retrieving
all the components before filtering.
This closes #3979
---
.../org/apache/nifi/atlas/NiFiAtlasClient.java | 38 +++++++++++++++++++---
1 file changed, 34 insertions(+), 4 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
index 49eacd1..e40e034 100644
---
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
+++
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/NiFiAtlasClient.java
@@ -193,6 +193,7 @@ public class NiFiAtlasClient {
}
final AtlasEntity nifiFlowEntity = nifiFlowExt.getEntity();
+ final Map<String, AtlasEntity> nifiFlowReferredEntities =
nifiFlowExt.getReferredEntities();
final Map<String, Object> attributes = nifiFlowEntity.getAttributes();
final NiFiFlow nifiFlow = new NiFiFlow(rootProcessGroupId);
nifiFlow.setExEntity(nifiFlowEntity);
@@ -201,12 +202,12 @@ public class NiFiAtlasClient {
nifiFlow.setUrl(toStr(attributes.get(ATTR_URL)));
nifiFlow.setDescription(toStr(attributes.get(ATTR_DESCRIPTION)));
-
nifiFlow.getQueues().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_QUEUES))));
-
nifiFlow.getRootInputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_INPUT_PORTS))));
-
nifiFlow.getRootOutputPortEntities().putAll(toQualifiedNameIds(toAtlasObjectIds(nifiFlowEntity.getAttribute(ATTR_OUTPUT_PORTS))));
+ nifiFlow.getQueues().putAll(fetchFlowComponents(TYPE_NIFI_QUEUE,
nifiFlowReferredEntities));
+
nifiFlow.getRootInputPortEntities().putAll(fetchFlowComponents(TYPE_NIFI_INPUT_PORT,
nifiFlowReferredEntities));
+
nifiFlow.getRootOutputPortEntities().putAll(fetchFlowComponents(TYPE_NIFI_OUTPUT_PORT,
nifiFlowReferredEntities));
final Map<String, NiFiFlowPath> flowPaths = nifiFlow.getFlowPaths();
- final Map<AtlasObjectId, AtlasEntity> flowPathEntities =
toQualifiedNameIds(toAtlasObjectIds(attributes.get(ATTR_FLOW_PATHS)));
+ final Map<AtlasObjectId, AtlasEntity> flowPathEntities =
fetchFlowComponents(TYPE_NIFI_FLOW_PATH, nifiFlowReferredEntities);
for (AtlasEntity flowPathEntity : flowPathEntities.values()) {
final String pathQualifiedName =
toStr(flowPathEntity.getAttribute(ATTR_QUALIFIED_NAME));
@@ -230,6 +231,35 @@ public class NiFiAtlasClient {
return nifiFlow;
}
+ /**
+ * Retrieves the flow components of type {@code componentType} from Atlas
server.
+ * Deleted components will be filtered out before calling Atlas.
+ * Atlas object ids will be initialized with all the attributes (guid,
type, unique attributes) in order to be able
+ * to match ids retrieved from Atlas (having guid) and ids created by the
reporting task (not having guid yet).
+ *
+ * @param componentType Atlas type of the flow component (nifi_flow_path,
nifi_queue, nifi_input_port, nifi_output_port)
+ * @param referredEntities referred entities of the flow entity (returned
when the flow fetched) containing the basic data (id, status) of the flow
components
+ * @return flow component entities mapped to their object ids
+ */
+ private Map<AtlasObjectId, AtlasEntity> fetchFlowComponents(String
componentType, Map<String, AtlasEntity> referredEntities) {
+ return referredEntities.values().stream()
+ .filter(referredEntity ->
referredEntity.getTypeName().equals(componentType))
+ .filter(referredEntity -> referredEntity.getStatus() ==
AtlasEntity.Status.ACTIVE)
+ .map(referredEntity -> {
+ final Map<String, Object> uniqueAttributes =
Collections.singletonMap(ATTR_QUALIFIED_NAME,
referredEntity.getAttribute(ATTR_QUALIFIED_NAME));
+ final AtlasObjectId id = new
AtlasObjectId(referredEntity.getGuid(), componentType, uniqueAttributes);
+ try {
+ final AtlasEntity.AtlasEntityWithExtInfo
fetchedEntityExt = searchEntityDef(id);
+ return new Tuple<>(id, fetchedEntityExt.getEntity());
+ } catch (AtlasServiceException e) {
+ logger.warn("Failed to search entity by id {}, due to
{}", id, e);
+ return null;
+ }
+ })
+ .filter(Objects::nonNull)
+ .collect(Collectors.toMap(Tuple::getKey, Tuple::getValue));
+ }
+
@SuppressWarnings("unchecked")
private List<AtlasObjectId> toAtlasObjectIds(Object _references) {
if (_references == null) {