[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19602 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19602 **[Test build #91383 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91383/testReport)** for PR 19602 at commit [`98c2512`](https://github.com/apache/spark/commit/98c251235a1d0924a9606be82abf1005dca03e1a). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21477 **[Test build #91384 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91384/testReport)** for PR 21477 at commit [`701a455`](https://github.com/apache/spark/commit/701a45506d75169455384ec8eebd30e509591c30). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21477: [WIP] [SPARK-24396] [SS] [PYSPARK] Add Structured Stream...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/21477 jenkins retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192331296 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- As I wrote a comment, since `UnsafeArrayData.fromPrimitiveArray()` uses `long[]`, this method can accept up to `Integer.MAX_VALUE * 8` (8 means `sizeof(long)`) as total byte size. Of course, conservatively, we limit the length by up to `Integer.MAX_VALUE`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192330635 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- `8` means of `sizeof(long)` in Java primitive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21452 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91379/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21452 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21452 **[Test build #91379 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91379/testReport)** for PR 21452 at commit [`9881d9c`](https://github.com/apache/spark/commit/9881d9c6a2b1d56e69bb06ee27fd8706f6e0fe43). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `logInfo(s\"Using output committer class $` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21472: [SPARK-24445][SQL] Schema in json format for from...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21472#discussion_r192323643 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -747,8 +748,13 @@ case class StructsToJson( object JsonExprUtils { - def validateSchemaLiteral(exp: Expression): StructType = exp match { -case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString) + def validateSchemaLiteral(exp: Expression): DataType = exp match { +case Literal(s, StringType) => + try { +DataType.fromJson(s.toString) --- End diff -- Usually they should be consistent but we don't necessarily support the obsolete functionality newly and consistently. I'm not sure how common it is to write the JSON literal as a schema via SQL. How do they get the metadata and how do they insert it into SQL? Is that the only way to do it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19602 In general cast is hard to be pushed into data source, e.g. `cast(a as string) = string` if a is int, how should data source handle it? In the meanwhile, I think we can omit most of the cast in the format of `attribute = literal`. e.g. `cast(byteCol as int) = 0`, we know `0` is within byte range, we can convert it to `byteCol = (byte) 0`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19602#discussion_r192319924 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -207,65 +271,68 @@ class HiveClientSuite(version: String) } private def testMetastorePartitionFiltering( - filterString: String, + table: String, + filterExpr: Expression, expectedDs: Seq[Int], expectedH: Seq[Int], expectedChunks: Seq[String]): Unit = { testMetastorePartitionFiltering( - filterString, - (expectedDs, expectedH, expectedChunks) :: Nil, + table, + filterExpr, + Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) :: Nil, identity) } private def testMetastorePartitionFiltering( - filterString: String, + table: String, + filterExpr: Expression, expectedDs: Seq[Int], expectedH: Seq[Int], expectedChunks: Seq[String], transform: Expression => Expression): Unit = { testMetastorePartitionFiltering( - filterString, - (expectedDs, expectedH, expectedChunks) :: Nil, + table, + filterExpr, + Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) :: Nil, identity) } private def testMetastorePartitionFiltering( - filterString: String, - expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])]): Unit = { -testMetastorePartitionFiltering(filterString, expectedPartitionCubes, identity) + table: String, + filterExpr: Expression, + expectedPartitionCubes: Seq[Map[String, Seq[Any]]]): Unit = { +testMetastorePartitionFiltering(table, filterExpr, expectedPartitionCubes, identity) } private def testMetastorePartitionFiltering( - filterString: String, - expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])], + table: String, + filterExpr: Expression, + expectedPartitionCubes: Seq[Map[String, Seq[Any]]], transform: Expression => Expression): Unit = { -val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), +val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", table), Seq( -transform(parseExpression(filterString)) +transform(filterExpr) )) -val expectedPartitionCount = expectedPartitionCubes.map { - case (expectedDs, expectedH, expectedChunks) => -expectedDs.size * expectedH.size * expectedChunks.size -}.sum - -val expectedPartitions = expectedPartitionCubes.map { - case (expectedDs, expectedH, expectedChunks) => -for { - ds <- expectedDs - h <- expectedH - chunk <- expectedChunks -} yield Set( - "ds" -> ds.toString, - "h" -> h.toString, - "chunk" -> chunk -) -}.reduce(_ ++ _) +val expectedPartitionCount = expectedPartitionCubes.map(_.map(_._2.size).product).sum + +val expectedPartitions = expectedPartitionCubes.map(getPartitionsFromCube(_)).reduce(_ ++ _) val actualFilteredPartitionCount = filteredPartitions.size assert(actualFilteredPartitionCount == expectedPartitionCount, s"Expected $expectedPartitionCount partitions but got $actualFilteredPartitionCount") -assert(filteredPartitions.map(_.spec.toSet).toSet == expectedPartitions.toSet) +assert(filteredPartitions.map(_.spec).toSet == expectedPartitions.toSet) + } + + private def getPartitionsFromCube(cube: Map[String, Seq[Any]]): Seq[Map[String, String]] = { +cube.map { + case (k: String, pts: Seq[Any]) => pts.map(pt => (k, pt.toString)) +}.foldLeft(Seq(Seq[(String, String)]()))((seq0, seq1) => { --- End diff -- this is hard to read, please use loop and mutable states directly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19602#discussion_r192319393 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -59,38 +61,62 @@ class HiveClientSuite(version: String) "h" -> h.toString, "chunk" -> chunk ), storageFormat) -assert(partitions.size == testPartitionCount) +assert(partitions0.size == testPartitionCount0) client.createPartitions( - "default", "test", partitions, ignoreIfExists = false) + "default", "test0", partitions0, ignoreIfExists = false) + +val partitions1 = + for { +pt <- 0 until 10 +chunk <- Seq("aa", "ab", "ba", "bb") + } yield CatalogTablePartition(Map( +"pt" -> pt.toString, +"chunk" -> chunk + ), storageFormat) +assert(partitions1.size == testPartitionCount1) + +client.createPartitions( + "default", "test1", partitions1, ignoreIfExists = false) + client } + private def pAttr(table: String, name: String): Attribute = { +val partTypes = client.getTable("default", table).partitionSchema.fields +.map(field => (field.name, field.dataType)).toMap +partTypes.get(name) match { + case Some(dt) => AttributeReference(name, dt)() + case None => +fail(s"Illegal name of partition attribute: $name") +} + } + override def beforeAll() { super.beforeAll() client = init(true) } test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") { val client = init(false) -val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), - Seq(parseExpression("ds=20170101"))) +val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test0"), + Seq(EqualTo(pAttr("test0", "ds"), Literal(20170101, IntegerType --- End diff -- we can import `org.apache.spark.sql.catalyst.dsl.expressions._` to simplify expression creation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/19602 And also I think we have same problem for datasource table. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21470#discussion_r192314729 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -803,18 +803,60 @@ object TypeCoercion { e.copy(left = Cast(e.left, TimestampType)) } - case b @ BinaryOperator(left, right) if left.dataType != right.dataType => -findTightestCommonType(left.dataType, right.dataType).map { commonType => - if (b.inputType.acceptsType(commonType)) { -// If the expression accepts the tightest common type, cast to that. -val newLeft = if (left.dataType == commonType) left else Cast(left, commonType) -val newRight = if (right.dataType == commonType) right else Cast(right, commonType) -b.withNewChildren(Seq(newLeft, newRight)) - } else { -// Otherwise, don't do anything with the expression. -b - } -}.getOrElse(b) // If there is no applicable conversion, leave expression unchanged. + case b @ BinaryOperator(left, right) + if !BinaryOperator.sameType(left.dataType, right.dataType) => +(left.dataType, right.dataType) match { + case (StructType(fields1), StructType(fields2)) => +val commonTypes = scala.collection.mutable.ArrayBuffer.empty[DataType] +val len = fields1.length +var i = 0 +var continue = fields1.length == fields2.length +while (i < len && continue) { + val commonType = findTightestCommonType(fields1(i).dataType, fields2(i).dataType) + if (commonType.isDefined) { +commonTypes += commonType.get + } else { +continue = false + } + i += 1 +} + +if (continue) { + val newLeftST = new StructType(fields1.zip(commonTypes).map { +case (f, commonType) => f.copy(dataType = commonType) + }) + val newLeft = if (left.dataType == newLeftST) left else Cast(left, newLeftST) + + val newRightST = new StructType(fields2.zip(commonTypes).map { +case (f, commonType) => f.copy(dataType = commonType) + }) + val newRight = if (right.dataType == newRightST) right else Cast(right, newRightST) + + if (b.inputType.acceptsType(newLeftST) && b.inputType.acceptsType(newRightST)) { --- End diff -- Is it possible `b` only accepts one side (e.g., only `newLeftST`) but doesn't accept other side? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21470#discussion_r192314292 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -803,18 +803,60 @@ object TypeCoercion { e.copy(left = Cast(e.left, TimestampType)) } - case b @ BinaryOperator(left, right) if left.dataType != right.dataType => -findTightestCommonType(left.dataType, right.dataType).map { commonType => - if (b.inputType.acceptsType(commonType)) { -// If the expression accepts the tightest common type, cast to that. -val newLeft = if (left.dataType == commonType) left else Cast(left, commonType) -val newRight = if (right.dataType == commonType) right else Cast(right, commonType) -b.withNewChildren(Seq(newLeft, newRight)) - } else { -// Otherwise, don't do anything with the expression. -b - } -}.getOrElse(b) // If there is no applicable conversion, leave expression unchanged. + case b @ BinaryOperator(left, right) + if !BinaryOperator.sameType(left.dataType, right.dataType) => +(left.dataType, right.dataType) match { + case (StructType(fields1), StructType(fields2)) => +val commonTypes = scala.collection.mutable.ArrayBuffer.empty[DataType] +val len = fields1.length +var i = 0 +var continue = fields1.length == fields2.length +while (i < len && continue) { + val commonType = findTightestCommonType(fields1(i).dataType, fields2(i).dataType) --- End diff -- What about nested structs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user jinxing64 commented on the issue: https://github.com/apache/spark/pull/19602 @cloud-fan Sorry for late reply, so busy these days. In current change: 1. I follow `Cast.mayTruncate` strictly when extract partition Attribute; 2. I created new test data in `HiveClientSuite` and `testMetastorePartitionFiltering` can be used for validation for tables with different partitions schema. If needed, I can create more tests -- different binary comparisons and different datatypes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19602 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19602 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3754/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/19602#discussion_r192312477 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -207,65 +271,68 @@ class HiveClientSuite(version: String) } private def testMetastorePartitionFiltering( - filterString: String, + table: String, + filterExpr: Expression, expectedDs: Seq[Int], expectedH: Seq[Int], expectedChunks: Seq[String]): Unit = { testMetastorePartitionFiltering( - filterString, - (expectedDs, expectedH, expectedChunks) :: Nil, + table, + filterExpr, + Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) :: Nil, identity) } private def testMetastorePartitionFiltering( - filterString: String, + table: String, + filterExpr: Expression, expectedDs: Seq[Int], expectedH: Seq[Int], expectedChunks: Seq[String], transform: Expression => Expression): Unit = { testMetastorePartitionFiltering( - filterString, - (expectedDs, expectedH, expectedChunks) :: Nil, + table, + filterExpr, + Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) :: Nil, identity) } private def testMetastorePartitionFiltering( - filterString: String, - expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])]): Unit = { -testMetastorePartitionFiltering(filterString, expectedPartitionCubes, identity) + table: String, + filterExpr: Expression, + expectedPartitionCubes: Seq[Map[String, Seq[Any]]]): Unit = { +testMetastorePartitionFiltering(table, filterExpr, expectedPartitionCubes, identity) } private def testMetastorePartitionFiltering( - filterString: String, - expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])], + table: String, + filterExpr: Expression, + expectedPartitionCubes: Seq[Map[String, Seq[Any]]], --- End diff -- With this change, number of partition names in `expectedPartitionCubes` is not necessarily to be 3. And schema of `expectedPartitionCubes` is like Seq[Map[partition name, partition values]] --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19602: [SPARK-22384][SQL] Refine partition pruning when ...
Github user jinxing64 commented on a diff in the pull request: https://github.com/apache/spark/pull/19602#discussion_r192311969 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala --- @@ -207,65 +271,68 @@ class HiveClientSuite(version: String) } private def testMetastorePartitionFiltering( - filterString: String, + table: String, + filterExpr: Expression, expectedDs: Seq[Int], expectedH: Seq[Int], expectedChunks: Seq[String]): Unit = { testMetastorePartitionFiltering( - filterString, - (expectedDs, expectedH, expectedChunks) :: Nil, + table, + filterExpr, + Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) :: Nil, identity) } private def testMetastorePartitionFiltering( - filterString: String, + table: String, + filterExpr: Expression, expectedDs: Seq[Int], expectedH: Seq[Int], expectedChunks: Seq[String], transform: Expression => Expression): Unit = { testMetastorePartitionFiltering( - filterString, - (expectedDs, expectedH, expectedChunks) :: Nil, + table, + filterExpr, + Map("ds" -> expectedDs, "h" -> expectedH, "chunk" -> expectedChunks) :: Nil, identity) } private def testMetastorePartitionFiltering( - filterString: String, - expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])]): Unit = { -testMetastorePartitionFiltering(filterString, expectedPartitionCubes, identity) + table: String, + filterExpr: Expression, + expectedPartitionCubes: Seq[Map[String, Seq[Any]]]): Unit = { +testMetastorePartitionFiltering(table, filterExpr, expectedPartitionCubes, identity) } private def testMetastorePartitionFiltering( - filterString: String, - expectedPartitionCubes: Seq[(Seq[Int], Seq[Int], Seq[String])], + table: String, + filterExpr: Expression, + expectedPartitionCubes: Seq[Map[String, Seq[Any]]], transform: Expression => Expression): Unit = { -val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"), +val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", table), Seq( -transform(parseExpression(filterString)) +transform(filterExpr) )) -val expectedPartitionCount = expectedPartitionCubes.map { - case (expectedDs, expectedH, expectedChunks) => -expectedDs.size * expectedH.size * expectedChunks.size -}.sum - -val expectedPartitions = expectedPartitionCubes.map { - case (expectedDs, expectedH, expectedChunks) => -for { - ds <- expectedDs - h <- expectedH - chunk <- expectedChunks -} yield Set( - "ds" -> ds.toString, - "h" -> h.toString, - "chunk" -> chunk -) -}.reduce(_ ++ _) +val expectedPartitionCount = expectedPartitionCubes.map(_.map(_._2.size).product).sum + +val expectedPartitions = expectedPartitionCubes.map(getPartitionsFromCube(_)).reduce(_ ++ _) val actualFilteredPartitionCount = filteredPartitions.size assert(actualFilteredPartitionCount == expectedPartitionCount, s"Expected $expectedPartitionCount partitions but got $actualFilteredPartitionCount") -assert(filteredPartitions.map(_.spec.toSet).toSet == expectedPartitions.toSet) +assert(filteredPartitions.map(_.spec).toSet == expectedPartitions.toSet) + } + --- End diff -- Parse the cube to be a list of partitions, the result is like `Seq[Map[partition name, partition value]]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19602: [SPARK-22384][SQL] Refine partition pruning when attribu...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19602 **[Test build #91383 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91383/testReport)** for PR 19602 at commit [`98c2512`](https://github.com/apache/spark/commit/98c251235a1d0924a9606be82abf1005dca03e1a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21469 **[Test build #91382 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91382/testReport)** for PR 21469 at commit [`933fb2e`](https://github.com/apache/spark/commit/933fb2e329b7e76ae5087e6ce58dd3359c78d5f1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...
Github user HeartSaVioR commented on the issue: https://github.com/apache/spark/pull/21469 Thanks @HyukjinKwon for reviewing. Addressed PR title as well as fixing nit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21472: [SPARK-24445][SQL] Schema in json format for from...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/21472#discussion_r192309953 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -747,8 +748,13 @@ case class StructsToJson( object JsonExprUtils { - def validateSchemaLiteral(exp: Expression): StructType = exp match { -case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString) + def validateSchemaLiteral(exp: Expression): DataType = exp match { +case Literal(s, StringType) => + try { +DataType.fromJson(s.toString) --- End diff -- I believe we should support JSON format because: - Functionality of SQL and Scala (and other languages) DSL should be equal otherwise we push users to use Scala DSL because SQL has less features. - The feature allows to save/restore schema in JSON format. Customer's use case is to have data in JSON format + meta info including schema in JSON format too. Schema in JSON format gives them more opportunities for processing in programatic way. - For now JSON format give us more flexibility and allows `MapType` (and `ArrayType`) as the root type for result of `from_json` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21470#discussion_r192309419 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2696,16 +2687,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql("SELECT struct(1 a) EXCEPT (SELECT struct(2 A))") }.message assert(m2.contains("Except can only be performed on tables with the compatible column types")) - - withTable("t", "S") { -sql("CREATE TABLE t(c struct) USING parquet") -sql("CREATE TABLE S(C struct) USING parquet") -checkAnswer(sql("SELECT * FROM t, S WHERE t.c.f = S.C.F"), Seq.empty) -val m = intercept[AnalysisException] { - sql("SELECT * FROM t, S WHERE c = C") -}.message -assert(m.contains("cannot resolve '(t.`c` = S.`C`)' due to data type mismatch")) --- End diff -- Thank you for pinging me, @cloud-fan . Since this removal is a real behavior change instead of new test coverage of `comparator.sql`, could you add a documentation for this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21469: [SPARK-24441][SS] Expose total size of states in HDFSBac...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21469 Shall we make the PR title complete? Looks truncated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21469: [SPARK-24441][SS] Expose total size of states in ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21469#discussion_r192308080 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -181,6 +182,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } } + def getCustomMetricsForProvider(): Map[StateStoreCustomMetric, Long] = { --- End diff -- tiny nit: ```scala def getCustomMetricsForProvider(): Map[StateStoreCustomMetric, Long] = synchronized { Map(metricProviderLoaderMapSize -> SizeEstimator.estimate(loadedMaps)) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18826: LogisticRegressionModel.toString should summarize model
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/18826 @bravo-zhang, mind if I ask to rebase it and see if the tests pass? BTW, let's fix the PR title to link the JIRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21478: [SPARK-24444][DOCS][PYTHON][BRANCH-2.3] Improve Pandas U...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21478 cc @vanzin FYI. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21478: [SPARK-24444][DOCS][PYTHON][BRANCH-2.3] Improve Pandas U...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21478 Merged to branch-2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21478: [SPARK-24444][DOCS][PYTHON][BRANCH-2.3] Improve Pandas U...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21478 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91380/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21478: [SPARK-24444][DOCS][PYTHON][BRANCH-2.3] Improve Pandas U...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21478 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21478: [SPARK-24444][DOCS][PYTHON][BRANCH-2.3] Improve Pandas U...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21478 **[Test build #91380 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91380/testReport)** for PR 21478 at commit [`58e7927`](https://github.com/apache/spark/commit/58e7927c770aa483837e6d8cb94800afb3c4fdf7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21475: [SPARK-24416] Fix configuration specification for killBl...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21475 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91381/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21475: [SPARK-24416] Fix configuration specification for killBl...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21475 **[Test build #91381 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91381/testReport)** for PR 21475 at commit [`f08f74a`](https://github.com/apache/spark/commit/f08f74a3a774f9e2768f7924c4438516a4106b7c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21475: [SPARK-24416] Fix configuration specification for killBl...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21475 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org