This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 71dd5576db NIFI-12065 Removed nifi-spark-receiver module 71dd5576db is described below commit 71dd5576dbc66b669a6840d03d48992a6263210a Author: Joseph Witt <joew...@apache.org> AuthorDate: Thu Sep 14 15:12:04 2023 -0700 NIFI-12065 Removed nifi-spark-receiver module This closes #7732 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- nifi-external/nifi-spark-receiver/pom.xml | 71 -------- .../java/org/apache/nifi/spark/NiFiDataPacket.java | 39 ---- .../java/org/apache/nifi/spark/NiFiReceiver.java | 197 --------------------- .../apache/nifi/spark/StandardNiFiDataPacket.java | 43 ----- nifi-external/pom.xml | 1 - 5 files changed, 351 deletions(-) diff --git a/nifi-external/nifi-spark-receiver/pom.xml b/nifi-external/nifi-spark-receiver/pom.xml deleted file mode 100644 index 3917a109c8..0000000000 --- a/nifi-external/nifi-spark-receiver/pom.xml +++ /dev/null @@ -1,71 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-external</artifactId> - <version>2.0.0-SNAPSHOT</version> - </parent> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-spark-receiver</artifactId> - <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming_2.13</artifactId> - <scope>provided</scope> - <version>3.3.2</version> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> - </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-site-to-site-client</artifactId> - <version>2.0.0-SNAPSHOT</version> - </dependency> - </dependencies> - <dependencyManagement> - <dependencies> - <!-- Override Hadoop --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client-api</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client-runtime</artifactId> - <version>${hadoop.version}</version> - </dependency> - </dependencies> - </dependencyManagement> -</project> diff --git a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java deleted file mode 100644 index 608aa2c2fe..0000000000 --- a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiDataPacket.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spark; - -import java.util.Map; - -/** - * <p> - * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both - * a FlowFile's content and its attributes so that they can be processed by - * Spark - * </p> - */ -public interface NiFiDataPacket { - - /** - * @return the contents of a NiFi FlowFile - */ - byte[] getContent(); - - /** - * @return a Map of attributes that are associated with the NiFi FlowFile - */ - Map<String, String> getAttributes(); -} diff --git a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java deleted file mode 100644 index 83a7e42ed7..0000000000 --- a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/NiFiReceiver.java +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spark; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.remote.Transaction; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; -import org.apache.nifi.remote.protocol.DataPacket; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.receiver.Receiver; - -/** - * <p> - * The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to - * pull data from Apache NiFi so that it can be processed by Spark Streaming. - * The NiFi Receiver connects to NiFi instance provided in the config and - * requests data from the OutputPort that is named. In NiFi, when an OutputPort - * is added to the root process group, it acts as a queue of data for remote - * clients. This receiver is then able to pull that data from NiFi reliably. - * </p> - * - * <p> - * It is important to note that if pulling data from a NiFi cluster, the URL - * that should be used is that of the NiFi Cluster Manager. The Receiver will - * automatically handle determining the nodes in that cluster and pull from - * those nodes as appropriate. - * </p> - * - * <p> - * In order to use the NiFiReceiver, you will need to first build a - * {@link SiteToSiteClientConfig} to provide to the constructor. This can be - * achieved by using the {@link SiteToSiteClient.Builder}. Below is an example - * snippet of driver code to pull data from NiFi that is running on - * localhost:8080. This example assumes that NiFi exposes and OutputPort on the - * root group named "Data For Spark". Additionally, it assumes that the data - * that it will receive from this OutputPort is text data, as it will map the - * byte array received from NiFi to a UTF-8 Encoded string. - * </p> - * - * <code> - * <pre> - * {@code - * Pattern SPACE = Pattern.compile(" "); - * - * // Build a Site-to-site client config - * SiteToSiteClientConfig config = new SiteToSiteClient.Builder() - * .setUrl("http://localhost:8080/nifi") - * .setPortName("Data For Spark") - * .buildConfig(); - * - * SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example"); - * JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L)); - * - * // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from - * // specified Port - * JavaReceiverInputDStream<NiFiDataPacket> packetStream = - * ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY())); - * - * // Map the data from NiFi to text, ignoring the attributes - * JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() { - * public String call(final NiFiDataPacket dataPacket) throws Exception { - * return new String(dataPacket.getContent(), StandardCharsets.UTF_8); - * } - * }); - * - * // Split the words by spaces - * JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() { - * public Iterable<String> call(final String text) throws Exception { - * return Arrays.asList(SPACE.split(text)); - * } - * }); - * - * // Map each word to the number 1, then aggregate by key - * JavaPairDStream<String, Integer> wordCounts = words.mapToPair( - * new PairFunction<String, String, Integer>() { - * public Tuple2<String, Integer> call(String s) { - * return new Tuple2<String, Integer>(s, 1); - * } - * }).reduceByKey(new Function2<Integer, Integer, Integer>() { - * public Integer call(Integer i1, Integer i2) { - * return i1 + i2; - * } - * } - * ); - * - * // print the results - * wordCounts.print(); - * ssc.start(); - * ssc.awaitTermination(); - * } - * </pre> - * </code> - */ -public class NiFiReceiver extends Receiver<NiFiDataPacket> { - - private static final long serialVersionUID = 3067274587595578836L; - private final SiteToSiteClientConfig clientConfig; - - public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) { - super(storageLevel); - this.clientConfig = clientConfig; - } - - @Override - public void onStart() { - final Thread thread = new Thread(new ReceiveRunnable()); - thread.setDaemon(true); - thread.setName("NiFi Receiver"); - thread.start(); - } - - @Override - public void onStop() { - } - - class ReceiveRunnable implements Runnable { - - public ReceiveRunnable() { - } - - @Override - public void run() { - try { - final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); - try { - while (!isStopped()) { - final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); - DataPacket dataPacket = transaction.receive(); - if (dataPacket == null) { - transaction.confirm(); - transaction.complete(); - - // no data available. Wait a bit and try again - try { - Thread.sleep(1000L); - } catch (InterruptedException e) { - } - - continue; - } - - final List<NiFiDataPacket> dataPackets = new ArrayList<>(); - do { - // Read the data into a byte array and wrap it along with the attributes - // into a NiFiDataPacket. - final InputStream inStream = dataPacket.getData(); - final byte[] data = new byte[(int) dataPacket.getSize()]; - StreamUtils.fillBuffer(inStream, data); - - final Map<String, String> attributes = dataPacket.getAttributes(); - final NiFiDataPacket NiFiDataPacket = new StandardNiFiDataPacket(data, attributes); - dataPackets.add(NiFiDataPacket); - dataPacket = transaction.receive(); - } while (dataPacket != null); - - // Confirm transaction to verify the data - transaction.confirm(); - - store(dataPackets.iterator()); - - transaction.complete(); - } - } finally { - try { - client.close(); - } catch (final IOException ioe) { - reportError("Failed to close client", ioe); - } - } - } catch (final IOException ioe) { - restart("Failed to receive data from NiFi", ioe); - } - } - } -} diff --git a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/StandardNiFiDataPacket.java b/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/StandardNiFiDataPacket.java deleted file mode 100644 index 80bbe8366d..0000000000 --- a/nifi-external/nifi-spark-receiver/src/main/java/org/apache/nifi/spark/StandardNiFiDataPacket.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.spark; - -import java.io.Serializable; -import java.util.Map; - -public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable { - private static final long serialVersionUID = 6364005260220243322L; - - private final byte[] content; - private final Map<String, String> attributes; - - public StandardNiFiDataPacket(final byte[] content, final Map<String, String> attributes) { - this.content = content; - this.attributes = attributes; - } - - @Override - public byte[] getContent() { - return content; - } - - @Override - public Map<String, String> getAttributes() { - return attributes; - } - -} diff --git a/nifi-external/pom.xml b/nifi-external/pom.xml index aeb8fe5d84..d393f7e2d3 100644 --- a/nifi-external/pom.xml +++ b/nifi-external/pom.xml @@ -23,7 +23,6 @@ <artifactId>nifi-external</artifactId> <packaging>pom</packaging> <modules> - <module>nifi-spark-receiver</module> <module>nifi-example-bundle</module> <module>nifi-kafka-connect</module> </modules>