spark git commit: [SPARK-23555][PYTHON] Add BinaryType support for Arrow in Python
Repository: spark Updated Branches: refs/heads/master ba84bcb2c -> 10f2b6fa0 [SPARK-23555][PYTHON] Add BinaryType support for Arrow in Python ## What changes were proposed in this pull request? Adding `BinaryType` support for Arrow in pyspark, conditional on using pyarrow >= 0.10.0. Earlier versions will continue to raise a TypeError. ## How was this patch tested? Additional unit tests in pyspark for code paths that use Arrow for createDataFrame, toPandas, and scalar pandas_udfs. Closes #20725 from BryanCutler/arrow-binary-type-support-SPARK-23555. Authored-by: Bryan Cutler Signed-off-by: Bryan Cutler Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10f2b6fa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10f2b6fa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10f2b6fa Branch: refs/heads/master Commit: 10f2b6fa05f3d977f3b6099fcd94c5c0cd97a0cb Parents: ba84bcb Author: Bryan Cutler Authored: Fri Aug 17 22:14:42 2018 -0700 Committer: Bryan Cutler Committed: Fri Aug 17 22:14:42 2018 -0700 -- python/pyspark/sql/tests.py | 66 +--- python/pyspark/sql/types.py | 15 + 2 files changed, 70 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/10f2b6fa/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 91ed600..00d7e18 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -4050,6 +4050,8 @@ class ArrowTests(ReusedSQLTestCase): def setUpClass(cls): from datetime import date, datetime from decimal import Decimal +from distutils.version import LooseVersion +import pyarrow as pa ReusedSQLTestCase.setUpClass() # Synchronize default timezone between Python and Java @@ -4078,6 +4080,13 @@ class ArrowTests(ReusedSQLTestCase): (u"c", 3, 30, 0.8, 6.0, Decimal("6.0"), date(2100, 3, 3), datetime(2100, 3, 3, 3, 3, 3))] +# TODO: remove version check once minimum pyarrow version is 0.10.0 +if LooseVersion("0.10.0") <= LooseVersion(pa.__version__): +cls.schema.add(StructField("9_binary_t", BinaryType(), True)) +cls.data[0] = cls.data[0] + (bytearray(b"a"),) +cls.data[1] = cls.data[1] + (bytearray(b"bb"),) +cls.data[2] = cls.data[2] + (bytearray(b"ccc"),) + @classmethod def tearDownClass(cls): del os.environ["TZ"] @@ -4115,12 +4124,23 @@ class ArrowTests(ReusedSQLTestCase): self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]})) def test_toPandas_fallback_disabled(self): +from distutils.version import LooseVersion +import pyarrow as pa + schema = StructType([StructField("map", MapType(StringType(), IntegerType()), True)]) df = self.spark.createDataFrame([(None,)], schema=schema) with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, 'Unsupported type'): df.toPandas() +# TODO: remove BinaryType check once minimum pyarrow version is 0.10.0 +if LooseVersion(pa.__version__) < LooseVersion("0.10.0"): +schema = StructType([StructField("binary", BinaryType(), True)]) +df = self.spark.createDataFrame([(None,)], schema=schema) +with QuietTest(self.sc): +with self.assertRaisesRegexp(Exception, 'Unsupported type.*BinaryType'): +df.toPandas() + def test_null_conversion(self): df_null = self.spark.createDataFrame([tuple([None for _ in range(len(self.data[0]))])] + self.data) @@ -4232,19 +4252,22 @@ class ArrowTests(ReusedSQLTestCase): def test_createDataFrame_with_incorrect_schema(self): pdf = self.create_pandas_data_frame() -wrong_schema = StructType(list(reversed(self.schema))) +fields = list(self.schema) +fields[0], fields[7] = fields[7], fields[0] # swap str with timestamp +wrong_schema = StructType(fields) with QuietTest(self.sc): with self.assertRaisesRegexp(Exception, ".*No cast.*string.*timestamp.*"): self.spark.createDataFrame(pdf, schema=wrong_schema) def test_createDataFrame_with_names(self): pdf = self.create_pandas_data_frame() +new_names = list(map(str, range(len(self.schema.fieldNames() # Test that schema as a list of column names gets applied -df = self.spark.createDataFrame(pdf, schema=list('abcdefgh')) -self.assertEquals(df.schema.fieldNames(), list('abcdefgh')) +df =
svn commit: r28821 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_17_20_02-ba84bcb-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Aug 18 03:16:11 2018 New Revision: 28821 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_17_20_02-ba84bcb docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28820 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_17_16_02-da2dc69-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 17 23:16:22 2018 New Revision: 28820 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_17_16_02-da2dc69 docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s
Repository: spark Updated Branches: refs/heads/master da2dc6929 -> ba84bcb2c [SPARK-24433][K8S] Initial R Bindings for SparkR on K8s ## What changes were proposed in this pull request? Introducing R Bindings for Spark R on K8s - [x] Running SparkR Job ## How was this patch tested? This patch was tested with - [x] Unit Tests - [x] Integration Tests ## Example: Commands to run example spark job: 1. `dev/make-distribution.sh --pip --r --tgz -Psparkr -Phadoop-2.7 -Pkubernetes` 2. `bin/docker-image-tool.sh -m -t testing build` 3. ``` bin/spark-submit \ --master k8s://https://192.168.64.33:8443 \ --deploy-mode cluster \ --name spark-r \ --conf spark.executor.instances=1 \ --conf spark.kubernetes.container.image=spark-r:testing \ local:///opt/spark/examples/src/main/r/dataframe.R ``` This above spark-submit command works given the distribution. (Will include this integration test in PR once PRB is ready). Author: Ilan Filonenko Closes #21584 from ifilonenko/spark-r. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba84bcb2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba84bcb2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba84bcb2 Branch: refs/heads/master Commit: ba84bcb2c4f73baf63782ff6fad5a607008c7cd2 Parents: da2dc69 Author: Ilan Filonenko Authored: Fri Aug 17 16:04:02 2018 -0700 Committer: mcheah Committed: Fri Aug 17 16:04:02 2018 -0700 -- bin/docker-image-tool.sh| 23 --- .../org/apache/spark/deploy/SparkSubmit.scala | 8 ++- .../org/apache/spark/deploy/k8s/Config.scala| 13 .../org/apache/spark/deploy/k8s/Constants.scala | 2 + .../spark/deploy/k8s/KubernetesConf.scala | 8 ++- .../features/bindings/RDriverFeatureStep.scala | 59 ++ .../submit/KubernetesClientApplication.scala| 2 + .../k8s/submit/KubernetesDriverBuilder.scala| 22 --- .../deploy/k8s/submit/MainAppResource.scala | 3 + .../spark/deploy/k8s/KubernetesConfSuite.scala | 22 +++ .../bindings/RDriverFeatureStepSuite.scala | 63 .../submit/KubernetesDriverBuilderSuite.scala | 36 ++- .../dockerfiles/spark/bindings/R/Dockerfile | 29 + .../src/main/dockerfiles/spark/entrypoint.sh| 14 - .../integrationtest/ClientModeTestsSuite.scala | 2 +- .../k8s/integrationtest/KubernetesSuite.scala | 21 ++- .../k8s/integrationtest/RTestsSuite.scala | 44 ++ 17 files changed, 344 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ba84bcb2/bin/docker-image-tool.sh -- diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index cd22e75..d637105 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -71,6 +71,7 @@ function build { ) local BASEDOCKERFILE=${BASEDOCKERFILE:-"$IMG_PATH/spark/Dockerfile"} local PYDOCKERFILE=${PYDOCKERFILE:-"$IMG_PATH/spark/bindings/python/Dockerfile"} + local RDOCKERFILE=${RDOCKERFILE:-"$IMG_PATH/spark/bindings/R/Dockerfile"} docker build $NOCACHEARG "${BUILD_ARGS[@]}" \ -t $(image_ref spark) \ @@ -79,11 +80,16 @@ function build { docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ -t $(image_ref spark-py) \ -f "$PYDOCKERFILE" . + + docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ +-t $(image_ref spark-r) \ +-f "$RDOCKERFILE" . } function push { docker push "$(image_ref spark)" docker push "$(image_ref spark-py)" + docker push "$(image_ref spark-r)" } function usage { @@ -97,12 +103,13 @@ Commands: pushPush a pre-built image to a registry. Requires a repository address to be provided. Options: - -f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark. - -p file Dockerfile with Python baked in. By default builds the Dockerfile shipped with Spark. - -r repo Repository address. - -t tag Tag to apply to the built image, or to identify the image to be pushed. - -m Use minikube's Docker daemon. - -n Build docker image with --no-cache + -f file Dockerfile to build for JVM based Jobs. By default builds the Dockerfile shipped with Spark. + -p file Dockerfile to build for PySpark Jobs. Builds Python dependencies and ships with Spark. + -R file Dockerfile to build for SparkR Jobs. Builds R dependencies and ships with Spark. + -r repo Repository address. + -t tagTag to apply to the built image, or to identify the image to be pushed. + -mUse minikube's Docker daemon. + -nBuild docker image with --
spark git commit: [SPARK-25116][TESTS] Fix the Kafka cluster leak and clean up cached producers
Repository: spark Updated Branches: refs/heads/master 8b0e94d89 -> da2dc6929 [SPARK-25116][TESTS] Fix the Kafka cluster leak and clean up cached producers ## What changes were proposed in this pull request? KafkaContinuousSinkSuite leaks a Kafka cluster because both KafkaSourceTest and KafkaContinuousSinkSuite create a Kafka cluster but `afterAll` only shuts down one cluster. This leaks a Kafka cluster and causes that some Kafka thread crash and kill JVM when SBT is trying to clean up tests. This PR fixes the leak and also adds a shut down hook to detect Kafka cluster leak. In additions, it also fixes `AdminClient` leak and cleans up cached producers (When a record is writtn using a producer, the producer will keep refreshing the topic and I don't find an API to clear it except closing the producer) to eliminate the following annoying logs: ``` 8/13 15:34:42.568 kafka-admin-client-thread | adminclient-4 WARN NetworkClient: [AdminClient clientId=adminclient-4] Connection to node 0 could not be established. Broker may not be available. 18/08/13 15:34:42.570 kafka-admin-client-thread | adminclient-6 WARN NetworkClient: [AdminClient clientId=adminclient-6] Connection to node 0 could not be established. Broker may not be available. 18/08/13 15:34:42.606 kafka-admin-client-thread | adminclient-8 WARN NetworkClient: [AdminClient clientId=adminclient-8] Connection to node -1 could not be established. Broker may not be available. 18/08/13 15:34:42.729 kafka-producer-network-thread | producer-797 WARN NetworkClient: [Producer clientId=producer-797] Connection to node -1 could not be established. Broker may not be available. 18/08/13 15:34:42.906 kafka-producer-network-thread | producer-1598 WARN NetworkClient: [Producer clientId=producer-1598] Connection to node 0 could not be established. Broker may not be available. ``` I also reverted https://github.com/apache/spark/pull/22097/commits/b5eb54244ed573c8046f5abf7bf087f5f08dba58 introduced by #22097 since it doesn't help. ## How was this patch tested? Jenkins Closes #22106 from zsxwing/SPARK-25116. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da2dc692 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da2dc692 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da2dc692 Branch: refs/heads/master Commit: da2dc69291cda8c8e7bb6b4a15001f768a97f65e Parents: 8b0e94d Author: Shixiong Zhu Authored: Fri Aug 17 14:21:08 2018 -0700 Committer: Shixiong Zhu Committed: Fri Aug 17 14:21:08 2018 -0700 -- .../sql/kafka010/CachedKafkaProducer.scala | 8 +- .../sql/kafka010/KafkaContinuousReader.scala| 2 +- .../sql/kafka010/CachedKafkaProducerSuite.scala | 5 +- .../sql/kafka010/KafkaContinuousSinkSuite.scala | 7 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 2 +- .../spark/sql/kafka010/KafkaRelationSuite.scala | 3 +- .../spark/sql/kafka010/KafkaSinkSuite.scala | 2 +- .../apache/spark/sql/kafka010/KafkaTest.scala | 32 +++ .../spark/sql/kafka010/KafkaTestUtils.scala | 91 ++-- .../streaming/kafka010/KafkaTestUtils.scala | 89 +-- 10 files changed, 132 insertions(+), 109 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index 571140b..cd680ad 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -33,8 +33,12 @@ private[kafka010] object CachedKafkaProducer extends Logging { private type Producer = KafkaProducer[Array[Byte], Array[Byte]] + private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10) + private lazy val cacheExpireTimeout: Long = -SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m") +Option(SparkEnv.get).map(_.conf.getTimeAsMs( + "spark.kafka.producer.cache.timeout", + s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout) private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] { override def load(config: Seq[(String, Object)]): Producer = { @@ -102,7 +106,7 @@ private[kafka010] object CachedKafkaProducer extends Logging { } } - private def clear(): Unit = { + private[kafka010] def clear(): Unit = { logInfo("Cleaning
svn commit: r28812 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_17_12_02-8b0e94d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 17 19:16:24 2018 New Revision: 28812 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_17_12_02-8b0e94d docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23042][ML] Use OneHotEncoderModel to encode labels in MultilayerPerceptronClassifier
Repository: spark Updated Branches: refs/heads/master 162326c0e -> 8b0e94d89 [SPARK-23042][ML] Use OneHotEncoderModel to encode labels in MultilayerPerceptronClassifier ## What changes were proposed in this pull request? In MultilayerPerceptronClassifier, we use RDD operation to encode labels for now. I think we should use ML's OneHotEncoderEstimator/Model to do the encoding. ## How was this patch tested? Existing tests. Closes #20232 from viirya/SPARK-23042. Authored-by: Liang-Chi Hsieh Signed-off-by: DB Tsai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b0e94d8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b0e94d8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b0e94d8 Branch: refs/heads/master Commit: 8b0e94d89621befe52d2a53a8cf2f58f98887a61 Parents: 162326c Author: Liang-Chi Hsieh Authored: Fri Aug 17 18:40:29 2018 + Committer: DB Tsai Committed: Fri Aug 17 18:40:29 2018 + -- .../tests/fulltests/test_mllib_classification.R | 4 +- R/pkg/vignettes/sparkr-vignettes.Rmd| 2 +- docs/sparkr.md | 4 ++ .../MultilayerPerceptronClassifier.scala| 50 ++-- project/MimaExcludes.scala | 5 +- 5 files changed, 26 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8b0e94d8/R/pkg/tests/fulltests/test_mllib_classification.R -- diff --git a/R/pkg/tests/fulltests/test_mllib_classification.R b/R/pkg/tests/fulltests/test_mllib_classification.R index a46c47d..023686e 100644 --- a/R/pkg/tests/fulltests/test_mllib_classification.R +++ b/R/pkg/tests/fulltests/test_mllib_classification.R @@ -382,10 +382,10 @@ test_that("spark.mlp", { trainidxs <- base::sample(nrow(data), nrow(data) * 0.7) traindf <- as.DataFrame(data[trainidxs, ]) testdf <- as.DataFrame(rbind(data[-trainidxs, ], c(0, "the other"))) - model <- spark.mlp(traindf, clicked ~ ., layers = c(1, 3)) + model <- spark.mlp(traindf, clicked ~ ., layers = c(1, 2)) predictions <- predict(model, testdf) expect_error(collect(predictions)) - model <- spark.mlp(traindf, clicked ~ ., layers = c(1, 3), handleInvalid = "skip") + model <- spark.mlp(traindf, clicked ~ ., layers = c(1, 2), handleInvalid = "skip") predictions <- predict(model, testdf) expect_equal(class(collect(predictions)$clicked[1]), "list") http://git-wip-us.apache.org/repos/asf/spark/blob/8b0e94d8/R/pkg/vignettes/sparkr-vignettes.Rmd -- diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd index 68a18ab..090363c 100644 --- a/R/pkg/vignettes/sparkr-vignettes.Rmd +++ b/R/pkg/vignettes/sparkr-vignettes.Rmd @@ -654,7 +654,7 @@ We use Titanic data set to show how to use `spark.mlp` in classification. t <- as.data.frame(Titanic) training <- createDataFrame(t) # fit a Multilayer Perceptron Classification Model -model <- spark.mlp(training, Survived ~ Age + Sex, blockSize = 128, layers = c(2, 3), solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1, initialWeights = c( 0, 0, 0, 5, 5, 5, 9, 9, 9)) +model <- spark.mlp(training, Survived ~ Age + Sex, blockSize = 128, layers = c(2, 2), solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1, initialWeights = c( 0, 0, 5, 5, 9, 9)) ``` To avoid lengthy display, we only present partial results of the model summary. You can check the full result from your sparkR shell. http://git-wip-us.apache.org/repos/asf/spark/blob/8b0e94d8/docs/sparkr.md -- diff --git a/docs/sparkr.md b/docs/sparkr.md index 4faad2c..84e9b4a 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -667,3 +667,7 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma ## Upgrading to SparkR 2.3.1 and above - In SparkR 2.3.0 and earlier, the `start` parameter of `substr` method was wrongly subtracted by one and considered as 0-based. This can lead to inconsistent substring results and also does not match with the behaviour with `substr` in R. In version 2.3.1 and later, it has been fixed so the `start` parameter of `substr` method is now 1-base. As an example, `substr(lit('abcdef'), 2, 4))` would result to `abc` in SparkR 2.3.0, and the result would be `bcd` in SparkR 2.3.1. + +## Upgrading to SparkR 2.4.0 + + - Previously, we don't check the validity of the size of the last layer in `spark.mlp`. For example, if the training data only has two labels, a `layers` param like `c(1, 3)` doesn't cause an error previously, now it does. http://git-wip-us.apache.org/repos/asf/spark/blob/8b0e94d8/m
svn commit: r28800 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_17_04_02-162326c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 17 11:18:20 2018 New Revision: 28800 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_17_04_02-162326c docs [This commit notification would consist of 1477 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28793 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_17_00_02-c1ffb3c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Aug 17 07:16:20 2018 New Revision: 28793 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_17_00_02-c1ffb3c docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25117][R] Add EXEPT ALL and INTERSECT ALL support in R
Repository: spark Updated Branches: refs/heads/master c1ffb3c10 -> 162326c0e [SPARK-25117][R] Add EXEPT ALL and INTERSECT ALL support in R ## What changes were proposed in this pull request? [SPARK-21274](https://issues.apache.org/jira/browse/SPARK-21274) added support for EXCEPT ALL and INTERSECT ALL. This PR adds the support in R. ## How was this patch tested? Added test in test_sparkSQL.R Author: Dilip Biswal Closes #22107 from dilipbiswal/SPARK-25117. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/162326c0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/162326c0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/162326c0 Branch: refs/heads/master Commit: 162326c0ee8419083ebd1669796abd234773e9b6 Parents: c1ffb3c Author: Dilip Biswal Authored: Fri Aug 17 00:04:04 2018 -0700 Committer: Felix Cheung Committed: Fri Aug 17 00:04:04 2018 -0700 -- R/pkg/NAMESPACE | 2 + R/pkg/R/DataFrame.R | 59 +- R/pkg/R/generics.R| 6 +++ R/pkg/tests/fulltests/test_sparkSQL.R | 19 ++ 4 files changed, 85 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/162326c0/R/pkg/NAMESPACE -- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index adfd387..0fd0848 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -117,6 +117,7 @@ exportMethods("arrange", "dropna", "dtypes", "except", + "exceptAll", "explain", "fillna", "filter", @@ -131,6 +132,7 @@ exportMethods("arrange", "hint", "insertInto", "intersect", + "intersectAll", "isLocal", "isStreaming", "join", http://git-wip-us.apache.org/repos/asf/spark/blob/162326c0/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 471ada1..4f2d4c7 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2848,6 +2848,35 @@ setMethod("intersect", dataFrame(intersected) }) +#' intersectAll +#' +#' Return a new SparkDataFrame containing rows in both this SparkDataFrame +#' and another SparkDataFrame while preserving the duplicates. +#' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in +#' SQL, this function resolves columns by position (not by name). +#' +#' @param x a SparkDataFrame. +#' @param y a SparkDataFrame. +#' @return A SparkDataFrame containing the result of the intersect all operation. +#' @family SparkDataFrame functions +#' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method +#' @rdname intersectAll +#' @name intersectAll +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df1 <- read.json(path) +#' df2 <- read.json(path2) +#' intersectAllDF <- intersectAll(df1, df2) +#' } +#' @note intersectAll since 2.4.0 +setMethod("intersectAll", + signature(x = "SparkDataFrame", y = "SparkDataFrame"), + function(x, y) { +intersected <- callJMethod(x@sdf, "intersectAll", y@sdf) +dataFrame(intersected) + }) + #' except #' #' Return a new SparkDataFrame containing rows in this SparkDataFrame @@ -2867,7 +2896,6 @@ setMethod("intersect", #' df2 <- read.json(path2) #' exceptDF <- except(df, df2) #' } -#' @rdname except #' @note except since 1.4.0 setMethod("except", signature(x = "SparkDataFrame", y = "SparkDataFrame"), @@ -2876,6 +2904,35 @@ setMethod("except", dataFrame(excepted) }) +#' exceptAll +#' +#' Return a new SparkDataFrame containing rows in this SparkDataFrame +#' but not in another SparkDataFrame while preserving the duplicates. +#' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in +#' SQL, this function resolves columns by position (not by name). +#' +#' @param x a SparkDataFrame. +#' @param y a SparkDataFrame. +#' @return A SparkDataFrame containing the result of the except all operation. +#' @family SparkDataFrame functions +#' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method +#' @rdname exceptAll +#' @name exceptAll +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df1 <- read.json(path) +#' df2 <- read.json(path2) +#' exceptAllDF <- exceptAll(df1, df2) +#' } +#' @note exceptAll since 2.4.0 +setMethod("exceptAll", + signature(x = "SparkDataFrame", y = "SparkDataFrame"), + function(x, y) { +excepted <- callJMethod(x@sdf, "exceptAll", y@sdf) +dataFrame(excepted) + }) + #' Save the contents of SparkDataFrame to a data source. #'