Repository: spark Updated Branches: refs/heads/master 83488cc31 -> 0c03297bf
[SPARK-22142][BUILD][STREAMING] Move Flume support behind a profile, take 2 ## What changes were proposed in this pull request? Move flume behind a profile, take 2. See https://github.com/apache/spark/pull/19365 for most of the back-story. This change should fix the problem by removing the examples module dependency and moving Flume examples to the module itself. It also adds deprecation messages, per a discussion on dev about deprecating for 2.3.0. ## How was this patch tested? Existing tests, which still enable flume integration. Author: Sean Owen <so...@cloudera.com> Closes #19412 from srowen/SPARK-22142.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c03297b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c03297b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c03297b Branch: refs/heads/master Commit: 0c03297bf0e87944f9fe0535fdae5518228e3e29 Parents: 83488cc Author: Sean Owen <so...@cloudera.com> Authored: Fri Oct 6 15:08:28 2017 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Fri Oct 6 15:08:28 2017 +0100 ---------------------------------------------------------------------- dev/create-release/release-build.sh | 4 +- dev/mima | 2 +- dev/scalastyle | 1 + dev/sparktestsupport/modules.py | 20 +++++- dev/test-dependencies.sh | 2 +- docs/building-spark.md | 7 ++ docs/streaming-flume-integration.md | 13 ++-- examples/pom.xml | 7 -- .../examples/streaming/JavaFlumeEventCount.java | 69 ------------------- .../examples/streaming/FlumeEventCount.scala | 70 -------------------- .../streaming/FlumePollingEventCount.scala | 67 ------------------- .../spark/examples/JavaFlumeEventCount.java | 67 +++++++++++++++++++ .../apache/spark/examples/FlumeEventCount.scala | 68 +++++++++++++++++++ .../spark/examples/FlumePollingEventCount.scala | 65 ++++++++++++++++++ .../spark/streaming/flume/FlumeUtils.scala | 1 + pom.xml | 13 +++- project/SparkBuild.scala | 17 ++--- python/pyspark/streaming/flume.py | 4 ++ python/pyspark/streaming/tests.py | 16 ++++- 19 files changed, 273 insertions(+), 240 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/create-release/release-build.sh ---------------------------------------------------------------------- diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index 5390f59..7e8d5c7 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -84,9 +84,9 @@ MVN="build/mvn --force" # Hive-specific profiles for some builds HIVE_PROFILES="-Phive -Phive-thriftserver" # Profiles for publishing snapshots and release to Maven Central -PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl" +PUBLISH_PROFILES="-Pmesos -Pyarn -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl" # Profiles for building binary releases -BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr" +BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pflume -Psparkr" # Scala 2.11 only profiles for some builds SCALA_2_11_PROFILES="-Pkafka-0-8" # Scala 2.12 only profiles for some builds http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/mima ---------------------------------------------------------------------- diff --git a/dev/mima b/dev/mima index fdb21f5..1e3ca97 100755 --- a/dev/mima +++ b/dev/mima @@ -24,7 +24,7 @@ set -e FWDIR="$(cd "`dirname "$0"`"/..; pwd)" cd "$FWDIR" -SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" +SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pflume -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive" TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)" OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)" http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/scalastyle ---------------------------------------------------------------------- diff --git a/dev/scalastyle b/dev/scalastyle index e5aa589..89ecc8a 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -25,6 +25,7 @@ ERRORS=$(echo -e "q\n" \ -Pmesos \ -Pkafka-0-8 \ -Pyarn \ + -Pflume \ -Phive \ -Phive-thriftserver \ scalastyle test:scalastyle \ http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/sparktestsupport/modules.py ---------------------------------------------------------------------- diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 50e14b6..91d5667 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -279,6 +279,12 @@ streaming_flume_sink = Module( source_file_regexes=[ "external/flume-sink", ], + build_profile_flags=[ + "-Pflume", + ], + environ={ + "ENABLE_FLUME_TESTS": "1" + }, sbt_test_goals=[ "streaming-flume-sink/test", ] @@ -291,6 +297,12 @@ streaming_flume = Module( source_file_regexes=[ "external/flume", ], + build_profile_flags=[ + "-Pflume", + ], + environ={ + "ENABLE_FLUME_TESTS": "1" + }, sbt_test_goals=[ "streaming-flume/test", ] @@ -302,7 +314,13 @@ streaming_flume_assembly = Module( dependencies=[streaming_flume, streaming_flume_sink], source_file_regexes=[ "external/flume-assembly", - ] + ], + build_profile_flags=[ + "-Pflume", + ], + environ={ + "ENABLE_FLUME_TESTS": "1" + } ) http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/dev/test-dependencies.sh ---------------------------------------------------------------------- diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh index c771457..58b295d 100755 --- a/dev/test-dependencies.sh +++ b/dev/test-dependencies.sh @@ -29,7 +29,7 @@ export LC_ALL=C # TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution. # NOTE: These should match those in the release publishing script -HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Phive" +HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Pflume -Phive" MVN="build/mvn" HADOOP_PROFILES=( hadoop-2.6 http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/docs/building-spark.md ---------------------------------------------------------------------- diff --git a/docs/building-spark.md b/docs/building-spark.md index 57baa50..98f7df1 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -100,6 +100,13 @@ Note: Kafka 0.8 support is deprecated as of Spark 2.3.0. Kafka 0.10 support is still automatically built. +## Building with Flume support + +Apache Flume support must be explicitly enabled with the `flume` profile. +Note: Flume support is deprecated as of Spark 2.3.0. + + ./build/mvn -Pflume -DskipTests clean package + ## Building submodules individually It's possible to build Spark sub-modules using the `mvn -pl` option. http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/docs/streaming-flume-integration.md ---------------------------------------------------------------------- diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md index a5d36da..257a4f7 100644 --- a/docs/streaming-flume-integration.md +++ b/docs/streaming-flume-integration.md @@ -5,6 +5,8 @@ title: Spark Streaming + Flume Integration Guide [Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this. +**Note: Flume support is deprecated as of Spark 2.3.0.** + ## Approach 1: Flume-style Push-based Approach Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps. @@ -44,8 +46,7 @@ configuring Flume agents. val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]) - See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$) - and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala). + See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$). </div> <div data-lang="java" markdown="1"> import org.apache.spark.streaming.flume.*; @@ -53,8 +54,7 @@ configuring Flume agents. JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]); - See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html) - and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java). + See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html). </div> <div data-lang="python" markdown="1"> from pyspark.streaming.flume import FlumeUtils @@ -62,8 +62,7 @@ configuring Flume agents. flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]) By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type. - See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils) - and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/flume_wordcount.py). + See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils). </div> </div> @@ -162,8 +161,6 @@ configuring Flume agents. </div> </div> - See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala). - Note that each input DStream can be configured to receive data from multiple sinks. 3. **Deploying:** This is same as the first approach. http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 52a6764..1791dba 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -34,7 +34,6 @@ <sbt.project.name>examples</sbt.project.name> <build.testJarPhase>none</build.testJarPhase> <build.copyDependenciesPhase>package</build.copyDependenciesPhase> - <flume.deps.scope>provided</flume.deps.scope> <hadoop.deps.scope>provided</hadoop.deps.scope> <hive.deps.scope>provided</hive.deps.scope> <parquet.deps.scope>provided</parquet.deps.scope> @@ -80,12 +79,6 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java deleted file mode 100644 index 0c65104..0000000 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.streaming; - -import org.apache.spark.SparkConf; -import org.apache.spark.streaming.*; -import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.flume.FlumeUtils; -import org.apache.spark.streaming.flume.SparkFlumeEvent; - -/** - * Produces a count of events received from Flume. - * - * This should be used in conjunction with an AvroSink in Flume. It will start - * an Avro server on at the request host:port address and listen for requests. - * Your Flume AvroSink should be pointed to this address. - * - * Usage: JavaFlumeEventCount <host> <port> - * <host> is the host the Flume receiver will be started on - a receiver - * creates a server and listens for flume events. - * <port> is the port the Flume receiver will listen on. - * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>` - */ -public final class JavaFlumeEventCount { - private JavaFlumeEventCount() { - } - - public static void main(String[] args) throws Exception { - if (args.length != 2) { - System.err.println("Usage: JavaFlumeEventCount <host> <port>"); - System.exit(1); - } - - StreamingExamples.setStreamingLogLevels(); - - String host = args[0]; - int port = Integer.parseInt(args[1]); - - Duration batchInterval = new Duration(2000); - SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); - JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); - JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = - FlumeUtils.createStream(ssc, host, port); - - flumeStream.count(); - - flumeStream.count().map(in -> "Received " + in + " flume events.").print(); - - ssc.start(); - ssc.awaitTermination(); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala deleted file mode 100644 index 91e52e4..0000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import org.apache.spark.SparkConf -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ -import org.apache.spark.streaming.flume._ -import org.apache.spark.util.IntParam - -/** - * Produces a count of events received from Flume. - * - * This should be used in conjunction with an AvroSink in Flume. It will start - * an Avro server on at the request host:port address and listen for requests. - * Your Flume AvroSink should be pointed to this address. - * - * Usage: FlumeEventCount <host> <port> - * <host> is the host the Flume receiver will be started on - a receiver - * creates a server and listens for flume events. - * <port> is the port the Flume receiver will listen on. - * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> ` - */ -object FlumeEventCount { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println( - "Usage: FlumeEventCount <host> <port>") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(host, IntParam(port)) = args - - val batchInterval = Milliseconds(2000) - - // Create the context and set the batch size - val sparkConf = new SparkConf().setAppName("FlumeEventCount") - val ssc = new StreamingContext(sparkConf, batchInterval) - - // Create a flume stream - val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) - - // Print out the count of events received from this server in each batch - stream.count().map(cnt => "Received " + cnt + " flume events." ).print() - - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala deleted file mode 100644 index dd725d7..0000000 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples.streaming - -import org.apache.spark.SparkConf -import org.apache.spark.streaming._ -import org.apache.spark.streaming.flume._ -import org.apache.spark.util.IntParam - -/** - * Produces a count of events received from Flume. - * - * This should be used in conjunction with the Spark Sink running in a Flume agent. See - * the Spark Streaming programming guide for more details. - * - * Usage: FlumePollingEventCount <host> <port> - * `host` is the host on which the Spark Sink is running. - * `port` is the port at which the Spark Sink is listening. - * - * To run this example: - * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] ` - */ -object FlumePollingEventCount { - def main(args: Array[String]) { - if (args.length < 2) { - System.err.println( - "Usage: FlumePollingEventCount <host> <port>") - System.exit(1) - } - - StreamingExamples.setStreamingLogLevels() - - val Array(host, IntParam(port)) = args - - val batchInterval = Milliseconds(2000) - - // Create the context and set the batch size - val sparkConf = new SparkConf().setAppName("FlumePollingEventCount") - val ssc = new StreamingContext(sparkConf, batchInterval) - - // Create a flume stream that polls the Spark Sink running in a Flume agent - val stream = FlumeUtils.createPollingStream(ssc, host, port) - - // Print out the count of events received from this server in each batch - stream.count().map(cnt => "Received " + cnt + " flume events." ).print() - - ssc.start() - ssc.awaitTermination() - } -} -// scalastyle:on println http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java ---------------------------------------------------------------------- diff --git a/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java new file mode 100644 index 0000000..4e3420d --- /dev/null +++ b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.*; +import org.apache.spark.streaming.api.java.*; +import org.apache.spark.streaming.flume.FlumeUtils; +import org.apache.spark.streaming.flume.SparkFlumeEvent; + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. + * + * Usage: JavaFlumeEventCount <host> <port> + * <host> is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * <port> is the port the Flume receiver will listen on. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.JavaFlumeEventCount <host> <port>` + */ +public final class JavaFlumeEventCount { + private JavaFlumeEventCount() { + } + + public static void main(String[] args) throws Exception { + if (args.length != 2) { + System.err.println("Usage: JavaFlumeEventCount <host> <port>"); + System.exit(1); + } + + String host = args[0]; + int port = Integer.parseInt(args[1]); + + Duration batchInterval = new Duration(2000); + SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); + JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = + FlumeUtils.createStream(ssc, host, port); + + flumeStream.count(); + + flumeStream.count().map(in -> "Received " + in + " flume events.").print(); + + ssc.start(); + ssc.awaitTermination(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala new file mode 100644 index 0000000..f877f79 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.streaming + +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ +import org.apache.spark.util.IntParam + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with an AvroSink in Flume. It will start + * an Avro server on at the request host:port address and listen for requests. + * Your Flume AvroSink should be pointed to this address. + * + * Usage: FlumeEventCount <host> <port> + * <host> is the host the Flume receiver will be started on - a receiver + * creates a server and listens for flume events. + * <port> is the port the Flume receiver will listen on. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> ` + */ +object FlumeEventCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println( + "Usage: FlumeEventCount <host> <port>") + System.exit(1) + } + + val Array(host, IntParam(port)) = args + + val batchInterval = Milliseconds(2000) + + // Create the context and set the batch size + val sparkConf = new SparkConf().setAppName("FlumeEventCount") + val ssc = new StreamingContext(sparkConf, batchInterval) + + // Create a flume stream + val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) + + // Print out the count of events received from this server in each batch + stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + + ssc.start() + ssc.awaitTermination() + } +} +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala new file mode 100644 index 0000000..79a4027 --- /dev/null +++ b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.streaming + +import org.apache.spark.SparkConf +import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ +import org.apache.spark.util.IntParam + +/** + * Produces a count of events received from Flume. + * + * This should be used in conjunction with the Spark Sink running in a Flume agent. See + * the Spark Streaming programming guide for more details. + * + * Usage: FlumePollingEventCount <host> <port> + * `host` is the host on which the Spark Sink is running. + * `port` is the port at which the Spark Sink is listening. + * + * To run this example: + * `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] ` + */ +object FlumePollingEventCount { + def main(args: Array[String]) { + if (args.length < 2) { + System.err.println( + "Usage: FlumePollingEventCount <host> <port>") + System.exit(1) + } + + val Array(host, IntParam(port)) = args + + val batchInterval = Milliseconds(2000) + + // Create the context and set the batch size + val sparkConf = new SparkConf().setAppName("FlumePollingEventCount") + val ssc = new StreamingContext(sparkConf, batchInterval) + + // Create a flume stream that polls the Spark Sink running in a Flume agent + val stream = FlumeUtils.createPollingStream(ssc, host, port) + + // Print out the count of events received from this server in each batch + stream.count().map(cnt => "Received " + cnt + " flume events." ).print() + + ssc.start() + ssc.awaitTermination() + } +} +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index 3e3ed71..707193a 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -30,6 +30,7 @@ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext} import org.apache.spark.streaming.dstream.ReceiverInputDStream +@deprecated("Deprecated without replacement", "2.3.0") object FlumeUtils { private val DEFAULT_POLLING_PARALLELISM = 5 private val DEFAULT_POLLING_BATCH_SIZE = 1000 http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 87a468c..9fac8b1 100644 --- a/pom.xml +++ b/pom.xml @@ -98,15 +98,13 @@ <module>sql/core</module> <module>sql/hive</module> <module>assembly</module> - <module>external/flume</module> - <module>external/flume-sink</module> - <module>external/flume-assembly</module> <module>examples</module> <module>repl</module> <module>launcher</module> <module>external/kafka-0-10</module> <module>external/kafka-0-10-assembly</module> <module>external/kafka-0-10-sql</module> + <!-- See additional modules enabled by profiles below --> </modules> <properties> @@ -2583,6 +2581,15 @@ </dependencies> </profile> + <profile> + <id>flume</id> + <modules> + <module>external/flume</module> + <module>external/flume-sink</module> + <module>external/flume-assembly</module> + </modules> + </profile> + <!-- Ganglia integration is not included by default due to LGPL-licensed code --> <profile> <id>spark-ganglia-lgpl</id> http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a568d26..9501eed 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -43,11 +43,8 @@ object BuildCommons { "catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10" ).map(ProjectRef(buildLocation, _)) - val streamingProjects@Seq( - streaming, streamingFlumeSink, streamingFlume, streamingKafka010 - ) = Seq( - "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-10" - ).map(ProjectRef(buildLocation, _)) + val streamingProjects@Seq(streaming, streamingKafka010) = + Seq("streaming", "streaming-kafka-0-10").map(ProjectRef(buildLocation, _)) val allProjects@Seq( core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, _* @@ -56,9 +53,13 @@ object BuildCommons { "tags", "sketch", "kvstore" ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects - val optionallyEnabledProjects@Seq(mesos, yarn, streamingKafka, sparkGangliaLgpl, - streamingKinesisAsl, dockerIntegrationTests, hadoopCloud) = - Seq("mesos", "yarn", "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl", + val optionallyEnabledProjects@Seq(mesos, yarn, + streamingFlumeSink, streamingFlume, + streamingKafka, sparkGangliaLgpl, streamingKinesisAsl, + dockerIntegrationTests, hadoopCloud) = + Seq("mesos", "yarn", + "streaming-flume-sink", "streaming-flume", + "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl", "docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _)) val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) = http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/python/pyspark/streaming/flume.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py index cd30483..2fed594 100644 --- a/python/pyspark/streaming/flume.py +++ b/python/pyspark/streaming/flume.py @@ -53,6 +53,8 @@ class FlumeUtils(object): :param enableDecompression: Should netty server decompress input stream :param bodyDecoder: A function used to decode body (default is utf8_decoder) :return: A DStream object + + .. note:: Deprecated in 2.3.0 """ jlevel = ssc._sc._getJavaStorageLevel(storageLevel) helper = FlumeUtils._get_helper(ssc._sc) @@ -79,6 +81,8 @@ class FlumeUtils(object): will result in this stream using more threads :param bodyDecoder: A function used to decode body (default is utf8_decoder) :return: A DStream object + + .. note:: Deprecated in 2.3.0 """ jlevel = ssc._sc._getJavaStorageLevel(storageLevel) hosts = [] http://git-wip-us.apache.org/repos/asf/spark/blob/0c03297b/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 229cf53..5b86c1c 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1478,7 +1478,7 @@ def search_kafka_assembly_jar(): ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) + "You need to build Spark with " "'build/sbt assembly/package streaming-kafka-0-8-assembly/assembly' or " - "'build/mvn package' before running this test.") + "'build/mvn -Pkafka-0-8 package' before running this test.") elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please " "remove all but one") % (", ".join(jars))) @@ -1495,7 +1495,7 @@ def search_flume_assembly_jar(): ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) + "You need to build Spark with " "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or " - "'build/mvn package' before running this test.") + "'build/mvn -Pflume package' before running this test.") elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please " "remove all but one") % (", ".join(jars))) @@ -1517,6 +1517,9 @@ def search_kinesis_asl_assembly_jar(): # Must be same as the variable and condition defined in modules.py +flume_test_environ_var = "ENABLE_FLUME_TESTS" +are_flume_tests_enabled = os.environ.get(flume_test_environ_var) == '1' +# Must be same as the variable and condition defined in modules.py kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS" are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1' # Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py @@ -1538,9 +1541,16 @@ if __name__ == "__main__": os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests, - FlumeStreamTests, FlumePollingStreamTests, StreamingListenerTests] + if are_flume_tests_enabled: + testcases.append(FlumeStreamTests) + testcases.append(FlumePollingStreamTests) + else: + sys.stderr.write( + "Skipped test_flume_stream (enable by setting environment variable %s=1" + % flume_test_environ_var) + if are_kafka_tests_enabled: testcases.append(KafkaStreamTests) else: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org