[GitHub] spark issue #19766: [SPARK-22542][SQL] remove unused features in ColumnarBat...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19766 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19601: [SPARK-22383][SQL] Generate code to directly get value o...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19601 We'd need to change the `UnsafeArrayData` format too, to avoid data copying when building the cache. BTW I think it's ok to release this columnar cache reader without efficient complex type support, so we don't need to rush. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19733: [SPARK-22501][SQL] Fix 64KB JVM bytecode limit pr...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19733#discussion_r151465775 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -236,24 +236,34 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val valueGen = value.genCode(ctx) val listGen = list.map(_.genCode(ctx)) +ctx.addMutableState("boolean", ev.value, "") +ctx.addMutableState("boolean", ev.isNull, "") --- End diff -- so we will create 2 mutable stats anyway? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151466298 --- Diff: python/pyspark/worker.py --- @@ -89,6 +90,26 @@ def verify_result_length(*a): return lambda *a: (verify_result_length(*a), arrow_return_type) +def wrap_pandas_group_map_udf(f, return_type): +def wrapped(*series): +import pandas as pd + +result = f(pd.concat(series, axis=1)) --- End diff -- Oh, I see. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19766: [SPARK-22542][SQL] remove unused features in ColumnarBat...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19766 **[Test build #83939 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83939/testReport)** for PR 19766 at commit [`9cc2def`](https://github.com/apache/spark/commit/9cc2def83550c71a5eefd520c9b93f2b5fd5de6a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19766: [SPARK-22542][SQL] remove unused features in ColumnarBat...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19766 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19766: [SPARK-22542][SQL] remove unused features in ColumnarBat...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19766 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83939/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17436#discussion_r151472401 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -80,6 +80,22 @@ package object config { .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g") + private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled") +.doc("If true, Spark will attempt to use off-heap memory for certain operations. " + + "If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.") +.withAlternative("spark.unsafe.offHeap") --- End diff -- do we still need this as we already did it in https://github.com/apache/spark/pull/17436/files#diff-529fc5c06b9731c1fbda6f3db60b16aaR658 ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17436#discussion_r151472648 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -140,6 +140,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val COLUMNVECTOR_OFFHEAP_ENABLED = --- End diff -- nit `COLUMN_VECTOR` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17436#discussion_r151472844 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java --- @@ -103,7 +103,11 @@ /** * The default config on whether columnarBatch should be offheap. */ - private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP; + private final MemoryMode DEFAULT_MEMORY_MODE; --- End diff -- do we still need to call it DEFAULT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17436#discussion_r151473122 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala --- @@ -75,9 +77,14 @@ class VectorizedHashMapGenerator( } }.mkString("\n").concat(";") +val columnVector = if (!conf.offHeapColumnVectorEnabled) { + "OnHeapColumnVector" +} else { + "OffHeapColumnVector" +} s""" - | private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] batchVectors; - | private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] bufferVectors; --- End diff -- Let's not bother about here too, this class is only used in benchmark AFAIK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17436#discussion_r151473518 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -364,8 +372,10 @@ class ParquetFileFormat if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } + val taskContext = Option(TaskContext.get()) val parquetReader = if (enableVectorizedReader) { -val vectorizedReader = new VectorizedParquetRecordReader() +val vectorizedReader = + new VectorizedParquetRecordReader(enableOffHeapColumnVector) --- End diff -- only enable it when taskContext exist? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17436#discussion_r151473749 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala --- @@ -75,6 +75,7 @@ object ParquetReadBenchmark { withTempPath { dir => withTempTable("t1", "tempTable") { +val enableOffHeapColumnVector = spark.sqlContext.conf.offHeapColumnVectorEnabled --- End diff -- `spark.sessionState.conf.xxx`, `sqlContext` is deprecated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151474342 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- It needs to be set `true` for the tests. This can be done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19729: [SPARK-22499][SQL] Fix 64KB JVM bytecode limit problem w...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19729 thanks, merging to master/2.2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19733: [SPARK-22501][SQL] Fix 64KB JVM bytecode limit pr...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19733#discussion_r151475305 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala --- @@ -236,24 +236,34 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val valueGen = value.genCode(ctx) val listGen = list.map(_.genCode(ctx)) +ctx.addMutableState("boolean", ev.value, "") +ctx.addMutableState("boolean", ev.isNull, "") --- End diff -- Yes if we call `ctx.splitExpressions`. One is to keep a value, the other is to keep nullability. Is it better to avoid to create 2 mutable if we will not call `ctx.splitExpressions` (i.e. `!(ctx.INPUT_ROW != null && ctx.currentVars == null)))? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19729: [SPARK-22499][SQL] Fix 64KB JVM bytecode limit pr...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19729 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151475971 --- Diff: python/pyspark/sql/udf.py --- @@ -0,0 +1,155 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +User-defined function related classes and functions +""" +import functools + +from pyspark import SparkContext +from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType +from pyspark.sql.column import Column, _to_java_column, _to_seq +from pyspark.sql.types import StringType, DataType, StructType, _parse_datatype_string + + +def _wrap_function(sc, func, returnType): +command = (func, returnType) +pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command) +return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec, + sc.pythonVer, broadcast_vars, sc._javaAccumulator) + + +def _create_udf(f, returnType, evalType): +if evalType == PythonEvalType.PANDAS_SCALAR_UDF: +import inspect +argspec = inspect.getargspec(f) +if len(argspec.args) == 0 and argspec.varargs is None: +raise ValueError( +"Invalid function: 0-arg pandas_udfs are not supported. " +"Instead, create a 1-arg pandas_udf and ignore the arg in your function." +) + +elif evalType == PythonEvalType.PANDAS_GROUP_MAP_UDF: +import inspect +argspec = inspect.getargspec(f) +if len(argspec.args) != 1: +raise ValueError( +"Invalid function: pandas_udf with function type GROUP_MAP " +"must take a single arg that is a pandas DataFrame." +) + +udf_obj = UserDefinedFunction(f, returnType=returnType, name=None, evalType=evalType) --- End diff -- I added the comment, but keep the `name=None` because I think this is more explicit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151476071 --- Diff: python/pyspark/sql/tests.py --- @@ -3166,6 +3166,88 @@ def test_filtered_frame(self): self.assertTrue(pdf.empty) +class PandasUDFTests(ReusedSQLTestCase): +def test_pandas_udf_basic(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +udf = pandas_udf(lambda x: x, DoubleType()) +self.assertEquals(udf.returnType, DoubleType()) +self.assertEquals(udf.evalType, PythonEvalType.PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR) +self.assertEquals(udf.returnType, DoubleType()) +self.assertEquals(udf.evalType, PythonEvalType.PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP) +self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF) + +udf = pandas_udf(lambda x: x, 'v double', + functionType=PandasUDFType.GROUP_MAP) +self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF) + +udf = pandas_udf(lambda x: x, returnType='v double', + functionType=PandasUDFType.GROUP_MAP) +self.assertEquals(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEquals(udf.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF) + +def test_pandas_udf_decorator(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql.types import StructType, StructField, DoubleType + +@pandas_udf(DoubleType()) +def foo(x): +return x +self.assertEquals(foo.returnType, DoubleType()) +self.assertEquals(foo.evalType, PythonEvalType.PANDAS_SCALAR_UDF) + +@pandas_udf(returnType=DoubleType()) +def foo(x): +return x +self.assertEquals(foo.returnType, DoubleType()) +self.assertEquals(foo.evalType, PythonEvalType.PANDAS_SCALAR_UDF) + +schema = StructType([StructField("v", DoubleType())]) + +@pandas_udf(schema, PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEquals(foo.returnType, schema) +self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF) + +@pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEquals(foo.returnType, schema) +self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF) + +@pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEquals(foo.returnType, schema) +self.assertEquals(foo.evalType, PythonEvalType.PANDAS_GROUP_MAP_UDF) + +def test_udf_wrong_arg(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(ValueError, 'return type'): +@pandas_udf(PandasUDFType.GROUP_MAP) --- End diff -- Not insane at all :) Added the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151476197 --- Diff: python/pyspark/rdd.py --- @@ -56,6 +56,20 @@ __all__ = ["RDD"] +class PythonEvalType(object): +""" +Evaluation type of python rdd. + +These values are internal to PySpark. --- End diff -- Added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19730: [SPARK-22500][SQL] Fix 64KB JVM bytecode limit pr...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19730#discussion_r151476367 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -1039,13 +1039,19 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String } } """ -}.mkString("\n") +} +val fieldsEvalCodes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { + ctx.splitExpressions(fieldsEvalCode, "castStruct", +("InternalRow", ctx.INPUT_ROW) :: (rowClass, result) :: ("InternalRow", tmpRow) :: Nil) --- End diff -- what about inner struct? I think we should use `evPrim` here instead of `ctx.INPUT_ROW`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19630 **[Test build #83942 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83942/testReport)** for PR 19630 at commit [`1f2c47b`](https://github.com/apache/spark/commit/1f2c47b569bcfa3f7ca7f974fee7cbdc21969623). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151476372 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala --- @@ -34,9 +34,11 @@ import org.apache.spark.util._ */ private[spark] object PythonEvalType { val NON_UDF = 0 - val SQL_BATCHED_UDF = 1 - val SQL_PANDAS_UDF = 2 - val SQL_PANDAS_GROUPED_UDF = 3 + + val SQL_BATCHED_UDF = 100 + + val PANDAS_SCALAR_UDF = 200 + val PANDAS_GROUP_MAP_UDF = 201 --- End diff -- Added SQL prefix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151476420 --- Diff: python/pyspark/sql/functions.py --- @@ -2208,26 +2089,39 @@ def udf(f=None, returnType=StringType()): | 8| JOHN DOE| 22| +--+--++ """ -return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.NORMAL_UDF) +# decorator @udf, @udf(), @udf(dataType()) +if f is None or isinstance(f, (str, DataType)): +# If DataType has been passed as a positional argument +# for decorator use it as a returnType +return_type = f or returnType +return functools.partial(_create_udf, returnType=return_type, + evalType=PythonEvalType.SQL_BATCHED_UDF) +else: +return _create_udf(f=f, returnType=returnType, + evalType=PythonEvalType.SQL_BATCHED_UDF) @since(2.3) -def pandas_udf(f=None, returnType=StringType()): +def pandas_udf(f=None, returnType=None, functionType=None): """ Creates a vectorized user defined function (UDF). :param f: user-defined function. A python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object +:param functionType: an enum value in :class:`pyspark.sql.functions.PandasUdfType`. + Default: SCALAR. -The user-defined function can define one of the following transformations: +The function type of the UDF can be one of the following: -1. One or more `pandas.Series` -> A `pandas.Series` +1. SCALAR - This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and - :meth:`pyspark.sql.DataFrame.select`. + A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`. The returnType should be a primitive data type, e.g., `DoubleType()`. The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. + Scalar UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and + :meth:`pyspark.sql.DataFrame.select`. + >>> from pyspark.sql.types import IntegerType, StringType >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) >>> @pandas_udf(returnType=StringType()) --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user DaimonPl commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151476679 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- Just to be clear. I mean to make it default true for all tests in spark. Not only those explicitly related to this feature :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19730: [SPARK-22500][SQL] Fix 64KB JVM bytecode limit pr...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19730#discussion_r151477316 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -1039,13 +1039,19 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String } } """ -}.mkString("\n") +} +val fieldsEvalCodes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { + ctx.splitExpressions(fieldsEvalCode, "castStruct", +("InternalRow", ctx.INPUT_ROW) :: (rowClass, result) :: ("InternalRow", tmpRow) :: Nil) --- End diff -- how about inner struct? We also need to pass in the `ctx.INPUT_ROW`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151477529 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- Ah. Sounds reasonable. Do you know how to do that? Is there a precedent I can follow? I'm not aware of one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19728: [SPARK-22498][SQL] Fix 64KB JVM bytecode limit pr...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19728#discussion_r151478304 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -63,15 +63,32 @@ case class Concat(children: Seq[Expression]) extends Expression with ImplicitCas override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val evals = children.map(_.genCode(ctx)) -val inputs = evals.map { eval => - s"${eval.isNull} ? null : ${eval.value}" -}.mkString(", ") -ev.copy(evals.map(_.code).mkString("\n") + s""" - boolean ${ev.isNull} = false; - UTF8String ${ev.value} = UTF8String.concat($inputs); - if (${ev.value} == null) { -${ev.isNull} = true; +val argNums = evals.length --- End diff -- nit: `numArgs`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19728: [SPARK-22498][SQL] Fix 64KB JVM bytecode limit pr...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19728#discussion_r151478604 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -63,15 +63,32 @@ case class Concat(children: Seq[Expression]) extends Expression with ImplicitCas override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val evals = children.map(_.genCode(ctx)) -val inputs = evals.map { eval => - s"${eval.isNull} ? null : ${eval.value}" -}.mkString(", ") -ev.copy(evals.map(_.code).mkString("\n") + s""" - boolean ${ev.isNull} = false; - UTF8String ${ev.value} = UTF8String.concat($inputs); - if (${ev.value} == null) { -${ev.isNull} = true; +val argNums = evals.length +val args = ctx.freshName("args") + +val inputs = evals.zipWithIndex.map { case (eval, index) => + if (eval.isNull != "true") { --- End diff -- please do not mix in optimizations with bug fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/17436#discussion_r151479595 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -80,6 +80,22 @@ package object config { .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g") + private[spark] val MEMORY_OFFHEAP_ENABLED = ConfigBuilder("spark.memory.offHeap.enabled") +.doc("If true, Spark will attempt to use off-heap memory for certain operations. " + + "If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.") +.withAlternative("spark.unsafe.offHeap") --- End diff -- Yes, it is necessary. Without this, we see [this failure](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83933/testReport/) `org.apache.spark.memory.TaskMemoryManagerSuite.offHeapConfigurationBackwardsCompatibility` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19720: [SPARK-22494][SQL] Fix 64KB limit exception with ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19720#discussion_r151479745 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala --- @@ -72,14 +72,10 @@ case class Coalesce(children: Seq[Expression]) extends Expression { } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -val first = children(0) -val rest = children.drop(1) -val firstEval = first.genCode(ctx) -ev.copy(code = s""" - ${firstEval.code} - boolean ${ev.isNull} = ${firstEval.isNull}; - ${ctx.javaType(dataType)} ${ev.value} = ${firstEval.value};""" + - rest.map { e => +ctx.addMutableState("boolean", ev.isNull, "") --- End diff -- This is guaranteed by `Expression.genCode`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19720: [SPARK-22494][SQL] Fix 64KB limit exception with Coalesc...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19720 thanks, merging to master/2.2! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19720: [SPARK-22494][SQL] Fix 64KB limit exception with ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19720 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19720: [SPARK-22494][SQL] Fix 64KB limit exception with Coalesc...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19720 @cloud-fan please do not backport this to 2.2. In 2.2 we don't have SPARK-18016 and this is adding new variables in the case of coalesce. Thus it can generate an higher pressure on the constant pool and this may even cause a regression IMHO. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19733: [SPARK-22501][SQL] Fix 64KB JVM bytecode limit problem w...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19733 thanks , merging to master/2.2! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19728: [SPARK-22498][SQL] Fix 64KB JVM bytecode limit pr...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19728#discussion_r151482932 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -63,15 +63,32 @@ case class Concat(children: Seq[Expression]) extends Expression with ImplicitCas override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val evals = children.map(_.genCode(ctx)) -val inputs = evals.map { eval => - s"${eval.isNull} ? null : ${eval.value}" -}.mkString(", ") -ev.copy(evals.map(_.code).mkString("\n") + s""" - boolean ${ev.isNull} = false; - UTF8String ${ev.value} = UTF8String.concat($inputs); - if (${ev.value} == null) { -${ev.isNull} = true; +val argNums = evals.length +val args = ctx.freshName("args") + +val inputs = evals.zipWithIndex.map { case (eval, index) => + if (eval.isNull != "true") { --- End diff -- I see. I will remove this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19720: [SPARK-22494][SQL] Fix 64KB limit exception with Coalesc...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19720 hmm, isn't running slower better than can't run? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19733: [SPARK-22501][SQL] Fix 64KB JVM bytecode limit pr...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19733 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19765: [SPARK-22540][SQL] Fix incorrect avgSize in HighlyCompre...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19765 **[Test build #83940 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83940/testReport)** for PR 19765 at commit [`23282da`](https://github.com/apache/spark/commit/23282da591f019dbb37ff6cfff6c229ea42bfa8c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #9428: [SPARK-8582][Core]Optimize checkpointing to avoid computi...
Github user ferdonline commented on the issue: https://github.com/apache/spark/pull/9428 Hello. I find this feature to be really important and I would be happy to contribute here. Even though we would potentially not support every use case, it would already be great if in the majority of cases we could avoid the double computation, while in other cases we raise a warning saying that computation is gonna happen twice. This is specially important for a use case I have where a transformation creates random numbers, so I simply cant recompute things as results will be different. So in my case the only option to break lineage seems to be a full write() followed by read(). Any plans to have it in eager checkpoints at least? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19765: [SPARK-22540][SQL] Fix incorrect avgSize in HighlyCompre...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19765 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19765: [SPARK-22540][SQL] Fix incorrect avgSize in HighlyCompre...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19765 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83940/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user DaimonPl commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151485646 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- Nope :( maybe @viirya can give input about it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19767: [WIP][SPARK-22543][SQL] fix java 64kb compile err...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/19767 [WIP][SPARK-22543][SQL] fix java 64kb compile error for deeply nested expressions ## What changes were proposed in this pull request? A frequently reported issue of Spark is the Java 64kb compile error. This is because Spark generates a very big method and it's usually caused by 2 reasons: 1. a deep expression tree, e.g. a very complex filter condition 2. many individual expressions, e.g. expressions can have many children, operators can have many expressions. This PR focuses on 1. There are already several patches(#15620 #18972 #18641) trying to fix this issue and some of them are already merged. However this is an endless job as every non-leaf expression has this issue. This PR proposes to fix this issue in `Expression.genCode`, to make sure the code for a single expression won't grow too big. TODO: check TPCDS to make sure no performance regression. ## How was this patch tested? existing test You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark codegen Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19767.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19767 commit aaa5b6f452034660880d8175f450ba8c8cfad23b Author: Wenchen Fan Date: 2017-11-16T16:55:43Z fix java 64kb compile error for deeply nested expressions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19767: [WIP][SPARK-22543][SQL] fix java 64kb compile error for ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19767 cc @kiszk @rednaxelafx @maropu @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19767: [WIP][SPARK-22543][SQL] fix java 64kb compile error for ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19767 **[Test build #83943 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83943/testReport)** for PR 19767 at commit [`aaa5b6f`](https://github.com/apache/spark/commit/aaa5b6f452034660880d8175f450ba8c8cfad23b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19728: [SPARK-22498][SQL] Fix 64KB JVM bytecode limit problem w...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19728 **[Test build #83944 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83944/testReport)** for PR 19728 at commit [`75e`](https://github.com/apache/spark/commit/75e2524920b09283ed28ba831e747e4eaf19). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16976: [SPARK-19610][SQL] Support parsing multiline CSV files
Github user vishnusram commented on the issue: https://github.com/apache/spark/pull/16976 The wholeFile option doesn't seem to be working. **Test file content:** "num_col1","txt_col","num_col2" 10001,"regular string",20001 10002,"string with newline",20002 **Command and result:** >>> dfu = sqlContext.read.format('com.databricks.spark.csv').option("header","true").option("inferschema","true").option("delimiter",",").option("quote",'"').option("parserLib","univocity").option("wholeFile","true").load('new_line.csv') >>> dfu.show(3,False) ++--++ |num_col1|txt_col |num_col2| ++--++ |10001 |regular string|20001 | |10002 |string with |null| |newline"|20002 |null| ++--++ **Expected result:** ++--++ |num_col1|txt_col |num_col2| ++--++ |10001 |regular string|20001 | |10002 |string with\nnewline||20002 | ++--++ **Spark version used 2.2** 17/11/16 17:15:37 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.2.0 /_/ Using Python version 2.7.5 (default, May 3 2017 07:55:04) SparkSession available as 'spark'. Please let me know if I am missing something here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151489959 --- Diff: python/pyspark/sql/tests.py --- @@ -3166,6 +3166,92 @@ def test_filtered_frame(self): self.assertTrue(pdf.empty) +class PandasUDFTests(ReusedSQLTestCase): +def test_pandas_udf_basic(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +udf = pandas_udf(lambda x: x, DoubleType()) +self.assertEqual(udf.returnType, DoubleType()) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR) +self.assertEqual(udf.returnType, DoubleType()) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +udf = pandas_udf(lambda x: x, 'v double', + functionType=PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +udf = pandas_udf(lambda x: x, returnType='v double', + functionType=PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +def test_pandas_udf_decorator(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql.types import StructType, StructField, DoubleType + +@pandas_udf(DoubleType()) +def foo(x): +return x +self.assertEqual(foo.returnType, DoubleType()) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +@pandas_udf(returnType=DoubleType()) +def foo(x): +return x +self.assertEqual(foo.returnType, DoubleType()) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +schema = StructType([StructField("v", DoubleType())]) + +@pandas_udf(schema, PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +@pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +@pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +def test_udf_wrong_arg(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(ValueError, 'return type'): +@pandas_udf(PandasUDFType.GROUP_MAP) +def foo(df): +return df +with self.assertRaisesRegexp(TypeError, 'Invalid returnType'): +@pandas_udf(returnType=PandasUDFType.GROUP_MAP) +def foo(df): +return df +with self.assertRaisesRegexp(ValueError, 'Invalid returnType'): +@pandas_udf(returnType='double', functionType=PandasUDFType.GROUP_MAP) --- End diff -- it's a little weird that we accept `DoubleType` but not `double`. If it's an existing issue, we can address it later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19631#discussion_r151490038 --- Diff: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala --- @@ -216,7 +216,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { if (driverConf.contains("spark.yarn.credentials.file")) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) -SparkHadoopUtil.get.startCredentialUpdater(driverConf) + Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") --- End diff -- I plan to do it in a separate PR (also to make YARN and Mesos share more code in that area). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19630 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19720: [SPARK-22494][SQL] Fix 64KB limit exception with Coalesc...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19720 It's not about running slower. This PR solves the problem which makes the user facing an exception if there are a lot of arguments in `coalesce` (or `AtLestNNonNulls`), but what I am doing in the `coalesce` function here is that I am adding to variables for each coalesce function. If there is a query with a lot of coalesce function (instead of a coalesce with a lot of parameters), this might result in having much more variables than before. This can cause the problem and the exception described in SPARK-18016. Thus a query that was previously running can fail. The same thing is true for all the other PRs similar to this one submitted by @kiszk. Then, we should keep all these changes only on master, where part of SPARK-18016 is landing and hopefully soon it will be completely solved. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19601: [SPARK-22383][SQL] Generate code to directly get value o...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19601 I see. Let us revisit this design later. I would appreciate it if you would review this columnar cache reader with simple primitive-type (non-nested) array. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19760: [SPARK-22533][core] Handle deprecated names in Co...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19760#discussion_r151492713 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -663,8 +663,10 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.yarn.jar", "2.0")), "spark.yarn.access.hadoopFileSystems" -> Seq( AlternateConfig("spark.yarn.access.namenodes", "2.2")), -"spark.maxRemoteBlockSizeFetchToMem" -> Seq( - AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")) +MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq( + AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")), +LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq( + AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")) --- End diff -- not really, it works with any configs you add to this list. if you look at it, there are some yarn conf there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151493167 --- Diff: python/pyspark/sql/tests.py --- @@ -3166,6 +3166,92 @@ def test_filtered_frame(self): self.assertTrue(pdf.empty) +class PandasUDFTests(ReusedSQLTestCase): +def test_pandas_udf_basic(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +udf = pandas_udf(lambda x: x, DoubleType()) +self.assertEqual(udf.returnType, DoubleType()) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR) +self.assertEqual(udf.returnType, DoubleType()) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +udf = pandas_udf(lambda x: x, 'v double', + functionType=PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +udf = pandas_udf(lambda x: x, returnType='v double', + functionType=PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +def test_pandas_udf_decorator(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql.types import StructType, StructField, DoubleType + +@pandas_udf(DoubleType()) +def foo(x): +return x +self.assertEqual(foo.returnType, DoubleType()) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +@pandas_udf(returnType=DoubleType()) +def foo(x): +return x +self.assertEqual(foo.returnType, DoubleType()) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +schema = StructType([StructField("v", DoubleType())]) + +@pandas_udf(schema, PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +@pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +@pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +def test_udf_wrong_arg(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(ValueError, 'return type'): +@pandas_udf(PandasUDFType.GROUP_MAP) +def foo(df): +return df +with self.assertRaisesRegexp(TypeError, 'Invalid returnType'): +@pandas_udf(returnType=PandasUDFType.GROUP_MAP) +def foo(df): +return df +with self.assertRaisesRegexp(ValueError, 'Invalid returnType'): +@pandas_udf(returnType='double', functionType=PandasUDFType.GROUP_MAP) --- End diff -- This is not an issue of DoubleType vs "double", but rather GROUP_MAP needs to take a returnType that is StructType or a string that can be parsed to a StructType --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC di...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19741#discussion_r151497034 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -268,8 +268,13 @@ private[spark] abstract class YarnSchedulerBackend( logWarning(reason.toString) driverEndpoint.ask[Boolean](r).onFailure { --- End diff -- I see there are multiple places this message is being used, but all of them are just logging for the failure. I am thinking this logging may be useful to diagnose the failures in some cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #9428: [SPARK-8582][Core]Optimize checkpointing to avoid computi...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/9428 > I simply cant recompute things as results will be different. A task may run multiple times due to failure. Why is this not a problem for you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19768: [SPARK-22535][PYSPARK] Sleep before killing the p...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/19768 [SPARK-22535][PYSPARK] Sleep before killing the python worker in PythRunner.MonitorThread (branch-2.2) ## What changes were proposed in this pull request? Backport #19762 to 2.2 ## How was this patch tested? Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark SPARK-22535-2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19768.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19768 commit 087d2273b58d07fbc30948001d69225e4a7c7aeb Author: Shixiong Zhu Date: 2017-11-16T18:59:42Z [SPARK-22535][PYSPARK] Sleep before killing the python worker in PythonRunner.MonitorThread `PythonRunner.MonitorThread` should give the task a little time to finish before forcibly killing the python worker. This will reduce the chance of the race condition a lot. I also improved the log a bit\ to find out the task to blame when it's stuck. Jenkins Author: Shixiong Zhu Closes #19762 from zsxwing/SPARK-22535. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19768: [SPARK-22535][PYSPARK] Sleep before killing the python w...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19768 cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19768: [SPARK-22535][PYSPARK] Sleep before killing the python w...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19768 **[Test build #83945 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83945/testReport)** for PR 19768 at commit [`087d227`](https://github.com/apache/spark/commit/087d2273b58d07fbc30948001d69225e4a7c7aeb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC di...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19741#discussion_r151509249 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -268,8 +268,13 @@ private[spark] abstract class YarnSchedulerBackend( logWarning(reason.toString) driverEndpoint.ask[Boolean](r).onFailure { --- End diff -- Can you point out to a couple of those? I only see this `RemoveExecutor` being handled in `CoarseGrainedSchedulerBackend.scala`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19730: [SPARK-22500][SQL] Fix 64KB JVM bytecode limit pr...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19730#discussion_r151509423 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -1039,13 +1039,19 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String } } """ -}.mkString("\n") +} +val fieldsEvalCodes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { + ctx.splitExpressions(fieldsEvalCode, "castStruct", +("InternalRow", ctx.INPUT_ROW) :: (rowClass, result) :: ("InternalRow", tmpRow) :: Nil) --- End diff -- Inner struct case in the following code works well. ``` val struct = Literal.create( Row( UTF8String.fromString("123.4"), Seq("456", "true", "78.9"), Row(7)), StructType(Seq( StructField("i", StringType, nullable = true), StructField("a", ArrayType(StringType, containsNull = false), nullable = true), StructField("s", StructType(Seq( StructField("i", IntegerType, nullable = true))) val ret = cast(struct, StructType(Seq( StructField("d", DoubleType, nullable = true), StructField("a", ArrayType(IntegerType, containsNull = true), nullable = true), StructField("s", StructType(Seq( StructField("l", LongType, nullable = true))) assert(ret.resolved === true) checkEvaluation(ret, Row(123.4, Seq(456, null, 78), Row(7L))) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19751: [SPARK-20653][core] Add cleaning of old elements ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19751#discussion_r151510043 --- Diff: core/src/main/scala/org/apache/spark/status/ElementTrackingStore.scala --- @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.{HashMap, ListBuffer} + +import org.apache.spark.SparkConf +import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.kvstore._ + +/** + * A KVStore wrapper that allows tracking the number of elements of specific types, and triggering + * actions once they reach a threshold. This allows writers, for example, to control how much data + * is stored by potentially deleting old data as new data is added. + * + * This store is used when populating data either from a live UI or an event log. On top of firing + * triggers when elements reach a certain threshold, it provides two extra bits of functionality: + * + * - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can + * be configured to run on the calling thread when more determinism is desired (e.g. unit tests). + * - a generic flush mechanism so that listeners can be notified about when they should flush + * internal state to the store (e.g. after the SHS finishes parsing an event log). + * + * The configured triggers are run on the same thread that triggered the write, after the write + * has completed. + */ +private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore { + + import config._ + + private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]() + private val flushTriggers = new ListBuffer[() => Unit]() + private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) { + Some(ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")) + } else { +None + } + + @volatile private var stopped = false + + /** + * Register a trigger that will be fired once the number of elements of a given type reaches + * the given threshold. + * + * Triggers are fired in a separate thread, so that they can do more expensive operations + * than would be allowed on the main threads populating the store. + * + * @param klass The type to monitor. + * @param threshold The number of elements that should trigger the action. + * @param action Action to run when the threshold is reached; takes as a parameter the number + * of elements of the registered type currently known to be in the store. + */ + def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = { +val existing = triggers.getOrElse(klass, Seq()) +triggers(klass) = existing :+ Trigger(threshold, action) + } + + /** + * Adds a trigger to be executed before the store is flushed. This normally happens before + * closing, and is useful for flushing intermediate state to the store, e.g. when replaying + * in-progress applications through the SHS. + * + * Flush triggers are called synchronously in the same thread that is closing the store. + */ + def onFlush(action: => Unit): Unit = { +flushTriggers += { () => action } + } + + /** + * Enqueues an action to be executed asynchronously. + */ + def doAsync(fn: => Unit): Unit = { +executor match { + case Some(exec) => +exec.submit(new Runnable() { + override def run(): Unit = Utils.tryLog { fn } +}) + + case _ => +fn +} + } + + override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey) + + override def write(value: Any): Unit = store.write(value) + + /** Write an element to the store, optionally c
[GitHub] spark issue #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "spark.m...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17436 **[Test build #83941 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83941/testReport)** for PR 17436 at commit [`55b56bf`](https://github.com/apache/spark/commit/55b56bf8dc08b2913efeb07e39ae7a1393b77b32). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "spark.m...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17436 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17436: [SPARK-20101][SQL] Use OffHeapColumnVector when "spark.m...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17436 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83941/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19631: [SPARK-22372][core, yarn] Make cluster submission use Sp...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19631 **[Test build #83946 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83946/testReport)** for PR 19631 at commit [`2129ccb`](https://github.com/apache/spark/commit/2129ccb13759fbc7784ba31e7c0e6aa08904cb42). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r151520336 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala --- @@ -349,13 +349,22 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerMem = getResource(offer.getResourcesList, "mem") val offerCpus = getResource(offer.getResourcesList, "cpus") val offerPorts = getRangeResource(offer.getResourcesList, "ports") + val offerAllocationInfo = offer.getAllocationInfo + val offerReservationInfo = offer +.getResourcesList +.asScala +.find(resource => Option(resource.getReservation).isDefined) --- End diff -- `.find { r => r.getReservation != null }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r151521265 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -427,10 +444,10 @@ trait MesosSchedulerUtils extends Logging { // partition port offers val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources) - val portsAndRoles = requestedPorts. -map(x => (x, findPortAndGetAssignedRangeRole(x, portResources))) + val portsAndResourceInfo = requestedPorts. +map(x => (x, findPortAndGetAssignedResourceInfo(x, portResources))) --- End diff -- Not caused by your change, but Spark's style is to use `.map { foo => ... }`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r151521542 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -451,15 +468,22 @@ trait MesosSchedulerUtils extends Logging { } /** Creates a mesos resource for a specific port number. */ - private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = { -portsAndRoles.flatMap{ case (port, role) => - createMesosPortResource(List((port, port)), Some(role))} + private def createResourcesFromPorts( + portsAndResourcesInfo: List[(Long, (String, AllocationInfo, Option[ReservationInfo]))]) +: List[Resource] = { --- End diff -- indent one extra level --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r151521101 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -228,24 +254,15 @@ trait MesosSchedulerUtils extends Logging { (attr.getName, attr.getText.getValue.split(',').toSet) } - - /** Build a Mesos resource protobuf object */ - protected def createResource(resourceName: String, quantity: Double): Protos.Resource = { -Resource.newBuilder() - .setName(resourceName) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) - .build() - } - /** * Converts the attributes from the resource offer into a Map of name to Attribute Value * The attribute values are the mesos attribute types and they are * * @param offerAttributes the attributes offered * @return */ - protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { + protected def toAttributeMap(offerAttributes: JList[Attribute]) +: Map[String, GeneratedMessageV3] = { --- End diff -- nit: indent one more level --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r151521406 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -451,15 +468,22 @@ trait MesosSchedulerUtils extends Logging { } /** Creates a mesos resource for a specific port number. */ - private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = { -portsAndRoles.flatMap{ case (port, role) => - createMesosPortResource(List((port, port)), Some(role))} + private def createResourcesFromPorts( + portsAndResourcesInfo: List[(Long, (String, AllocationInfo, Option[ReservationInfo]))]) +: List[Resource] = { +portsAndResourcesInfo.flatMap{ case (port, rInfo) => --- End diff -- space before `{` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r151520818 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -175,17 +176,39 @@ trait MesosSchedulerUtils extends Logging { registerLatch.countDown() } - def createResource(name: String, amount: Double, role: Option[String] = None): Resource = { + private def setAllocationAndReservationInfo( + allocationInfo: Option[AllocationInfo], + reservationInfo: Option[ReservationInfo], + role: Option[String], + builder: Resource.Builder): Unit = { +if (role.forall(r => !r.equals(ANY_ROLE))) { --- End diff -- `.forall(_ != ANY_ROLE)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19390: [SPARK-18935][MESOS] Fix dynamic reservations on ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19390#discussion_r151521750 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala --- @@ -451,15 +468,22 @@ trait MesosSchedulerUtils extends Logging { } /** Creates a mesos resource for a specific port number. */ - private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = { -portsAndRoles.flatMap{ case (port, role) => - createMesosPortResource(List((port, port)), Some(role))} + private def createResourcesFromPorts( + portsAndResourcesInfo: List[(Long, (String, AllocationInfo, Option[ReservationInfo]))]) --- End diff -- `(String, AllocationInfo, Option[ReservationInfo])` is kind of an awkward type. A case class to wrap that info might help here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151523879 --- Diff: python/pyspark/sql/tests.py --- @@ -3166,6 +3166,92 @@ def test_filtered_frame(self): self.assertTrue(pdf.empty) +class PandasUDFTests(ReusedSQLTestCase): +def test_pandas_udf_basic(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +udf = pandas_udf(lambda x: x, DoubleType()) +self.assertEqual(udf.returnType, DoubleType()) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR) +self.assertEqual(udf.returnType, DoubleType()) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +udf = pandas_udf(lambda x: x, 'v double', + functionType=PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +udf = pandas_udf(lambda x: x, returnType='v double', + functionType=PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +def test_pandas_udf_decorator(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql.types import StructType, StructField, DoubleType + +@pandas_udf(DoubleType()) +def foo(x): +return x +self.assertEqual(foo.returnType, DoubleType()) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +@pandas_udf(returnType=DoubleType()) +def foo(x): +return x +self.assertEqual(foo.returnType, DoubleType()) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +schema = StructType([StructField("v", DoubleType())]) + +@pandas_udf(schema, PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +@pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +@pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +def test_udf_wrong_arg(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +with QuietTest(self.sc): +with self.assertRaisesRegexp(ValueError, 'return type'): +@pandas_udf(PandasUDFType.GROUP_MAP) +def foo(df): +return df +with self.assertRaisesRegexp(TypeError, 'Invalid returnType'): +@pandas_udf(returnType=PandasUDFType.GROUP_MAP) +def foo(df): +return df +with self.assertRaisesRegexp(ValueError, 'Invalid returnType'): +@pandas_udf(returnType='double', functionType=PandasUDFType.GROUP_MAP) --- End diff -- oh i see --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151524279 --- Diff: python/pyspark/sql/tests.py --- @@ -3166,6 +3166,92 @@ def test_filtered_frame(self): self.assertTrue(pdf.empty) +class PandasUDFTests(ReusedSQLTestCase): +def test_pandas_udf_basic(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +udf = pandas_udf(lambda x: x, DoubleType()) +self.assertEqual(udf.returnType, DoubleType()) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR) +self.assertEqual(udf.returnType, DoubleType()) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +udf = pandas_udf(lambda x: x, 'v double', + functionType=PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +udf = pandas_udf(lambda x: x, returnType='v double', + functionType=PandasUDFType.GROUP_MAP) +self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())])) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +def test_pandas_udf_decorator(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType +from pyspark.sql.types import StructType, StructField, DoubleType + +@pandas_udf(DoubleType()) +def foo(x): +return x +self.assertEqual(foo.returnType, DoubleType()) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +@pandas_udf(returnType=DoubleType()) +def foo(x): +return x +self.assertEqual(foo.returnType, DoubleType()) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +schema = StructType([StructField("v", DoubleType())]) + +@pandas_udf(schema, PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +@pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +@pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP) +def foo(x): +return x +self.assertEqual(foo.returnType, schema) +self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF) + +def test_udf_wrong_arg(self): --- End diff -- can we also add some test for `SCALAR_MAP` for the error case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19630: [SPARK-22409] Introduce function type argument in...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19630#discussion_r151524338 --- Diff: python/pyspark/sql/tests.py --- @@ -3166,6 +3166,92 @@ def test_filtered_frame(self): self.assertTrue(pdf.empty) +class PandasUDFTests(ReusedSQLTestCase): +def test_pandas_udf_basic(self): +from pyspark.rdd import PythonEvalType +from pyspark.sql.functions import pandas_udf, PandasUDFType + +udf = pandas_udf(lambda x: x, DoubleType()) +self.assertEqual(udf.returnType, DoubleType()) +self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF) + +udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR) --- End diff -- can we try `double`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19760: [SPARK-22533][core] Handle deprecated names in Co...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19760#discussion_r151524853 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -663,8 +663,10 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.yarn.jar", "2.0")), "spark.yarn.access.hadoopFileSystems" -> Seq( AlternateConfig("spark.yarn.access.namenodes", "2.2")), -"spark.maxRemoteBlockSizeFetchToMem" -> Seq( - AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")) +MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq( + AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")), +LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq( + AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")) --- End diff -- but you need to hardcode the config key here, if the config entry is defined in SQL module. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19760: [SPARK-22533][core] Handle deprecated names in ConfigEnt...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19760 I don't exactly follow the purpose here, the name `withAlternatives` says it's an alternative, not deprecated, so it should not print deprecation message. Besides, I think `withAlternatives` is easier to use and maintain, as it's defined with the config entry. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19720: [SPARK-22494][SQL] Fix 64KB limit exception with Coalesc...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19720 If there is a query with a lot of coalesce function, wouldn't it hit the 64kb issue? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16976: [SPARK-19610][SQL] Support parsing multiline CSV files
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/16976 It was renamed `multiLine` before the release. Could we try out it instead? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19630 **[Test build #83942 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83942/testReport)** for PR 19630 at commit [`1f2c47b`](https://github.com/apache/spark/commit/1f2c47b569bcfa3f7ca7f974fee7cbdc21969623). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19630 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83942/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19630: [SPARK-22409] Introduce function type argument in pandas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19630 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16976: [SPARK-19610][SQL] Support parsing multiline CSV files
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/16976 Guys, please use the mailing list or JIRA ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19767: [WIP][SPARK-22543][SQL] fix java 64kb compile error for ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19767 **[Test build #83943 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83943/testReport)** for PR 19767 at commit [`aaa5b6f`](https://github.com/apache/spark/commit/aaa5b6f452034660880d8175f450ba8c8cfad23b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19767: [WIP][SPARK-22543][SQL] fix java 64kb compile error for ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19767 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19767: [WIP][SPARK-22543][SQL] fix java 64kb compile error for ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19767 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83943/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19728: [SPARK-22498][SQL] Fix 64KB JVM bytecode limit problem w...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19728 **[Test build #83944 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83944/testReport)** for PR 19728 at commit [`75e`](https://github.com/apache/spark/commit/75e2524920b09283ed28ba831e747e4eaf19). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19728: [SPARK-22498][SQL] Fix 64KB JVM bytecode limit problem w...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19728 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83944/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19728: [SPARK-22498][SQL] Fix 64KB JVM bytecode limit problem w...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19728 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18918: [SPARK-21707][SQL]Improvement a special case for ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18918#discussion_r151533419 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2029,4 +2029,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { testData2.select(lit(7), 'a, 'b).orderBy(lit(1), lit(2), lit(3)), Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2))) } + + test("SPARK-21707: nondeterministic expressions correctly for filter predicates") { +withTempPath { path => + val p = path.getAbsolutePath + Seq(1 -> "a").toDF("a", "b").write.partitionBy("a").parquet(p) + val df = spark.read.parquet(p) + checkAnswer(df.filter(rand(10) <= 1.0).select($"a"), Row(1)) --- End diff -- this test can pass on current master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC di...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19741#discussion_r151533418 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -268,8 +268,13 @@ private[spark] abstract class YarnSchedulerBackend( logWarning(reason.toString) driverEndpoint.ask[Boolean](r).onFailure { --- End diff -- I see these places other than `CoarseGrainedSchedulerBackend.scala` and the one present in the PR. https://github.com/apache/spark/blob/1e82335413bc2384073ead0d6d581c862036d0f5/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L169 https://github.com/apache/spark/blob/6735433cde44b3430fb44edfff58ef8149c66c13/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala#L250 https://github.com/apache/spark/blob/a2db5c5761b0c72babe48b79859d3b208ee8e9f6/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala#L653 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18918: [SPARK-21707][SQL]Improvement a special case for non-det...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/18918 what exactly are you proposing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC disassoci...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19741 **[Test build #83947 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83947/testReport)** for PR 19741 at commit [`1fae5ab`](https://github.com/apache/spark/commit/1fae5abb5652556243cfd6156dfc392c6a5e76d6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19769: [SPARK-12297][SQL] Adjust timezone for int96 data...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/19769 [SPARK-12297][SQL] Adjust timezone for int96 data from impala ## What changes were proposed in this pull request? Int96 data written by impala vs data written by hive & spark is stored slightly differently -- they use a different offset for the timezone. This adds an option "spark.sql.parquet.int96TimestampConversion" (false by default) to adjust timestamps if and only if the writer is impala (or more precisely, if the parquet file's "createdBy" metadata does not start with "parquet-mr"). This matches the existing behavior in hive from HIVE-9482. ## How was this patch tested? Unit test added, existing tests run via jenkins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-12297_skip_conversion Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19769.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19769 commit e36453c2118fa3c45f424536ea787a95f0328687 Author: Imran Rashid Date: 2017-11-15T18:44:10Z wip commit 2c71453cead0025deaeadddbbad1c3a79e49a99f Author: Imran Rashid Date: 2017-11-15T22:48:18Z test and a bunch of plumbing in place commit 592663393686d31a1d759e0e69540c10ee99dc69 Author: Imran Rashid Date: 2017-11-16T18:38:10Z works, needs cleanup commit e9ecd16defd82e9a33fc31fa200f961d2dad9d2e Author: Imran Rashid Date: 2017-11-16T19:35:24Z cleanup, test for predicate pushdown --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19769: [SPARK-12297][SQL] Adjust timezone for int96 data from i...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19769 **[Test build #83948 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83948/testReport)** for PR 19769 at commit [`e9ecd16`](https://github.com/apache/spark/commit/e9ecd16defd82e9a33fc31fa200f961d2dad9d2e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC di...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19741#discussion_r151538353 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -268,8 +268,13 @@ private[spark] abstract class YarnSchedulerBackend( logWarning(reason.toString) driverEndpoint.ask[Boolean](r).onFailure { --- End diff -- Yeah, as you said, there's only logging going on. There's only two things that can happen: - the handler throws an exception instead of replying - the RPC layer hits an error The first can be logged at the handler (it not already logged by the RPC layer). The second is already logged by the RPC layer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19760: [SPARK-22533][core] Handle deprecated names in Co...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19760#discussion_r151538808 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -663,8 +663,10 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.yarn.jar", "2.0")), "spark.yarn.access.hadoopFileSystems" -> Seq( AlternateConfig("spark.yarn.access.namenodes", "2.2")), -"spark.maxRemoteBlockSizeFetchToMem" -> Seq( - AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")) +MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM.key -> Seq( + AlternateConfig("spark.reducer.maxReqSizeShuffleToMem", "2.3")), +LISTENER_BUS_EVENT_QUEUE_CAPACITY.key -> Seq( + AlternateConfig("spark.scheduler.listenerbus.eventqueue.size", "2.3")) --- End diff -- Yes. But there's no way for the SQL module to deprecate configs any other way. Deprecation warnings are handled internally by SparkConf and the metadata needs to be available at the time the `SparkConf.set` call is made, which cannot happen if the config constant is declared in some other module (since the class holding the constant may not have been initialized). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19760: [SPARK-22533][core] Handle deprecated names in ConfigEnt...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19760 > the name withAlternatives says it's an alternative, That's the other thing. Having alternative names is just confusing; there should be a single name for a config, with others deprecated. The two configs that had alternatives were basically deprecating the old names and replacing them with the new ones. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org