Lehel44 commented on code in PR #7644: URL: https://github.com/apache/nifi/pull/7644#discussion_r1305733343
########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/commons/zendesk/ZendeskProperties.java: ########## @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.commons.zendesk; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processors.zendesk.ZendeskAuthenticationType; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.processor.util.StandardValidators.NON_BLANK_VALIDATOR; +import static org.apache.nifi.processor.util.StandardValidators.NON_EMPTY_VALIDATOR; + +public class ZendeskProperties { Review Comment: Please make this class final and add a private constructor to hide the public implicit one. ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/commons/zendesk/ZendeskUtils.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.commons.zendesk; + +import com.fasterxml.jackson.core.JsonPointer; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.stream.Collectors; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Base64.getEncoder; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.AUTHORIZATION_HEADER_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.BASIC_AUTH_PREFIX; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.CONTENT_TYPE_APPLICATION_JSON; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.CONTENT_TYPE_HEADER_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKETS_ROOT_NODE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_ROOT_NODE; +import static org.apache.nifi.commons.zendesk.ZendeskRecordPathUtils.addNode; + +public class ZendeskUtils { Review Comment: Please make this class final and add a private constructor. ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/resources/docs.org.apache.nifi.processors.zendesk.PutZendeskTicket/additionalDetails.html: ########## @@ -0,0 +1,72 @@ +<!DOCTYPE html> +<html lang="en" xmlns="http://www.w3.org/1999/html"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + <head> + <meta charset="utf-8"/> + <title>GetZendesk</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/> + <style> + h2 {margin-top: 4em} + h3 {margin-top: 3em} + td {text-align: left} + </style> + </head> + + <body> + <h1>PutZendeskTicket</h1> + + <h3>Description</h3> + + <p> + The processor uses the Zendesk API to ingest tickets into Zendesk. The processor is capable to send requests directly from the flowFile content or construct the request objects from the incoming records using RecordReader. + </p> + + <h3>Authentication</h3> + + <p> + Zendesk API uses basic authentication. Either a password or an authentication token have to be provided. + Authentication token can be created in Zendesk API Settings, so the users don't have to expose their passwords, + and also auth tokens can be revoked quickly if necessary. + </p> + + <h3>Property values</h3> + + <p> + There are multiple ways of providing property values to the request object: + <dl> + <dt>Record Path:</dt> + <dd> + The property value is going to be evaluated as a record path if the value is provided inside brackets starting with a '%'. <br> + e.g.: if the incoming record is <code>{"foo":{"bar":"example value"}}</code> then the <code>%{/foo/bar}</code> will get the value of "example value" from the record. + </dd> + <dt>Constant:</dt> + <dd> + The property value is going to be treated as a constant if the provided value doesn't match with the record path format. <br> + e.g.: if the property value is <code>"constant value"</code> then the property value in the request object will be "constant value" + </dd> + </dl> + </p> + + <h3>Additional properties</h3> + + <p> + The processor provides some commonly used Zendesk ticket attribute in its property list but any number of additional property can be added with dynamic properties. + The dynamic property key is treated as a record path, and it represents the path in the request object. The dynamic property value is the same as the predefined property attributes. Review Comment: ```suggestion The processor offers a set of frequently used Zendesk ticket attributes within its property list. However, users have the flexibility to include any desired number of additional properties using dynamic properties. These dynamic properties utilize their keys as record paths, which denote the paths within the request object. Correspondingly, the values of these dynamic properties align with the predefined property attributes. ``` Or something like this ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/commons/zendesk/ZendeskUtils.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.commons.zendesk; + +import com.fasterxml.jackson.core.JsonPointer; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.stream.Collectors; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Base64.getEncoder; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.AUTHORIZATION_HEADER_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.BASIC_AUTH_PREFIX; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.CONTENT_TYPE_APPLICATION_JSON; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.CONTENT_TYPE_HEADER_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKETS_ROOT_NODE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_ROOT_NODE; +import static org.apache.nifi.commons.zendesk.ZendeskRecordPathUtils.addNode; + +public class ZendeskUtils { + + /** + * Constructs the header for the Zendesk API call + * @param authenticationContext authentication data holder object + * @return the constructed header + */ + public static String basicAuthHeaderValue(ZendeskAuthenticationContext authenticationContext) { + final String user = authenticationContext.getAuthenticationType().enrichUserName(authenticationContext.getUser()); + final String userWithPassword = user + ":" + authenticationContext.getAuthenticationCredentials(); + return BASIC_AUTH_PREFIX + getEncoder().encodeToString(userWithPassword.getBytes()); + } + + public static String responseBodyToString(HttpResponseEntity response) { + try { + return IOUtils.toString(response.body(), UTF_8); + } catch (IOException e) { + throw new UncheckedIOException("Reading response body has failed", e); + } + } + + /** + * Collects every non-blank dynamic property from the context. + * @param context property context + * @param properties property list + * @return list of dynamic properties Review Comment: Doc typo: The return type is map ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/commons/zendesk/ZendeskRecordPathUtils.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.commons.zendesk; + +import com.fasterxml.jackson.core.JsonPointer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.RecordPathResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; + +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static java.lang.String.format; +import static java.util.stream.Collectors.toList; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKETS_ROOT_NODE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_ROOT_NODE; + +public class ZendeskRecordPathUtils { + + private static final String NULL_VALUE = "null"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Pattern RECORD_PATH_PATTERN = Pattern.compile("^%\\{(.*?)\\}$"); + + /** + * Resolves the input field value to be handled as a record path or a constant value. + * @param path path in the request object + * @param value field value to be resolved + * @param baseTicketNode base request object where the field value will be added + * @param record record to receive the value from if the field value is a record path + */ + public static void resolveFieldValue(String path, String value, ObjectNode baseTicketNode, Record record) { + final Matcher matcher = RECORD_PATH_PATTERN.matcher(value); + if (matcher.matches()) { + addNode(baseTicketNode, JsonPointer.compile(path), new TextNode(resolveRecordState(matcher.group(1), record))); + } else { + addNode(baseTicketNode, JsonPointer.compile(path), new TextNode(value)); + } + } + + /** + * Adds a user defined dynamic field to the request object. If the user specifies the path in the request object as full path (starting with '/ticket' or '/tickets') + * the method removes them since the root path is specified later based on the number of records. + * @param path path in the request object + * @param value dynamic field value + * @param baseTicketNode base request object where the field value will be added + * @param record record to receive the value from if the field value is a record path + */ + public static void addDynamicField(String path, String value, ObjectNode baseTicketNode, Record record) { + if (path.startsWith(ZENDESK_TICKET_ROOT_NODE)) { + path = path.substring(7); + } else if (path.startsWith(ZENDESK_TICKETS_ROOT_NODE)) { + path = path.substring(8); + } + + resolveFieldValue(path, value, baseTicketNode, record); + } + + private static String resolveRecordState(String pathValue, final Record record) { + final RecordPath recordPath = RecordPath.compile(pathValue); + final RecordPathResult result = recordPath.evaluate(record); + final List<FieldValue> fieldValues = result.getSelectedFields().collect(toList()); + final FieldValue fieldValue = getMatchingFieldValue(recordPath, fieldValues); + + if (fieldValue.getValue() == null || fieldValue.getValue() == NULL_VALUE) { + return null; + } + + return getFieldValue(recordPath, fieldValue); + } + + /** + * The method checks the field's type and filters out every non-compatible type. + * @param recordPath path to the requested field + * @param fieldValue record field + * @return value of the record field + */ + private static String getFieldValue(final RecordPath recordPath, FieldValue fieldValue) { + final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType(); + + if (fieldType == RecordFieldType.RECORD || fieldType == RecordFieldType.ARRAY || fieldType == RecordFieldType.MAP) { + throw new ProcessException(format("The provided RecordPath [%s] points to a [%s] type value", recordPath, fieldType)); + } + + if (fieldType == RecordFieldType.CHOICE) { + final ChoiceDataType choiceDataType = (ChoiceDataType) fieldValue.getField().getDataType(); + final List<DataType> possibleTypes = choiceDataType.getPossibleSubTypes(); + if (possibleTypes.stream().anyMatch(type -> type.getFieldType() == RecordFieldType.RECORD)) { + throw new ProcessException(format("The provided RecordPath [%s] points to a [CHOICE] type value with Record subtype", recordPath)); + } + } + + return String.valueOf(fieldValue.getValue()); + } + + /** + * The method checks if only one result were received for the give record path. + * @param recordPath path to the requested field + * @param resultList result list + * @return matching field + */ + private static FieldValue getMatchingFieldValue(final RecordPath recordPath, final List<FieldValue> resultList) { + if (resultList.isEmpty()) { + throw new ProcessException(format("Evaluated RecordPath [%s] against Record but got no results", recordPath)); + } + + if (resultList.size() > 1) { + throw new ProcessException(format("Evaluated RecordPath [%s] against Record and received multiple distinct results [%s]", recordPath, resultList)); + } + + return resultList.get(0); + } + + /** + * Adds a new node on the provided path with the give value to the request object. + * @param baseNode base object where the new node will be added + * @param path path of the new node + * @param value value of the new node + */ + public static void addNode(ObjectNode baseNode, JsonPointer path, JsonNode value) { Review Comment: I think `addNodeAtPathWithJsonValue` or similar would be a more descriptive name. ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/commons/zendesk/ZendeskAuthenticationContext.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.commons.zendesk; + +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.processors.zendesk.ZendeskAuthenticationType; + +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_AUTHENTICATION_CREDENTIAL; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_AUTHENTICATION_TYPE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_SUBDOMAIN; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_USER; + +public class ZendeskAuthenticationContext { + + private final String subDomain; + private final String user; + private final ZendeskAuthenticationType authenticationType; + private final String authenticationCredentials; + + public ZendeskAuthenticationContext(PropertyContext context) { Review Comment: I'd avoid passing PropertyContext directly to this class. It introduces a tight coupling between your class and NiFi's specific classes which makes the code less modular and harder to test. Passing specific properties or configuration values is generally considered a better practice. ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/commons/zendesk/ZendeskRecordPathUtils.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.commons.zendesk; + +import com.fasterxml.jackson.core.JsonPointer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.RecordPathResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; + +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static java.lang.String.format; +import static java.util.stream.Collectors.toList; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKETS_ROOT_NODE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_ROOT_NODE; + +public class ZendeskRecordPathUtils { + + private static final String NULL_VALUE = "null"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Pattern RECORD_PATH_PATTERN = Pattern.compile("^%\\{(.*?)\\}$"); Review Comment: The matches() method inherently performs a full-string match, so the anchors are not necessary in this context. You can simplify the regex pattern to: `%\\{(.*?)\\}` ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/services/zendesk/BaseSinkZendeskTicket.java: ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.zendesk; + +import org.apache.nifi.context.PropertyContext; + +import static org.apache.nifi.services.zendesk.ZendeskRecordSink.ZENDESK_TICKET_COMMENT_BODY; +import static org.apache.nifi.services.zendesk.ZendeskRecordSink.ZENDESK_TICKET_PRIORITY; +import static org.apache.nifi.services.zendesk.ZendeskRecordSink.ZENDESK_TICKET_SUBJECT; +import static org.apache.nifi.services.zendesk.ZendeskRecordSink.ZENDESK_TICKET_TYPE; + +public class BaseSinkZendeskTicket { + + private final String commentBody; + private final String subject; + private final String priority; + private final String type; + + public BaseSinkZendeskTicket(PropertyContext context) { Review Comment: I'd only pass the properties here for the same reason I mentioned in ZendeskAuthenticationContext. ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/PutZendeskTicket.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.zendesk; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.commons.zendesk.ZendeskAuthenticationContext; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.HTTPS; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.REL_FAILURE_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.REL_SUCCESS_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.WEB_CLIENT_SERVICE_PROVIDER; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_AUTHENTICATION_CREDENTIAL; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_AUTHENTICATION_TYPE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_CREATE_TICKETS_RESOURCE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_CREATE_TICKET_RESOURCE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_HOST_TEMPLATE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_SUBDOMAIN; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_COMMENT_BODY_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_PRIORITY_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_SUBJECT_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_TYPE_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_USER; +import static org.apache.nifi.commons.zendesk.ZendeskRecordPathUtils.addDynamicField; +import static org.apache.nifi.commons.zendesk.ZendeskRecordPathUtils.resolveFieldValue; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.createRequestObject; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.getDynamicProperties; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.performPostRequest; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.responseBodyToString; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.web.client.api.HttpResponseStatus.CREATED; +import static org.apache.nifi.web.client.api.HttpResponseStatus.OK; + +@Tags({"zendesk, ticket"}) +@CapabilityDescription("Create Zendesk tickets using the Zendesk API.") +@DynamicProperty( + name = "The path in the request object to add.", + value = "The path in the incoming record to get the value from.", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Additional property to be added to the Zendesk request object.") +public class PutZendeskTicket extends AbstractProcessor { + + static final String ZENDESK_RECORD_READER_NAME = "zendesk-record-reader"; + + private static final ObjectMapper mapper = new ObjectMapper(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name(ZENDESK_RECORD_READER_NAME) + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_COMMENT_BODY = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_COMMENT_BODY_NAME) + .displayName("Comment Body") + .description("The content or the path to the comment body in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .required(true) + .dependsOn(RECORD_READER) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_SUBJECT = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_SUBJECT_NAME) + .displayName("Subject") + .description("The content or the path to the subject in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .dependsOn(RECORD_READER) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_PRIORITY = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_PRIORITY_NAME) + .displayName("Priority") + .description("The content or the path to the priority in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .dependsOn(RECORD_READER) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_TYPE = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_TYPE_NAME) + .displayName("Type") + .description("The content or the path to the type in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .dependsOn(RECORD_READER) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + RECORD_READER, + WEB_CLIENT_SERVICE_PROVIDER, + ZENDESK_SUBDOMAIN, + ZENDESK_USER, + ZENDESK_AUTHENTICATION_TYPE, + ZENDESK_AUTHENTICATION_CREDENTIAL, + ZENDESK_TICKET_COMMENT_BODY, + ZENDESK_TICKET_SUBJECT, + ZENDESK_TICKET_PRIORITY, + ZENDESK_TICKET_TYPE + )); + + private volatile WebClientServiceProvider webClientServiceProvider; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name(REL_SUCCESS_NAME) + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name(REL_FAILURE_NAME) + .description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.") + .build(); + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dynamic(true) + .build(); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_FAILURE + ))); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + long startNanos = System.nanoTime(); + HttpResponseEntity response; + URI uri; + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final ZendeskAuthenticationContext authenticationContext = new ZendeskAuthenticationContext(context); + + if (readerFactory == null) { + try (final InputStream inputStream = session.read(flowFile)) { + final HttpUriBuilder uriBuilder = uriBuilder(authenticationContext.getSubDomain(), ZENDESK_CREATE_TICKET_RESOURCE); + uri = uriBuilder.build(); + response = performPostRequest(authenticationContext, webClientServiceProvider, uri, inputStream); + } catch (IOException e) { + throw new ProcessException("Could not read incoming FlowFile", e); + } + } else { + final String commentBody = context.getProperty(ZENDESK_TICKET_COMMENT_BODY).evaluateAttributeExpressions().getValue(); + final String subject = context.getProperty(ZENDESK_TICKET_SUBJECT).evaluateAttributeExpressions().getValue(); + final String priority = context.getProperty(ZENDESK_TICKET_PRIORITY).evaluateAttributeExpressions().getValue(); + final String type = context.getProperty(ZENDESK_TICKET_TYPE).evaluateAttributeExpressions().getValue(); + final Map<String, String> dynamicProperties = getDynamicProperties(context, context.getProperties()); + List<ObjectNode> zendeskTickets = new ArrayList<>(); + + try (final InputStream in = session.read(flowFile); final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { + Record record; + + while ((record = reader.nextRecord()) != null) { + ObjectNode baseTicketNode = mapper.createObjectNode(); + resolveFieldValue("/comment/body", commentBody, baseTicketNode, record); + + if (subject != null) { + resolveFieldValue("/subject", subject, baseTicketNode, record); + } + if (priority != null) { + resolveFieldValue("/priority", priority, baseTicketNode, record); + } + if (type != null) { + resolveFieldValue("/type", type, baseTicketNode, record); + } + for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) { + addDynamicField(dynamicProperty.getKey(), dynamicProperty.getValue(), baseTicketNode, record); + } + zendeskTickets.add(baseTicketNode); + } + + } catch (IOException | SchemaNotFoundException | MalformedRecordException e) { + getLogger().error("Error occurred while creating zendesk tickets", e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + if (zendeskTickets.size() == 0) { Review Comment: You can use `Collection::isEmpty` here. ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/resources/docs.org.apache.nifi.processors.zendesk.PutZendeskTicket/additionalDetails.html: ########## @@ -0,0 +1,72 @@ +<!DOCTYPE html> +<html lang="en" xmlns="http://www.w3.org/1999/html"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + <head> + <meta charset="utf-8"/> + <title>GetZendesk</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/> + <style> + h2 {margin-top: 4em} + h3 {margin-top: 3em} + td {text-align: left} + </style> + </head> + + <body> + <h1>PutZendeskTicket</h1> + + <h3>Description</h3> + + <p> + The processor uses the Zendesk API to ingest tickets into Zendesk. The processor is capable to send requests directly from the flowFile content or construct the request objects from the incoming records using RecordReader. + </p> Review Comment: ```suggestion <p> The processor uses the Zendesk API to ingest tickets into Zendesk. The processor is capable to send requests directly from the FlowFile content or construct the request objects from the incoming records using a RecordReader. </p> ``` ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/PutZendeskTicket.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.zendesk; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.commons.zendesk.ZendeskAuthenticationContext; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.HTTPS; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.REL_FAILURE_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.REL_SUCCESS_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.WEB_CLIENT_SERVICE_PROVIDER; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_AUTHENTICATION_CREDENTIAL; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_AUTHENTICATION_TYPE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_CREATE_TICKETS_RESOURCE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_CREATE_TICKET_RESOURCE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_HOST_TEMPLATE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_SUBDOMAIN; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_COMMENT_BODY_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_PRIORITY_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_SUBJECT_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_TYPE_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_USER; +import static org.apache.nifi.commons.zendesk.ZendeskRecordPathUtils.addDynamicField; +import static org.apache.nifi.commons.zendesk.ZendeskRecordPathUtils.resolveFieldValue; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.createRequestObject; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.getDynamicProperties; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.performPostRequest; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.responseBodyToString; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.web.client.api.HttpResponseStatus.CREATED; +import static org.apache.nifi.web.client.api.HttpResponseStatus.OK; + +@Tags({"zendesk, ticket"}) +@CapabilityDescription("Create Zendesk tickets using the Zendesk API.") +@DynamicProperty( + name = "The path in the request object to add.", + value = "The path in the incoming record to get the value from.", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Additional property to be added to the Zendesk request object.") +public class PutZendeskTicket extends AbstractProcessor { + + static final String ZENDESK_RECORD_READER_NAME = "zendesk-record-reader"; + + private static final ObjectMapper mapper = new ObjectMapper(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name(ZENDESK_RECORD_READER_NAME) + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_COMMENT_BODY = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_COMMENT_BODY_NAME) + .displayName("Comment Body") + .description("The content or the path to the comment body in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .required(true) + .dependsOn(RECORD_READER) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_SUBJECT = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_SUBJECT_NAME) + .displayName("Subject") + .description("The content or the path to the subject in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .dependsOn(RECORD_READER) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_PRIORITY = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_PRIORITY_NAME) + .displayName("Priority") + .description("The content or the path to the priority in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .dependsOn(RECORD_READER) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_TYPE = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_TYPE_NAME) + .displayName("Type") + .description("The content or the path to the type in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .dependsOn(RECORD_READER) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + RECORD_READER, + WEB_CLIENT_SERVICE_PROVIDER, + ZENDESK_SUBDOMAIN, + ZENDESK_USER, + ZENDESK_AUTHENTICATION_TYPE, + ZENDESK_AUTHENTICATION_CREDENTIAL, + ZENDESK_TICKET_COMMENT_BODY, + ZENDESK_TICKET_SUBJECT, + ZENDESK_TICKET_PRIORITY, + ZENDESK_TICKET_TYPE + )); + + private volatile WebClientServiceProvider webClientServiceProvider; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name(REL_SUCCESS_NAME) + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name(REL_FAILURE_NAME) + .description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.") + .build(); + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dynamic(true) + .build(); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_FAILURE + ))); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + long startNanos = System.nanoTime(); + HttpResponseEntity response; + URI uri; + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final ZendeskAuthenticationContext authenticationContext = new ZendeskAuthenticationContext(context); + + if (readerFactory == null) { + try (final InputStream inputStream = session.read(flowFile)) { + final HttpUriBuilder uriBuilder = uriBuilder(authenticationContext.getSubDomain(), ZENDESK_CREATE_TICKET_RESOURCE); + uri = uriBuilder.build(); + response = performPostRequest(authenticationContext, webClientServiceProvider, uri, inputStream); + } catch (IOException e) { + throw new ProcessException("Could not read incoming FlowFile", e); + } + } else { + final String commentBody = context.getProperty(ZENDESK_TICKET_COMMENT_BODY).evaluateAttributeExpressions().getValue(); + final String subject = context.getProperty(ZENDESK_TICKET_SUBJECT).evaluateAttributeExpressions().getValue(); + final String priority = context.getProperty(ZENDESK_TICKET_PRIORITY).evaluateAttributeExpressions().getValue(); + final String type = context.getProperty(ZENDESK_TICKET_TYPE).evaluateAttributeExpressions().getValue(); + final Map<String, String> dynamicProperties = getDynamicProperties(context, context.getProperties()); + List<ObjectNode> zendeskTickets = new ArrayList<>(); + + try (final InputStream in = session.read(flowFile); final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { + Record record; + + while ((record = reader.nextRecord()) != null) { + ObjectNode baseTicketNode = mapper.createObjectNode(); + resolveFieldValue("/comment/body", commentBody, baseTicketNode, record); + + if (subject != null) { + resolveFieldValue("/subject", subject, baseTicketNode, record); + } + if (priority != null) { + resolveFieldValue("/priority", priority, baseTicketNode, record); + } + if (type != null) { + resolveFieldValue("/type", type, baseTicketNode, record); + } + for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) { + addDynamicField(dynamicProperty.getKey(), dynamicProperty.getValue(), baseTicketNode, record); + } + zendeskTickets.add(baseTicketNode); + } + + } catch (IOException | SchemaNotFoundException | MalformedRecordException e) { + getLogger().error("Error occurred while creating zendesk tickets", e); Review Comment: ```suggestion getLogger().error("Error occurred while creating Zendesk tickets", e); ``` ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/resources/docs.org.apache.nifi.processors.zendesk.PutZendeskTicket/additionalDetails.html: ########## @@ -0,0 +1,72 @@ +<!DOCTYPE html> +<html lang="en" xmlns="http://www.w3.org/1999/html"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + <head> + <meta charset="utf-8"/> + <title>GetZendesk</title> + <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/> + <style> + h2 {margin-top: 4em} + h3 {margin-top: 3em} + td {text-align: left} + </style> + </head> + + <body> + <h1>PutZendeskTicket</h1> + + <h3>Description</h3> + + <p> + The processor uses the Zendesk API to ingest tickets into Zendesk. The processor is capable to send requests directly from the flowFile content or construct the request objects from the incoming records using RecordReader. + </p> + + <h3>Authentication</h3> + + <p> + Zendesk API uses basic authentication. Either a password or an authentication token have to be provided. + Authentication token can be created in Zendesk API Settings, so the users don't have to expose their passwords, + and also auth tokens can be revoked quickly if necessary. Review Comment: ```suggestion Zendesk API uses basic authentication. Either a password or an authentication token has to be provided. In Zendesk API Settings, it's possible to generate authentication tokens, eliminating the need for users to expose their passwords. This approach also offers the advantage of fast token revocation when required. ``` ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/java/org/apache/nifi/processors/zendesk/PutZendeskTicket.java: ########## @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.zendesk; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.commons.zendesk.ZendeskAuthenticationContext; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.web.client.api.HttpResponseEntity; +import org.apache.nifi.web.client.api.HttpUriBuilder; +import org.apache.nifi.web.client.provider.api.WebClientServiceProvider; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.HTTPS; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.REL_FAILURE_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.REL_SUCCESS_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.WEB_CLIENT_SERVICE_PROVIDER; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_AUTHENTICATION_CREDENTIAL; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_AUTHENTICATION_TYPE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_CREATE_TICKETS_RESOURCE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_CREATE_TICKET_RESOURCE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_HOST_TEMPLATE; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_SUBDOMAIN; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_COMMENT_BODY_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_PRIORITY_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_SUBJECT_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_TICKET_TYPE_NAME; +import static org.apache.nifi.commons.zendesk.ZendeskProperties.ZENDESK_USER; +import static org.apache.nifi.commons.zendesk.ZendeskRecordPathUtils.addDynamicField; +import static org.apache.nifi.commons.zendesk.ZendeskRecordPathUtils.resolveFieldValue; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.createRequestObject; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.getDynamicProperties; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.performPostRequest; +import static org.apache.nifi.commons.zendesk.ZendeskUtils.responseBodyToString; +import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES; +import static org.apache.nifi.web.client.api.HttpResponseStatus.CREATED; +import static org.apache.nifi.web.client.api.HttpResponseStatus.OK; + +@Tags({"zendesk, ticket"}) +@CapabilityDescription("Create Zendesk tickets using the Zendesk API.") +@DynamicProperty( + name = "The path in the request object to add.", + value = "The path in the incoming record to get the value from.", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Additional property to be added to the Zendesk request object.") +public class PutZendeskTicket extends AbstractProcessor { + + static final String ZENDESK_RECORD_READER_NAME = "zendesk-record-reader"; + + private static final ObjectMapper mapper = new ObjectMapper(); + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name(ZENDESK_RECORD_READER_NAME) + .displayName("Record Reader") + .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_COMMENT_BODY = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_COMMENT_BODY_NAME) + .displayName("Comment Body") + .description("The content or the path to the comment body in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .required(true) + .dependsOn(RECORD_READER) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_SUBJECT = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_SUBJECT_NAME) + .displayName("Subject") + .description("The content or the path to the subject in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .dependsOn(RECORD_READER) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_PRIORITY = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_PRIORITY_NAME) + .displayName("Priority") + .description("The content or the path to the priority in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .dependsOn(RECORD_READER) + .build(); + + public static final PropertyDescriptor ZENDESK_TICKET_TYPE = new PropertyDescriptor.Builder() + .name(ZENDESK_TICKET_TYPE_NAME) + .displayName("Type") + .description("The content or the path to the type in the incoming record.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .dependsOn(RECORD_READER) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( + RECORD_READER, + WEB_CLIENT_SERVICE_PROVIDER, + ZENDESK_SUBDOMAIN, + ZENDESK_USER, + ZENDESK_AUTHENTICATION_TYPE, + ZENDESK_AUTHENTICATION_CREDENTIAL, + ZENDESK_TICKET_COMMENT_BODY, + ZENDESK_TICKET_SUBJECT, + ZENDESK_TICKET_PRIORITY, + ZENDESK_TICKET_TYPE + )); + + private volatile WebClientServiceProvider webClientServiceProvider; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name(REL_SUCCESS_NAME) + .description("For FlowFiles created as a result of a successful HTTP request.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name(REL_FAILURE_NAME) + .description("A FlowFile is routed to this relationship if the operation failed and retrying the operation will also fail, such as an invalid data or schema.") + .build(); + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dynamic(true) + .build(); + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( + REL_SUCCESS, + REL_FAILURE + ))); + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnScheduled + public void onScheduled(ProcessContext context) { + webClientServiceProvider = context.getProperty(WEB_CLIENT_SERVICE_PROVIDER).asControllerService(WebClientServiceProvider.class); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) { + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + long startNanos = System.nanoTime(); + HttpResponseEntity response; + URI uri; + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final ZendeskAuthenticationContext authenticationContext = new ZendeskAuthenticationContext(context); + + if (readerFactory == null) { + try (final InputStream inputStream = session.read(flowFile)) { + final HttpUriBuilder uriBuilder = uriBuilder(authenticationContext.getSubDomain(), ZENDESK_CREATE_TICKET_RESOURCE); + uri = uriBuilder.build(); + response = performPostRequest(authenticationContext, webClientServiceProvider, uri, inputStream); + } catch (IOException e) { + throw new ProcessException("Could not read incoming FlowFile", e); + } + } else { + final String commentBody = context.getProperty(ZENDESK_TICKET_COMMENT_BODY).evaluateAttributeExpressions().getValue(); + final String subject = context.getProperty(ZENDESK_TICKET_SUBJECT).evaluateAttributeExpressions().getValue(); + final String priority = context.getProperty(ZENDESK_TICKET_PRIORITY).evaluateAttributeExpressions().getValue(); + final String type = context.getProperty(ZENDESK_TICKET_TYPE).evaluateAttributeExpressions().getValue(); + final Map<String, String> dynamicProperties = getDynamicProperties(context, context.getProperties()); + List<ObjectNode> zendeskTickets = new ArrayList<>(); + + try (final InputStream in = session.read(flowFile); final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) { + Record record; + + while ((record = reader.nextRecord()) != null) { + ObjectNode baseTicketNode = mapper.createObjectNode(); + resolveFieldValue("/comment/body", commentBody, baseTicketNode, record); + + if (subject != null) { + resolveFieldValue("/subject", subject, baseTicketNode, record); + } + if (priority != null) { + resolveFieldValue("/priority", priority, baseTicketNode, record); + } + if (type != null) { + resolveFieldValue("/type", type, baseTicketNode, record); + } + for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) { + addDynamicField(dynamicProperty.getKey(), dynamicProperty.getValue(), baseTicketNode, record); + } + zendeskTickets.add(baseTicketNode); + } + + } catch (IOException | SchemaNotFoundException | MalformedRecordException e) { + getLogger().error("Error occurred while creating zendesk tickets", e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + + if (zendeskTickets.size() == 0) { + getLogger().info("No record were received"); + return; + } + + try { + final InputStream inputStream = createRequestObject(mapper, zendeskTickets); + uri = createUri(authenticationContext, zendeskTickets.size()); + response = performPostRequest(authenticationContext, webClientServiceProvider, uri, inputStream); + } catch (IOException e) { + getLogger().error("Failed to post request to Zendesk", e); + session.transfer(session.penalize(flowFile), REL_FAILURE); + return; + } + } + + handleResponse(session, flowFile, response, uri, startNanos); + } + + private void handleResponse(ProcessSession session, FlowFile flowFile, HttpResponseEntity response, URI uri, long startNanos) { + if (response.statusCode() == CREATED.getCode() || response.statusCode() == OK.getCode()) { + session.transfer(flowFile, REL_SUCCESS); + long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, uri.toString(), transferMillis); + } else { + getLogger().error("Zendesk ticket creation returned with error, HTTP status={}, response={}", response.statusCode(), responseBodyToString(response)); + session.transfer(session.penalize(flowFile), REL_FAILURE); + } + } + + private URI createUri(ZendeskAuthenticationContext authenticationContext, int numberOfTickets) { + final String resource = numberOfTickets > 1 ? ZENDESK_CREATE_TICKETS_RESOURCE : ZENDESK_CREATE_TICKET_RESOURCE; + final HttpUriBuilder uriBuilder = uriBuilder(authenticationContext.getSubDomain(), resource); + return uriBuilder.build(); + } + + HttpUriBuilder uriBuilder(String subDomain, String resourcePath) { Review Comment: I suggest creating a new ZendeskClient class dedicated to handling HTTP operations. In doing so, you can relocate the components responsible for creating URLs from PutZendeskTicket and GetZendeskTicket and the method ZendeskUtils::performRequest as well. This approach will lead to better organization and separation of concerns within the codebase. ########## nifi-nar-bundles/nifi-zendesk-bundle/nifi-zendesk-processors/src/main/resources/docs.org.apache.nifi.processors.zendesk.PutZendeskTicket/additionalDetails.html: ########## @@ -0,0 +1,72 @@ +<!DOCTYPE html> +<html lang="en" xmlns="http://www.w3.org/1999/html"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + <head> + <meta charset="utf-8"/> + <title>GetZendesk</title> Review Comment: ```suggestion <title>PutZendesk</title> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org