[ https://issues.apache.org/jira/browse/NIFI-1571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15198616#comment-15198616 ]
ASF GitHub Bot commented on NIFI-1571: -------------------------------------- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/271#discussion_r56446675 --- Diff: nifi-nar-bundles/nifi-spring-bundle/nifi-spring-processors/src/main/java/org/apache/nifi/spring/SpringContextProcessor.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.spring; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.spring.SpringDataExchanger.SpringResponse; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.PollableChannel; + +/** + * Implementation of {@link Processor} capable of sending and receiving data + * from application defined in Spring Application context. It does so via + * predefined in/out {@link MessageChannel}s (see spring-messaging module of + * Spring). Once such channels are defined user is free to implement the rest of + * the application any way they wish (e.g., custom code and/or using frameworks + * such as Spring Integration or Camel). + * <p> + * The requirement and expectations for channel types are: + * <ul> + * <li>Input channel must be of type {@link MessageChannel} and named "fromNiFi" + * (see {@link SpringNiFiConstants#FROM_NIFI})</li> + * <li>Output channel must be of type {@link PollableChannel} and named "toNiFi" + * (see {@link SpringNiFiConstants#TO_NIFI})</li> + * </ul> + * </p> + * Below is the example of sample configuration: + * + * <pre> + * <?xml version="1.0" encoding="UTF-8"?> + * <beans xmlns="http://www.springframework.org/schema/beans" + * xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + * xmlns:int="http://www.springframework.org/schema/integration" + * xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + * http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd"> + * + * <int:channel id="fromNiFi"/> + * + * . . . . . + * + * <int:channel id="toNiFi"> + * <int:queue/> + * </int:channel> + * + * </beans> + * </pre> + * <p> + * Defining {@link MessageChannel} is optional. That's why this processor + * supports 3 modes of interaction with Spring Application Context: + * <ul> + * <li>Headless – no channels are defined therefore nothing is sent to or + * received from such Application Contexts (i.e., some monitoring app).</li> + * <li>One way (NiFi -> Spring or Spring -> NiFi) - depends on existence + * of one of "fromNiFi" or "toNiFi" channel in the Spring Application Context. + * </li> + * <li>Bi-directional (NiFi -> Spring -> Nifi or Spring -> NiFi -> + * Spring) - depends on existence of both "fromNiFi" and "toNiFi" channels in + * the Spring Application Context</li> + * </ul> + * + * </p> + * <p> + * To create an instance of the ApplicationConetxt this processor requires user + * to provide configuration file path and the path to the resources that needs + * to be added to the classpath of ApplicationContext. This essentially allows + * user to package their Spring Application any way they want as long as + * everything it requires is available on the classpath. + * </p> + * <p> + * Data exchange between Spring and NiFi relies on simple mechanism which is + * exposed via {@link SpringDataExchanger}; {@link FlowFile}s's content is + * converted to primitive representation that can be easily wrapped in Spring + * {@link Message}. The requirement imposed by this Processor is to send/receive + * {@link Message} with <i>payload</i> of type <i>byte[]</i> and headers of type + * <i>Map<String, Object></i>. This is primarily for simplicity and type + * safety. Converters and Transformers could be used by either side to change + * representation of the content that is being exchanged between NiFi and + * Spring. + */ +@TriggerWhenEmpty +@Tags({ "Spring", "Message", "Get", "Put", "Integration" }) +@CapabilityDescription("A Processor that supports sending and receiving data from application defined in " + + "Spring Application Context via predefined in/out MessageChannels.") +public class SpringContextProcessor extends AbstractProcessor { + private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class); + + public static final PropertyDescriptor CTX_CONFIG_NAME = new PropertyDescriptor.Builder() + .name("Application Context config path") + .description("The path to the Spring Application Context configuration file relative to the classpath") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) --- End diff -- Sounds like a job for https://issues.apache.org/jira/browse/NIFI-1121 (someday) > Provide generic processor that would bootstrap itself from Spring's > Application Context > --------------------------------------------------------------------------------------- > > Key: NIFI-1571 > URL: https://issues.apache.org/jira/browse/NIFI-1571 > Project: Apache NiFi > Issue Type: New Feature > Reporter: Oleg Zhurakousky > Assignee: Oleg Zhurakousky > Fix For: 0.6.0 > > > So, several clients have expressed interests in using WorkFlow orchestration > frameworks such as Camel, Spring Integration etc. to be able to encapsulate > yet modularize and externalize the complexity of some of the custom > processors as well as handle some of the use cases that fall outside of scope > of Data Flow paradigm (e.g., transactional context and XA between two+ > Processors). > There is already a ticket to provide Camel support - NIFI-924. However > realizing that both Camel and naturally Spring Integration is based on Spring > Application Context it appears that instead of having multiple extensions we > should have a more generic extension for a Processor that would delegate its > processing to a bean in provided Spring Application Context (AC). This way AC > becomes a black box and could contain anything (e.g., Camel, Spring > Integration or some custom user code). -- This message was sent by Atlassian JIRA (v6.3.4#6332)