[spark] branch master updated: [SPARK-28215][SQL][R] as_tibble was removed from Arrow R API
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7083ec0 [SPARK-28215][SQL][R] as_tibble was removed from Arrow R API 7083ec0 is described below commit 7083ec051ed47b9c6500f2107650814f0ff9206f Author: Liang-Chi Hsieh AuthorDate: Mon Jul 1 13:21:06 2019 +0900 [SPARK-28215][SQL][R] as_tibble was removed from Arrow R API ## What changes were proposed in this pull request? New R api of Arrow has removed `as_tibble` as of https://github.com/apache/arrow/commit/2ef96c8623cbad1770f82e97df733bd881ab967b. Arrow optimization for DataFrame in R doesn't work due to the change. This can be tested as below, after installing latest Arrow: ``` ./bin/sparkR --conf spark.sql.execution.arrow.sparkr.enabled=true ``` ``` > collect(createDataFrame(mtcars)) ``` Before this PR: ``` > collect(createDataFrame(mtcars)) Error in get("as_tibble", envir = asNamespace("arrow")) : object 'as_tibble' not found ``` After: ``` > collect(createDataFrame(mtcars)) mpg cyl disp hp dratwt qsec vs am gear carb 1 21.0 6 160.0 110 3.90 2.620 16.46 0 144 2 21.0 6 160.0 110 3.90 2.875 17.02 0 144 3 22.8 4 108.0 93 3.85 2.320 18.61 1 141 ... ``` ## How was this patch tested? Manual test. Closes #25012 from viirya/SPARK-28215. Authored-by: Liang-Chi Hsieh Signed-off-by: HyukjinKwon --- R/pkg/R/DataFrame.R | 10 -- R/pkg/R/deserialize.R | 13 ++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 439cad0..6f3c7c1 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1203,7 +1203,8 @@ setMethod("collect", requireNamespace1 <- requireNamespace if (requireNamespace1("arrow", quietly = TRUE)) { read_arrow <- get("read_arrow", envir = asNamespace("arrow"), inherits = FALSE) -as_tibble <- get("as_tibble", envir = asNamespace("arrow")) +# Arrow drops `as_tibble` since 0.14.0, see ARROW-5190. +useAsTibble <- exists("as_tibble", envir = asNamespace("arrow")) portAuth <- callJMethod(x@sdf, "collectAsArrowToR") port <- portAuth[[1]] @@ -1213,7 +1214,12 @@ setMethod("collect", output <- tryCatch({ doServerAuth(conn, authSecret) arrowTable <- read_arrow(readRaw(conn)) - as.data.frame(as_tibble(arrowTable), stringsAsFactors = stringsAsFactors) + if (useAsTibble) { +as_tibble <- get("as_tibble", envir = asNamespace("arrow")) +as.data.frame(as_tibble(arrowTable), stringsAsFactors = stringsAsFactors) + } else { +as.data.frame(arrowTable, stringsAsFactors = stringsAsFactors) + } }, finally = { close(conn) }) diff --git a/R/pkg/R/deserialize.R b/R/pkg/R/deserialize.R index 191c51e..b38d245 100644 --- a/R/pkg/R/deserialize.R +++ b/R/pkg/R/deserialize.R @@ -237,7 +237,9 @@ readDeserializeInArrow <- function(inputCon) { if (requireNamespace1("arrow", quietly = TRUE)) { RecordBatchStreamReader <- get( "RecordBatchStreamReader", envir = asNamespace("arrow"), inherits = FALSE) -as_tibble <- get("as_tibble", envir = asNamespace("arrow")) +# Arrow drops `as_tibble` since 0.14.0, see ARROW-5190. +useAsTibble <- exists("as_tibble", envir = asNamespace("arrow")) + # Currently, there looks no way to read batch by batch by socket connection in R side, # See ARROW-4512. Therefore, it reads the whole Arrow streaming-formatted binary at once @@ -246,8 +248,13 @@ readDeserializeInArrow <- function(inputCon) { arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big") batches <- RecordBatchStreamReader(arrowData)$batches() -# Read all groupped batches. Tibble -> data.frame is cheap. -lapply(batches, function(batch) as.data.frame(as_tibble(batch))) +if (useAsTibble) { + as_tibble <- get("as_tibble", envir = asNamespace("arrow")) + # Read all groupped batches. Tibble -> data.frame is cheap. + lapply(batches, function(batch) as.data.frame(as_tibble(batch))) +} else { + lapply(batches, function(batch) as.data.frame(batch)) +} } else { stop("'arrow' package should be installed.") } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28201][SQL] Revisit MakeDecimal behavior on overflow
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bc4a676 [SPARK-28201][SQL] Revisit MakeDecimal behavior on overflow bc4a676 is described below commit bc4a676b2752c691f7c1d824a58387dbfac6d695 Author: Marco Gaido AuthorDate: Mon Jul 1 11:54:58 2019 +0800 [SPARK-28201][SQL] Revisit MakeDecimal behavior on overflow ## What changes were proposed in this pull request? In SPARK-23179, it has been introduced a flag to control the behavior in case of overflow on decimals. The behavior is: returning `null` when `spark.sql.decimalOperations.nullOnOverflow` (default and traditional Spark behavior); throwing an `ArithmeticException` if that conf is false (according to SQL standards, other DBs behavior). `MakeDecimal` so far had an ambiguous behavior. In case of codegen mode, it returned `null` as the other operators, but in interpreted mode, it was throwing an `IllegalArgumentException`. The PR aligns `MakeDecimal`'s behavior with the one of other operators as defined in SPARK-23179. So now both modes return `null` or throw `ArithmeticException` according to `spark.sql.decimalOperations.nullOnOverflow`'s value. Credits for this PR to mickjermsurawong-stripe who pointed out the wrong behavior in #20350. ## How was this patch tested? improved UTs Closes #25010 from mgaido91/SPARK-28201. Authored-by: Marco Gaido Signed-off-by: Wenchen Fan --- .../catalyst/expressions/decimalExpressions.scala | 32 ++ .../scala/org/apache/spark/sql/types/Decimal.scala | 9 +++--- .../expressions/DecimalExpressionSuite.scala | 20 -- .../org/apache/spark/sql/types/DecimalSuite.scala | 10 +++ 4 files changed, 54 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala index ad7f7dd..b5b712c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, EmptyBlock, ExprCode} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -46,19 +47,38 @@ case class UnscaledValue(child: Expression) extends UnaryExpression { */ case class MakeDecimal(child: Expression, precision: Int, scale: Int) extends UnaryExpression { + private val nullOnOverflow = SQLConf.get.decimalOperationsNullOnOverflow + override def dataType: DataType = DecimalType(precision, scale) - override def nullable: Boolean = true + override def nullable: Boolean = child.nullable || nullOnOverflow override def toString: String = s"MakeDecimal($child,$precision,$scale)" - protected override def nullSafeEval(input: Any): Any = -Decimal(input.asInstanceOf[Long], precision, scale) + protected override def nullSafeEval(input: Any): Any = { +val longInput = input.asInstanceOf[Long] +val result = new Decimal() +if (nullOnOverflow) { + result.setOrNull(longInput, precision, scale) +} else { + result.set(longInput, precision, scale) +} + } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { nullSafeCodeGen(ctx, ev, eval => { + val setMethod = if (nullOnOverflow) { +"setOrNull" + } else { +"set" + } + val setNull = if (nullable) { +s"${ev.isNull} = ${ev.value} == null;" + } else { +"" + } s""" -${ev.value} = (new Decimal()).setOrNull($eval, $precision, $scale); -${ev.isNull} = ${ev.value} == null; - """ + |${ev.value} = (new Decimal()).$setMethod($eval, $precision, $scale); + |$setNull + |""".stripMargin }) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index b7b7097..1bf322a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -76,7 +76,7 @@ final class Decimal extends Ordered[Decimal] with Serializable { */ def set(unscaled: Long, precision: Int, scale: Int): Decimal = { if (setOrNull(unscaled, precision, scale) == null) { - throw new IllegalArgumentException("Unscaled value too large for precision") + throw new ArithmeticException("Unscaled value
[spark] branch branch-2.4 updated: [SPARK-28170][ML][PYTHON] Uniform Vectors and Matrix documentation
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new d57b392 [SPARK-28170][ML][PYTHON] Uniform Vectors and Matrix documentation d57b392 is described below commit d57b392961167bb8dfd888d79efa947277d62ec9 Author: Marco Gaido AuthorDate: Mon Jul 1 11:40:12 2019 +0900 [SPARK-28170][ML][PYTHON] Uniform Vectors and Matrix documentation ## What changes were proposed in this pull request? The documentation in `linalg.py` is not consistent. This PR uniforms the documentation. ## How was this patch tested? NA Closes #25011 from mgaido91/SPARK-28170. Authored-by: Marco Gaido Signed-off-by: HyukjinKwon (cherry picked from commit 048224ce9a3bdb304ba24852ecc66c7f14c25c11) Signed-off-by: HyukjinKwon --- python/pyspark/ml/linalg/__init__.py | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/linalg/__init__.py b/python/pyspark/ml/linalg/__init__.py index 9da9836..9261d65 100644 --- a/python/pyspark/ml/linalg/__init__.py +++ b/python/pyspark/ml/linalg/__init__.py @@ -386,14 +386,14 @@ class DenseVector(Vector): def toArray(self): """ -Returns an numpy.ndarray +Returns the underlying numpy.ndarray """ return self.array @property def values(self): """ -Returns a list of values +Returns the underlying numpy.ndarray """ return self.array @@ -681,7 +681,7 @@ class SparseVector(Vector): def toArray(self): """ -Returns a copy of this SparseVector as a 1-dimensional NumPy array. +Returns a copy of this SparseVector as a 1-dimensional numpy.ndarray. """ arr = np.zeros((self.size,), dtype=np.float64) arr[self.indices] = self.values @@ -862,7 +862,7 @@ class Matrix(object): def toArray(self): """ -Returns its elements in a NumPy ndarray. +Returns its elements in a numpy.ndarray. """ raise NotImplementedError @@ -937,7 +937,7 @@ class DenseMatrix(Matrix): def toArray(self): """ -Return an numpy.ndarray +Return a numpy.ndarray >>> m = DenseMatrix(2, 2, range(4)) >>> m.toArray() @@ -1121,7 +1121,7 @@ class SparseMatrix(Matrix): def toArray(self): """ -Return an numpy.ndarray +Return a numpy.ndarray """ A = np.zeros((self.numRows, self.numCols), dtype=np.float64, order='F') for k in xrange(self.colPtrs.size - 1): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28170][ML][PYTHON] Uniform Vectors and Matrix documentation
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 048224c [SPARK-28170][ML][PYTHON] Uniform Vectors and Matrix documentation 048224c is described below commit 048224ce9a3bdb304ba24852ecc66c7f14c25c11 Author: Marco Gaido AuthorDate: Mon Jul 1 11:40:12 2019 +0900 [SPARK-28170][ML][PYTHON] Uniform Vectors and Matrix documentation ## What changes were proposed in this pull request? The documentation in `linalg.py` is not consistent. This PR uniforms the documentation. ## How was this patch tested? NA Closes #25011 from mgaido91/SPARK-28170. Authored-by: Marco Gaido Signed-off-by: HyukjinKwon --- python/pyspark/ml/linalg/__init__.py | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/linalg/__init__.py b/python/pyspark/ml/linalg/__init__.py index f99161c..f6ddc09 100644 --- a/python/pyspark/ml/linalg/__init__.py +++ b/python/pyspark/ml/linalg/__init__.py @@ -386,14 +386,14 @@ class DenseVector(Vector): def toArray(self): """ -Returns an numpy.ndarray +Returns the underlying numpy.ndarray """ return self.array @property def values(self): """ -Returns a list of values +Returns the underlying numpy.ndarray """ return self.array @@ -681,7 +681,7 @@ class SparseVector(Vector): def toArray(self): """ -Returns a copy of this SparseVector as a 1-dimensional NumPy array. +Returns a copy of this SparseVector as a 1-dimensional numpy.ndarray. """ arr = np.zeros((self.size,), dtype=np.float64) arr[self.indices] = self.values @@ -862,7 +862,7 @@ class Matrix(object): def toArray(self): """ -Returns its elements in a NumPy ndarray. +Returns its elements in a numpy.ndarray. """ raise NotImplementedError @@ -937,7 +937,7 @@ class DenseMatrix(Matrix): def toArray(self): """ -Return an numpy.ndarray +Return a numpy.ndarray >>> m = DenseMatrix(2, 2, range(4)) >>> m.toArray() @@ -1121,7 +1121,7 @@ class SparseMatrix(Matrix): def toArray(self): """ -Return an numpy.ndarray +Return a numpy.ndarray """ A = np.zeros((self.numRows, self.numCols), dtype=np.float64, order='F') for k in xrange(self.colPtrs.size - 1): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.3 updated: [SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new 4fdbd87 [SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed 4fdbd87 is described below commit 4fdbd87e3b3de804c8a0bf3d2613881f25425eed Author: LantaoJin AuthorDate: Sun Jun 30 15:14:41 2019 -0500 [SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed This is very like #23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin Co-authored-by: lajin Signed-off-by: Sean Owen (cherry picked from commit 0e421000e0ea2c090b6fab0201a6046afceec132) Signed-off-by: Sean Owen --- .../org/apache/spark/network/client/TransportClient.java | 15 ++- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 8f354ad..3b400e2 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -254,11 +254,16 @@ public class TransportClient implements Closeable { sendRpc(message, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { -ByteBuffer copy = ByteBuffer.allocate(response.remaining()); -copy.put(response); -// flip "copy" to make it readable -copy.flip(); -result.set(copy); +try { + ByteBuffer copy = ByteBuffer.allocate(response.remaining()); + copy.put(response); + // flip "copy" to make it readable + copy.flip(); + result.set(copy); +} catch (Throwable t) { + logger.warn("Error in responding PRC callback", t); + result.setException(t); +} } @Override - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 9f9bf13 [SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed 9f9bf13 is described below commit 9f9bf13830763728f223550f1d83debcb23b83fd Author: LantaoJin AuthorDate: Sun Jun 30 15:14:41 2019 -0500 [SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed This is very like #23590 . `ByteBuffer.allocate` may throw `OutOfMemoryError` when the response is large but no enough memory is available. However, when this happens, `TransportClient.sendRpcSync` will just hang forever if the timeout set to unlimited. This PR catches `Throwable` and uses the error to complete `SettableFuture`. I tested in my IDE by setting the value of size to -1 to verify the result. Without this patch, it won't be finished until timeout (May hang forever if timeout set to MAX_INT), or the expected `IllegalArgumentException` will be caught. ```java Override public void onSuccess(ByteBuffer response) { try { int size = response.remaining(); ByteBuffer copy = ByteBuffer.allocate(size); // set size to -1 in runtime when debug copy.put(response); // flip "copy" to make it readable copy.flip(); result.set(copy); } catch (Throwable t) { result.setException(t); } } ``` Closes #24964 from LantaoJin/SPARK-28160. Lead-authored-by: LantaoJin Co-authored-by: lajin Signed-off-by: Sean Owen (cherry picked from commit 0e421000e0ea2c090b6fab0201a6046afceec132) Signed-off-by: Sean Owen --- .../org/apache/spark/network/client/TransportClient.java | 15 ++- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 20d840b..b018197 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -237,11 +237,16 @@ public class TransportClient implements Closeable { sendRpc(message, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { -ByteBuffer copy = ByteBuffer.allocate(response.remaining()); -copy.put(response); -// flip "copy" to make it readable -copy.flip(); -result.set(copy); +try { + ByteBuffer copy = ByteBuffer.allocate(response.remaining()); + copy.put(response); + // flip "copy" to make it readable + copy.flip(); + result.set(copy); +} catch (Throwable t) { + logger.warn("Error in responding PRC callback", t); + result.setException(t); +} } @Override - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (24e1e41 -> 0e42100)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 24e1e41 [SPARK-28196][SQL] Add a new `listTables` and `listLocalTempViews` APIs for SessionCatalog add 0e42100 [SPARK-28160][CORE] Fix a bug that callback function may hang when unchecked exception missed No new revisions were added by this update. Summary of changes: .../org/apache/spark/network/client/TransportClient.java | 15 ++- .../spark/network/shuffle/ExternalShuffleClient.java | 13 ++--- 2 files changed, 20 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org