Yes, happens everytime, I started spark job after starting NIFI as well.same
result.

here is my sample spark application

package com.dtcc.nifi;
import java.nio.charset.StandardCharsets;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.spark.NiFiDataPacket;
import org.apache.nifi.spark.NiFiReceiver;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;



public class MyProcessor {

        private final static Log log = LogFactory.getLog(MyProcessor.class);

        public static void main(String args[]){
                
                System.out.println("blah***");
                 printToDebugLogsAndConsole("Satarting ...", "starting ......", 
log);
                
                
                SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
                  .url("http://sxmn5:8080/nifi";)
                  .portName("Data_For_Spark")
                  .buildConfig();
                
                 SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark 
Streaming
example");
                 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
new
Duration(1000L));
                 
                 // Create a JavaReceiverInputDStream using a NiFi receiver so 
that we can
pull data from 
                 // specified Port
                 JavaReceiverInputDStream packetStream = 
                     ssc.receiverStream(new NiFiReceiver(config,
StorageLevel.MEMORY_ONLY()));

                 // Map the data from NiFi to text, ignoring the attributes
                 JavaDStream text = packetStream.map(new 
Function<NiFiDataPacket,
String>() {
                
                
                         @Override
                        public String call(NiFiDataPacket dataPacket) throws 
Exception {
                                // TODO Auto-generated method stub
                                 System.out.println("blah***");
                                 printToDebugLogsAndConsole("blah###", new
String(dataPacket.getContent()), log);
                                
                             return dataPacket.getAttributes().get("uuid");     
                
                         }
                 });
                 
                 text.print();
                 
                 ssc.start();
                 ssc.awaitTermination(1000000000);// make this 0 eventually
                 
                 
                
        }
        
        public static void printToDebugLogsAndConsole(String description,
                        String msg,Log log) {
                String lineBling = "******* ";
                log.info(lineBling + "info: " + description + " : " + msg);
                log.debug(lineBling + "debug: " + description + " : " + msg);
                System.out.println(lineBling + "sysout: " + description + " : " 
+ msg);
        }

        public static void printToErrorLogsAndConsole(String description,
                        String msg,Log log) {
                String lineBling = "$$$$$$$ ";
                log.error(lineBling + "error: " + description + " : " + msg);
                System.out.println(lineBling + "sysout: " + description + " : " 
+ msg);
        }
}





--
View this message in context: 
http://apache-nifi-developer-list.39713.n7.nabble.com/site-to-site-communication-error-on-output-port-tp10698p10709.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.

Reply via email to