Repository: spark
Updated Branches:
  refs/heads/master 91e9389f3 -> 0f90d6055


[SPARK-9640] [STREAMING] [TEST] Do not run Python Kinesis tests when the 
Kinesis assembly JAR has not been generated

Author: Tathagata Das <tathagata.das1...@gmail.com>

Closes #7961 from tdas/SPARK-9640 and squashes the following commits:

974ce19 [Tathagata Das] Undo changes related to SPARK-9727
004ae26 [Tathagata Das] style fixes
9bbb97d [Tathagata Das] Minor style fies
e6a677e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' 
into SPARK-9640
ca90719 [Tathagata Das] Removed extra line
ba9cfc7 [Tathagata Das] Improved kinesis test selection logic
88d59bd [Tathagata Das] updated test modules
871fcc8 [Tathagata Das] Fixed SparkBuild
94be631 [Tathagata Das] Fixed style
b858196 [Tathagata Das] Fixed conditions and few other things based on PR 
comments.
e292e64 [Tathagata Das] Added filters for Kinesis python tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f90d605
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f90d605
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f90d605

Branch: refs/heads/master
Commit: 0f90d6055e5bea9ceb1d454db84f4aa1d59b284d
Parents: 91e9389
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Mon Aug 10 23:41:53 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Aug 10 23:41:53 2015 -0700

----------------------------------------------------------------------
 python/pyspark/streaming/tests.py | 56 ++++++++++++++++++++++++++--------
 1 file changed, 44 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0f90d605/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 66ae334..f0ed415 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -971,8 +971,10 @@ class KinesisStreamTests(PySparkStreamingTestCase):
             "awsAccessKey", "awsSecretKey")
 
     def test_kinesis_stream(self):
-        if os.environ.get('ENABLE_KINESIS_TESTS') != '1':
-            print("Skip test_kinesis_stream")
+        if not are_kinesis_tests_enabled:
+            sys.stderr.write(
+                "Skipped test_kinesis_stream (enable by setting environment 
variable %s=1"
+                % kinesis_test_environ_var)
             return
 
         import random
@@ -1013,6 +1015,7 @@ class KinesisStreamTests(PySparkStreamingTestCase):
             traceback.print_exc()
             raise
         finally:
+            self.ssc.stop(False)
             kinesisTestUtils.deleteStream()
             kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
 
@@ -1027,7 +1030,7 @@ def search_kafka_assembly_jar():
             ("Failed to find Spark Streaming kafka assembly jar in %s. " % 
kafka_assembly_dir) +
             "You need to build Spark with "
             "'build/sbt assembly/assembly streaming-kafka-assembly/assembly' 
or "
-            "'build/mvn package' before running this test")
+            "'build/mvn package' before running this test.")
     elif len(jars) > 1:
         raise Exception(("Found multiple Spark Streaming Kafka assembly JARs 
in %s; please "
                          "remove all but one") % kafka_assembly_dir)
@@ -1045,7 +1048,7 @@ def search_flume_assembly_jar():
             ("Failed to find Spark Streaming Flume assembly jar in %s. " % 
flume_assembly_dir) +
             "You need to build Spark with "
             "'build/sbt assembly/assembly streaming-flume-assembly/assembly' 
or "
-            "'build/mvn package' before running this test")
+            "'build/mvn package' before running this test.")
     elif len(jars) > 1:
         raise Exception(("Found multiple Spark Streaming Flume assembly JARs 
in %s; please "
                         "remove all but one") % flume_assembly_dir)
@@ -1095,11 +1098,7 @@ def search_kinesis_asl_assembly_jar():
         os.path.join(kinesis_asl_assembly_dir,
                      
"target/scala-*/spark-streaming-kinesis-asl-assembly-*.jar"))
     if not jars:
-        raise Exception(
-            ("Failed to find Spark Streaming Kinesis ASL assembly jar in %s. " 
%
-             kinesis_asl_assembly_dir) + "You need to build Spark with "
-            "'build/sbt -Pkinesis-asl assembly/assembly 
streaming-kinesis-asl-assembly/assembly' "
-            "or 'build/mvn -Pkinesis-asl package' before running this test")
+        return None
     elif len(jars) > 1:
         raise Exception(("Found multiple Spark Streaming Kinesis ASL assembly 
JARs in %s; please "
                          "remove all but one") % kinesis_asl_assembly_dir)
@@ -1107,6 +1106,10 @@ def search_kinesis_asl_assembly_jar():
         return jars[0]
 
 
+# Must be same as the variable and condition defined in KinesisTestUtils.scala
+kinesis_test_environ_var = "ENABLE_KINESIS_TESTS"
+are_kinesis_tests_enabled = os.environ.get(kinesis_test_environ_var) == '1'
+
 if __name__ == "__main__":
     kafka_assembly_jar = search_kafka_assembly_jar()
     flume_assembly_jar = search_flume_assembly_jar()
@@ -1114,8 +1117,37 @@ if __name__ == "__main__":
     mqtt_test_jar = search_mqtt_test_jar()
     kinesis_asl_assembly_jar = search_kinesis_asl_assembly_jar()
 
-    jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, 
kinesis_asl_assembly_jar,
-                               mqtt_assembly_jar, mqtt_test_jar)
+    if kinesis_asl_assembly_jar is None:
+        kinesis_jar_present = False
+        jars = "%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, 
mqtt_assembly_jar,
+                                mqtt_test_jar)
+    else:
+        kinesis_jar_present = True
+        jars = "%s,%s,%s,%s,%s" % (kafka_assembly_jar, flume_assembly_jar, 
mqtt_assembly_jar,
+                                   mqtt_test_jar, kinesis_asl_assembly_jar)
 
     os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
-    unittest.main()
+    testcases = [BasicOperationTests, WindowFunctionTests, 
StreamingContextTests,
+                 CheckpointTests, KafkaStreamTests, FlumeStreamTests, 
FlumePollingStreamTests]
+
+    if kinesis_jar_present is True:
+        testcases.append(KinesisStreamTests)
+    elif are_kinesis_tests_enabled is False:
+        sys.stderr.write("Skipping all Kinesis Python tests as the optional 
Kinesis project was "
+                         "not compiled with -Pkinesis-asl profile. To run 
these tests, "
+                         "you need to build Spark with 'build/sbt 
-Pkinesis-asl assembly/assembly "
+                         "streaming-kinesis-asl-assembly/assembly' or "
+                         "'build/mvn -Pkinesis-asl package' before running 
this test.")
+    else:
+        raise Exception(
+            ("Failed to find Spark Streaming Kinesis assembly jar in %s. "
+             % kinesis_asl_assembly_dir) +
+            "You need to build Spark with 'build/sbt -Pkinesis-asl "
+            "assembly/assembly streaming-kinesis-asl-assembly/assembly'"
+            "or 'build/mvn -Pkinesis-asl package' before running this test.")
+
+    sys.stderr.write("Running tests: %s \n" % (str(testcases)))
+    for testcase in testcases:
+        sys.stderr.write("[Running %s]\n" % (testcase))
+        tests = unittest.TestLoader().loadTestsFromTestCase(testcase)
+        unittest.TextTestRunner(verbosity=2).run(tests)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to