turcsanyip commented on code in PR #6504:
URL: https://github.com/apache/nifi/pull/6504#discussion_r1027320316


##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-api/pom.xml:
##########
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>

Review Comment:
   `nifi-api`'s version and scope are defined in `dependecyManagement` in 
parent pom and they do not need to be specified here.
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-api</artifactId>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services/pom.xml:
##########
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.asana</groupId>
+            <artifactId>asana</artifactId>
+        </dependency>

Review Comment:
   The `asana` dependency should be `provided` here because it is available 
from `nifi-asana-services-api-nar` which is the parent nar of this nar (jar-s 
from the parent nar will be loaded first and they are available in the child 
nar-s).
   ```suggestion
           <dependency>
               <groupId>com.asana</groupId>
               <artifactId>asana</artifactId>
               <scope>provided</scope>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/pom.xml:
##########
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>4.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.asana</groupId>
+            <artifactId>asana</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-asana-services-api</artifactId>
+        </dependency>

Review Comment:
   Similar to `nifi-asana-services`' pom:
   - no version needed for `nifi-api`
   - `compile` scope for `nifi-utils` is redundant
   - `asana` and `nifi-asana-services-api` should be provided



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/main/java/org/apache/nifi/processors/asana/GetAsanaObject.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.asana;
+
+import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.commons.collections4.ListUtils.partition;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.http.entity.ContentType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.asana.AsanaClient;
+import org.apache.nifi.controller.asana.AsanaClientProviderService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.asana.utils.AsanaObject;
+import org.apache.nifi.processors.asana.utils.AsanaObjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaObjectState;
+import org.apache.nifi.processors.asana.utils.AsanaProjectEventFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectMembershipFetcher;
+import 
org.apache.nifi.processors.asana.utils.AsanaProjectStatusAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaStoryFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTagFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamMemberFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaUserFetcher;
+import org.apache.nifi.reporting.InitializationException;
+
+@TriggerSerially
+@PrimaryNodeOnly
+@Stateful(scopes = {Scope.CLUSTER}, description = "Fingerprints of items in 
the last successful query are stored in order to enable incremental loading and 
change detection.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttribute(attribute = GetAsanaObject.ASANA_GID, description = "Global 
ID of the object in Asana.")
+@Tags({"asana", "source", "ingest"})
+@CapabilityDescription("This processor collects data from Asana")
+public class GetAsanaObject extends AbstractProcessor {
+
+    protected static final String ASANA_GID = "asana.gid";
+    protected static final String AV_NAME_COLLECT_TASKS = 
"asana-collect-tasks";
+    protected static final String AV_NAME_COLLECT_TASK_ATTACHMENTS = 
"asana-collect-task-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECTS = 
"asana-collect-projects";
+    protected static final String AV_NAME_COLLECT_TAGS = "asana-collect-tags";
+    protected static final String AV_NAME_COLLECT_USERS = 
"asana-collect-users";
+    protected static final String AV_NAME_COLLECT_PROJECT_MEMBERS = 
"asana-collect-project-members";
+    protected static final String AV_NAME_COLLECT_TEAMS = 
"asana-collect-teams";
+    protected static final String AV_NAME_COLLECT_TEAM_MEMBERS = 
"asana-collect-team-members";
+    protected static final String AV_NAME_COLLECT_STORIES = 
"asana-collect-stories";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_UPDATES = 
"asana-collect-project-status-updates";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS = 
"asana-collect-project-status-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECT_EVENTS = 
"asana-collect-project-events";
+    protected static final String ASANA_CONTROLLER_SERVICE = 
"asana-controller-service";
+    protected static final String ASANA_OBJECT_TYPE = "asana-object-type";
+    protected static final String ASANA_PROJECT_NAME = "asana-project-name";
+    protected static final String ASANA_SECTION_NAME = "asana-section-name";
+    protected static final String ASANA_TAG_NAME = "asana-tag-name";
+    protected static final String ASANA_TEAM_NAME = "asana-team-name";
+    protected static final String ASANA_OUTPUT_BATCH_SIZE = 
"asana-output-batch-size";
+    protected static final String REL_NAME_NEW = "new";
+    protected static final String REL_NAME_UPDATED = "updated";
+    protected static final String REL_NAME_REMOVED = "removed";
+
+    protected static final AllowableValue AV_COLLECT_TASKS = new 
AllowableValue(
+            AV_NAME_COLLECT_TASKS,
+            "Tasks",
+            "Collect tasks matching to the specified conditions.");
+
+    protected static final AllowableValue AV_COLLECT_TASK_ATTACHMENTS = new 
AllowableValue(
+            AV_NAME_COLLECT_TASK_ATTACHMENTS,
+            "Task Attachments",
+            "Collect attached files of tasks matching to the specified 
conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECTS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECTS,
+            "Projects",
+            "Collect projects of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TAGS = new AllowableValue(
+            AV_NAME_COLLECT_TAGS,
+            "Tags",
+            "Collect tags of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_USERS = new 
AllowableValue(
+            AV_NAME_COLLECT_USERS,
+            "Users",
+            "Collect users assigned to the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_MEMBERS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECT_MEMBERS,
+            "Members of a Project",
+            "Collect users assigned to the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAMS = new 
AllowableValue(
+            AV_NAME_COLLECT_TEAMS,
+            "Teams",
+            "Collect teams of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAM_MEMBERS = new 
AllowableValue(
+            AV_NAME_COLLECT_TEAM_MEMBERS,
+            "Team Members",
+            "Collect users assigned to the specified team."
+    );
+
+    protected static final AllowableValue AV_COLLECT_STORIES = new 
AllowableValue(
+            AV_NAME_COLLECT_STORIES,
+            "Stories of Tasks",
+            "Collect stories (comments) of of tasks matching to the specified 
conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_UPDATES = 
new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_UPDATES,
+            "Status Updates of a Project",
+            "Collect status updates of the specified project."
+    );
+
+    protected static final AllowableValue 
AV_COLLECT_PROJECT_STATUS_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+            "Attachments of Status Updates",
+            "Collect attached files of project status updates."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_EVENTS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECT_EVENTS,
+            "Events of a Project",
+            "Collect various events happening on the specified project and on 
its' tasks."
+    );
+
+    protected static final PropertyDescriptor PROP_ASANA_CONTROLLER_SERVICE = 
new PropertyDescriptor.Builder()
+            .name(ASANA_CONTROLLER_SERVICE)
+            .displayName("Asana Controller Service")
+            .description("Specify which controller service to use for 
accessing Asana.")
+            .required(true)
+            .identifiesControllerService(AsanaClientProviderService.class)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OBJECT_TYPE = new 
PropertyDescriptor.Builder()
+            .name(ASANA_OBJECT_TYPE)
+            .displayName("Object Type")
+            .description("Specify what kind of objects to be collected from 
Asana")
+            .required(true)
+            .allowableValues(
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECTS,
+                    AV_COLLECT_TAGS,
+                    AV_COLLECT_USERS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_TEAMS,
+                    AV_COLLECT_TEAM_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .defaultValue(AV_COLLECT_TASKS.getValue())
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_PROJECT = new 
PropertyDescriptor.Builder()
+            .name(ASANA_PROJECT_NAME)
+            .displayName("Project Name")
+            .description("Fetch only objects in this project. Case sensitive.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(
+                PROP_ASANA_OBJECT_TYPE,
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_SECTION = new 
PropertyDescriptor.Builder()
+            .name(ASANA_SECTION_NAME)
+            .displayName("Section Name")
+            .description("Fetch only objects in this section. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, 
AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TAG = new 
PropertyDescriptor.Builder()
+            .name(ASANA_TAG_NAME)
+            .displayName("Tag")
+            .description("Fetch only objects having this tag. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, 
AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TEAM_NAME = new 
PropertyDescriptor.Builder()
+            .name(ASANA_TEAM_NAME)
+            .displayName("Team")
+            .description("Team name. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TEAM_MEMBERS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OUTPUT_BATCH_SIZE = 
new PropertyDescriptor.Builder()
+            .name(ASANA_OUTPUT_BATCH_SIZE)
+            .displayName("Output Batch Size")
+            .description("The number of items batched together in a single 
Flow File. If set to 1 (default), then each item is"
+                    + " transferred in a separate Flow File and each will have 
an asana.gid attribute, to help identifying"
+                    + " the fetched item on the server side, if needed. If the 
batch size is greater than 1, then the"
+                    + " specified amount of items are batched together in a 
single Flow File as a Json array, and the"
+                    + " Flow Files won't have the asana.gid attribute.")
+            .defaultValue("1")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    protected static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            PROP_ASANA_CONTROLLER_SERVICE,
+            PROP_ASANA_OBJECT_TYPE,
+            PROP_ASANA_PROJECT,
+            PROP_ASANA_SECTION,
+            PROP_ASANA_TEAM_NAME,
+            PROP_ASANA_TAG,
+            PROP_ASANA_OUTPUT_BATCH_SIZE
+    ));
+
+    protected static final Relationship REL_NEW = new Relationship.Builder()
+            .name(REL_NAME_NEW)
+            .description("Newly collected objects are routed to this 
relationship.")
+            .build();
+
+    protected static final Relationship REL_UPDATED = new 
Relationship.Builder()
+            .name(REL_NAME_UPDATED)
+            .description("Objects that have already been collected earlier, 
but were updated since, are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_REMOVED = new 
Relationship.Builder()
+            .name(REL_NAME_REMOVED)
+            .description("Notification about deleted objects are routed to 
this relationship. "
+                    + "Flow files will not have any payload. IDs of the 
resources no longer exist "
+                    + "are carried by the asana.gid attribute of the generated 
FlowFiles.")
+            .build();
+
+    protected static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_NEW,
+            REL_UPDATED,
+            REL_REMOVED
+    )));
+
+    private static final Scope STATE_STORAGE_SCOPE = Scope.CLUSTER;
+
+    private volatile AsanaObjectFetcher objectFetcher;
+    private volatile Integer batchSize;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public synchronized void onScheduled(final ProcessContext context) throws 
InitializationException {

Review Comment:
   We don't use `synchronized` for `onScheduled()`. It is not necessary, 
because the framework always calls it on a single thread.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-api-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar
+Copyright 2015-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2017 The Apache Software Foundation
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and 
has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different 
licenses.
+      To find the details that apply to this artifact see the accompanying 
LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.
+
+
+===========================================
+The MIT License
+===========================================
+
+The following binary components are provided under the MIT License
+
+  (MIT License) Java client library for the Asana API
+    The following NOTICE information applies:
+      Asana
+      Copyright (c) 2015

Review Comment:
   Asana library brings in a couple of transitive dependencies that also need 
to be added in the NOTICE file (e.g. checker-qual). Please check the jar list 
in the nar file after the build and add the necessary entries in the NOTICE 
file.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services/pom.xml:
##########
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.asana</groupId>
+            <artifactId>asana</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>

Review Comment:
   `compile` scope is the default and we don't use it explicitly if not needed.
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-utils</artifactId>
               <version>1.19.0-SNAPSHOT</version>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services/pom.xml:
##########
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>

Review Comment:
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-api</artifactId>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-api-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar
+Copyright 2015-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2017 The Apache Software Foundation
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and 
has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different 
licenses.
+      To find the details that apply to this artifact see the accompanying 
LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.

Review Comment:
   As far as I see, Jackson is not present in the built nar so this entry 
should be deleted.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/pom.xml:
##########
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-collections4</artifactId>
+            <version>4.4</version>
+        </dependency>
+        <dependency>
+            <groupId>com.asana</groupId>
+            <artifactId>asana</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-asana-services-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   This is a duplicated dependency and should be deleted. 
`mockito-junit-jupiter` comes from the project root pom.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services/pom.xml:
##########
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>

Review Comment:
   Could you please organize the dependencies in all the poms using the 
following order:
   - nifi prod dependencies
   - 3rd party prod dependencies
   - test dependencies
   



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services/pom.xml:
##########
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-asana-bundle</artifactId>
+        <version>1.19.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-asana-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.asana</groupId>
+            <artifactId>asana</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-asana-services-api</artifactId>
+        </dependency>

Review Comment:
   Similar to the `asana` dependency, it should be `provided`.
   ```suggestion
           <dependency>
               <groupId>org.apache.nifi</groupId>
               <artifactId>nifi-asana-services-api</artifactId>
               <scope>provided</scope>
           </dependency>
   ```



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar

Review Comment:
   ```suggestion
   nifi-asana-processors-nar
   ```
   Please fix the header for the other NOTICE files too. I must match the name 
of the nar bundle.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar
+Copyright 2015-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2017 The Apache Software Foundation
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and 
has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different 
licenses.
+      To find the details that apply to this artifact see the accompanying 
LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.

Review Comment:
   Jackson is not bundled, it should be deleted.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar
+Copyright 2015-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2017 The Apache Software Foundation
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and 
has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different 
licenses.
+      To find the details that apply to this artifact see the accompanying 
LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.
+
+
+===========================================
+The MIT License
+===========================================
+
+The following binary components are provided under the MIT License
+
+  (MIT License) Java client library for the Asana API
+    The following NOTICE information applies:
+      Asana
+      Copyright (c) 2015

Review Comment:
   `asana` dependency is provided in this nar (see my earlier comment) and 
therefore this entry should be deleted.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-services-api/src/main/java/org/apache/nifi/controller/asana/AsanaClient.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.asana;
+
+import com.asana.models.Attachment;
+import com.asana.models.Project;
+import com.asana.models.ProjectMembership;
+import com.asana.models.ProjectStatus;
+import com.asana.models.Section;
+import com.asana.models.Story;
+import com.asana.models.Tag;
+import com.asana.models.Task;
+import com.asana.models.Team;
+import com.asana.models.User;
+
+import java.util.Map;
+
+/**
+ * This interface represents a client to Asana REST server, with some basic 
filtering options built in.
+ */
+public interface AsanaClient {
+    /**
+     * Find & retrieve an Asana project by its name. If multiple projects 
match, returns the first.
+     * If there is no match, then {@link RuntimeException} is thrown. Note 
that constant ordering

Review Comment:
   Please do not throw raw `RuntimeException`s in the code. Please use 
`ProcessException` or `UncheckedIOException` instead, or you can also define a 
custom runtime exception like `AsanaException`.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/main/java/org/apache/nifi/processors/asana/GetAsanaObject.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.asana;
+
+import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.commons.collections4.ListUtils.partition;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.http.entity.ContentType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.asana.AsanaClient;
+import org.apache.nifi.controller.asana.AsanaClientProviderService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.asana.utils.AsanaObject;
+import org.apache.nifi.processors.asana.utils.AsanaObjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaObjectState;
+import org.apache.nifi.processors.asana.utils.AsanaProjectEventFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectMembershipFetcher;
+import 
org.apache.nifi.processors.asana.utils.AsanaProjectStatusAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaStoryFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTagFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamMemberFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaUserFetcher;
+import org.apache.nifi.reporting.InitializationException;
+
+@TriggerSerially
+@PrimaryNodeOnly
+@Stateful(scopes = {Scope.CLUSTER}, description = "Fingerprints of items in 
the last successful query are stored in order to enable incremental loading and 
change detection.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttribute(attribute = GetAsanaObject.ASANA_GID, description = "Global 
ID of the object in Asana.")
+@Tags({"asana", "source", "ingest"})
+@CapabilityDescription("This processor collects data from Asana")
+public class GetAsanaObject extends AbstractProcessor {
+
+    protected static final String ASANA_GID = "asana.gid";
+    protected static final String AV_NAME_COLLECT_TASKS = 
"asana-collect-tasks";
+    protected static final String AV_NAME_COLLECT_TASK_ATTACHMENTS = 
"asana-collect-task-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECTS = 
"asana-collect-projects";
+    protected static final String AV_NAME_COLLECT_TAGS = "asana-collect-tags";
+    protected static final String AV_NAME_COLLECT_USERS = 
"asana-collect-users";
+    protected static final String AV_NAME_COLLECT_PROJECT_MEMBERS = 
"asana-collect-project-members";
+    protected static final String AV_NAME_COLLECT_TEAMS = 
"asana-collect-teams";
+    protected static final String AV_NAME_COLLECT_TEAM_MEMBERS = 
"asana-collect-team-members";
+    protected static final String AV_NAME_COLLECT_STORIES = 
"asana-collect-stories";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_UPDATES = 
"asana-collect-project-status-updates";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS = 
"asana-collect-project-status-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECT_EVENTS = 
"asana-collect-project-events";

Review Comment:
   I'd suggest creating an `enum` for these allowable values, like 
[HubSpotObjectType](https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-hubspot-bundle/nifi-hubspot-processors/src/main/java/org/apache/nifi/processors/hubspot/HubSpotObjectType.java).



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors-nar/src/main/resources/META-INF/NOTICE:
##########
@@ -0,0 +1,51 @@
+nifi-asana-nar
+Copyright 2015-2022 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License 
v2
+
+  (ASLv2) Apache Commons IO
+    The following NOTICE information applies:
+      Apache Commons IO
+      Copyright 2002-2017 The Apache Software Foundation
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.salora...@iki.fi), and 
has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as 
supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different 
licenses.
+      To find the details that apply to this artifact see the accompanying 
LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.
+
+
+===========================================
+The MIT License
+===========================================
+
+The following binary components are provided under the MIT License
+
+  (MIT License) Java client library for the Asana API
+    The following NOTICE information applies:
+      Asana
+      Copyright (c) 2015

Review Comment:
   Similar to `nifi-asana-services-nar`'s NOTICE file:
   - Jackson is not present
   - Asana is provided and should be delete from here



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/main/java/org/apache/nifi/processors/asana/GetAsanaObject.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.asana;
+
+import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.commons.collections4.ListUtils.partition;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.http.entity.ContentType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.asana.AsanaClient;
+import org.apache.nifi.controller.asana.AsanaClientProviderService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.asana.utils.AsanaObject;
+import org.apache.nifi.processors.asana.utils.AsanaObjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaObjectState;
+import org.apache.nifi.processors.asana.utils.AsanaProjectEventFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectMembershipFetcher;
+import 
org.apache.nifi.processors.asana.utils.AsanaProjectStatusAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaStoryFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTagFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamMemberFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaUserFetcher;
+import org.apache.nifi.reporting.InitializationException;
+
+@TriggerSerially
+@PrimaryNodeOnly
+@Stateful(scopes = {Scope.CLUSTER}, description = "Fingerprints of items in 
the last successful query are stored in order to enable incremental loading and 
change detection.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttribute(attribute = GetAsanaObject.ASANA_GID, description = "Global 
ID of the object in Asana.")
+@Tags({"asana", "source", "ingest"})
+@CapabilityDescription("This processor collects data from Asana")
+public class GetAsanaObject extends AbstractProcessor {
+
+    protected static final String ASANA_GID = "asana.gid";
+    protected static final String AV_NAME_COLLECT_TASKS = 
"asana-collect-tasks";
+    protected static final String AV_NAME_COLLECT_TASK_ATTACHMENTS = 
"asana-collect-task-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECTS = 
"asana-collect-projects";
+    protected static final String AV_NAME_COLLECT_TAGS = "asana-collect-tags";
+    protected static final String AV_NAME_COLLECT_USERS = 
"asana-collect-users";
+    protected static final String AV_NAME_COLLECT_PROJECT_MEMBERS = 
"asana-collect-project-members";
+    protected static final String AV_NAME_COLLECT_TEAMS = 
"asana-collect-teams";
+    protected static final String AV_NAME_COLLECT_TEAM_MEMBERS = 
"asana-collect-team-members";
+    protected static final String AV_NAME_COLLECT_STORIES = 
"asana-collect-stories";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_UPDATES = 
"asana-collect-project-status-updates";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS = 
"asana-collect-project-status-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECT_EVENTS = 
"asana-collect-project-events";
+    protected static final String ASANA_CONTROLLER_SERVICE = 
"asana-controller-service";
+    protected static final String ASANA_OBJECT_TYPE = "asana-object-type";
+    protected static final String ASANA_PROJECT_NAME = "asana-project-name";
+    protected static final String ASANA_SECTION_NAME = "asana-section-name";
+    protected static final String ASANA_TAG_NAME = "asana-tag-name";
+    protected static final String ASANA_TEAM_NAME = "asana-team-name";
+    protected static final String ASANA_OUTPUT_BATCH_SIZE = 
"asana-output-batch-size";
+    protected static final String REL_NAME_NEW = "new";
+    protected static final String REL_NAME_UPDATED = "updated";
+    protected static final String REL_NAME_REMOVED = "removed";
+
+    protected static final AllowableValue AV_COLLECT_TASKS = new 
AllowableValue(
+            AV_NAME_COLLECT_TASKS,
+            "Tasks",
+            "Collect tasks matching to the specified conditions.");
+
+    protected static final AllowableValue AV_COLLECT_TASK_ATTACHMENTS = new 
AllowableValue(
+            AV_NAME_COLLECT_TASK_ATTACHMENTS,
+            "Task Attachments",
+            "Collect attached files of tasks matching to the specified 
conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECTS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECTS,
+            "Projects",
+            "Collect projects of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TAGS = new AllowableValue(
+            AV_NAME_COLLECT_TAGS,
+            "Tags",
+            "Collect tags of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_USERS = new 
AllowableValue(
+            AV_NAME_COLLECT_USERS,
+            "Users",
+            "Collect users assigned to the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_MEMBERS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECT_MEMBERS,
+            "Members of a Project",
+            "Collect users assigned to the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAMS = new 
AllowableValue(
+            AV_NAME_COLLECT_TEAMS,
+            "Teams",
+            "Collect teams of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAM_MEMBERS = new 
AllowableValue(
+            AV_NAME_COLLECT_TEAM_MEMBERS,
+            "Team Members",
+            "Collect users assigned to the specified team."
+    );
+
+    protected static final AllowableValue AV_COLLECT_STORIES = new 
AllowableValue(
+            AV_NAME_COLLECT_STORIES,
+            "Stories of Tasks",
+            "Collect stories (comments) of of tasks matching to the specified 
conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_UPDATES = 
new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_UPDATES,
+            "Status Updates of a Project",
+            "Collect status updates of the specified project."
+    );
+
+    protected static final AllowableValue 
AV_COLLECT_PROJECT_STATUS_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+            "Attachments of Status Updates",
+            "Collect attached files of project status updates."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_EVENTS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECT_EVENTS,
+            "Events of a Project",
+            "Collect various events happening on the specified project and on 
its' tasks."
+    );
+
+    protected static final PropertyDescriptor PROP_ASANA_CONTROLLER_SERVICE = 
new PropertyDescriptor.Builder()
+            .name(ASANA_CONTROLLER_SERVICE)
+            .displayName("Asana Controller Service")
+            .description("Specify which controller service to use for 
accessing Asana.")
+            .required(true)
+            .identifiesControllerService(AsanaClientProviderService.class)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OBJECT_TYPE = new 
PropertyDescriptor.Builder()
+            .name(ASANA_OBJECT_TYPE)
+            .displayName("Object Type")
+            .description("Specify what kind of objects to be collected from 
Asana")
+            .required(true)
+            .allowableValues(
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECTS,
+                    AV_COLLECT_TAGS,
+                    AV_COLLECT_USERS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_TEAMS,
+                    AV_COLLECT_TEAM_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .defaultValue(AV_COLLECT_TASKS.getValue())
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_PROJECT = new 
PropertyDescriptor.Builder()
+            .name(ASANA_PROJECT_NAME)
+            .displayName("Project Name")
+            .description("Fetch only objects in this project. Case sensitive.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(
+                PROP_ASANA_OBJECT_TYPE,
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_SECTION = new 
PropertyDescriptor.Builder()
+            .name(ASANA_SECTION_NAME)
+            .displayName("Section Name")
+            .description("Fetch only objects in this section. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, 
AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TAG = new 
PropertyDescriptor.Builder()
+            .name(ASANA_TAG_NAME)
+            .displayName("Tag")
+            .description("Fetch only objects having this tag. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, 
AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TEAM_NAME = new 
PropertyDescriptor.Builder()
+            .name(ASANA_TEAM_NAME)
+            .displayName("Team")
+            .description("Team name. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TEAM_MEMBERS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OUTPUT_BATCH_SIZE = 
new PropertyDescriptor.Builder()
+            .name(ASANA_OUTPUT_BATCH_SIZE)
+            .displayName("Output Batch Size")
+            .description("The number of items batched together in a single 
Flow File. If set to 1 (default), then each item is"
+                    + " transferred in a separate Flow File and each will have 
an asana.gid attribute, to help identifying"
+                    + " the fetched item on the server side, if needed. If the 
batch size is greater than 1, then the"
+                    + " specified amount of items are batched together in a 
single Flow File as a Json array, and the"
+                    + " Flow Files won't have the asana.gid attribute.")
+            .defaultValue("1")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    protected static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            PROP_ASANA_CONTROLLER_SERVICE,
+            PROP_ASANA_OBJECT_TYPE,
+            PROP_ASANA_PROJECT,
+            PROP_ASANA_SECTION,
+            PROP_ASANA_TEAM_NAME,
+            PROP_ASANA_TAG,
+            PROP_ASANA_OUTPUT_BATCH_SIZE
+    ));
+
+    protected static final Relationship REL_NEW = new Relationship.Builder()
+            .name(REL_NAME_NEW)
+            .description("Newly collected objects are routed to this 
relationship.")
+            .build();
+
+    protected static final Relationship REL_UPDATED = new 
Relationship.Builder()
+            .name(REL_NAME_UPDATED)
+            .description("Objects that have already been collected earlier, 
but were updated since, are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_REMOVED = new 
Relationship.Builder()
+            .name(REL_NAME_REMOVED)
+            .description("Notification about deleted objects are routed to 
this relationship. "
+                    + "Flow files will not have any payload. IDs of the 
resources no longer exist "
+                    + "are carried by the asana.gid attribute of the generated 
FlowFiles.")
+            .build();
+
+    protected static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_NEW,
+            REL_UPDATED,
+            REL_REMOVED
+    )));
+
+    private static final Scope STATE_STORAGE_SCOPE = Scope.CLUSTER;
+
+    private volatile AsanaObjectFetcher objectFetcher;
+    private volatile Integer batchSize;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public synchronized void onScheduled(final ProcessContext context) throws 
InitializationException {
+        AsanaClientProviderService controllerService = 
context.getProperty(PROP_ASANA_CONTROLLER_SERVICE).asControllerService(AsanaClientProviderService.class);
+        AsanaClient client = controllerService.createClient();
+        batchSize = 
context.getProperty(PROP_ASANA_OUTPUT_BATCH_SIZE).asInteger();
+
+        try {
+            getLogger().debug("Initializing object fetcher...");
+            objectFetcher = createObjectFetcher(context, client);
+        } catch (Exception e) {
+            throw new InitializationException(e);
+        }
+    }
+
+    @Override
+    public synchronized void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {

Review Comment:
   We don't use `synchronized` for `onTrigger()`. `@TriggerSerially` ensures 
that the framework will call it on a single thread at a time.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/main/java/org/apache/nifi/processors/asana/GetAsanaObject.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.asana;
+
+import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.commons.collections4.ListUtils.partition;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.http.entity.ContentType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.asana.AsanaClient;
+import org.apache.nifi.controller.asana.AsanaClientProviderService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.asana.utils.AsanaObject;
+import org.apache.nifi.processors.asana.utils.AsanaObjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaObjectState;
+import org.apache.nifi.processors.asana.utils.AsanaProjectEventFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectMembershipFetcher;
+import 
org.apache.nifi.processors.asana.utils.AsanaProjectStatusAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaStoryFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTagFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamMemberFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaUserFetcher;
+import org.apache.nifi.reporting.InitializationException;
+
+@TriggerSerially
+@PrimaryNodeOnly
+@Stateful(scopes = {Scope.CLUSTER}, description = "Fingerprints of items in 
the last successful query are stored in order to enable incremental loading and 
change detection.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttribute(attribute = GetAsanaObject.ASANA_GID, description = "Global 
ID of the object in Asana.")
+@Tags({"asana", "source", "ingest"})
+@CapabilityDescription("This processor collects data from Asana")
+public class GetAsanaObject extends AbstractProcessor {
+
+    protected static final String ASANA_GID = "asana.gid";
+    protected static final String AV_NAME_COLLECT_TASKS = 
"asana-collect-tasks";
+    protected static final String AV_NAME_COLLECT_TASK_ATTACHMENTS = 
"asana-collect-task-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECTS = 
"asana-collect-projects";
+    protected static final String AV_NAME_COLLECT_TAGS = "asana-collect-tags";
+    protected static final String AV_NAME_COLLECT_USERS = 
"asana-collect-users";
+    protected static final String AV_NAME_COLLECT_PROJECT_MEMBERS = 
"asana-collect-project-members";
+    protected static final String AV_NAME_COLLECT_TEAMS = 
"asana-collect-teams";
+    protected static final String AV_NAME_COLLECT_TEAM_MEMBERS = 
"asana-collect-team-members";
+    protected static final String AV_NAME_COLLECT_STORIES = 
"asana-collect-stories";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_UPDATES = 
"asana-collect-project-status-updates";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS = 
"asana-collect-project-status-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECT_EVENTS = 
"asana-collect-project-events";
+    protected static final String ASANA_CONTROLLER_SERVICE = 
"asana-controller-service";
+    protected static final String ASANA_OBJECT_TYPE = "asana-object-type";
+    protected static final String ASANA_PROJECT_NAME = "asana-project-name";
+    protected static final String ASANA_SECTION_NAME = "asana-section-name";
+    protected static final String ASANA_TAG_NAME = "asana-tag-name";
+    protected static final String ASANA_TEAM_NAME = "asana-team-name";
+    protected static final String ASANA_OUTPUT_BATCH_SIZE = 
"asana-output-batch-size";
+    protected static final String REL_NAME_NEW = "new";
+    protected static final String REL_NAME_UPDATED = "updated";
+    protected static final String REL_NAME_REMOVED = "removed";
+
+    protected static final AllowableValue AV_COLLECT_TASKS = new 
AllowableValue(
+            AV_NAME_COLLECT_TASKS,
+            "Tasks",
+            "Collect tasks matching to the specified conditions.");
+
+    protected static final AllowableValue AV_COLLECT_TASK_ATTACHMENTS = new 
AllowableValue(
+            AV_NAME_COLLECT_TASK_ATTACHMENTS,
+            "Task Attachments",
+            "Collect attached files of tasks matching to the specified 
conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECTS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECTS,
+            "Projects",
+            "Collect projects of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TAGS = new AllowableValue(
+            AV_NAME_COLLECT_TAGS,
+            "Tags",
+            "Collect tags of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_USERS = new 
AllowableValue(
+            AV_NAME_COLLECT_USERS,
+            "Users",
+            "Collect users assigned to the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_MEMBERS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECT_MEMBERS,
+            "Members of a Project",
+            "Collect users assigned to the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAMS = new 
AllowableValue(
+            AV_NAME_COLLECT_TEAMS,
+            "Teams",
+            "Collect teams of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAM_MEMBERS = new 
AllowableValue(
+            AV_NAME_COLLECT_TEAM_MEMBERS,
+            "Team Members",
+            "Collect users assigned to the specified team."
+    );
+
+    protected static final AllowableValue AV_COLLECT_STORIES = new 
AllowableValue(
+            AV_NAME_COLLECT_STORIES,
+            "Stories of Tasks",
+            "Collect stories (comments) of of tasks matching to the specified 
conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_UPDATES = 
new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_UPDATES,
+            "Status Updates of a Project",
+            "Collect status updates of the specified project."
+    );
+
+    protected static final AllowableValue 
AV_COLLECT_PROJECT_STATUS_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+            "Attachments of Status Updates",
+            "Collect attached files of project status updates."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_EVENTS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECT_EVENTS,
+            "Events of a Project",
+            "Collect various events happening on the specified project and on 
its' tasks."
+    );
+
+    protected static final PropertyDescriptor PROP_ASANA_CONTROLLER_SERVICE = 
new PropertyDescriptor.Builder()
+            .name(ASANA_CONTROLLER_SERVICE)
+            .displayName("Asana Controller Service")
+            .description("Specify which controller service to use for 
accessing Asana.")
+            .required(true)
+            .identifiesControllerService(AsanaClientProviderService.class)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OBJECT_TYPE = new 
PropertyDescriptor.Builder()
+            .name(ASANA_OBJECT_TYPE)
+            .displayName("Object Type")
+            .description("Specify what kind of objects to be collected from 
Asana")
+            .required(true)
+            .allowableValues(
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECTS,
+                    AV_COLLECT_TAGS,
+                    AV_COLLECT_USERS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_TEAMS,
+                    AV_COLLECT_TEAM_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .defaultValue(AV_COLLECT_TASKS.getValue())
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_PROJECT = new 
PropertyDescriptor.Builder()
+            .name(ASANA_PROJECT_NAME)
+            .displayName("Project Name")
+            .description("Fetch only objects in this project. Case sensitive.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(
+                PROP_ASANA_OBJECT_TYPE,
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_SECTION = new 
PropertyDescriptor.Builder()
+            .name(ASANA_SECTION_NAME)
+            .displayName("Section Name")
+            .description("Fetch only objects in this section. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, 
AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TAG = new 
PropertyDescriptor.Builder()
+            .name(ASANA_TAG_NAME)
+            .displayName("Tag")
+            .description("Fetch only objects having this tag. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, 
AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TEAM_NAME = new 
PropertyDescriptor.Builder()
+            .name(ASANA_TEAM_NAME)
+            .displayName("Team")
+            .description("Team name. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TEAM_MEMBERS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OUTPUT_BATCH_SIZE = 
new PropertyDescriptor.Builder()
+            .name(ASANA_OUTPUT_BATCH_SIZE)
+            .displayName("Output Batch Size")
+            .description("The number of items batched together in a single 
Flow File. If set to 1 (default), then each item is"
+                    + " transferred in a separate Flow File and each will have 
an asana.gid attribute, to help identifying"
+                    + " the fetched item on the server side, if needed. If the 
batch size is greater than 1, then the"
+                    + " specified amount of items are batched together in a 
single Flow File as a Json array, and the"
+                    + " Flow Files won't have the asana.gid attribute.")
+            .defaultValue("1")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    protected static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            PROP_ASANA_CONTROLLER_SERVICE,
+            PROP_ASANA_OBJECT_TYPE,
+            PROP_ASANA_PROJECT,
+            PROP_ASANA_SECTION,
+            PROP_ASANA_TEAM_NAME,
+            PROP_ASANA_TAG,
+            PROP_ASANA_OUTPUT_BATCH_SIZE
+    ));
+
+    protected static final Relationship REL_NEW = new Relationship.Builder()
+            .name(REL_NAME_NEW)
+            .description("Newly collected objects are routed to this 
relationship.")
+            .build();
+
+    protected static final Relationship REL_UPDATED = new 
Relationship.Builder()
+            .name(REL_NAME_UPDATED)
+            .description("Objects that have already been collected earlier, 
but were updated since, are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_REMOVED = new 
Relationship.Builder()
+            .name(REL_NAME_REMOVED)
+            .description("Notification about deleted objects are routed to 
this relationship. "
+                    + "Flow files will not have any payload. IDs of the 
resources no longer exist "
+                    + "are carried by the asana.gid attribute of the generated 
FlowFiles.")
+            .build();
+
+    protected static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_NEW,
+            REL_UPDATED,
+            REL_REMOVED
+    )));
+
+    private static final Scope STATE_STORAGE_SCOPE = Scope.CLUSTER;
+
+    private volatile AsanaObjectFetcher objectFetcher;
+    private volatile Integer batchSize;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public synchronized void onScheduled(final ProcessContext context) throws 
InitializationException {
+        AsanaClientProviderService controllerService = 
context.getProperty(PROP_ASANA_CONTROLLER_SERVICE).asControllerService(AsanaClientProviderService.class);
+        AsanaClient client = controllerService.createClient();
+        batchSize = 
context.getProperty(PROP_ASANA_OUTPUT_BATCH_SIZE).asInteger();
+
+        try {
+            getLogger().debug("Initializing object fetcher...");
+            objectFetcher = createObjectFetcher(context, client);
+        } catch (Exception e) {
+            throw new InitializationException(e);
+        }
+    }
+
+    @Override
+    public synchronized void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+        try {
+            Map<String, String> state = 
recoverState(context).orElse(Collections.emptyMap());
+            getLogger().debug("Attempting to load state: {}", state);
+            objectFetcher.loadState(state);
+        } catch (Exception e) {
+            getLogger().info("Failed to recover state. Falling back to clean 
start.");
+            objectFetcher.clearState();
+        }
+        getLogger().debug("Initial state: {}", objectFetcher.saveState());
+
+        Collection<FlowFile> newItems = new ArrayList<>();
+        Collection<FlowFile> updatedItems = new ArrayList<>();
+        Collection<FlowFile> removedItems = new ArrayList<>();
+        Map<AsanaObjectState, Collection<FlowFile>> flowFiles = new 
HashMap<>();
+        flowFiles.put(AsanaObjectState.NEW, newItems);
+        flowFiles.put(AsanaObjectState.UPDATED, updatedItems);
+        flowFiles.put(AsanaObjectState.REMOVED, removedItems);
+
+        List<AsanaObject> allObjects = new ArrayList<>();
+
+        AsanaObject nextObject;
+        while ((nextObject = objectFetcher.fetchNext()) != null) {
+            allObjects.add(nextObject);
+        }
+
+        Map<AsanaObjectState, List<AsanaObject>> allObjectsByState = 
allObjects.stream()
+                .collect(groupingBy(AsanaObject::getState));
+
+        if (batchSize == 1) {
+            allObjectsByState
+                    .forEach((asanaObjectState, asanaObjects) -> 
asanaObjects.forEach(
+                            asanaObject -> {
+                                final Map<String, String> attributes = new 
HashMap<>(2);
+                                attributes.put(CoreAttributes.MIME_TYPE.key(), 
ContentType.APPLICATION_JSON.getMimeType());
+                                attributes.put(ASANA_GID, 
asanaObject.getGid());
+                                FlowFile flowFile = 
createFlowFileWithStringPayload(session, asanaObject.getContent());
+                                flowFile = session.putAllAttributes(flowFile, 
attributes);
+                                
flowFiles.get(asanaObject.getState()).add(flowFile);
+                            }
+                    ));
+        } else {
+            allObjectsByState
+                    .forEach((asanaObjectState, asanaObjects) -> 
partition(asanaObjects, batchSize).forEach(
+                            asanaObjectsInPartition -> {
+                                FlowFile flowFile = 
createFlowFileWithStringPayload(session, format("[%s]",
+                                        
asanaObjectsInPartition.stream().map(AsanaObject::getContent)
+                                                .collect(joining(","))));
+                                flowFile = session.putAllAttributes(flowFile,
+                                        
singletonMap(CoreAttributes.MIME_TYPE.key(),
+                                                
ContentType.APPLICATION_JSON.getMimeType()));
+                                flowFiles.get(asanaObjectState).add(flowFile);
+                            }
+                    ));
+        }
+
+        if (flowFiles.values().stream().allMatch(Collection::isEmpty)) {
+            context.yield();
+            getLogger().debug("Yielding, as there are no new FlowFiles.");

Review Comment:
   I believe we can return at this point.



##########
nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/main/java/org/apache/nifi/processors/asana/GetAsanaObject.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.asana;
+
+import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.joining;
+import static org.apache.commons.collections4.ListUtils.partition;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.http.entity.ContentType;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.controller.asana.AsanaClient;
+import org.apache.nifi.controller.asana.AsanaClientProviderService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.asana.utils.AsanaObject;
+import org.apache.nifi.processors.asana.utils.AsanaObjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaObjectState;
+import org.apache.nifi.processors.asana.utils.AsanaProjectEventFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectMembershipFetcher;
+import 
org.apache.nifi.processors.asana.utils.AsanaProjectStatusAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaProjectStatusFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaStoryFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTagFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskAttachmentFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTaskFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaTeamMemberFetcher;
+import org.apache.nifi.processors.asana.utils.AsanaUserFetcher;
+import org.apache.nifi.reporting.InitializationException;
+
+@TriggerSerially
+@PrimaryNodeOnly
+@Stateful(scopes = {Scope.CLUSTER}, description = "Fingerprints of items in 
the last successful query are stored in order to enable incremental loading and 
change detection.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttribute(attribute = GetAsanaObject.ASANA_GID, description = "Global 
ID of the object in Asana.")
+@Tags({"asana", "source", "ingest"})
+@CapabilityDescription("This processor collects data from Asana")
+public class GetAsanaObject extends AbstractProcessor {
+
+    protected static final String ASANA_GID = "asana.gid";
+    protected static final String AV_NAME_COLLECT_TASKS = 
"asana-collect-tasks";
+    protected static final String AV_NAME_COLLECT_TASK_ATTACHMENTS = 
"asana-collect-task-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECTS = 
"asana-collect-projects";
+    protected static final String AV_NAME_COLLECT_TAGS = "asana-collect-tags";
+    protected static final String AV_NAME_COLLECT_USERS = 
"asana-collect-users";
+    protected static final String AV_NAME_COLLECT_PROJECT_MEMBERS = 
"asana-collect-project-members";
+    protected static final String AV_NAME_COLLECT_TEAMS = 
"asana-collect-teams";
+    protected static final String AV_NAME_COLLECT_TEAM_MEMBERS = 
"asana-collect-team-members";
+    protected static final String AV_NAME_COLLECT_STORIES = 
"asana-collect-stories";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_UPDATES = 
"asana-collect-project-status-updates";
+    protected static final String AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS = 
"asana-collect-project-status-attachments";
+    protected static final String AV_NAME_COLLECT_PROJECT_EVENTS = 
"asana-collect-project-events";
+    protected static final String ASANA_CONTROLLER_SERVICE = 
"asana-controller-service";
+    protected static final String ASANA_OBJECT_TYPE = "asana-object-type";
+    protected static final String ASANA_PROJECT_NAME = "asana-project-name";
+    protected static final String ASANA_SECTION_NAME = "asana-section-name";
+    protected static final String ASANA_TAG_NAME = "asana-tag-name";
+    protected static final String ASANA_TEAM_NAME = "asana-team-name";
+    protected static final String ASANA_OUTPUT_BATCH_SIZE = 
"asana-output-batch-size";
+    protected static final String REL_NAME_NEW = "new";
+    protected static final String REL_NAME_UPDATED = "updated";
+    protected static final String REL_NAME_REMOVED = "removed";
+
+    protected static final AllowableValue AV_COLLECT_TASKS = new 
AllowableValue(
+            AV_NAME_COLLECT_TASKS,
+            "Tasks",
+            "Collect tasks matching to the specified conditions.");
+
+    protected static final AllowableValue AV_COLLECT_TASK_ATTACHMENTS = new 
AllowableValue(
+            AV_NAME_COLLECT_TASK_ATTACHMENTS,
+            "Task Attachments",
+            "Collect attached files of tasks matching to the specified 
conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECTS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECTS,
+            "Projects",
+            "Collect projects of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TAGS = new AllowableValue(
+            AV_NAME_COLLECT_TAGS,
+            "Tags",
+            "Collect tags of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_USERS = new 
AllowableValue(
+            AV_NAME_COLLECT_USERS,
+            "Users",
+            "Collect users assigned to the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_MEMBERS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECT_MEMBERS,
+            "Members of a Project",
+            "Collect users assigned to the specified project."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAMS = new 
AllowableValue(
+            AV_NAME_COLLECT_TEAMS,
+            "Teams",
+            "Collect teams of the workspace."
+    );
+
+    protected static final AllowableValue AV_COLLECT_TEAM_MEMBERS = new 
AllowableValue(
+            AV_NAME_COLLECT_TEAM_MEMBERS,
+            "Team Members",
+            "Collect users assigned to the specified team."
+    );
+
+    protected static final AllowableValue AV_COLLECT_STORIES = new 
AllowableValue(
+            AV_NAME_COLLECT_STORIES,
+            "Stories of Tasks",
+            "Collect stories (comments) of of tasks matching to the specified 
conditions."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_STATUS_UPDATES = 
new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_UPDATES,
+            "Status Updates of a Project",
+            "Collect status updates of the specified project."
+    );
+
+    protected static final AllowableValue 
AV_COLLECT_PROJECT_STATUS_ATTACHMENTS = new AllowableValue(
+            AV_NAME_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+            "Attachments of Status Updates",
+            "Collect attached files of project status updates."
+    );
+
+    protected static final AllowableValue AV_COLLECT_PROJECT_EVENTS = new 
AllowableValue(
+            AV_NAME_COLLECT_PROJECT_EVENTS,
+            "Events of a Project",
+            "Collect various events happening on the specified project and on 
its' tasks."
+    );
+
+    protected static final PropertyDescriptor PROP_ASANA_CONTROLLER_SERVICE = 
new PropertyDescriptor.Builder()
+            .name(ASANA_CONTROLLER_SERVICE)
+            .displayName("Asana Controller Service")
+            .description("Specify which controller service to use for 
accessing Asana.")
+            .required(true)
+            .identifiesControllerService(AsanaClientProviderService.class)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OBJECT_TYPE = new 
PropertyDescriptor.Builder()
+            .name(ASANA_OBJECT_TYPE)
+            .displayName("Object Type")
+            .description("Specify what kind of objects to be collected from 
Asana")
+            .required(true)
+            .allowableValues(
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECTS,
+                    AV_COLLECT_TAGS,
+                    AV_COLLECT_USERS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_TEAMS,
+                    AV_COLLECT_TEAM_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .defaultValue(AV_COLLECT_TASKS.getValue())
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_PROJECT = new 
PropertyDescriptor.Builder()
+            .name(ASANA_PROJECT_NAME)
+            .displayName("Project Name")
+            .description("Fetch only objects in this project. Case sensitive.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(
+                PROP_ASANA_OBJECT_TYPE,
+                    AV_COLLECT_TASKS,
+                    AV_COLLECT_TASK_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_MEMBERS,
+                    AV_COLLECT_STORIES,
+                    AV_COLLECT_PROJECT_STATUS_UPDATES,
+                    AV_COLLECT_PROJECT_STATUS_ATTACHMENTS,
+                    AV_COLLECT_PROJECT_EVENTS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_SECTION = new 
PropertyDescriptor.Builder()
+            .name(ASANA_SECTION_NAME)
+            .displayName("Section Name")
+            .description("Fetch only objects in this section. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, 
AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TAG = new 
PropertyDescriptor.Builder()
+            .name(ASANA_TAG_NAME)
+            .displayName("Tag")
+            .description("Fetch only objects having this tag. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TASKS, 
AV_COLLECT_TASK_ATTACHMENTS, AV_COLLECT_STORIES)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_TEAM_NAME = new 
PropertyDescriptor.Builder()
+            .name(ASANA_TEAM_NAME)
+            .displayName("Team")
+            .description("Team name. Case sensitive.")
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .dependsOn(PROP_ASANA_OBJECT_TYPE, AV_COLLECT_TEAM_MEMBERS)
+            .build();
+
+    protected static final PropertyDescriptor PROP_ASANA_OUTPUT_BATCH_SIZE = 
new PropertyDescriptor.Builder()
+            .name(ASANA_OUTPUT_BATCH_SIZE)
+            .displayName("Output Batch Size")
+            .description("The number of items batched together in a single 
Flow File. If set to 1 (default), then each item is"
+                    + " transferred in a separate Flow File and each will have 
an asana.gid attribute, to help identifying"
+                    + " the fetched item on the server side, if needed. If the 
batch size is greater than 1, then the"
+                    + " specified amount of items are batched together in a 
single Flow File as a Json array, and the"
+                    + " Flow Files won't have the asana.gid attribute.")
+            .defaultValue("1")
+            .required(true)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    protected static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
+            PROP_ASANA_CONTROLLER_SERVICE,
+            PROP_ASANA_OBJECT_TYPE,
+            PROP_ASANA_PROJECT,
+            PROP_ASANA_SECTION,
+            PROP_ASANA_TEAM_NAME,
+            PROP_ASANA_TAG,
+            PROP_ASANA_OUTPUT_BATCH_SIZE
+    ));
+
+    protected static final Relationship REL_NEW = new Relationship.Builder()
+            .name(REL_NAME_NEW)
+            .description("Newly collected objects are routed to this 
relationship.")
+            .build();
+
+    protected static final Relationship REL_UPDATED = new 
Relationship.Builder()
+            .name(REL_NAME_UPDATED)
+            .description("Objects that have already been collected earlier, 
but were updated since, are routed to this relationship.")
+            .build();
+
+    protected static final Relationship REL_REMOVED = new 
Relationship.Builder()
+            .name(REL_NAME_REMOVED)
+            .description("Notification about deleted objects are routed to 
this relationship. "
+                    + "Flow files will not have any payload. IDs of the 
resources no longer exist "
+                    + "are carried by the asana.gid attribute of the generated 
FlowFiles.")
+            .build();
+
+    protected static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_NEW,
+            REL_UPDATED,
+            REL_REMOVED
+    )));
+
+    private static final Scope STATE_STORAGE_SCOPE = Scope.CLUSTER;
+
+    private volatile AsanaObjectFetcher objectFetcher;
+    private volatile Integer batchSize;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return DESCRIPTORS;
+    }
+
+    @OnScheduled
+    public synchronized void onScheduled(final ProcessContext context) throws 
InitializationException {
+        AsanaClientProviderService controllerService = 
context.getProperty(PROP_ASANA_CONTROLLER_SERVICE).asControllerService(AsanaClientProviderService.class);
+        AsanaClient client = controllerService.createClient();
+        batchSize = 
context.getProperty(PROP_ASANA_OUTPUT_BATCH_SIZE).asInteger();
+
+        try {
+            getLogger().debug("Initializing object fetcher...");
+            objectFetcher = createObjectFetcher(context, client);
+        } catch (Exception e) {
+            throw new InitializationException(e);
+        }
+    }
+
+    @Override
+    public synchronized void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+        try {
+            Map<String, String> state = 
recoverState(context).orElse(Collections.emptyMap());
+            getLogger().debug("Attempting to load state: {}", state);
+            objectFetcher.loadState(state);
+        } catch (Exception e) {
+            getLogger().info("Failed to recover state. Falling back to clean 
start.");
+            objectFetcher.clearState();
+        }
+        getLogger().debug("Initial state: {}", objectFetcher.saveState());
+
+        Collection<FlowFile> newItems = new ArrayList<>();
+        Collection<FlowFile> updatedItems = new ArrayList<>();
+        Collection<FlowFile> removedItems = new ArrayList<>();
+        Map<AsanaObjectState, Collection<FlowFile>> flowFiles = new 
HashMap<>();
+        flowFiles.put(AsanaObjectState.NEW, newItems);
+        flowFiles.put(AsanaObjectState.UPDATED, updatedItems);
+        flowFiles.put(AsanaObjectState.REMOVED, removedItems);
+
+        List<AsanaObject> allObjects = new ArrayList<>();
+
+        AsanaObject nextObject;
+        while ((nextObject = objectFetcher.fetchNext()) != null) {
+            allObjects.add(nextObject);
+        }
+
+        Map<AsanaObjectState, List<AsanaObject>> allObjectsByState = 
allObjects.stream()
+                .collect(groupingBy(AsanaObject::getState));
+
+        if (batchSize == 1) {
+            allObjectsByState
+                    .forEach((asanaObjectState, asanaObjects) -> 
asanaObjects.forEach(
+                            asanaObject -> {
+                                final Map<String, String> attributes = new 
HashMap<>(2);
+                                attributes.put(CoreAttributes.MIME_TYPE.key(), 
ContentType.APPLICATION_JSON.getMimeType());
+                                attributes.put(ASANA_GID, 
asanaObject.getGid());
+                                FlowFile flowFile = 
createFlowFileWithStringPayload(session, asanaObject.getContent());
+                                flowFile = session.putAllAttributes(flowFile, 
attributes);
+                                
flowFiles.get(asanaObject.getState()).add(flowFile);
+                            }
+                    ));
+        } else {
+            allObjectsByState
+                    .forEach((asanaObjectState, asanaObjects) -> 
partition(asanaObjects, batchSize).forEach(
+                            asanaObjectsInPartition -> {
+                                FlowFile flowFile = 
createFlowFileWithStringPayload(session, format("[%s]",
+                                        
asanaObjectsInPartition.stream().map(AsanaObject::getContent)
+                                                .collect(joining(","))));
+                                flowFile = session.putAllAttributes(flowFile,
+                                        
singletonMap(CoreAttributes.MIME_TYPE.key(),
+                                                
ContentType.APPLICATION_JSON.getMimeType()));
+                                flowFiles.get(asanaObjectState).add(flowFile);
+                            }
+                    ));
+        }
+
+        if (flowFiles.values().stream().allMatch(Collection::isEmpty)) {
+            context.yield();
+            getLogger().debug("Yielding, as there are no new FlowFiles.");
+        } else {
+            session.transfer(newItems, REL_NEW);
+            session.transfer(updatedItems, REL_UPDATED);
+            session.transfer(removedItems, REL_REMOVED);
+            session.commitAsync();
+        }
+        Map<String, String> state = objectFetcher.saveState();
+        try {
+            persistState(state, context);
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }

Review Comment:
   You can handle the state in a transaction with the session if you use 
`session.setState()` instead of `context.getStateManager().setState()` in 
`persistState()`. I would also move the exception handling into that method.
   The code would look like in that case:
   ```suggestion
           } else {
               session.transfer(newItems, REL_NEW);
               session.transfer(updatedItems, REL_UPDATED);
               session.transfer(removedItems, REL_REMOVED);
               
               Map<String, String> state = objectFetcher.saveState();
               persistState(state, session);
               
               session.commitAsync();
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to