This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 71dd5576db NIFI-12065 Removed nifi-spark-receiver module
71dd5576db is described below

commit 71dd5576dbc66b669a6840d03d48992a6263210a
Author: Joseph Witt <joew...@apache.org>
AuthorDate: Thu Sep 14 15:12:04 2023 -0700

    NIFI-12065 Removed nifi-spark-receiver module
    
    This closes #7732
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 nifi-external/nifi-spark-receiver/pom.xml          |  71 --------
 .../java/org/apache/nifi/spark/NiFiDataPacket.java |  39 ----
 .../java/org/apache/nifi/spark/NiFiReceiver.java   | 197 ---------------------
 .../apache/nifi/spark/StandardNiFiDataPacket.java  |  43 -----
 nifi-external/pom.xml                              |   1 -
 5 files changed, 351 deletions(-)

diff --git a/nifi-external/nifi-spark-receiver/pom.xml 
b/nifi-external/nifi-spark-receiver/pom.xml
deleted file mode 100644
index 3917a109c8..0000000000
--- a/nifi-external/nifi-spark-receiver/pom.xml
+++ /dev/null
@@ -1,71 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements. See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License. You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.nifi</groupId>
-        <artifactId>nifi-external</artifactId>
-        <version>2.0.0-SNAPSHOT</version>
-    </parent>
-    <groupId>org.apache.nifi</groupId>
-    <artifactId>nifi-spark-receiver</artifactId>
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming_2.13</artifactId>
-            <scope>provided</scope>
-            <version>3.3.2</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>commons-logging</groupId>
-                    <artifactId>commons-logging</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-core</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>log4j-over-slf4j</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>jcl-over-slf4j</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi</groupId>
-            <artifactId>nifi-site-to-site-client</artifactId>
-            <version>2.0.0-SNAPSHOT</version>
-        </dependency>
-    </dependencies>
-    <dependencyManagement>
-        <dependencies>
-            <!-- Override Hadoop -->
-            <dependency>
-                <groupId>org.apache.hadoop</groupId>
-                <artifactId>hadoop-client-api</artifactId>
-                <version>${hadoop.version}</version>
-            </dependency>
-            <dependency>
-                <groupId>org.apache.hadoop</groupId>
-                <artifactId>hadoop-client-runtime</artifactId>
-                <version>${hadoop.version}</version>
-            </dependency>
-        </dependencies>
-    </dependencyManagement>
-</project>
diff --git 
a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
 
b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
deleted file mode 100644
index 608aa2c2fe..0000000000
--- 
a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.spark;
-
-import java.util.Map;
-
-/**
- * <p>
- * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps 
both
- * a FlowFile's content and its attributes so that they can be processed by
- * Spark
- * </p>
- */
-public interface NiFiDataPacket {
-
-    /**
-     * @return the contents of a NiFi FlowFile
-     */
-    byte[] getContent();
-
-    /**
-     * @return a Map of attributes that are associated with the NiFi FlowFile
-     */
-    Map<String, String> getAttributes();
-}
diff --git 
a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
 
b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
deleted file mode 100644
index 83a7e42ed7..0000000000
--- 
a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.spark;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.remote.Transaction;
-import org.apache.nifi.remote.TransferDirection;
-import org.apache.nifi.remote.client.SiteToSiteClient;
-import org.apache.nifi.remote.client.SiteToSiteClientConfig;
-import org.apache.nifi.remote.protocol.DataPacket;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.receiver.Receiver;
-
-/**
- * <p>
- * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to
- * pull data from Apache NiFi so that it can be processed by Spark Streaming.
- * The NiFi Receiver connects to NiFi instance provided in the config and
- * requests data from the OutputPort that is named. In NiFi, when an OutputPort
- * is added to the root process group, it acts as a queue of data for remote
- * clients. This receiver is then able to pull that data from NiFi reliably.
- * </p>
- *
- * <p>
- * It is important to note that if pulling data from a NiFi cluster, the URL
- * that should be used is that of the NiFi Cluster Manager. The Receiver will
- * automatically handle determining the nodes in that cluster and pull from
- * those nodes as appropriate.
- * </p>
- *
- * <p>
- * In order to use the NiFiReceiver, you will need to first build a
- * {@link SiteToSiteClientConfig} to provide to the constructor. This can be
- * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
- * snippet of driver code to pull data from NiFi that is running on
- * localhost:8080. This example assumes that NiFi exposes and OutputPort on the
- * root group named "Data For Spark". Additionally, it assumes that the data
- * that it will receive from this OutputPort is text data, as it will map the
- * byte array received from NiFi to a UTF-8 Encoded string.
- * </p>
- *
- * <code>
- * <pre>
- * {@code
- * Pattern SPACE = Pattern.compile(" ");
- *
- * // Build a Site-to-site client config
- * SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
- *   .setUrl("http://localhost:8080/nifi";)
- *   .setPortName("Data For Spark")
- *   .buildConfig();
- *
- * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming 
example");
- * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new 
Duration(1000L));
- *
- * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can 
pull data from
- * // specified Port
- * JavaReceiverInputDStream<NiFiDataPacket> packetStream =
- *     ssc.receiverStream(new NiFiReceiver(clientConfig, 
StorageLevel.MEMORY_ONLY()));
- *
- * // Map the data from NiFi to text, ignoring the attributes
- * JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, 
String>() {
- *   public String call(final NiFiDataPacket dataPacket) throws Exception {
- *     return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
- *   }
- * });
- *
- * // Split the words by spaces
- * JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, 
String>() {
- *   public Iterable<String> call(final String text) throws Exception {
- *     return Arrays.asList(SPACE.split(text));
- *   }
- * });
- *
- * // Map each word to the number 1, then aggregate by key
- * JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
- *   new PairFunction<String, String, Integer>() {
- *     public Tuple2<String, Integer> call(String s) {
- *       return new Tuple2<String, Integer>(s, 1);
- *     }
- *   }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- *     public Integer call(Integer i1, Integer i2) {
- *       return i1 + i2;
- *     }
- *    }
- *  );
- *
- * // print the results
- * wordCounts.print();
- * ssc.start();
- * ssc.awaitTermination();
- * }
- * </pre>
- * </code>
- */
-public class NiFiReceiver extends Receiver<NiFiDataPacket> {
-
-    private static final long serialVersionUID = 3067274587595578836L;
-    private final SiteToSiteClientConfig clientConfig;
-
-    public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final 
StorageLevel storageLevel) {
-        super(storageLevel);
-        this.clientConfig = clientConfig;
-    }
-
-    @Override
-    public void onStart() {
-        final Thread thread = new Thread(new ReceiveRunnable());
-        thread.setDaemon(true);
-        thread.setName("NiFi Receiver");
-        thread.start();
-    }
-
-    @Override
-    public void onStop() {
-    }
-
-    class ReceiveRunnable implements Runnable {
-
-        public ReceiveRunnable() {
-        }
-
-        @Override
-        public void run() {
-            try {
-                final SiteToSiteClient client = new 
SiteToSiteClient.Builder().fromConfig(clientConfig).build();
-                try {
-                    while (!isStopped()) {
-                        final Transaction transaction = 
client.createTransaction(TransferDirection.RECEIVE);
-                        DataPacket dataPacket = transaction.receive();
-                        if (dataPacket == null) {
-                            transaction.confirm();
-                            transaction.complete();
-
-                            // no data available. Wait a bit and try again
-                            try {
-                                Thread.sleep(1000L);
-                            } catch (InterruptedException e) {
-                            }
-
-                            continue;
-                        }
-
-                        final List<NiFiDataPacket> dataPackets = new 
ArrayList<>();
-                        do {
-                            // Read the data into a byte array and wrap it 
along with the attributes
-                            // into a NiFiDataPacket.
-                            final InputStream inStream = dataPacket.getData();
-                            final byte[] data = new byte[(int) 
dataPacket.getSize()];
-                            StreamUtils.fillBuffer(inStream, data);
-
-                            final Map<String, String> attributes = 
dataPacket.getAttributes();
-                            final NiFiDataPacket NiFiDataPacket = new 
StandardNiFiDataPacket(data, attributes);
-                            dataPackets.add(NiFiDataPacket);
-                            dataPacket = transaction.receive();
-                        } while (dataPacket != null);
-
-                        // Confirm transaction to verify the data
-                        transaction.confirm();
-
-                        store(dataPackets.iterator());
-
-                        transaction.complete();
-                    }
-                } finally {
-                    try {
-                        client.close();
-                    } catch (final IOException ioe) {
-                        reportError("Failed to close client", ioe);
-                    }
-                }
-            } catch (final IOException ioe) {
-                restart("Failed to receive data from NiFi", ioe);
-            }
-        }
-    }
-}
diff --git 
a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/StandardNiFiDataPacket.java
 
b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/StandardNiFiDataPacket.java
deleted file mode 100644
index 80bbe8366d..0000000000
--- 
a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/StandardNiFiDataPacket.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.spark;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable {
-    private static final long serialVersionUID = 6364005260220243322L;
-
-    private final byte[] content;
-    private final Map<String, String> attributes;
-
-    public StandardNiFiDataPacket(final byte[] content, final Map<String, 
String> attributes) {
-        this.content = content;
-        this.attributes = attributes;
-    }
-
-    @Override
-    public byte[] getContent() {
-        return content;
-    }
-
-    @Override
-    public Map<String, String> getAttributes() {
-        return attributes;
-    }
-
-}
diff --git a/nifi-external/pom.xml b/nifi-external/pom.xml
index aeb8fe5d84..d393f7e2d3 100644
--- a/nifi-external/pom.xml
+++ b/nifi-external/pom.xml
@@ -23,7 +23,6 @@
     <artifactId>nifi-external</artifactId>
     <packaging>pom</packaging>
     <modules>
-        <module>nifi-spark-receiver</module>
         <module>nifi-example-bundle</module>
         <module>nifi-kafka-connect</module>
     </modules>

Reply via email to