spark git commit: [SPARK-16286][SQL] Implement stack table generating function
Repository: spark Updated Branches: refs/heads/master fdde7d0aa -> d0d28507c [SPARK-16286][SQL] Implement stack table generating function ## What changes were proposed in this pull request? This PR implements `stack` table generating function. ## How was this patch tested? Pass the Jenkins tests including new testcases. Author: Dongjoon HyunCloses #14033 from dongjoon-hyun/SPARK-16286. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0d28507 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0d28507 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0d28507 Branch: refs/heads/master Commit: d0d28507cacfca5919dbfb4269892d58b62e8662 Parents: fdde7d0 Author: Dongjoon Hyun Authored: Wed Jul 6 10:54:43 2016 +0800 Committer: Wenchen Fan Committed: Wed Jul 6 10:54:43 2016 +0800 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../sql/catalyst/expressions/generators.scala | 53 .../expressions/GeneratorExpressionSuite.scala | 18 +++ .../spark/sql/GeneratorFunctionSuite.scala | 53 .../spark/sql/hive/HiveSessionCatalog.scala | 2 +- 5 files changed, 126 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d0d28507/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 021bec7..f6ebcae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -182,6 +182,7 @@ object FunctionRegistry { expression[PosExplode]("posexplode"), expression[Rand]("rand"), expression[Randn]("randn"), +expression[Stack]("stack"), expression[CreateStruct]("struct"), expression[CaseWhen]("when"), http://git-wip-us.apache.org/repos/asf/spark/blob/d0d28507/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 99b97c8..9d5c856 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -94,6 +94,59 @@ case class UserDefinedGenerator( } /** + * Separate v1, ..., vk into n rows. Each row will have k/n columns. n must be constant. + * {{{ + * SELECT stack(2, 1, 2, 3) -> + * 1 2 + * 3 NULL + * }}} + */ +@ExpressionDescription( + usage = "_FUNC_(n, v1, ..., vk) - Separate v1, ..., vk into n rows.", + extended = "> SELECT _FUNC_(2, 1, 2, 3);\n [1,2]\n [3,null]") +case class Stack(children: Seq[Expression]) +extends Expression with Generator with CodegenFallback { + + private lazy val numRows = children.head.eval().asInstanceOf[Int] + private lazy val numFields = Math.ceil((children.length - 1.0) / numRows).toInt + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least 2 arguments.") +} else if (children.head.dataType != IntegerType || !children.head.foldable || numRows < 1) { + TypeCheckResult.TypeCheckFailure("The number of rows must be a positive constant integer.") +} else { + for (i <- 1 until children.length) { +val j = (i - 1) % numFields +if (children(i).dataType != elementSchema.fields(j).dataType) { + return TypeCheckResult.TypeCheckFailure( +s"Argument ${j + 1} (${elementSchema.fields(j).dataType}) != " + + s"Argument $i (${children(i).dataType})") +} + } + TypeCheckResult.TypeCheckSuccess +} + } + + override def elementSchema: StructType = +StructType(children.tail.take(numFields).zipWithIndex.map { + case (e, index) => StructField(s"col$index", e.dataType) +}) + + override def eval(input: InternalRow): TraversableOnce[InternalRow] = { +val values = children.tail.map(_.eval(input)).toArray +for (row <- 0 until numRows) yield { + val fields = new Array[Any](numFields) + for (col <- 0 until numFields) { +val index = row * numFields + col +fields.update(col,
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.0.0-rc2 [created] 4a55b2326 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing development version 2.0.1-SNAPSHOT
Preparing development version 2.0.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e8fa86e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e8fa86e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e8fa86e Branch: refs/heads/branch-2.0 Commit: 6e8fa86ebf30a9b850f4a66810d5d38d1f188b33 Parents: 4a55b23 Author: Patrick WendellAuthored: Tue Jul 5 18:40:51 2016 -0700 Committer: Patrick Wendell Committed: Tue Jul 5 18:40:51 2016 -0700 -- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 34 files changed, 34 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e8fa86e/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 5f546bb..507ddc7 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.0 +2.0.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/6e8fa86e/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index 2eaa810..bc3b0fe 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.0 +2.0.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/6e8fa86e/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index f068d9d..2fb5835 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.0 +2.0.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/6e8fa86e/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index fd22188..07d9f1c 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.0 +2.0.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/6e8fa86e/common/sketch/pom.xml -- diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index a17aba5..5e02efd 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.0 +2.0.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/6e8fa86e/common/tags/pom.xml -- diff --git a/common/tags/pom.xml b/common/tags/pom.xml index 0bd8846..e7fc6a2 100644 --- a/common/tags/pom.xml +++ b/common/tags/pom.xml @@ -22,7 +22,7 @@ org.apache.spark
[1/2] spark git commit: Preparing Spark release v2.0.0-rc2
Repository: spark Updated Branches: refs/heads/branch-2.0 0fe2a8c16 -> 6e8fa86eb Preparing Spark release v2.0.0-rc2 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a55b232 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a55b232 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a55b232 Branch: refs/heads/branch-2.0 Commit: 4a55b2326c8cf50f772907a8b73fd5e7b3d1aa06 Parents: 0fe2a8c Author: Patrick WendellAuthored: Tue Jul 5 18:40:45 2016 -0700 Committer: Patrick Wendell Committed: Tue Jul 5 18:40:45 2016 -0700 -- assembly/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml| 2 +- common/network-yarn/pom.xml | 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml | 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/java8-tests/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10/pom.xml | 2 +- external/kafka-0-8-assembly/pom.xml | 2 +- external/kafka-0-8/pom.xml| 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml | 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- 34 files changed, 34 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4a55b232/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 507ddc7..5f546bb 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.0.1-SNAPSHOT +2.0.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4a55b232/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml index bc3b0fe..2eaa810 100644 --- a/common/network-common/pom.xml +++ b/common/network-common/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1-SNAPSHOT +2.0.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4a55b232/common/network-shuffle/pom.xml -- diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml index 2fb5835..f068d9d 100644 --- a/common/network-shuffle/pom.xml +++ b/common/network-shuffle/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1-SNAPSHOT +2.0.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4a55b232/common/network-yarn/pom.xml -- diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml index 07d9f1c..fd22188 100644 --- a/common/network-yarn/pom.xml +++ b/common/network-yarn/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1-SNAPSHOT +2.0.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4a55b232/common/sketch/pom.xml -- diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 5e02efd..a17aba5 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.0.1-SNAPSHOT +2.0.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/4a55b232/common/tags/pom.xml -- diff --git a/common/tags/pom.xml b/common/tags/pom.xml index e7fc6a2..0bd8846 100644 ---
spark git commit: [SPARK-16348][ML][MLLIB][PYTHON] Use full classpaths for pyspark ML JVM calls
Repository: spark Updated Branches: refs/heads/branch-2.0 a2ef13a7d -> 0fe2a8c16 [SPARK-16348][ML][MLLIB][PYTHON] Use full classpaths for pyspark ML JVM calls ## What changes were proposed in this pull request? Issue: Omitting the full classpath can cause problems when calling JVM methods or classes from pyspark. This PR: Changed all uses of jvm.X in pyspark.ml and pyspark.mllib to use full classpath for X ## How was this patch tested? Existing unit tests. Manual testing in an environment where this was an issue. Author: Joseph K. BradleyCloses #14023 from jkbradley/SPARK-16348. (cherry picked from commit fdde7d0aa0ef69d0e9a88cf712601bba1d5b0706) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fe2a8c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fe2a8c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fe2a8c1 Branch: refs/heads/branch-2.0 Commit: 0fe2a8c161894ba7edbd89689e69a368392d564b Parents: a2ef13a Author: Joseph K. Bradley Authored: Tue Jul 5 17:00:24 2016 -0700 Committer: Joseph K. Bradley Committed: Tue Jul 5 17:00:37 2016 -0700 -- python/pyspark/ml/common.py| 10 +- python/pyspark/ml/tests.py | 8 python/pyspark/mllib/clustering.py | 5 +++-- python/pyspark/mllib/common.py | 10 +- python/pyspark/mllib/feature.py| 2 +- python/pyspark/mllib/fpm.py| 2 +- python/pyspark/mllib/recommendation.py | 2 +- python/pyspark/mllib/tests.py | 15 --- 8 files changed, 28 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0fe2a8c1/python/pyspark/ml/common.py -- diff --git a/python/pyspark/ml/common.py b/python/pyspark/ml/common.py index 256e91e..7d449aa 100644 --- a/python/pyspark/ml/common.py +++ b/python/pyspark/ml/common.py @@ -63,7 +63,7 @@ def _to_java_object_rdd(rdd): RDD is serialized in batch or not. """ rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer())) -return rdd.ctx._jvm.MLSerDe.pythonToJava(rdd._jrdd, True) +return rdd.ctx._jvm.org.apache.spark.ml.python.MLSerDe.pythonToJava(rdd._jrdd, True) def _py2java(sc, obj): @@ -82,7 +82,7 @@ def _py2java(sc, obj): pass else: data = bytearray(PickleSerializer().dumps(obj)) -obj = sc._jvm.MLSerDe.loads(data) +obj = sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(data) return obj @@ -95,17 +95,17 @@ def _java2py(sc, r, encoding="bytes"): clsName = 'JavaRDD' if clsName == 'JavaRDD': -jrdd = sc._jvm.MLSerDe.javaToPython(r) +jrdd = sc._jvm.org.apache.spark.ml.python.MLSerDe.javaToPython(r) return RDD(jrdd, sc) if clsName == 'Dataset': return DataFrame(r, SQLContext.getOrCreate(sc)) if clsName in _picklable_classes: -r = sc._jvm.MLSerDe.dumps(r) +r = sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(r) elif isinstance(r, (JavaArray, JavaList)): try: -r = sc._jvm.MLSerDe.dumps(r) +r = sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(r) except Py4JJavaError: pass # not pickable http://git-wip-us.apache.org/repos/asf/spark/blob/0fe2a8c1/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 981ed9d..24efce8 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1195,12 +1195,12 @@ class VectorTests(MLlibTestCase): def _test_serialize(self, v): self.assertEqual(v, ser.loads(ser.dumps(v))) -jvec = self.sc._jvm.MLSerDe.loads(bytearray(ser.dumps(v))) -nv = ser.loads(bytes(self.sc._jvm.MLSerDe.dumps(jvec))) +jvec = self.sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(bytearray(ser.dumps(v))) +nv = ser.loads(bytes(self.sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(jvec))) self.assertEqual(v, nv) vs = [v] * 100 -jvecs = self.sc._jvm.MLSerDe.loads(bytearray(ser.dumps(vs))) -nvs = ser.loads(bytes(self.sc._jvm.MLSerDe.dumps(jvecs))) +jvecs = self.sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(bytearray(ser.dumps(vs))) +nvs = ser.loads(bytes(self.sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(jvecs))) self.assertEqual(vs, nvs) def test_serialize(self): http://git-wip-us.apache.org/repos/asf/spark/blob/0fe2a8c1/python/pyspark/mllib/clustering.py
spark git commit: [SPARK-16348][ML][MLLIB][PYTHON] Use full classpaths for pyspark ML JVM calls
Repository: spark Updated Branches: refs/heads/master 59f9c1bd1 -> fdde7d0aa [SPARK-16348][ML][MLLIB][PYTHON] Use full classpaths for pyspark ML JVM calls ## What changes were proposed in this pull request? Issue: Omitting the full classpath can cause problems when calling JVM methods or classes from pyspark. This PR: Changed all uses of jvm.X in pyspark.ml and pyspark.mllib to use full classpath for X ## How was this patch tested? Existing unit tests. Manual testing in an environment where this was an issue. Author: Joseph K. BradleyCloses #14023 from jkbradley/SPARK-16348. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdde7d0a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdde7d0a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdde7d0a Branch: refs/heads/master Commit: fdde7d0aa0ef69d0e9a88cf712601bba1d5b0706 Parents: 59f9c1b Author: Joseph K. Bradley Authored: Tue Jul 5 17:00:24 2016 -0700 Committer: Joseph K. Bradley Committed: Tue Jul 5 17:00:24 2016 -0700 -- python/pyspark/ml/common.py| 10 +- python/pyspark/ml/tests.py | 8 python/pyspark/mllib/clustering.py | 5 +++-- python/pyspark/mllib/common.py | 10 +- python/pyspark/mllib/feature.py| 2 +- python/pyspark/mllib/fpm.py| 2 +- python/pyspark/mllib/recommendation.py | 2 +- python/pyspark/mllib/tests.py | 15 --- 8 files changed, 28 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/ml/common.py -- diff --git a/python/pyspark/ml/common.py b/python/pyspark/ml/common.py index 256e91e..7d449aa 100644 --- a/python/pyspark/ml/common.py +++ b/python/pyspark/ml/common.py @@ -63,7 +63,7 @@ def _to_java_object_rdd(rdd): RDD is serialized in batch or not. """ rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer())) -return rdd.ctx._jvm.MLSerDe.pythonToJava(rdd._jrdd, True) +return rdd.ctx._jvm.org.apache.spark.ml.python.MLSerDe.pythonToJava(rdd._jrdd, True) def _py2java(sc, obj): @@ -82,7 +82,7 @@ def _py2java(sc, obj): pass else: data = bytearray(PickleSerializer().dumps(obj)) -obj = sc._jvm.MLSerDe.loads(data) +obj = sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(data) return obj @@ -95,17 +95,17 @@ def _java2py(sc, r, encoding="bytes"): clsName = 'JavaRDD' if clsName == 'JavaRDD': -jrdd = sc._jvm.MLSerDe.javaToPython(r) +jrdd = sc._jvm.org.apache.spark.ml.python.MLSerDe.javaToPython(r) return RDD(jrdd, sc) if clsName == 'Dataset': return DataFrame(r, SQLContext.getOrCreate(sc)) if clsName in _picklable_classes: -r = sc._jvm.MLSerDe.dumps(r) +r = sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(r) elif isinstance(r, (JavaArray, JavaList)): try: -r = sc._jvm.MLSerDe.dumps(r) +r = sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(r) except Py4JJavaError: pass # not pickable http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 981ed9d..24efce8 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1195,12 +1195,12 @@ class VectorTests(MLlibTestCase): def _test_serialize(self, v): self.assertEqual(v, ser.loads(ser.dumps(v))) -jvec = self.sc._jvm.MLSerDe.loads(bytearray(ser.dumps(v))) -nv = ser.loads(bytes(self.sc._jvm.MLSerDe.dumps(jvec))) +jvec = self.sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(bytearray(ser.dumps(v))) +nv = ser.loads(bytes(self.sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(jvec))) self.assertEqual(v, nv) vs = [v] * 100 -jvecs = self.sc._jvm.MLSerDe.loads(bytearray(ser.dumps(vs))) -nvs = ser.loads(bytes(self.sc._jvm.MLSerDe.dumps(jvecs))) +jvecs = self.sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(bytearray(ser.dumps(vs))) +nvs = ser.loads(bytes(self.sc._jvm.org.apache.spark.ml.python.MLSerDe.dumps(jvecs))) self.assertEqual(vs, nvs) def test_serialize(self): http://git-wip-us.apache.org/repos/asf/spark/blob/fdde7d0a/python/pyspark/mllib/clustering.py -- diff --git a/python/pyspark/mllib/clustering.py
spark git commit: [SPARK-16385][CORE] Catch correct exception when calling method via reflection.
Repository: spark Updated Branches: refs/heads/master 4db63fd2b -> 59f9c1bd1 [SPARK-16385][CORE] Catch correct exception when calling method via reflection. Using "Method.invoke" causes an exception to be thrown, not an error, so Utils.waitForProcess() was always throwing an exception when run on Java 7. Author: Marcelo VanzinCloses #14056 from vanzin/SPARK-16385. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59f9c1bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59f9c1bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59f9c1bd Branch: refs/heads/master Commit: 59f9c1bd1adfea7069e769fb68351c228c37c8fc Parents: 4db63fd Author: Marcelo Vanzin Authored: Tue Jul 5 16:55:22 2016 -0700 Committer: Marcelo Vanzin Committed: Tue Jul 5 16:55:22 2016 -0700 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/59f9c1bd/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0c23f3c..156cf17 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1813,7 +1813,7 @@ private[spark] object Utils extends Logging { .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS) .asInstanceOf[Boolean] } catch { - case _: NoSuchMethodError => + case _: NoSuchMethodException => // Otherwise implement it manually var terminated = false val startTime = System.currentTimeMillis - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16385][CORE] Catch correct exception when calling method via reflection.
Repository: spark Updated Branches: refs/heads/branch-2.0 801fb7994 -> a2ef13a7d [SPARK-16385][CORE] Catch correct exception when calling method via reflection. Using "Method.invoke" causes an exception to be thrown, not an error, so Utils.waitForProcess() was always throwing an exception when run on Java 7. Author: Marcelo VanzinCloses #14056 from vanzin/SPARK-16385. (cherry picked from commit 59f9c1bd1adfea7069e769fb68351c228c37c8fc) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2ef13a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2ef13a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2ef13a7 Branch: refs/heads/branch-2.0 Commit: a2ef13a7d3b3daf888f9aba5e28fcdf813c4171c Parents: 801fb79 Author: Marcelo Vanzin Authored: Tue Jul 5 16:55:22 2016 -0700 Committer: Marcelo Vanzin Committed: Tue Jul 5 16:55:45 2016 -0700 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a2ef13a7/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0c23f3c..156cf17 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1813,7 +1813,7 @@ private[spark] object Utils extends Logging { .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS) .asInstanceOf[Boolean] } catch { - case _: NoSuchMethodError => + case _: NoSuchMethodException => // Otherwise implement it manually var terminated = false val startTime = System.currentTimeMillis - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16383][SQL] Remove `SessionState.executeSql`
Repository: spark Updated Branches: refs/heads/master 1f0d02130 -> 4db63fd2b [SPARK-16383][SQL] Remove `SessionState.executeSql` ## What changes were proposed in this pull request? This PR removes `SessionState.executeSql` in favor of `SparkSession.sql`. We can remove this safely since the visibility `SessionState` is `private[sql]` and `executeSql` is only used in one **ignored** test, `test("Multiple Hive Instances")`. ## How was this patch tested? Pass the Jenkins tests. Author: Dongjoon HyunCloses #14055 from dongjoon-hyun/SPARK-16383. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4db63fd2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4db63fd2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4db63fd2 Branch: refs/heads/master Commit: 4db63fd2b430b0902ce0e50f526b1c2e2a5c6497 Parents: 1f0d021 Author: Dongjoon Hyun Authored: Tue Jul 5 16:47:32 2016 -0700 Committer: Reynold Xin Committed: Tue Jul 5 16:47:32 2016 -0700 -- .../scala/org/apache/spark/sql/internal/SessionState.scala | 4 +--- .../apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4db63fd2/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 01cc13f..a228566 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, _} +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -162,8 +162,6 @@ private[sql] class SessionState(sparkSession: SparkSession) { // Helper methods, partially leftover from pre-2.0 days // -- - def executeSql(sql: String): QueryExecution = executePlan(sqlParser.parsePlan(sql)) - def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan) def refreshTable(tableName: String): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/4db63fd2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala index 1583a44..07d8c5b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala @@ -30,9 +30,9 @@ class ConcurrentHiveSuite extends SparkFunSuite with BeforeAndAfterAll { conf.set("spark.ui.enabled", "false") val ts = new TestHiveContext(new SparkContext("local", s"TestSQLContext$i", conf)) -ts.sessionState.executeSql("SHOW TABLES").toRdd.collect() -ts.sessionState.executeSql("SELECT * FROM src").toRdd.collect() -ts.sessionState.executeSql("SHOW TABLES").toRdd.collect() +ts.sparkSession.sql("SHOW TABLES").collect() +ts.sparkSession.sql("SELECT * FROM src").collect() +ts.sparkSession.sql("SHOW TABLES").collect() } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16359][STREAMING][KAFKA] unidoc skip kafka 0.10
Repository: spark Updated Branches: refs/heads/master 920cb5fe4 -> 1f0d02130 [SPARK-16359][STREAMING][KAFKA] unidoc skip kafka 0.10 ## What changes were proposed in this pull request? during sbt unidoc task, skip the streamingKafka010 subproject and filter kafka 0.10 classes from the classpath, so that at least existing kafka 0.8 doc can be included in unidoc without error ## How was this patch tested? sbt spark/scalaunidoc:doc | grep -i error Author: cody koeningerCloses #14041 from koeninger/SPARK-16359. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f0d0213 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f0d0213 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f0d0213 Branch: refs/heads/master Commit: 1f0d021308f2201366111f8390015114710d4f9b Parents: 920cb5f Author: cody koeninger Authored: Tue Jul 5 16:44:15 2016 -0700 Committer: Tathagata Das Committed: Tue Jul 5 16:44:15 2016 -0700 -- project/SparkBuild.scala | 18 -- 1 file changed, 16 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f0d0213/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6018b22..b1a9f39 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -701,15 +701,29 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test"))) } + private def ignoreClasspaths(classpaths: Seq[Classpath]): Seq[Classpath] = { +classpaths + .map(_.filterNot(_.data.getCanonicalPath.matches(""".*kafka-clients-0\.10.*"""))) + .map(_.filterNot(_.data.getCanonicalPath.matches(""".*kafka_2\..*-0\.10.*"""))) + } + val unidocSourceBase = settingKey[String]("Base URL of source links in Scaladoc.") lazy val settings = scalaJavaUnidocSettings ++ Seq ( publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010), + +unidocAllClasspaths in (ScalaUnidoc, unidoc) := { + ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value) +}, + +unidocAllClasspaths in (JavaUnidoc, unidoc) := { + ignoreClasspaths((unidocAllClasspaths in (JavaUnidoc, unidoc)).value) +}, // Skip actual catalyst, but include the subproject. // Catalyst is not public API and contains quasiquotes which break scaladoc. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16359][STREAMING][KAFKA] unidoc skip kafka 0.10
Repository: spark Updated Branches: refs/heads/branch-2.0 9c1596b6c -> 801fb7994 [SPARK-16359][STREAMING][KAFKA] unidoc skip kafka 0.10 ## What changes were proposed in this pull request? during sbt unidoc task, skip the streamingKafka010 subproject and filter kafka 0.10 classes from the classpath, so that at least existing kafka 0.8 doc can be included in unidoc without error ## How was this patch tested? sbt spark/scalaunidoc:doc | grep -i error Author: cody koeningerCloses #14041 from koeninger/SPARK-16359. (cherry picked from commit 1f0d021308f2201366111f8390015114710d4f9b) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/801fb799 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/801fb799 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/801fb799 Branch: refs/heads/branch-2.0 Commit: 801fb7994d890fa4112b97fc339520f5ce3ab6cb Parents: 9c1596b Author: cody koeninger Authored: Tue Jul 5 16:44:15 2016 -0700 Committer: Tathagata Das Committed: Tue Jul 5 16:44:24 2016 -0700 -- project/SparkBuild.scala | 18 -- 1 file changed, 16 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/801fb799/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6018b22..b1a9f39 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -701,15 +701,29 @@ object Unidoc { .map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test"))) } + private def ignoreClasspaths(classpaths: Seq[Classpath]): Seq[Classpath] = { +classpaths + .map(_.filterNot(_.data.getCanonicalPath.matches(""".*kafka-clients-0\.10.*"""))) + .map(_.filterNot(_.data.getCanonicalPath.matches(""".*kafka_2\..*-0\.10.*"""))) + } + val unidocSourceBase = settingKey[String]("Base URL of source links in Scaladoc.") lazy val settings = scalaJavaUnidocSettings ++ Seq ( publish := {}, unidocProjectFilter in(ScalaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010), unidocProjectFilter in(JavaUnidoc, unidoc) := - inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags), + inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010), + +unidocAllClasspaths in (ScalaUnidoc, unidoc) := { + ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value) +}, + +unidocAllClasspaths in (JavaUnidoc, unidoc) := { + ignoreClasspaths((unidocAllClasspaths in (JavaUnidoc, unidoc)).value) +}, // Skip actual catalyst, but include the subproject. // Catalyst is not public API and contains quasiquotes which break scaladoc. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15730][SQL] Respect the --hiveconf in the spark-sql command line
Repository: spark Updated Branches: refs/heads/master 5b7a1770a -> 920cb5fe4 [SPARK-15730][SQL] Respect the --hiveconf in the spark-sql command line ## What changes were proposed in this pull request? This PR makes spark-sql (backed by SparkSQLCLIDriver) respects confs set by hiveconf, which is what we do in previous versions. The change is that when we start SparkSQLCLIDriver, we explicitly set confs set through --hiveconf to SQLContext's conf (basically treating those confs as a SparkSQL conf). ## How was this patch tested? A new test in CliSuite. Closes #13542 Author: Cheng HaoAuthor: Yin Huai Closes #14058 from yhuai/hiveConfThriftServer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/920cb5fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/920cb5fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/920cb5fe Branch: refs/heads/master Commit: 920cb5fe4ed0eb008cd14bf0ea45ed5b225b5022 Parents: 5b7a177 Author: Cheng Hao Authored: Tue Jul 5 16:42:43 2016 -0700 Committer: Reynold Xin Committed: Tue Jul 5 16:42:43 2016 -0700 -- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 8 .../apache/spark/sql/hive/thriftserver/CliSuite.scala| 11 +++ 2 files changed, 19 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/920cb5fe/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 7389e18..5dafec1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -156,6 +156,14 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Execute -i init files (always in silent mode) cli.processInitFiles(sessionState) +// Respect the configurations set by --hiveconf from the command line +// (based on Hive's CliDriver). +val it = sessionState.getOverriddenConfigurations.entrySet().iterator() +while (it.hasNext) { + val kv = it.next() + SparkSQLEnv.sqlContext.setConf(kv.getKey, kv.getValue) +} + if (sessionState.execString != null) { System.exit(cli.processLine(sessionState.execString)) } http://git-wip-us.apache.org/repos/asf/spark/blob/920cb5fe/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 75535ca..d3cec11 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -91,6 +91,8 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath | --hiveconf ${ConfVars.SCRATCHDIR}=$scratchDirPath + | --hiveconf conf1=conftest + | --hiveconf conf2=1 """.stripMargin.split("\\s+").toSeq ++ extraArgs } @@ -272,4 +274,13 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { s"LIST FILE $dataFilePath;" -> "small_kv.txt" ) } + + test("apply hiveconf from cli command") { +runCliWithin(2.minute)( + "SET conf1;" -> "conftest", + "SET conf2;" -> "1", + "SET conf3=${hiveconf:conf1};" -> "conftest", + "SET conf3;" -> "conftest" +) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15730][SQL] Respect the --hiveconf in the spark-sql command line
Repository: spark Updated Branches: refs/heads/branch-2.0 cabee2324 -> 9c1596b6c [SPARK-15730][SQL] Respect the --hiveconf in the spark-sql command line ## What changes were proposed in this pull request? This PR makes spark-sql (backed by SparkSQLCLIDriver) respects confs set by hiveconf, which is what we do in previous versions. The change is that when we start SparkSQLCLIDriver, we explicitly set confs set through --hiveconf to SQLContext's conf (basically treating those confs as a SparkSQL conf). ## How was this patch tested? A new test in CliSuite. Closes #13542 Author: Cheng HaoAuthor: Yin Huai Closes #14058 from yhuai/hiveConfThriftServer. (cherry picked from commit 920cb5fe4ed0eb008cd14bf0ea45ed5b225b5022) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c1596b6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c1596b6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c1596b6 Branch: refs/heads/branch-2.0 Commit: 9c1596b6cf5ca6d0b2529e31dc072a16db014683 Parents: cabee23 Author: Cheng Hao Authored: Tue Jul 5 16:42:43 2016 -0700 Committer: Reynold Xin Committed: Tue Jul 5 16:42:49 2016 -0700 -- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 8 .../apache/spark/sql/hive/thriftserver/CliSuite.scala| 11 +++ 2 files changed, 19 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9c1596b6/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala -- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 7389e18..5dafec1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -156,6 +156,14 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Execute -i init files (always in silent mode) cli.processInitFiles(sessionState) +// Respect the configurations set by --hiveconf from the command line +// (based on Hive's CliDriver). +val it = sessionState.getOverriddenConfigurations.entrySet().iterator() +while (it.hasNext) { + val kv = it.next() + SparkSQLEnv.sqlContext.setConf(kv.getKey, kv.getValue) +} + if (sessionState.execString != null) { System.exit(cli.processLine(sessionState.execString)) } http://git-wip-us.apache.org/repos/asf/spark/blob/9c1596b6/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala -- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 75535ca..d3cec11 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -91,6 +91,8 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath | --hiveconf ${ConfVars.SCRATCHDIR}=$scratchDirPath + | --hiveconf conf1=conftest + | --hiveconf conf2=1 """.stripMargin.split("\\s+").toSeq ++ extraArgs } @@ -272,4 +274,13 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { s"LIST FILE $dataFilePath;" -> "small_kv.txt" ) } + + test("apply hiveconf from cli command") { +runCliWithin(2.minute)( + "SET conf1;" -> "conftest", + "SET conf2;" -> "1", + "SET conf3=${hiveconf:conf1};" -> "conftest", + "SET conf3;" -> "conftest" +) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [HOTFIX] Fix build break.
Repository: spark Updated Branches: refs/heads/master 1fca9da95 -> 5b7a1770a [HOTFIX] Fix build break. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b7a1770 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b7a1770 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b7a1770 Branch: refs/heads/master Commit: 5b7a1770ac9cf36a1e92b31d10fe692fef17 Parents: 1fca9da Author: Reynold XinAuthored: Tue Jul 5 12:06:41 2016 -0700 Committer: Reynold Xin Committed: Tue Jul 5 12:06:41 2016 -0700 -- .../org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5b7a1770/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 529d388..3d58d49 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -298,7 +298,7 @@ case class InsertIntoHiveTable( // Invalidate the cache. sqlContext.sharedState.cacheManager.invalidateCache(table) - sqlContext.sessionState.catalog.invalidateTable(table.catalogTable.identifier) +sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16212][STREAMING][KAFKA] use random port for embedded kafka
Repository: spark Updated Branches: refs/heads/master 16a2a7d71 -> 1fca9da95 [SPARK-16212][STREAMING][KAFKA] use random port for embedded kafka ## What changes were proposed in this pull request? Testing for 0.10 uncovered an issue with a fixed port number being used in KafkaTestUtils. This is making a roughly equivalent fix for the 0.8 connector ## How was this patch tested? Unit tests, manual tests Author: cody koeningerCloses #14018 from koeninger/kafka-0-8-test-port. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fca9da9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fca9da9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fca9da9 Branch: refs/heads/master Commit: 1fca9da95dc9b9aaf9ae75fd7456378861d8b409 Parents: 16a2a7d Author: cody koeninger Authored: Tue Jul 5 11:45:54 2016 -0700 Committer: Tathagata Das Committed: Tue Jul 5 11:45:54 2016 -0700 -- .../org/apache/spark/streaming/kafka/KafkaTestUtils.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1fca9da9/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala -- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index d9d4240..abfd7aa 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -35,6 +35,7 @@ import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{ZKStringSerializer, ZkUtils} import org.I0Itec.zkclient.ZkClient +import org.apache.commons.lang3.RandomUtils import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.SparkConf @@ -62,7 +63,8 @@ private[kafka] class KafkaTestUtils extends Logging { // Kafka broker related configurations private val brokerHost = "localhost" - private var brokerPort = 9092 + // 0.8.2 server doesn't have a boundPort method, so can't use 0 for a random port + private var brokerPort = RandomUtils.nextInt(1024, 65536) private var brokerConf: KafkaConfig = _ // Kafka broker server @@ -112,7 +114,7 @@ private[kafka] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration) server = new KafkaServer(brokerConf) server.startup() - (server, port) + (server, brokerPort) }, new SparkConf(), "KafkaBroker") brokerReady = true - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16212][STREAMING][KAFKA] use random port for embedded kafka
Repository: spark Updated Branches: refs/heads/branch-2.0 0754ccb2b -> cabee2324 [SPARK-16212][STREAMING][KAFKA] use random port for embedded kafka ## What changes were proposed in this pull request? Testing for 0.10 uncovered an issue with a fixed port number being used in KafkaTestUtils. This is making a roughly equivalent fix for the 0.8 connector ## How was this patch tested? Unit tests, manual tests Author: cody koeningerCloses #14018 from koeninger/kafka-0-8-test-port. (cherry picked from commit 1fca9da95dc9b9aaf9ae75fd7456378861d8b409) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cabee232 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cabee232 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cabee232 Branch: refs/heads/branch-2.0 Commit: cabee23241922d55179c3e725f24397eccc75471 Parents: 0754ccb Author: cody koeninger Authored: Tue Jul 5 11:45:54 2016 -0700 Committer: Tathagata Das Committed: Tue Jul 5 11:46:06 2016 -0700 -- .../org/apache/spark/streaming/kafka/KafkaTestUtils.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cabee232/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala -- diff --git a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index d9d4240..abfd7aa 100644 --- a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -35,6 +35,7 @@ import kafka.serializer.StringEncoder import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.{ZKStringSerializer, ZkUtils} import org.I0Itec.zkclient.ZkClient +import org.apache.commons.lang3.RandomUtils import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.SparkConf @@ -62,7 +63,8 @@ private[kafka] class KafkaTestUtils extends Logging { // Kafka broker related configurations private val brokerHost = "localhost" - private var brokerPort = 9092 + // 0.8.2 server doesn't have a boundPort method, so can't use 0 for a random port + private var brokerPort = RandomUtils.nextInt(1024, 65536) private var brokerConf: KafkaConfig = _ // Kafka broker server @@ -112,7 +114,7 @@ private[kafka] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration) server = new KafkaServer(brokerConf) server.startup() - (server, port) + (server, brokerPort) }, new SparkConf(), "KafkaBroker") brokerReady = true - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-16311][SQL] Metadata refresh should work on temporary views
Repository: spark Updated Branches: refs/heads/master 07d9c5327 -> 16a2a7d71 [SPARK-16311][SQL] Metadata refresh should work on temporary views ## What changes were proposed in this pull request? This patch fixes the bug that the refresh command does not work on temporary views. This patch is based on https://github.com/apache/spark/pull/13989, but removes the public Dataset.refresh() API as well as improved test coverage. Note that I actually think the public refresh() API is very useful. We can in the future implement it by also invalidating the lazy vals in QueryExecution (or alternatively just create a new QueryExecution). ## How was this patch tested? Re-enabled a previously ignored test, and added a new test suite for Hive testing behavior of temporary views against MetastoreRelation. Author: Reynold XinAuthor: petermaxlee Closes #14009 from rxin/SPARK-16311. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16a2a7d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16a2a7d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16a2a7d7 Branch: refs/heads/master Commit: 16a2a7d714f945b06978e3bd20a58ea32f0621ac Parents: 07d9c53 Author: Reynold Xin Authored: Tue Jul 5 11:36:05 2016 -0700 Committer: Reynold Xin Committed: Tue Jul 5 11:36:05 2016 -0700 -- .../sql/catalyst/catalog/SessionCatalog.scala | 16 ++--- .../catalyst/plans/logical/LogicalPlan.scala| 5 ++ .../spark/sql/execution/command/ddl.scala | 2 +- .../spark/sql/execution/command/tables.scala| 4 +- .../execution/datasources/LogicalRelation.scala | 5 ++ .../spark/sql/internal/SessionState.scala | 4 +- .../apache/spark/sql/MetadataCacheSuite.scala | 8 +-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 4 -- .../spark/sql/hive/HiveSessionCatalog.scala | 5 +- .../spark/sql/hive/HiveMetadataCacheSuite.scala | 62 .../sql/hive/MetastoreDataSourcesSuite.scala| 16 ++--- .../apache/spark/sql/hive/parquetSuites.scala | 6 +- 12 files changed, 101 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/16a2a7d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 8c620d3..e1d4991 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -462,17 +462,17 @@ class SessionCatalog( } } - // TODO: It's strange that we have both refresh and invalidate here. - /** * Refresh the cache entry for a metastore table, if any. */ - def refreshTable(name: TableIdentifier): Unit = { /* no-op */ } - - /** - * Invalidate the cache entry for a metastore table, if any. - */ - def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ } + def refreshTable(name: TableIdentifier): Unit = { +// Go through temporary tables and invalidate them. +// If the database is defined, this is definitely not a temp table. +// If the database is not defined, there is a good chance this is a temp table. +if (name.database.isEmpty) { + tempTables.get(name.table).foreach(_.refresh()) +} + } /** * Drop all existing temporary tables. http://git-wip-us.apache.org/repos/asf/spark/blob/16a2a7d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 4984f23..d0b2b5d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -265,6 +265,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { s"Reference '$name' is ambiguous, could be: $referenceNames.") } } + + /** + * Refreshes (or invalidates) any metadata/data cached in the plan recursively. + */ + def refresh(): Unit = children.foreach(_.refresh()) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/16a2a7d7/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
spark git commit: [SPARK-16311][SQL] Metadata refresh should work on temporary views
Repository: spark Updated Branches: refs/heads/branch-2.0 cc100ab54 -> 0754ccb2b [SPARK-16311][SQL] Metadata refresh should work on temporary views ## What changes were proposed in this pull request? This patch fixes the bug that the refresh command does not work on temporary views. This patch is based on https://github.com/apache/spark/pull/13989, but removes the public Dataset.refresh() API as well as improved test coverage. Note that I actually think the public refresh() API is very useful. We can in the future implement it by also invalidating the lazy vals in QueryExecution (or alternatively just create a new QueryExecution). ## How was this patch tested? Re-enabled a previously ignored test, and added a new test suite for Hive testing behavior of temporary views against MetastoreRelation. Author: Reynold XinAuthor: petermaxlee Closes #14009 from rxin/SPARK-16311. (cherry picked from commit 16a2a7d714f945b06978e3bd20a58ea32f0621ac) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0754ccb2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0754ccb2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0754ccb2 Branch: refs/heads/branch-2.0 Commit: 0754ccb2be79e90bc746de54b01aa6af55f3291f Parents: cc100ab Author: Reynold Xin Authored: Tue Jul 5 11:36:05 2016 -0700 Committer: Reynold Xin Committed: Tue Jul 5 11:36:20 2016 -0700 -- .../sql/catalyst/catalog/SessionCatalog.scala | 16 ++--- .../catalyst/plans/logical/LogicalPlan.scala| 5 ++ .../spark/sql/execution/command/ddl.scala | 2 +- .../spark/sql/execution/command/tables.scala| 4 +- .../execution/datasources/LogicalRelation.scala | 5 ++ .../spark/sql/internal/SessionState.scala | 4 +- .../apache/spark/sql/MetadataCacheSuite.scala | 8 +-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 4 -- .../spark/sql/hive/HiveSessionCatalog.scala | 5 +- .../spark/sql/hive/HiveMetadataCacheSuite.scala | 62 .../sql/hive/MetastoreDataSourcesSuite.scala| 16 ++--- .../apache/spark/sql/hive/parquetSuites.scala | 6 +- 12 files changed, 101 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 8c620d3..e1d4991 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -462,17 +462,17 @@ class SessionCatalog( } } - // TODO: It's strange that we have both refresh and invalidate here. - /** * Refresh the cache entry for a metastore table, if any. */ - def refreshTable(name: TableIdentifier): Unit = { /* no-op */ } - - /** - * Invalidate the cache entry for a metastore table, if any. - */ - def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ } + def refreshTable(name: TableIdentifier): Unit = { +// Go through temporary tables and invalidate them. +// If the database is defined, this is definitely not a temp table. +// If the database is not defined, there is a good chance this is a temp table. +if (name.database.isEmpty) { + tempTables.get(name.table).foreach(_.refresh()) +} + } /** * Drop all existing temporary tables. http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 4984f23..d0b2b5d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -265,6 +265,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { s"Reference '$name' is ambiguous, could be: $referenceNames.") } } + + /** + * Refreshes (or invalidates) any metadata/data cached in the plan recursively. + */ + def refresh(): Unit = children.foreach(_.refresh()) } /**
spark git commit: [SPARK-9876][SQL][FOLLOWUP] Enable string and binary tests for Parquet predicate pushdown and replace deprecated fromByteArray.
Repository: spark Updated Branches: refs/heads/master 7f7eb3934 -> 07d9c5327 [SPARK-9876][SQL][FOLLOWUP] Enable string and binary tests for Parquet predicate pushdown and replace deprecated fromByteArray. ## What changes were proposed in this pull request? It seems Parquet has been upgraded to 1.8.1 by https://github.com/apache/spark/pull/13280. So, this PR enables string and binary predicate push down which was disabled due to [SPARK-11153](https://issues.apache.org/jira/browse/SPARK-11153) and [PARQUET-251](https://issues.apache.org/jira/browse/PARQUET-251) and cleans up some comments unremoved (I think by mistake). This PR also replace the API, `fromByteArray()` deprecated in [PARQUET-251](https://issues.apache.org/jira/browse/PARQUET-251). ## How was this patch tested? Unit tests in `ParquetFilters` Author: hyukjinkwonCloses #13389 from HyukjinKwon/parquet-1.8-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07d9c532 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07d9c532 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07d9c532 Branch: refs/heads/master Commit: 07d9c5327f050f9da611d5239f61ed73b36ce4e6 Parents: 7f7eb39 Author: hyukjinkwon Authored: Tue Jul 5 16:59:40 2016 +0800 Committer: Cheng Lian Committed: Tue Jul 5 16:59:40 2016 +0800 -- .../datasources/parquet/VectorizedPlainValuesReader.java | 2 +- .../datasources/parquet/CatalystWriteSupport.scala | 11 ++- .../execution/datasources/parquet/ParquetFilters.scala | 8 .../datasources/parquet/ParquetFilterSuite.scala | 6 ++ 4 files changed, 9 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/07d9c532/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java index 9def455..98018b7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java @@ -170,7 +170,7 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori @Override public final Binary readBinary(int len) { -Binary result = Binary.fromByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len); +Binary result = Binary.fromConstantByteArray(buffer, offset - Platform.BYTE_ARRAY_OFFSET, len); offset += len; return result; } http://git-wip-us.apache.org/repos/asf/spark/blob/07d9c532/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index cf974af..00e1bca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -150,7 +150,8 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi case StringType => (row: SpecializedGetters, ordinal: Int) => - recordConsumer.addBinary(Binary.fromByteArray(row.getUTF8String(ordinal).getBytes)) + recordConsumer.addBinary( +Binary.fromReusedByteArray(row.getUTF8String(ordinal).getBytes)) case TimestampType => (row: SpecializedGetters, ordinal: Int) => { @@ -165,12 +166,12 @@ private[parquet] class CatalystWriteSupport extends WriteSupport[InternalRow] wi val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) val buf = ByteBuffer.wrap(timestampBuffer) buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) - recordConsumer.addBinary(Binary.fromByteArray(timestampBuffer)) + recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) } case BinaryType => (row: SpecializedGetters, ordinal: Int) => - recordConsumer.addBinary(Binary.fromByteArray(row.getBinary(ordinal))) + recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)))
spark git commit: [SPARK-16360][SQL] Speed up SQL query performance by removing redundant `executePlan` call
Repository: spark Updated Branches: refs/heads/master 7742d9f15 -> 7f7eb3934 [SPARK-16360][SQL] Speed up SQL query performance by removing redundant `executePlan` call ## What changes were proposed in this pull request? Currently, there are a few reports about Spark 2.0 query performance regression for large queries. This PR speeds up SQL query processing performance by removing redundant **consecutive `executePlan`** call in `Dataset.ofRows` function and `Dataset` instantiation. Specifically, this PR aims to reduce the overhead of SQL query execution plan generation, not real query execution. So, we can not see the result in the Spark Web UI. Please use the following query script. The result is **25.78 sec** -> **12.36 sec** as expected. **Sample Query** ```scala val n = 4000 val values = (1 to n).map(_.toString).mkString(", ") val columns = (1 to n).map("column" + _).mkString(", ") val query = s""" |SELECT $columns |FROM VALUES ($values) T($columns) |WHERE 1=2 AND 1 IN ($columns) |GROUP BY $columns |ORDER BY $columns |""".stripMargin def time[R](block: => R): R = { val t0 = System.nanoTime() val result = block println("Elapsed time: " + ((System.nanoTime - t0) / 1e9) + "s") result } ``` **Before** ```scala scala> time(sql(query)) Elapsed time: 30.138142577s // First query has a little overhead of initialization. res0: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields] scala> time(sql(query)) Elapsed time: 25.787751452s // Let's compare this one. res1: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields] ``` **After** ```scala scala> time(sql(query)) Elapsed time: 17.500279659s // First query has a little overhead of initialization. res0: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields] scala> time(sql(query)) Elapsed time: 12.364812255s // This shows the real difference. The speed up is about 2 times. res1: org.apache.spark.sql.DataFrame = [column1: int, column2: int ... 3998 more fields] ``` ## How was this patch tested? Manual by the above script. Author: Dongjoon HyunCloses #14044 from dongjoon-hyun/SPARK-16360. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f7eb393 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f7eb393 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f7eb393 Branch: refs/heads/master Commit: 7f7eb3934ea258f2b163a87da06766bf5c7d443d Parents: 7742d9f Author: Dongjoon Hyun Authored: Tue Jul 5 16:19:22 2016 +0800 Committer: Cheng Lian Committed: Tue Jul 5 16:19:22 2016 +0800 -- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7f7eb393/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index e64669a..ededf7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -62,7 +62,7 @@ private[sql] object Dataset { def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = { val qe = sparkSession.sessionState.executePlan(logicalPlan) qe.assertAnalyzed() -new Dataset[Row](sparkSession, logicalPlan, RowEncoder(qe.analyzed.schema)) +new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema)) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-15198][SQL] Support for pushing down filters for boolean types in ORC data source
Repository: spark Updated Branches: refs/heads/master 8f6cf00c6 -> 7742d9f15 [SPARK-15198][SQL] Support for pushing down filters for boolean types in ORC data source ## What changes were proposed in this pull request? It seems ORC supports all the types in ([`PredicateLeaf.Type`](https://github.com/apache/hive/blob/e085b7e9bd059d91aaf013df0db4d71dca90ec6f/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java#L50-L56)) which includes boolean types. So, this was tested first. This PR adds the support for pushing filters down for `BooleanType` in ORC data source. This PR also removes `OrcTableScan` class and the companion object, which is not used anymore. ## How was this patch tested? Unittest in `OrcFilterSuite` and `OrcQuerySuite`. Author: hyukjinkwonCloses #12972 from HyukjinKwon/SPARK-15198. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7742d9f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7742d9f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7742d9f1 Branch: refs/heads/master Commit: 7742d9f1584150befeb2f3d76cdbd4ea1f37c914 Parents: 8f6cf00 Author: hyukjinkwon Authored: Tue Jul 5 13:59:13 2016 +0800 Committer: Cheng Lian Committed: Tue Jul 5 13:59:13 2016 +0800 -- .../spark/sql/hive/orc/OrcFileFormat.scala | 10 .../apache/spark/sql/hive/orc/OrcFilters.scala | 2 +- .../spark/sql/hive/orc/OrcFilterSuite.scala | 25 .../spark/sql/hive/orc/OrcQuerySuite.scala | 13 ++ 4 files changed, 39 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7742d9f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala index 5de3507..1d3c466 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala @@ -111,7 +111,7 @@ private[sql] class OrcFileFormat if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates OrcFilters.createFilter(requiredSchema, filters.toArray).foreach { f => -hadoopConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) +hadoopConf.set(OrcRelation.SARG_PUSHDOWN, f.toKryo) hadoopConf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) } } @@ -258,15 +258,13 @@ private[orc] class OrcOutputWriter( } } -private[orc] object OrcTableScan { - // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. - private[orc] val SARG_PUSHDOWN = "sarg.pushdown" -} - private[orc] object OrcRelation extends HiveInspectors { // The references of Hive's classes will be minimized. val ORC_COMPRESSION = "orc.compress" + // This constant duplicates `OrcInputFormat.SARG_PUSHDOWN`, which is unfortunately not public. + private[orc] val SARG_PUSHDOWN = "sarg.pushdown" + // The extensions for ORC compression codecs val extensionsForCompressionCodecNames = Map( "NONE" -> "", http://git-wip-us.apache.org/repos/asf/spark/blob/7742d9f1/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index c463bc8..6ab8244 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -83,7 +83,7 @@ private[orc] object OrcFilters extends Logging { // Only the values in the Spark types below can be recognized by // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. case ByteType | ShortType | FloatType | DoubleType => true - case IntegerType | LongType | StringType => true + case IntegerType | LongType | StringType | BooleanType => true case _ => false } http://git-wip-us.apache.org/repos/asf/spark/blob/7742d9f1/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala index 8c027f9..7a30e54 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++