[GitHub] spark issue #21465: [SPARK-24333][ML][PYTHON]Add fit with validation set to ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/21465 merged to master, thanks @huaxingao ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22273: [SPARK-25272][PYTHON][TEST] Add test to better indicate ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22273 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22273: [SPARK-25272][PYTHON][TEST] Add test to better indicate ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22273 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22273: [SPARK-25272][PYTHON][TEST] Add test to better indicate ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22273 restest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r239569077 --- Diff: python/pyspark/ml/param/shared.py --- @@ -814,3 +814,25 @@ def getDistanceMeasure(self): """ return self.getOrDefault(self.distanceMeasure) + +class HasValidationIndicatorCol(Params): --- End diff -- Would you mind running the codegen again, like this command for example `pushd python/pyspark/ml/param/ && python _shared_params_code_gen.py > shared.py && popd` and push the result if there is a diff? I think the DecisionTreeParams should be at the bottom of the file.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23236: [SPARK-26275][PYTHON][ML] Increases timeout for Streamin...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23236 > I suspect that might because as the resource usage is heavy, StreamingLogisticRegressionWithSGD's training speed on input batch stream can't always catch up predict batch stream. So the model doesn't reach expected improvement in error yet. Yeah that sounds likely, and the timeout increase should help with that --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send u...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22275 merged to master, thanks @holdenk @viirya and @felixcheung ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23236: [SPARK-26275][PYTHON][ML] Increases timeout for Streamin...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23236 True, the test is not that long under light resources. Locally, I saw a couple seconds difference with the changes I mentioned. The weird thing is the unmodified test completes after the 11th batch with errors ``` ['0.67', '0.71', '0.78', '0.7', '0.75', '0.74', '0.73', '0.69', '0.62', '0.71', '0.31'] ``` Compared to the error values from the test failures above, they match up until the 10th batch but then these continue until the 16th where it has a timeout ``` 0.67, 0.71, 0.78, 0.7, 0.75, 0.74, 0.73, 0.69, 0.62, 0.71, 0.69, 0.75, 0.72, 0.77, 0.71, 0.74 ``` I would expect the seed to produce the same values (or all diff if the random func is different), which makes me think something else is going on.. Anyway, I think it's fine to increase the timeout and if it's still flaky, we can look at making changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r239245388 --- Diff: python/pyspark/ml/regression.py --- @@ -705,12 +710,59 @@ def getNumTrees(self): return self.getOrDefault(self.numTrees) -class GBTParams(TreeEnsembleParams): +class GBTParams(TreeEnsembleParams, HasMaxIter, HasStepSize, HasValidationIndicatorCol): """ Private class to track supported GBT params. """ + +stepSize = Param(Params._dummy(), "stepSize", + "Step size (a.k.a. learning rate) in interval (0, 1] for shrinking " + + "the contribution of each estimator.", + typeConverter=TypeConverters.toFloat) + +validationTol = Param(Params._dummy(), "validationTol", + "Threshold for stopping early when fit with validation is used. " + + "If the error rate on the validation input changes by less than the " + + "validationTol, then learning will stop early (before `maxIter`). " + + "This parameter is ignored when fit without validation is used.", + typeConverter=TypeConverters.toFloat) + +@since("3.0.0") +def setValidationTol(self, value): +""" +Sets the value of :py:attr:`validationTol`. +""" +return self._set(validationTol=value) + +@since("3.0.0") +def getValidationTol(self): +""" +Gets the value of validationTol or its default value. +""" +return self.getOrDefault(self.validationTol) + + +class GBTRegressorParams(GBTParams, TreeRegressorParams): +""" +Private class to track supported GBTRegressor params. + +.. versionadded:: 3.0.0 +""" + supportedLossTypes = ["squared", "absolute"] +lossType = Param(Params._dummy(), "lossType", + "Loss function which GBT tries to minimize (case-insensitive). " + + "Supported options: " + ", ".join(supportedLossTypes), + typeConverter=TypeConverters.toString) + +@since("1.4.0") +def setLossType(self, value): --- End diff -- `setLossType` should be in the estimator and `getLossType` should be here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r239240113 --- Diff: python/pyspark/ml/regression.py --- @@ -705,12 +710,59 @@ def getNumTrees(self): return self.getOrDefault(self.numTrees) -class GBTParams(TreeEnsembleParams): +class GBTParams(TreeEnsembleParams, HasMaxIter, HasStepSize, HasValidationIndicatorCol): """ Private class to track supported GBT params. """ + +stepSize = Param(Params._dummy(), "stepSize", + "Step size (a.k.a. learning rate) in interval (0, 1] for shrinking " + + "the contribution of each estimator.", + typeConverter=TypeConverters.toFloat) + +validationTol = Param(Params._dummy(), "validationTol", + "Threshold for stopping early when fit with validation is used. " + + "If the error rate on the validation input changes by less than the " + + "validationTol, then learning will stop early (before `maxIter`). " + + "This parameter is ignored when fit without validation is used.", + typeConverter=TypeConverters.toFloat) + +@since("3.0.0") +def setValidationTol(self, value): --- End diff -- It seems scala does not have this API right? If not then let's remove it here for now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r239243661 --- Diff: python/pyspark/ml/classification.py --- @@ -1174,9 +1165,31 @@ def trees(self): return [DecisionTreeClassificationModel(m) for m in list(self._call_java("trees"))] +class GBTClassifierParams(GBTParams, HasVarianceImpurity): +""" +Private class to track supported GBTClassifier params. + +.. versionadded:: 3.0.0 +""" + +supportedLossTypes = ["logistic"] + +lossType = Param(Params._dummy(), "lossType", + "Loss function which GBT tries to minimize (case-insensitive). " + + "Supported options: " + ", ".join(supportedLossTypes), + typeConverter=TypeConverters.toString) + +@since("3.0.0") --- End diff -- please address the above comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r239242316 --- Diff: python/pyspark/ml/classification.py --- @@ -1174,9 +1165,31 @@ def trees(self): return [DecisionTreeClassificationModel(m) for m in list(self._call_java("trees"))] +class GBTClassifierParams(GBTParams, HasVarianceImpurity): --- End diff -- Yeah, you're correct, this is fine --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r239243683 --- Diff: python/pyspark/ml/classification.py --- @@ -1174,9 +1165,31 @@ def trees(self): return [DecisionTreeClassificationModel(m) for m in list(self._call_java("trees"))] +class GBTClassifierParams(GBTParams, HasVarianceImpurity): +""" +Private class to track supported GBTClassifier params. + +.. versionadded:: 3.0.0 +""" + +supportedLossTypes = ["logistic"] + +lossType = Param(Params._dummy(), "lossType", + "Loss function which GBT tries to minimize (case-insensitive). " + + "Supported options: " + ", ".join(supportedLossTypes), + typeConverter=TypeConverters.toString) + +@since("3.0.0") +def setLossType(self, value): --- End diff -- please address the above comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r239211515 --- Diff: python/pyspark/ml/classification.py --- @@ -1174,9 +1165,31 @@ def trees(self): return [DecisionTreeClassificationModel(m) for m in list(self._call_java("trees"))] +class GBTClassifierParams(GBTParams, HasVarianceImpurity): --- End diff -- ah, I see. let me take another look.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23236: [SPARK-26275][PYTHON][ML] Increases timeout for Streamin...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23236 Seems ok to me, but there are a few silly things with this test that might help also * why is the `stepSize` so low at 0.01? I think it would be fine at 0.1, but even conservatively at 0.03 would still help converge a little faster * why is it comparing the current error with the second error value (errors[1] - errors[-1])? Maybe because errors[1] is bigger, but should just be `errors[0]` and make the values more sensible, like maybe check that error is > 0.2 * having 2 `if` statements to check basically the same condition is a bit redundant, just `assert(len(errors) < len(predict_batches)` before the last return * nit: `true_predicted = []` on line 350 isn't used If you don't feel comfortable changing any of these, just increasing the timeout is probably fine --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23203: [SPARK-26252][PYTHON] Add support to run specific...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/23203#discussion_r238868565 --- Diff: python/run-tests.py --- @@ -93,17 +93,18 @@ def run_individual_python_test(target_dir, test_name, pyspark_python): "pyspark-shell" ] env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args) - -LOGGER.info("Starting test(%s): %s", pyspark_python, test_name) +str_test_name = " ".join(test_name) +LOGGER.info("Starting test(%s): %s", pyspark_python, str_test_name) start_time = time.time() try: per_test_output = tempfile.TemporaryFile() retcode = subprocess.Popen( -[os.path.join(SPARK_HOME, "bin/pyspark"), test_name], --- End diff -- Just a thought, could you leave `test_name` as a string and then change this line to `[os.path.join(SPARK_HOME, "bin/pyspark")] + test_name.split(),`? I think it would be a little more simple and wouldn't need `str_test_name`, wdyt? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r238801573 --- Diff: python/pyspark/ml/classification.py --- @@ -1174,9 +1165,31 @@ def trees(self): return [DecisionTreeClassificationModel(m) for m in list(self._call_java("trees"))] +class GBTClassifierParams(GBTParams, HasVarianceImpurity): +""" +Private class to track supported GBTClassifier params. + +.. versionadded:: 3.0.0 +""" + +supportedLossTypes = ["logistic"] + +lossType = Param(Params._dummy(), "lossType", + "Loss function which GBT tries to minimize (case-insensitive). " + + "Supported options: " + ", ".join(supportedLossTypes), + typeConverter=TypeConverters.toString) + +@since("3.0.0") +def setLossType(self, value): --- End diff -- `setLossType` should be in the estimators, `getLossType` should be here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r238808440 --- Diff: python/pyspark/ml/classification.py --- @@ -1174,9 +1165,31 @@ def trees(self): return [DecisionTreeClassificationModel(m) for m in list(self._call_java("trees"))] +class GBTClassifierParams(GBTParams, HasVarianceImpurity): --- End diff -- this should extend `TreeClassifierParams` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r238809338 --- Diff: python/pyspark/ml/classification.py --- @@ -1242,40 +1255,36 @@ class GBTClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol [0.25..., 0.23..., 0.21..., 0.19..., 0.18...] >>> model.numClasses 2 +>>> gbt = gbt.setValidationIndicatorCol("validationIndicator") +>>> gbt.getValidationIndicatorCol() +'validationIndicator' +>>> gbt.getValidationTol() +0.01 .. versionadded:: 1.4.0 """ -lossType = Param(Params._dummy(), "lossType", - "Loss function which GBT tries to minimize (case-insensitive). " + - "Supported options: " + ", ".join(GBTParams.supportedLossTypes), - typeConverter=TypeConverters.toString) - -stepSize = Param(Params._dummy(), "stepSize", - "Step size (a.k.a. learning rate) in interval (0, 1] for shrinking " + - "the contribution of each estimator.", - typeConverter=TypeConverters.toFloat) - @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, lossType="logistic", - maxIter=20, stepSize=0.1, seed=None, subsamplingRate=1.0, - featureSubsetStrategy="all"): + maxIter=20, stepSize=0.1, seed=None, subsamplingRate=1.0, impurity="variance", --- End diff -- this is not the correct default impurity --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r238801256 --- Diff: python/pyspark/ml/classification.py --- @@ -1174,9 +1165,31 @@ def trees(self): return [DecisionTreeClassificationModel(m) for m in list(self._call_java("trees"))] +class GBTClassifierParams(GBTParams, HasVarianceImpurity): +""" +Private class to track supported GBTClassifier params. + +.. versionadded:: 3.0.0 +""" + +supportedLossTypes = ["logistic"] + +lossType = Param(Params._dummy(), "lossType", + "Loss function which GBT tries to minimize (case-insensitive). " + + "Supported options: " + ", ".join(supportedLossTypes), + typeConverter=TypeConverters.toString) + +@since("3.0.0") --- End diff -- don't change the version, since we are just refactoring the base classes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r238809091 --- Diff: python/pyspark/ml/regression.py --- @@ -650,19 +650,20 @@ def getFeatureSubsetStrategy(self): return self.getOrDefault(self.featureSubsetStrategy) -class TreeRegressorParams(Params): +class HasVarianceImpurity(Params): --- End diff -- This shouldn't be changed, impurity is different for regression and classification, so the param needs to be defined in `TreeRegressorParams` and `TreeClassifierParams`, as it was already --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23200: [SPARK-26033][SPARK-26034][PYTHON][FOLLOW-UP] Small clea...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23200 merged to master, thanks @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23200: [SPARK-26033][SPARK-26034][PYTHON][FOLLOW-UP] Sma...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/23200#discussion_r238454041 --- Diff: python/pyspark/mllib/tests/test_linalg.py --- @@ -22,33 +22,18 @@ from numpy import array, array_equal, zeros, arange, tile, ones, inf import pyspark.ml.linalg as newlinalg +from pyspark.serializers import PickleSerializer from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector, \ DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT from pyspark.mllib.regression import LabeledPoint -from pyspark.testing.mllibutils import make_serializer, MLlibTestCase - -_have_scipy = False -try: -import scipy.sparse -_have_scipy = True -except: -# No SciPy, but that's okay, we'll skip those tests -pass - - -ser = make_serializer() - - -def _squared_distance(a, b): -if isinstance(a, Vector): -return a.squared_distance(b) -else: -return b.squared_distance(a) +from pyspark.testing.mllibutils import MLlibTestCase +from pyspark.testing.utils import have_scipy --- End diff -- Oh that's good, didn't realize have_scipy was there --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22954: [SPARK-25981][R] Enables Arrow optimization from R DataF...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22954 > @BryanCutler BTW, do you know the rough expected timing for Arrow 0.12.0 release? I think we should be starting the release process soon, so maybe in a week or two. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r236880776 --- Diff: python/pyspark/ml/regression.py --- @@ -705,12 +705,38 @@ def getNumTrees(self): return self.getOrDefault(self.numTrees) -class GBTParams(TreeEnsembleParams): +class GBTParams(TreeEnsembleParams, HasMaxIter, HasStepSize, HasValidationIndicatorCol): --- End diff -- I like having a common `GBTParams` class, it was strange to have 2 of the same name. But you should also define `GBTClassifierParams` and `GBTRegressorParams`, then put the `supportedLossTypes` in there so you don't need to override them later. You can also put the `lossType` param and `getLossType()` method there. This makes it clean and follows how it's done in Scala. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r236881042 --- Diff: python/pyspark/ml/regression.py --- @@ -1030,9 +1056,9 @@ def featureImportances(self): @inherit_doc -class GBTRegressor(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter, - GBTParams, HasCheckpointInterval, HasStepSize, HasSeed, JavaMLWritable, - JavaMLReadable, TreeRegressorParams): +class GBTRegressor(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, GBTParams, + HasCheckpointInterval, HasStepSize, HasSeed, JavaMLWritable, JavaMLReadable, --- End diff -- I think you can remove `HasStepSize` since it is in `GBTParams` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r235524421 --- Diff: python/pyspark/worker.py --- @@ -22,7 +22,12 @@ import os import sys import time -import resource +# 'resource' is a Unix specific package. +has_resource_package = True --- End diff -- nit: is it technically a module, not a package? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23055: [SPARK-26080][PYTHON] Disable 'spark.executor.pys...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/23055#discussion_r235523238 --- Diff: docs/configuration.md --- @@ -189,7 +189,7 @@ of the most common options to set are: limited to this amount. If not set, Spark will not limit Python's memory use and it is up to the application to avoid exceeding the overhead memory space shared with other non-JVM processes. When PySpark is run in YARN or Kubernetes, this memory -is added to executor resource requests. +is added to executor resource requests. This configuration is not supported on Windows. --- End diff -- Maybe add `NOTE: ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21465: [SPARK-24333][ML][PYTHON]Add fit with validation ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/21465#discussion_r235128413 --- Diff: python/pyspark/ml/classification.py --- @@ -1176,8 +1176,8 @@ def trees(self): @inherit_doc class GBTClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasMaxIter, -GBTParams, HasCheckpointInterval, HasStepSize, HasSeed, JavaMLWritable, -JavaMLReadable): +GBTParams, HasCheckpointInterval, HasStepSize, HasSeed, +HasValidationIndicatorCol, JavaMLWritable, JavaMLReadable): --- End diff -- I think this should be added to `GBTParams`, which is done on the Scala side too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23077: [SPARK-25344][PYTHON] Clean unittest2 imports up that we...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23077 Oh, I think the PR title should be SPARK-26105 too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23077: [SPARK-25344][PYTHON] Clean unittest2 imports up that we...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23077 >BTW, Bryan, do you have some time to work on the has_numpy stuff Yup, I can do that --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23077: [SPARK-25344][PYTHON] Clean unittest2 imports up that we...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23077 Oops, actually I think there is one more here https://github.com/apache/spark/blob/master/python/pyspark/testing/mllibutils.py#L20 Other than that, looks good --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23063: [SPARK-26033][PYTHON][TESTS] Break large ml/tests.py fil...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23063 cc @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23063: [SPARK-26033][PYTHON][TESTS] Break large ml/tests.py fil...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23063 Dist by line count: ``` 348 ./test_algorithms.py 84 ./test_base.py 71 ./test_evaluation.py 314 ./test_feature.py 118 ./test_image.py 392 ./test_linalg.py 367 ./test_param.py 369 ./test_persistence.py 77 ./test_pipeline.py 56 ./test_stat.py 254 ./test_training_summary.py 552 ./test_tuning.py 116 ./test_wrapper.py ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23063: [SPARK-26033][PYTHON][TESTS] Break large ml/tests...
GitHub user BryanCutler opened a pull request: https://github.com/apache/spark/pull/23063 [SPARK-26033][PYTHON][TESTS] Break large ml/tests.py file into smaller files ## What changes were proposed in this pull request? This PR breaks down the large ml/tests.py file that contains all Python ML unit tests into several smaller test files to be easier to read and maintain. The tests are broken down as follows: ``` pyspark âââ __init__.py ... âââ ml â âââ __init__.py ... â âââ tests â â âââ __init__.py â â âââ test_algorithms.py â â âââ test_base.py â â âââ test_evaluation.py â â âââ test_feature.py â â âââ test_image.py â â âââ test_linalg.py â â âââ test_param.py â â âââ test_persistence.py â â âââ test_pipeline.py â â âââ test_stat.py â â âââ test_training_summary.py â â âââ test_tuning.py â â âââ test_wrapper.py ... âââ testing ... â âââ mlutils.py ... ``` ## How was this patch tested? Ran tests manually by module to ensure test count was the same, and ran `python/run-tests --modules=pyspark-ml` to verify all passing with Python 2.7 and Python 3.6. You can merge this pull request into a Git repository by running: $ git pull https://github.com/BryanCutler/spark python-test-breakup-ml-SPARK-26033 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23063.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23063 commit a4f8f12f6357861572ffbf34190947983545ba98 Author: Bryan Cutler Date: 2018-11-17T01:30:29Z separated out ml tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23056: [SPARK-26034][PYTHON][TESTS] Break large mllib/te...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/23056#discussion_r234093063 --- Diff: python/pyspark/testing/mllibutils.py --- @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +if sys.version_info[:2] <= (2, 6): +try: +import unittest2 as unittest +except ImportError: +sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier') +sys.exit(1) +else: +import unittest --- End diff -- Yeah, I wondered about that but thought it might be better to do in a followup --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23056: [SPARK-26034][PYTHON][TESTS] Break large mllib/tests.py ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23056 cc @HyukjinKwon @squito --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23056: [SPARK-26034][PYTHON][TESTS] Break large mllib/tests.py ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23056 Dist by line count: ``` 313 ./test_algorithms.py 201 ./test_feature.py 642 ./test_linalg.py 197 ./test_stat.py 523 ./test_streaming_algorithms.py 115 ./test_util.py ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23056: [SPARK-26034][PYTHON][TESTS] Break large mllib/te...
GitHub user BryanCutler opened a pull request: https://github.com/apache/spark/pull/23056 [SPARK-26034][PYTHON][TESTS] Break large mllib/tests.py file into smaller files ## What changes were proposed in this pull request? This PR breaks down the large mllib/tests.py file that contains all Python MLlib unit tests into several smaller test files to be easier to read and maintain. The tests are broken down as follows: ``` pyspark âââ __init__.py ... âââ mllib â âââ __init__.py ... â âââ tests â â âââ __init__.py â â âââ test_algorithms.py â â âââ test_feature.py â â âââ test_linalg.py â â âââ test_stat.py â â âââ test_streaming_algorithms.py â â âââ test_util.py ... âââ testing ... â âââ mllibutils.py ... ``` ## How was this patch tested? Ran tests manually by module to ensure test count was the same, and ran `python/run-tests --modules=pyspark-mllib` to verify all passing with Python 2.7 and Python 3.6. Also installed scipy to include optional tests in test_linalg. You can merge this pull request into a Git repository by running: $ git pull https://github.com/BryanCutler/spark python-test-breakup-mllib-SPARK-26034 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23056.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23056 commit 2759521df7f2dffc9ddb9379e0b1dac6721da366 Author: Bryan Cutler Date: 2018-11-16T03:01:22Z separated mllib tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23034: [SPARK-26035][PYTHON] Break large streaming/tests.py fil...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23034 > Also, @BryanCutler, I think we can talk about locations of testing/...util.py later when we finished to split the tests. Moving utils would probably cause less conflicts and should be good enough to separately discuss if that's a worry, and should be changed. Sounds good, working on MLlib right now. Hopefully have a PR up soon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #23033: [SPARK-26036][PYTHON] Break large tests.py files into sm...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/23033 Looks like ML is using `QuietTest` also, so the import needs to be updated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r233209385 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") --- End diff -- Oh if it's only when casting to a float, then maybe not that big of an issue. I just wanted to make sure a bug was filed for Arrow if the problem is there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23021: [SPARK-26032][PYTHON] Break large sql/tests.py fi...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/23021#discussion_r233183572 --- Diff: python/pyspark/testing/sqlutils.py --- @@ -0,0 +1,268 @@ +# --- End diff -- Maybe rename this file to `sql_testing_utils.py`? And is it possible to just place this in the `sql/tests` directory? Having the `testing` folder seems a little ambiguous to me --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22954: [SPARK-25981][R] Enables Arrow optimization from R DataF...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22954 I don't know R well enough to review that code, but the results look awesome! Nice work @HyukjinKwon!! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232425279 --- Diff: R/pkg/R/SQLContext.R --- @@ -189,19 +238,67 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0, x } } + data[] <- lapply(data, cleanCols) - # drop factors and wrap lists - data <- setNames(lapply(data, cleanCols), NULL) + args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE) + if (arrowEnabled) { +shouldUseArrow <- tryCatch({ + stopifnot(length(data) > 0) + dataHead <- head(data, 1) + # Currenty Arrow optimization does not support POSIXct and raw for now. + # Also, it does not support explicit float type set by users. It leads to + # incorrect conversion. We will fall back to the path without Arrow optimization. + if (any(sapply(dataHead, function(x) is(x, "POSIXct" { +stop("Arrow optimization with R DataFrame does not support POSIXct type yet.") + } + if (any(sapply(dataHead, is.raw))) { +stop("Arrow optimization with R DataFrame does not support raw type yet.") + } + if (inherits(schema, "structType")) { +if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) { + stop("Arrow optimization with R DataFrame does not support FloatType type yet.") --- End diff -- Any idea what's going on with the `FloatType`? Is it a problem on the arrow side? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22954: [SPARK-25981][R] Enables Arrow optimization from ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22954#discussion_r232425031 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala --- @@ -225,4 +226,25 @@ private[sql] object SQLUtils extends Logging { } sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray } + + /** + * R callable function to read a file in Arrow stream format and create a `RDD` --- End diff -- nit: a `RDD` -> an `RDD` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22913: [SPARK-25902][SQL] Add support for dates with millisecon...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22913 Sounds good, thanks @javierluraschi ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send u...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22275 ping @HyukjinKwon and @viirya to maybe take another look at the recent changes to make this cleaner, if you are able to. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r232145973 --- Diff: python/pyspark/sql/tests.py --- @@ -4923,6 +4923,28 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): + +# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python +def run_test(num_records, num_parts, max_records): +df = self.spark.range(num_records, numPartitions=num_parts).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) + +cases = [ +(1024, 512, 2), # Try large num partitions for good chance of not collecting in order +(512, 64, 2),# Try medium num partitions to test out of order collection +(64, 8, 2), # Try small number of partitions to test out of order collection +(64, 64, 1), # Test single batch per partition +(64, 1, 64), # Test single partition, single batch +(64, 1, 8), # Test single partition, multiple batches +(30, 7, 2), # Test different sized partitions +] --- End diff -- @holdenk , I updated the tests, please take another look when you get a chance. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r231311398 --- Diff: python/pyspark/sql/tests.py --- @@ -4923,6 +4923,28 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): + +# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python +def run_test(num_records, num_parts, max_records): +df = self.spark.range(num_records, numPartitions=num_parts).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) + +cases = [ +(1024, 512, 2), # Try large num partitions for good chance of not collecting in order +(512, 64, 2),# Try medium num partitions to test out of order collection +(64, 8, 2), # Try small number of partitions to test out of order collection +(64, 64, 1), # Test single batch per partition +(64, 1, 64), # Test single partition, single batch +(64, 1, 8), # Test single partition, multiple batches +(30, 7, 2), # Test different sized partitions +] --- End diff -- Yeah, it's not a guarantee, but with a large num of partitions, it's a pretty slim chance they will all be in order. I can also add a case with some delay. My only concern is how big to make a delay to be sure it's enough without adding wasted time to the tests. How about we keep the case with a large number of partitions and add a case with 100ms delay on the first partition? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22913: [SPARK-25902][SQL] Add support for dates with millisecon...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22913 I'm a little against adding this because the Arrow Java Vectors used so far were done to match the internal data of Spark, to keep things simple and avoid lots of conversions on the Java side. Conversions to supported types are being done before reading the data in Java. For instance, there are lots of timestamp types for other time units, but we only accept microseconds with tz in Java (to match Spark) and do any necessary conversions in Python before writing the Arrow data. Can this also be done in R? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22913: [SPARK-25902][SQL] Add support for dates with mil...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22913#discussion_r230953015 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala --- @@ -71,6 +71,7 @@ object ArrowUtils { case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale) case date: ArrowType.Date if date.getUnit == DateUnit.DAY => DateType case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND => TimestampType +case date: ArrowType.Date if date.getUnit == DateUnit.MILLISECOND => TimestampType --- End diff -- I think it should map to `Date` and any extra time would be truncated. Looking at the Arrow format from https://github.com/apache/arrow/blob/master/format/Schema.fbs ``` /// Date is either a 32-bit or 64-bit type representing elapsed time since UNIX /// epoch (1970-01-01), stored in either of two units: /// /// * Milliseconds (64 bits) indicating UNIX time elapsed since the epoch (no /// leap seconds), where the values are evenly divisible by 8640 /// * Days (32 bits) since the UNIX epoch ``` So it's expected to be a specific number of days without any additional milliseconds. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r229522939 --- Diff: python/pyspark/sql/tests.py --- @@ -4923,6 +4923,28 @@ def test_timestamp_dst(self): self.assertPandasEqual(pdf, df_from_python.toPandas()) self.assertPandasEqual(pdf, df_from_pandas.toPandas()) +def test_toPandas_batch_order(self): + +# Collects Arrow RecordBatches out of order in driver JVM then re-orders in Python +def run_test(num_records, num_parts, max_records): +df = self.spark.range(num_records, numPartitions=num_parts).toDF("a") +with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": max_records}): +pdf, pdf_arrow = self._toPandas_arrow_toggle(df) +self.assertPandasEqual(pdf, pdf_arrow) + +cases = [ +(1024, 512, 2), # Try large num partitions for good chance of not collecting in order +(512, 64, 2),# Try medium num partitions to test out of order collection +(64, 8, 2), # Try small number of partitions to test out of order collection +(64, 64, 1), # Test single batch per partition +(64, 1, 64), # Test single partition, single batch +(64, 1, 8), # Test single partition, multiple batches +(30, 7, 2), # Test different sized partitions +] --- End diff -- @holdenk and @felixcheung , I didn't do a loop but chose some different levels of partition numbers to be a bit more sure that partitions won't end up in order. I also added some other cases of different partition/batch ratios. Let me know if you think we need more to be sure here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send o...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22275 Apologies for the delay in circling back to this. I reorganized a little to simplify and expanded the comments to hopefully better describe the code. A quick summary of the changes: I changed the ArrowStreamSerializer to not have any state - that seemed to complicate things. So instead of saving the batch order indices, they are loaded on the last iteration of `load_stream`, and this was put in a special serializer `ArrowCollectSerializer` so that it is clear where it is used. I also consolidated all the batch ordering calls within `_collectAsArrow` so it is easier to follow the whole process. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22871: [SPARK-25179][PYTHON][DOCS] Document BinaryType support ...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22871 Thanks @HyukjinKwon , looks good! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22795: [SPARK-25798][PYTHON] Internally document type conversio...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22795 merged to master, thanks @HyukjinKwon ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r227878740 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -63,7 +65,7 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -evalType: Int, +evalTypes: Seq[Int], --- End diff -- Could you just send a flag to indicate it has bounds or not? or add something to `runner_conf`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22795: [SPARK-25798][PYTHON] Internally document type co...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22795#discussion_r227875311 --- Diff: python/pyspark/sql/functions.py --- @@ -3023,6 +3023,42 @@ def pandas_udf(f=None, returnType=None, functionType=None): conversion on returned data. The conversion is not guaranteed to be correct and results should be checked for accuracy by users. """ + +# The following table shows most of Pandas data and SQL type conversions in Pandas UDFs that +# are not yet visible to the user. Some of behaviors are buggy and might be changed in the near +# future. The table might have to be eventually documented externally. +# Please see SPARK-25798's PR to see the codes in order to generate the table below. +# +# +-+--+--+---+++++-+-+-++++---+-+-++-+-+-+--+---++ # noqa +# |SQL Type \ Pandas Value(Type)|None(object(NoneType))|True(bool)|1(int8)|1(int16)| 1(int32)| 1(int64)|1(uint8)|1(uint16)|1(uint32)|1(uint64)|1.0(float16)|1.0(float32)|1.0(float64)|1970-01-01 00:00:00(datetime64[ns])|1970-01-01 00:00:00-05:00(datetime64[ns, US/Eastern])|a(object(string))| 1(object(Decimal))|[1 2 3](object(array[int32]))|1.0(float128)|(1+0j)(complex64)|(1+0j)(complex128)|A(category)|1 days 00:00:00(timedelta64[ns])| # noqa +# +-+--+--+---+++++-+-+-++++---+-+-++-+-+-+--+---++ # noqa +# | boolean| None| True| True|True|True|True|True| True| True| True| False| False| False| False|False| X| X|X|False| False| False| X| False| # noqa +# | tinyint| None| 1| 1| 1| 1| 1| X|X| X|X| 1| 1| 1| X|X| X| X|X|X| X| X| 0| X| # noqa +# | smallint| None| 1| 1| 1| 1| 1| 1|X| X|X| 1| 1| 1| X|X| X| X|X|X| X| X| X| X| # noqa +# | int| None| 1| 1| 1| 1| 1| 1|1| X|X| 1| 1| 1| X|X| X| X|X|X| X| X| X| X| # noqa +# | bigint| None| 1| 1| 1| 1| 1| 1|1| 1|X| 1| 1| 1| 0| 18| X| X|X|X| X| X| X| X| # noqa +# |float| None| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0|
[GitHub] spark pull request #22795: [SPARK-25798][PYTHON] Internally document type co...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22795#discussion_r227593281 --- Diff: python/pyspark/sql/functions.py --- @@ -3023,6 +3023,42 @@ def pandas_udf(f=None, returnType=None, functionType=None): conversion on returned data. The conversion is not guaranteed to be correct and results should be checked for accuracy by users. """ + +# The following table shows most of Pandas data and SQL type conversions in Pandas UDFs that +# are not yet visible to the user. Some of behaviors are buggy and might be changed in the near +# future. The table might have to be eventually documented externally. +# Please see SPARK-25798's PR to see the codes in order to generate the table below. +# +# +-+--+--+---+++++-+-+-++++---+-+-++-+-+-+--+---++ # noqa +# |SQL Type \ Pandas Value(Type)|None(object(NoneType))|True(bool)|1(int8)|1(int16)| 1(int32)| 1(int64)|1(uint8)|1(uint16)|1(uint32)|1(uint64)|1.0(float16)|1.0(float32)|1.0(float64)|1970-01-01 00:00:00(datetime64[ns])|1970-01-01 00:00:00-05:00(datetime64[ns, US/Eastern])|a(object(string))| 1(object(Decimal))|[1 2 3](object(array[int32]))|1.0(float128)|(1+0j)(complex64)|(1+0j)(complex128)|A(category)|1 days 00:00:00(timedelta64[ns])| # noqa +# +-+--+--+---+++++-+-+-++++---+-+-++-+-+-+--+---++ # noqa +# | boolean| None| True| True|True|True|True|True| True| True| True| False| False| False| False|False| X| X|X|False| False| False| X| False| # noqa +# | tinyint| None| 1| 1| 1| 1| 1| X|X| X|X| 1| 1| 1| X|X| X| X|X|X| X| X| 0| X| # noqa +# | smallint| None| 1| 1| 1| 1| 1| 1|X| X|X| 1| 1| 1| X|X| X| X|X|X| X| X| X| X| # noqa +# | int| None| 1| 1| 1| 1| 1| 1|1| X|X| 1| 1| 1| X|X| X| X|X|X| X| X| X| X| # noqa +# | bigint| None| 1| 1| 1| 1| 1| 1|1| 1|X| 1| 1| 1| 0| 18| X| X|X|X| X| X| X| X| # noqa +# |float| None| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0| 1.0|
[GitHub] spark pull request #22795: [SPARK-25798][PYTHON] Internally document type co...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22795#discussion_r227592794 --- Diff: python/pyspark/sql/functions.py --- @@ -3023,6 +3023,42 @@ def pandas_udf(f=None, returnType=None, functionType=None): conversion on returned data. The conversion is not guaranteed to be correct and results should be checked for accuracy by users. """ + +# The following table shows most of Pandas data and SQL type conversions in Pandas UDFs that +# are not yet visible to the user. Some of behaviors are buggy and might be changed in the near +# future. The table might have to be eventually documented externally. +# Please see SPARK-25798's PR to see the codes in order to generate the table below. +# +# +-+--+--+---+++++-+-+-++++---+-+-++-+-+-+--+---++ # noqa +# |SQL Type \ Pandas Value(Type)|None(object(NoneType))|True(bool)|1(int8)|1(int16)| 1(int32)| 1(int64)|1(uint8)|1(uint16)|1(uint32)|1(uint64)|1.0(float16)|1.0(float32)|1.0(float64)|1970-01-01 00:00:00(datetime64[ns])|1970-01-01 00:00:00-05:00(datetime64[ns, US/Eastern])|a(object(string))| 1(object(Decimal))|[1 2 3](object(array[int32]))|1.0(float128)|(1+0j)(complex64)|(1+0j)(complex128)|A(category)|1 days 00:00:00(timedelta64[ns])| # noqa +# +-+--+--+---+++++-+-+-++++---+-+-++-+-+-+--+---++ # noqa +# | boolean| None| True| True|True|True|True|True| True| True| True| False| False| False| False|False| X| X|X|False| False| False| X| False| # noqa --- End diff -- Sure, no prob. I just don't want to us to forget to update this and then it might look confusing and look like these are expected results --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r227582390 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -63,7 +65,7 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -evalType: Int, +evalTypes: Seq[Int], --- End diff -- I don't see that the additional complexity this adds is worth it for now, but curious what others think. If I understand correctly, the python worker just takes an index range for bounded windows and the entire range for unbounded. It does not really care about anything else. So couldn't you just send an index that encompasses the entire range for unbounded? Then you would only need to define `SQL_WINDOW_AGG_PANDAS_UDF` in the worker and all the same code for both, which would simplify quite a bit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r227579686 --- Diff: python/pyspark/sql/tests.py --- @@ -6323,6 +6333,33 @@ def ordered_window(self): def unpartitioned_window(self): return Window.partitionBy() +@property +def sliding_row_window(self): --- End diff -- do these need to be properties or could they be variables? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r227579436 --- Diff: python/pyspark/sql/tests.py --- @@ -6481,12 +6516,116 @@ def test_invalid_args(self): foo_udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP) df.withColumn('v2', foo_udf(df['v']).over(w)) -with QuietTest(self.sc): -with self.assertRaisesRegexp( -AnalysisException, -'.*Only unbounded window frame is supported.*'): -df.withColumn('mean_v', mean_udf(df['v']).over(ow)) +def test_bounded_simple(self): +from pyspark.sql.functions import mean, max, min, count + +df = self.data +w1 = self.sliding_row_window +w2 = self.shrinking_range_window + +plus_one = self.python_plus_one +count_udf = self.pandas_agg_count_udf +mean_udf = self.pandas_agg_mean_udf +max_udf = self.pandas_agg_max_udf +min_udf = self.pandas_agg_min_udf + +result1 = df.withColumn('mean_v', mean_udf(plus_one(df['v'])).over(w1))\ +.withColumn('count_v', count_udf(df['v']).over(w2)) \ +.withColumn('max_v', max_udf(df['v']).over(w2)) \ +.withColumn('min_v', min_udf(df['v']).over(w1)) \ + +expected1 = df.withColumn('mean_v', mean(plus_one(df['v'])).over(w1))\ +.withColumn('count_v', count(df['v']).over(w2)) \ +.withColumn('max_v', max(df['v']).over(w2)) \ +.withColumn('min_v', min(df['v']).over(w1)) \ + +result1.explain(True) +expected1.explain(True) + +result1.show() +expected1.show() --- End diff -- These should be taken out here and the other tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22807: [WIP][SPARK-25811][PySpark] Raise a proper error ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22807#discussion_r227571481 --- Diff: python/pyspark/serializers.py --- @@ -248,7 +248,14 @@ def create_array(s, t): # TODO: see ARROW-2432. Remove when the minimum PyArrow version becomes 0.10.0. return pa.Array.from_pandas(s.apply( lambda v: decimal.Decimal('NaN') if v is None else v), mask=mask, type=t) -return pa.Array.from_pandas(s, mask=mask, type=t) +try: +array = pa.Array.from_pandas(s, mask=mask, type=t) +except pa.ArrowInvalid as e: --- End diff -- I'm not sure if there are other issues that could case an `ArrowInvalid` error. What do you think about catching an `ArrowException` and making the message a little more generic? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22807: [WIP][SPARK-25811][PySpark] Raise a proper error ...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22807#discussion_r227572701 --- Diff: python/pyspark/sql/tests.py --- @@ -4961,6 +4961,31 @@ def foofoo(x, y): ).collect ) +def test_pandas_udf_detect_unsafe_type_conversion(self): +from distutils.version import LooseVersion +from pyspark.sql.functions import pandas_udf +import pandas as pd +import numpy as np +import pyarrow as pa + +values = [1.0] * 3 +pdf = pd.DataFrame({'A': values}) +df = self.spark.createDataFrame(pdf).repartition(1) + +@pandas_udf(returnType="int") +def udf(column): +return pd.Series(np.linspace(0, 1, 3)) + +udf_boolean = df.select(['A']).withColumn('udf', udf('A')) + +# Since 0.11.0, PyArrow supports the feature to raise an error for unsafe cast. +if LooseVersion(pa.__version__) >= LooseVersion("0.11.0"): --- End diff -- I checked how 0.8.0 is working and it does raise an error for something like overflows, but not truncation like this test. Can you also add a check for overflow not dependent on the version? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22795: [SPARK-25798][PYTHON] Internally document type co...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22795#discussion_r227093067 --- Diff: python/pyspark/sql/functions.py --- @@ -3023,6 +3023,42 @@ def pandas_udf(f=None, returnType=None, functionType=None): conversion on returned data. The conversion is not guaranteed to be correct and results should be checked for accuracy by users. """ + +# The following table shows most of Pandas data and SQL type conversions in Pandas UDFs that +# are not yet visible to the user. Some of behaviors are buggy and might be changed in the near +# future. The table might have to be eventually documented externally. +# Please see SPARK-25798's PR to see the codes in order to generate the table below. +# +# +-+--+--+---+++++-+-+-++++---+-+-++-+-+-+--+---++ # noqa +# |SQL Type \ Pandas Value(Type)|None(object(NoneType))|True(bool)|1(int8)|1(int16)| 1(int32)| 1(int64)|1(uint8)|1(uint16)|1(uint32)|1(uint64)|1.0(float16)|1.0(float32)|1.0(float64)|1970-01-01 00:00:00(datetime64[ns])|1970-01-01 00:00:00-05:00(datetime64[ns, US/Eastern])|a(object(string))| 1(object(Decimal))|[1 2 3](object(array[int32]))|1.0(float128)|(1+0j)(complex64)|(1+0j)(complex128)|A(category)|1 days 00:00:00(timedelta64[ns])| # noqa +# +-+--+--+---+++++-+-+-++++---+-+-++-+-+-+--+---++ # noqa +# | boolean| None| True| True|True|True|True|True| True| True| True| False| False| False| False|False| X| X|X|False| False| False| X| False| # noqa --- End diff -- Could you add a note that bool coversion is not accurate and being addressed in ARROW-3428? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22655: [SPARK-25666][PYTHON] Internally document type conversio...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22655 Thanks @viirya ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r223774544 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -63,7 +65,7 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -evalType: Int, +evalTypes: Seq[Int], --- End diff -- The only eval types that can be used together are unbounded and bounded windows right? I only worry that this might lead to more complications with the other types that cannot be sent to the worker together. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r223507242 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -63,7 +65,7 @@ private[spark] object PythonEvalType { */ private[spark] abstract class BasePythonRunner[IN, OUT]( funcs: Seq[ChainedPythonFunctions], -evalType: Int, +evalTypes: Seq[Int], --- End diff -- So you have a Seq now to support a mix of unbounded/bounded eval types? Is there any other way to do this, like maybe decipher from the bounds given? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r223505747 --- Diff: python/pyspark/worker.py --- @@ -154,6 +154,47 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_bounded_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(begin_index, end_index, *series): +import numpy as np +import pandas as pd +result = [] +for i in range(0, len(begin_index)): +begin = begin_index[i] +end = end_index[i] +range_index = np.arange(begin, end) +# Note: Create a slice from a series is actually pretty expensive to +# do for each window. However, there is no way to reduce/eliminate +# the cost of creating sub series here AFAIK. +# TODO: s.take might be the best way to create sub series +series_slices = [s.take(range_index) for s in series] +result.append(f(*series_slices)) +return pd.Series(result) + +return lambda *a: (wrapped(*a), arrow_return_type) + + +def wrap_bounded_window_agg_pandas_udf_np(f, return_type): --- End diff -- This is currently not used right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22305: [SPARK-24561][SQL][Python] User-defined window ag...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22305#discussion_r223506840 --- Diff: python/pyspark/worker.py --- @@ -154,6 +154,47 @@ def wrapped(*series): return lambda *a: (wrapped(*a), arrow_return_type) +def wrap_bounded_window_agg_pandas_udf(f, return_type): +arrow_return_type = to_arrow_type(return_type) + +def wrapped(begin_index, end_index, *series): +import numpy as np +import pandas as pd +result = [] +for i in range(0, len(begin_index)): +begin = begin_index[i] +end = end_index[i] +range_index = np.arange(begin, end) +# Note: Create a slice from a series is actually pretty expensive to +# do for each window. However, there is no way to reduce/eliminate +# the cost of creating sub series here AFAIK. +# TODO: s.take might be the best way to create sub series +series_slices = [s.take(range_index) for s in series] --- End diff -- why can't you use `s.iloc(...)` here and would that be any better? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregati...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22305 I think there is a typo in your example in the description ``` @pandas_udf('double', PandasUDFType.GROUPED_AGG) def avg(v): return v.mean() return avg ``` I think you didn't mean to have `return avg`? Also, I think exploring returning numpy arrays would be good, but lets discuss elsewhere and I would remove that from your description if it isn't supported as part of this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22653: [SPARK-25659][PYTHON][TEST] Test type inference s...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22653#discussion_r223475373 --- Diff: python/pyspark/sql/tests.py --- @@ -1149,6 +1149,75 @@ def test_infer_schema(self): result = self.spark.sql("SELECT l[0].a from test2 where d['key'].d = '2'") self.assertEqual(1, result.head()[0]) +def test_infer_schema_specification(self): +from decimal import Decimal + +class A(object): +def __init__(self): +self.a = 1 + +data = [ +True, +1, +"a", +u"a", +datetime.date(1970, 1, 1), +datetime.datetime(1970, 1, 1, 0, 0), +1.0, +array.array("d", [1]), +[1], +(1, ), +{"a": 1}, +bytearray(1), +Decimal(1), +Row(a=1), +Row("a")(1), +A(), --- End diff -- I didn't this was possible - does it just look at the variable attributes in the object to get the fields? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send o...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22275 Thanks for the review @holdenk ! I haven't had time to followup, but I'll take a look through this and see what I can do about making things clearer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r223116201 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3279,34 +3280,33 @@ class Dataset[T] private[sql]( val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone withAction("collectAsArrowToPython", queryExecution) { plan => - PythonRDD.serveToStream("serve-Arrow") { out => + PythonRDD.serveToStream("serve-Arrow") { outputStream => +val out = new DataOutputStream(outputStream) val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) val arrowBatchRdd = toArrowBatchRdd(plan) val numPartitions = arrowBatchRdd.partitions.length -// Store collection results for worst case of 1 to N-1 partitions -val results = new Array[Array[Array[Byte]]](numPartitions - 1) -var lastIndex = -1 // index of last partition written +// Batches ordered by (index of partition, batch # in partition) tuple +val batchOrder = new ArrayBuffer[(Int, Int)]() +var partitionCount = 0 -// Handler to eagerly write partitions to Python in order +// Handler to eagerly write batches to Python out of order def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { - // If result is from next partition in order - if (index - 1 == lastIndex) { + if (arrowBatches.nonEmpty) { batchWriter.writeBatches(arrowBatches.iterator) -lastIndex += 1 -// Write stored partitions that come next in order -while (lastIndex < results.length && results(lastIndex) != null) { - batchWriter.writeBatches(results(lastIndex).iterator) - results(lastIndex) = null - lastIndex += 1 -} -// After last batch, end the stream -if (lastIndex == results.length) { - batchWriter.end() +arrowBatches.indices.foreach { i => batchOrder.append((index, i)) } + } + partitionCount += 1 + + // After last batch, end the stream and write batch order + if (partitionCount == numPartitions) { +batchWriter.end() +out.writeInt(batchOrder.length) +// Batch order indices are from 0 to N-1 batches, sorted by order they arrived --- End diff -- yeah, sounds good --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22275#discussion_r223116082 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3279,34 +3280,33 @@ class Dataset[T] private[sql]( val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone withAction("collectAsArrowToPython", queryExecution) { plan => - PythonRDD.serveToStream("serve-Arrow") { out => + PythonRDD.serveToStream("serve-Arrow") { outputStream => +val out = new DataOutputStream(outputStream) val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) val arrowBatchRdd = toArrowBatchRdd(plan) val numPartitions = arrowBatchRdd.partitions.length -// Store collection results for worst case of 1 to N-1 partitions -val results = new Array[Array[Array[Byte]]](numPartitions - 1) -var lastIndex = -1 // index of last partition written +// Batches ordered by (index of partition, batch # in partition) tuple +val batchOrder = new ArrayBuffer[(Int, Int)]() +var partitionCount = 0 -// Handler to eagerly write partitions to Python in order +// Handler to eagerly write batches to Python out of order def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { - // If result is from next partition in order - if (index - 1 == lastIndex) { + if (arrowBatches.nonEmpty) { batchWriter.writeBatches(arrowBatches.iterator) -lastIndex += 1 -// Write stored partitions that come next in order -while (lastIndex < results.length && results(lastIndex) != null) { - batchWriter.writeBatches(results(lastIndex).iterator) - results(lastIndex) = null - lastIndex += 1 -} -// After last batch, end the stream -if (lastIndex == results.length) { - batchWriter.end() +arrowBatches.indices.foreach { i => batchOrder.append((index, i)) } --- End diff -- yup! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r223070065 --- Diff: python/pyspark/sql/functions.py --- @@ -2909,6 +2909,11 @@ def pandas_udf(f=None, returnType=None, functionType=None): can fail on special rows, the workaround is to incorporate the condition into the functions. .. note:: The user-defined functions do not take keyword arguments on the calling side. + +.. note:: The data type of returned `pandas.Series` from the user-defined functions should be +matched with defined returnType. When there is mismatch between them, it is not guaranteed +that the conversion by SparkSQL during serialization is correct at all and users might get --- End diff -- instead of saying "conversion is not guaranteed" which sounds like results might be arbitrary, could we say "..mismatch between them, an attempt will be made to cast the data and results should be checked for accuracy."? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning when retu...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22610 > It is pretty new one, is it said we need to upgrade to latest PyArrow in order to use it? Since it is an option at Table.from_pandas, is it possible to extend it to pyarrow.Array? Yeah, it's part of pyarrow.Array now, but will only be in the 0.11.0 release so we would have to do it after the next upgrade. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning when retu...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22610 > Thanks, @BryanCutler. WDYT about documenting the type map thing? I think that would help in the cases of dates/times because those can get a little confusing. For primitives, I think it's pretty straightforward, so I don't know how much that would help. Maybe it we just highlight some potential pitfalls? The problem here was that when a null value was introduced, Pandas automatically converted the data to float to insert a NaN value, then the Arrow conversion from float to bool is broken. When the data just had ints, the conversion seems ok, so it ended up giving inconsistent confusing results. Not sure what might have helped here, it's just a nasty bug :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning when retu...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22610 So pyarrow just added an option when converting from Pandas to raise an error for unsafe casts. I'd have to try it out to see if it would prevent this case though. It's a common option when working with Pandas, so users might be familiar with it and might be more useful to expose this as a Spark conf rather than checking the types. Btw, I'm working on fixing the float to boolean conversion here https://github.com/apache/arrow/pull/2698 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222501309 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " --- End diff -- Yeah, it might be useful to see the warning if doing some local tests etc. My only concern is that users might be confused why they see a warning locally, but doesn't appear in logs.. Man, it would be nice to have some proper python logging for this! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning when retu...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22610 Thanks for looking into this @viirya ! I agree that there are lots of cases where casting to another type is intentional and works fine, so this isn't a bug. The only other idea I have is to provide an option to raise an error if the type needs to be cast. That might be possible with pyarrow right now, but I'm not sure how useful it would be. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22610: [WIP][SPARK-25461][PySpark][SQL] Print warning wh...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22610#discussion_r222007837 --- Diff: python/pyspark/worker.py --- @@ -84,13 +84,36 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): +import pyarrow as pa result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined function should be " "Pandas.Series, but is {}".format(type(result))) if len(result) != len(a[0]): raise RuntimeError("Result vector from pandas_udf was not the required length: " "expected %d, got %d" % (len(a[0]), len(result))) + +# Ensure return type of Pandas.Series matches the arrow return type of the user-defined +# function. Otherwise, we may produce incorrect serialized data. +# Note: for timestamp type, we only need to ensure both types are timestamp because the +# serializer will do conversion. +try: +arrow_type_of_result = pa.from_numpy_dtype(result.dtype) +both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ +pa.types.is_timestamp(arrow_return_type) +if not both_are_timestamp and arrow_return_type != arrow_type_of_result: +print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " --- End diff -- Will this appear when being run in an executor? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22540: [SPARK-24324] [PYTHON] [FOLLOW-UP] Rename the Con...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22540#discussion_r220283006 --- Diff: python/pyspark/worker.py --- @@ -97,8 +97,9 @@ def verify_result_length(*a): def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf): -assign_cols_by_pos = runner_conf.get( -"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False) +assign_cols_by_name = runner_conf.get( + "spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true") --- End diff -- Ahh, I forgot this part is using a plain dict. If we are always passing a value then we need to do string comparison, so yeah this looks right. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22540: [SPARK-24324] [PYTHON] [FOLLOW-UP] Rename the Con...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22540#discussion_r220274047 --- Diff: python/pyspark/worker.py --- @@ -97,8 +97,9 @@ def verify_result_length(*a): def wrap_grouped_map_pandas_udf(f, return_type, argspec, runner_conf): -assign_cols_by_pos = runner_conf.get( -"spark.sql.execution.pandas.groupedMap.assignColumnsByPosition", False) +assign_cols_by_name = runner_conf.get( + "spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName", "true") --- End diff -- why change from passing a boolean to a string? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22540: [SPARK-24324] [PYTHON] [FOLLOW-UP] Rename the Con...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22540#discussion_r220272980 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala --- @@ -131,11 +131,8 @@ object ArrowUtils { } else { Nil } -val pandasColsByPosition = if (conf.pandasGroupedMapAssignColumnssByPosition) { --- End diff -- It's not a big deal performance wise, but does add something. I don't think this is really necessary because the worker has to specify a default value that matches the SQLConf anyway, so I would leave as is but it's fine if you prefer to change it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send o...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22275 > generally, is this going to limit how much data to pass along because of the bit length of the index? So the index passed to python is the RecordBatch index, not an element index, and it would limit the number of batches to Int.MAX. I wouldn't expect that would be likely and you can always set the number of batches to 1 per partition, so that would be the limiting factor then. WDYT @felixcheung ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22477: [SPARK-25471][PYTHON][TEST] Fix pyspark-sql test error w...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22477 Thanks @HyukjinKwon ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22477: [SPARK-25471][PYTHON][TEST] Fix pyspark-sql test error w...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22477 cc @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22477: [SPARK-25471][PYTHON][TEST] Fix pyspark-sql test error w...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22477 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22477: [SPARK-25471][PYTHON][TEST] Fix pyspark-sql test ...
GitHub user BryanCutler opened a pull request: https://github.com/apache/spark/pull/22477 [SPARK-25471][PYTHON][TEST] Fix pyspark-sql test error when using Python 3.6 and Pandas 0.23 ## What changes were proposed in this pull request? Fix test that constructs a Pandas DataFrame by specifying the column order. Previously this test assumed the columns would be sorted alphabetically, however when using Python 3.6 with Pandas 0.23 or higher, the original column order is maintained. Manually tested with `python/run-tests` using Python 3.6.6 and Pandas 0.23.4 You can merge this pull request into a Git repository by running: $ git pull https://github.com/BryanCutler/spark pyspark-tests-py36-pd23-SPARK-25471 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22477.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22477 commit a268250b951a307f216ce4bb6bfd53aa5417bbfa Author: Bryan Cutler Date: 2018-09-19T21:35:49Z specify column ordering --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22275: [SPARK-25274][PYTHON][SQL] In toPandas with Arrow send o...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22275 @holdenk I was wondering if you had any thoughts on this? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20908: [SPARK-23672][PYTHON] Document support for nested return...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/20908 merged to master and branch-2.4, thanks @holdenk ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22140 > Thanks for your understanding. Normally, we are very conservative to introduce any potential behavior change to the released version. Yes, I know. It seemed to me at the time as failing fast rather than later and improving the error message, but best to be safe. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22140 > Can we just simply take this out from branch-2.3? Thanks @HyukjinKwon , that is fine with me. What do you think @gatorsmile ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22369: [SPARK-25072][DOC] Update migration guide for beh...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/22369#discussion_r216147674 --- Diff: docs/sql-programming-guide.md --- @@ -1901,6 +1901,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see ## Upgrading From Spark SQL 2.3.0 to 2.3.1 and above - As of version 2.3.1 Arrow functionality, including `pandas_udf` and `toPandas()`/`createDataFrame()` with `spark.sql.execution.arrow.enabled` set to `True`, has been marked as experimental. These are still evolving and not currently recommended for use in production. + - In version 2.3.1 and earlier, it is possible for PySpark to create a Row object by providing more value than column number through the customized Row class. Since Spark 2.3.3, Spark will confirm value length is less or equal than column length in PySpark. See [SPARK-25072](https://issues.apache.org/jira/browse/SPARK-25072) for details. --- End diff -- Maybe say `..by providing more values than number of fields through a customized Row class. As of Spark 2.3.3, PySpark will raise a ValueError if the number of values are more than the number of fields. See...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22140 @gatorsmile it seemed like a straightforward bug to me. Rows with extra values lead to incorrect output and exceptions when used in `DataFrames`, so it did not seem like there was any possible this would break existing code. For example ``` In [1]: MyRow = Row('a','b') In [2]: print(MyRow(1,2,3)) Row(a=1, b=2) In [3]: spark.createDataFrame([MyRow(1,2,3)]) Out[3]: DataFrame[a: bigint, b: bigint] In [4]: spark.createDataFrame([MyRow(1,2,3)]).show() 18/09/08 21:55:48 ERROR Executor: Exception in task 2.0 in stage 2.0 (TID 7) java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 2 fields are required while 3 values are provided. In [5]: spark.createDataFrame([MyRow(1,2,3)], schema="x: int, y: int").show() ValueError: Length of object (3) does not match with length of fields (2) ``` Maybe I was too hasty with backporting and this needed some discussion. Do you know of a use case that this change would break? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22140 merged to master, branch 2.4 and 2.3. Thanks @xuanyuanking ! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22140: [SPARK-25072][PySpark] Forbid extra value for custom Row
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22140 > yea, to me it looks less sense actually but seems at least working for now: good point, I guess it only fails when you supply a schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22329: [SPARK-25328][PYTHON] Add an example for having two colu...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/22329 merged to branch-2.4 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org