Repository: spark Updated Branches: refs/heads/branch-1.5 f9beef998 -> c7f009040
[SPARK-9640] [STREAMING] [TEST] Do not run Python Kinesis tests when the Kinesis assembly JAR has not been generated Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #7961 from tdas/SPARK-9640 and squashes the following commits: 974ce19 [Tathagata Das] Undo changes related to SPARK-9727 004ae26 [Tathagata Das] style fixes 9bbb97d [Tathagata Das] Minor style fies e6a677e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9640 ca90719 [Tathagata Das] Removed extra line ba9cfc7 [Tathagata Das] Improved kinesis test selection logic 88d59bd [Tathagata Das] updated test modules 871fcc8 [Tathagata Das] Fixed SparkBuild 94be631 [Tathagata Das] Fixed style b858196 [Tathagata Das] Fixed conditions and few other things based on PR comments. e292e64 [Tathagata Das] Added filters for Kinesis python tests (cherry picked from commit 0f90d6055e5bea9ceb1d454db84f4aa1d59b284d) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7f00904 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7f00904 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7f00904 Branch: refs/heads/branch-1.5 Commit: c7f0090409c2a94a43404271730beded421a0f2f Parents: f9beef9 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Mon Aug 10 23:41:53 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Aug 10 23:42:44 2015 -0700 ---------------------------------------------------------------------- python/pyspark/streaming/tests.py | 56 ++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c7f00904/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 66ae334..f0ed415 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -971,8 +971,10 @@ class KinesisStreamTests(PySparkStreamingTestCase): "awsAccessKey", "awsSecretKey") def test_kinesis_stream(self): - if os.environ.get('ENABLE_KINESIS_TESTS') != '1': - print("Skip test_kinesis_stream") + if not are_kinesis_tests_enabled: + sys.stderr.write( + "Skipped test_kinesis_stream (enable by setting environment variable %s=1" + % kinesis_test_environ_var) return import random @@ -1013,6 +1015,7 @@ class KinesisStreamTests(PySparkStreamingTestCase): traceback.print_exc() raise finally: + self.ssc.stop(False) kinesisTestUtils.deleteStream() kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) @@ -1027,7 +1030,7 @@ def search_kafka_assembly_jar(): ("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) + "You need to build Spark with " "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' or " - "'build/mvn package' before running this test") + "'build/mvn package' before running this test.") elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Kafka assembly JARs in %s; please " "remove all but one") % kafka_assembly_dir) @@ -1045,7 +1048,7 @@ def search_flume_assembly_jar(): ("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) + "You need to build Spark with " "'build/sbt assembly/assembly streaming-flume-assembly/assembly' or " - "'build/mvn package' before running this test") + "'build/mvn package' before running this test.") elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Flume assembly JARs in %s; please " "remove all but one") % flume_assembly_dir) @@ -1095,11 +1098,7 @@ def search_kinesis_asl_assembly_jar(): os.path.join(kinesis_asl_assembly_dir, "target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar")) if not jars: - raise Exception( - ("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " % - kinesis_asl_assembly_dir) + "You need to build Spark with " - "'build/sbt -Pkinesis-asl assembly/assembly streaming-kinesis-asl-assembly/assembly' " - "or 'build/mvn -Pkinesis-asl package' before running this test") + return None elif len(jars) > 1: raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly JARs in %s; please " "remove all but one") % kinesis_asl_assembly_dir) @@ -1107,6 +1106,10 @@ def search_kinesis_asl_assembly_jar(): return jars[0] +# Must be same as the variable and condition defined in KinesisTestUtils.scala +kinesis_test_environ_var = "ENABLE_KINESIS_TESTS" +are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1' + if __name__ == "__main__": kafka_assembly_jar = search_kafka_assembly_jar() flume_assembly_jar = search_flume_assembly_jar() @@ -1114,8 +1117,37 @@ if __name__ == "__main__": mqtt_test_jar = search_mqtt_test_jar() kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar() - jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, kinesis_asl_assembly_jar, - mqtt_assembly_jar, mqtt_test_jar) + if kinesis_asl_assembly_jar is None: + kinesis_jar_present = False + jars = "%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar, + mqtt_test_jar) + else: + kinesis_jar_present = True + jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, mqtt_assembly_jar, + mqtt_test_jar, kinesis_asl_assembly_jar) os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars - unittest.main() + testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, + CheckpointTests, KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests] + + if kinesis_jar_present is True: + testcases.append(KinesisStreamTests) + elif are_kinesis_tests_enabled is False: + sys.stderr.write("Skipping all Kinesis Python tests as the optional Kinesis project was " + "not compiled with -Pkinesis-asl profile. To run these tests, " + "you need to build Spark with 'build/sbt -Pkinesis-asl assembly/assembly " + "streaming-kinesis-asl-assembly/assembly' or " + "'build/mvn -Pkinesis-asl package' before running this test.") + else: + raise Exception( + ("Failed to find Spark Streaming Kinesis assembly jar in %s. " + % kinesis_asl_assembly_dir) + + "You need to build Spark with 'build/sbt -Pkinesis-asl " + "assembly/assembly streaming-kinesis-asl-assembly/assembly'" + "or 'build/mvn -Pkinesis-asl package' before running this test.") + + sys.stderr.write("Running tests: %s \n" % (str(testcases))) + for testcase in testcases: + sys.stderr.write("[Running %s]\n" % (testcase)) + tests = unittest.TestLoader().loadTestsFromTestCase(testcase) + unittest.TextTestRunner(verbosity=2).run(tests) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org