Repository: spark Updated Branches: refs/heads/master 4482ff23a -> ecf437a64
[SPARK-21534][SQL][PYSPARK] PickleException when creating dataframe from python row with empty bytearray ## What changes were proposed in this pull request? `PickleException` is thrown when creating dataframe from python row with empty bytearray spark.createDataFrame(spark.sql("select unhex('') as xx").rdd.map(lambda x: {"abc": x.xx})).show() net.razorvine.pickle.PickleException: invalid pickle data for bytearray; expected 1 or 2 args, got 0 at net.razorvine.pickle.objects.ByteArrayConstructor.construct(ByteArrayConstructor.java ... `ByteArrayConstructor` doesn't deal with empty byte array pickled by Python3. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #19085 from viirya/SPARK-21534. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecf437a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecf437a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecf437a6 Branch: refs/heads/master Commit: ecf437a64874a31328f4e28c6b24f37557fbe07d Parents: 4482ff2 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Thu Aug 31 12:55:38 2017 +0900 Committer: hyukjinkwon <gurwls...@gmail.com> Committed: Thu Aug 31 12:55:38 2017 +0900 ---------------------------------------------------------------------- .../scala/org/apache/spark/api/python/SerDeUtil.scala | 14 ++++++++++++++ python/pyspark/sql/tests.py | 4 +++- 2 files changed, 17 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ecf437a6/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index aaf8e7a..01e64b6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -35,6 +35,16 @@ import org.apache.spark.rdd.RDD /** Utilities for serialization / deserialization between Python and Java, using Pickle. */ private[spark] object SerDeUtil extends Logging { + class ByteArrayConstructor extends net.razorvine.pickle.objects.ByteArrayConstructor { + override def construct(args: Array[Object]): Object = { + // Deal with an empty byte array pickled by Python 3. + if (args.length == 0) { + Array.emptyByteArray + } else { + super.construct(args) + } + } + } // Unpickle array.array generated by Python 2.6 class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor { // /* Description of types */ @@ -108,6 +118,10 @@ private[spark] object SerDeUtil extends Logging { synchronized{ if (!initialized) { Unpickler.registerConstructor("array", "array", new ArrayConstructor()) + Unpickler.registerConstructor("__builtin__", "bytearray", new ByteArrayConstructor()) + Unpickler.registerConstructor("builtins", "bytearray", new ByteArrayConstructor()) + Unpickler.registerConstructor("__builtin__", "bytes", new ByteArrayConstructor()) + Unpickler.registerConstructor("_codecs", "encode", new ByteArrayConstructor()) initialized = true } } http://git-wip-us.apache.org/repos/asf/spark/blob/ecf437a6/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1ecde68..b310285 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2383,9 +2383,11 @@ class SQLTests(ReusedPySparkTestCase): def test_BinaryType_serialization(self): # Pyrolite version <= 4.9 could not serialize BinaryType with Python3 SPARK-17808 + # The empty bytearray is test for SPARK-21534. schema = StructType([StructField('mybytes', BinaryType())]) data = [[bytearray(b'here is my data')], - [bytearray(b'and here is some more')]] + [bytearray(b'and here is some more')], + [bytearray(b'')]] df = self.spark.createDataFrame(data, schema=schema) df.collect() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org