[GitHub] spark pull request #22154: [SPARK-23711][SPARK-25140][SQL] Catch correct exc...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22154#discussion_r234177079 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala --- @@ -17,17 +17,33 @@ package org.apache.spark.sql.catalyst.expressions +import java.util.concurrent.ExecutionException + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} import org.apache.spark.sql.catalyst.plans.PlanTestBase import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{IntegerType, LongType} +import org.apache.spark.sql.types.IntegerType class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanTestBase { - test("UnsafeProjection with codegen factory mode") { -val input = Seq(LongType, IntegerType) - .zipWithIndex.map(x => BoundReference(x._2, x._1, true)) + object FailedCodegenProjection + extends CodeGeneratorWithInterpretedFallback[Seq[Expression], UnsafeProjection] { + +override protected def createCodeGeneratedObject(in: Seq[Expression]): UnsafeProjection = { + val invalidCode = new CodeAndComment("invalid code", Map.empty) + // We assume this compilation throws an exception --- End diff -- I'd use this comment as part of an exception (say `IllegalStateException` or similar) that should be thrown rather than returning `null`. I think that would make the comment part of the code itself and can be checked in tests (by catching the exception). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21838: [SPARK-24811][SQL]Avro: add new function from_avr...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21838#discussion_r234099158 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroCatalystDataConversionSuite.scala --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.avro + +import org.apache.avro.Schema + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.{AvroDataToCatalyst, CatalystDataToAvro, RandomDataGenerator} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{ExpressionEvalHelper, GenericInternalRow, Literal} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class AvroCatalystDataConversionSuite extends SparkFunSuite with ExpressionEvalHelper { + + private def roundTripTest(data: Literal): Unit = { +val avroType = SchemaConverters.toAvroType(data.dataType, data.nullable) +checkResult(data, avroType.toString, data.eval()) + } + + private def checkResult(data: Literal, schema: String, expected: Any): Unit = { +checkEvaluation( + AvroDataToCatalyst(CatalystDataToAvro(data), schema), + prepareExpectedResult(expected)) + } + + private def assertFail(data: Literal, schema: String): Unit = { +intercept[java.io.EOFException] { + AvroDataToCatalyst(CatalystDataToAvro(data), schema).eval() +} + } + + private val testingTypes = Seq( +BooleanType, +ByteType, +ShortType, +IntegerType, +LongType, +FloatType, +DoubleType, +DecimalType(8, 0), // 32 bits decimal without fraction +DecimalType(8, 4), // 32 bits decimal +DecimalType(16, 0), // 64 bits decimal without fraction +DecimalType(16, 11), // 64 bits decimal +DecimalType(38, 0), +DecimalType(38, 38), +StringType, +BinaryType) + + protected def prepareExpectedResult(expected: Any): Any = expected match { +// Spark decimal is converted to avro string= +case d: Decimal => UTF8String.fromString(d.toString) +// Spark byte and short both map to avro int +case b: Byte => b.toInt +case s: Short => s.toInt +case row: GenericInternalRow => InternalRow.fromSeq(row.values.map(prepareExpectedResult)) +case array: GenericArrayData => new GenericArrayData(array.array.map(prepareExpectedResult)) +case map: MapData => + val keys = new GenericArrayData( + map.keyArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult)) + val values = new GenericArrayData( + map.valueArray().asInstanceOf[GenericArrayData].array.map(prepareExpectedResult)) + new ArrayBasedMapData(keys, values) +case other => other + } + + testingTypes.foreach { dt => +val seed = scala.util.Random.nextLong() +test(s"single $dt with seed $seed") { + val rand = new scala.util.Random(seed) + val data = RandomDataGenerator.forType(dt, rand = rand).get.apply() + val converter = CatalystTypeConverters.createToCatalystConverter(dt) + val input = Literal.create(converter(data), dt) + roundTripTest(input) +} + } + + for (_ <- 1 to 5) { --- End diff -- Why not `(1 to 5).foreach`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r215376132 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -82,7 +83,7 @@ case class CreateHiveTableAsSelectCommand( query, overwrite = true, ifPartitionNotExists = false, - outputColumns = outputColumns).run(sparkSession, child) + outputColumnNames = outputColumnNames).run(sparkSession, child) --- End diff -- `outputColumnNames` themselves. Specyfing `outputColumnNames` as the name of the property to set using `outputColumnNames` does nothing but introduces a duplication. If you removed one `outputColumnNames` the comprehension should not be lowered whatsoever, shouldn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215216011 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +/** + * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. + * The loaded `DataFrame` has one `StructType` column: `image`. + * The schema of the `image` column is: + * - origin: String (represent the origin of image. If loaded from file, then it is file path) + * - height: Int (height of image) + * - width: Int (width of image) + * - nChannels: Int (number of image channels) + * - mode: Int (OpenCV-compatible type) + * - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases) + * + * To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and + * optionally specify options, for example: + * {{{ + * // Scala + * val df = spark.read.format("image") + * .option("dropImageFailures", "true") --- End diff -- Really? What about `option(key: String, value: Boolean): DataFrameReader` then? There are more --> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r215215098 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -754,6 +754,54 @@ class HiveDDLSuite } } + test("Insert overwrite Hive table should output correct schema") { +withSQLConf(CONVERT_METASTORE_PARQUET.key -> "false") { + withTable("tbl", "tbl2") { +withView("view1") { + spark.sql("CREATE TABLE tbl(id long)") + spark.sql("INSERT OVERWRITE TABLE tbl VALUES 4") --- End diff -- I might be missing something, but why does this test use SQL statements not DataFrameWriter API, e.g. `Seq(4).toDF("id").write.mode(SaveMode.Overwrite).saveAsTable("tbl")`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r215213849 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala --- @@ -805,6 +805,80 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + test("Insert overwrite table command should output correct schema: basic") { +withTable("tbl", "tbl2") { + withView("view1") { +val df = spark.range(10).toDF("id") --- End diff -- "case sensitive"? How is so since Spark SQL is case-insensitive by default? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r215214259 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -82,7 +83,7 @@ case class CreateHiveTableAsSelectCommand( query, overwrite = true, ifPartitionNotExists = false, - outputColumns = outputColumns).run(sparkSession, child) + outputColumnNames = outputColumnNames).run(sparkSession, child) --- End diff -- Why is this duplication needed here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22332: [SPARK-25333][SQL] Ability add new columns in Dat...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22332#discussion_r215144932 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2226,16 +2226,18 @@ class Dataset[T] private[sql]( * `column`'s expression must only refer to attributes supplied by this Dataset. It is an * error to add a column that refers to some other Dataset. * -* You can choose to add new columns either at the end (default behavior) or at the beginning. +* The position of the new column start from 0, and a negative position means at the end (default behavior). --- End diff -- "starts at `0`. Any negative position means to add the column at the end"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22332: [SPARK-25333][SQL] Ability add new columns in Dat...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22332#discussion_r215145065 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2226,16 +2226,18 @@ class Dataset[T] private[sql]( * `column`'s expression must only refer to attributes supplied by this Dataset. It is an * error to add a column that refers to some other Dataset. * -* You can choose to add new columns either at the end (default behavior) or at the beginning. +* The position of the new column start from 0, and a negative position means at the end (default behavior). */ - def withColumn(colName: String, col: Column, atTheEnd: Boolean): DataFrame = -withColumns(Seq(colName), Seq(col), atTheEnd) + def withColumn(colName: String, col: Column, atPosition: Int): DataFrame = +withColumns(Seq(colName), Seq(col), atPosition) /** * Returns a new Dataset by adding columns or replacing the existing columns that has --- End diff -- s/has/have --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22332: [SPARK-25333][SQL] Ability add new columns in Dat...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22332#discussion_r215144732 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2226,16 +2226,18 @@ class Dataset[T] private[sql]( * `column`'s expression must only refer to attributes supplied by this Dataset. It is an * error to add a column that refers to some other Dataset. * -* You can choose to add new columns either at the end (default behavior) or at the beginning. +* The position of the new column start from 0, and a negative position means at the end (default behavior). */ - def withColumn(colName: String, col: Column, atTheEnd: Boolean): DataFrame = -withColumns(Seq(colName), Seq(col), atTheEnd) + def withColumn(colName: String, col: Column, atPosition: Int): DataFrame = --- End diff -- `@since 2.4.0`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22332: [SPARK-25333][SQL] Ability add new columns in Dat...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22332#discussion_r215145351 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -831,13 +831,21 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { }.toSeq) assert(df.schema.map(_.name) === Seq("key", "value", "newCol")) -val df2 = testData.toDF().withColumn("newCol", col("key") + 1, false) +val df2 = testData.toDF().withColumn("newCol", col("key") + 1, 0) --- End diff -- What about tests with negative positions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22332: [SPARK-25333][SQL] Ability add new columns in Dat...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22332#discussion_r215144982 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -2226,16 +2226,18 @@ class Dataset[T] private[sql]( * `column`'s expression must only refer to attributes supplied by this Dataset. It is an * error to add a column that refers to some other Dataset. * -* You can choose to add new columns either at the end (default behavior) or at the beginning. +* The position of the new column start from 0, and a negative position means at the end (default behavior). */ - def withColumn(colName: String, col: Column, atTheEnd: Boolean): DataFrame = -withColumns(Seq(colName), Seq(col), atTheEnd) + def withColumn(colName: String, col: Column, atPosition: Int): DataFrame = +withColumns(Seq(colName), Seq(col), atPosition) /** * Returns a new Dataset by adding columns or replacing the existing columns that has * the same names. + * + * The position of new columns start from 0, and a negative position means at the end (default behavior). --- End diff -- Same as above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215138305 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -29,7 +29,7 @@ package org.apache.spark.ml.source.image * - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases) * * To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and - * optionally specify options, for example: + * optionally specify the datasource options, for example: --- End diff -- s/datasource/data source --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215138635 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageOptions.scala --- @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap + +private[image] class ImageOptions( +@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean --- End diff -- Why `false` is a String not a boolean? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215138476 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -45,6 +45,8 @@ package org.apache.spark.ml.source.image * IMAGE data source supports the following options: * - "dropImageFailures": Whether to drop the files that are not valid images from the result. * + * @note This IMAGE data source does not support "write". --- End diff -- s/"write"/saving images to a file(s)/ ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215038606 --- Diff: mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +import java.nio.file.Paths + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.image.ImageSchema._ +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.{col, substring_index} + +class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext { + + // Single column of images named "image" + private lazy val imagePath = "../data/mllib/images/imagesWithPartitions" + + test("image datasource count test") { +val df1 = spark.read.format("image").load(imagePath) +assert(df1.count === 9) + +val df2 = spark.read.format("image").option("dropImageFailures", "true").load(imagePath) +assert(df2.count === 8) + } + + test("image datasource test: read jpg image") { +val df = spark.read.format("image").load(imagePath + "/cls=kittens/date=2018-02/DP153539.jpg") +assert(df.count() === 1) + } + + test("image datasource test: read png image") { +val df = spark.read.format("image").load(imagePath + "/cls=multichannel/date=2018-01/BGRA.png") +assert(df.count() === 1) + } + + test("image datasource test: read non image") { +val filePath = imagePath + "/cls=kittens/date=2018-01/not-image.txt" +val df = spark.read.format("image").option("dropImageFailures", "true") + .load(filePath) +assert(df.count() === 0) + +val df2 = spark.read.format("image").option("dropImageFailures", "false") + .load(filePath) +assert(df2.count() === 1) +val result = df2.head() +assert(result === invalidImageRow( + Paths.get(filePath).toAbsolutePath().normalize().toUri().toString)) + } + + test("image datasource partition test") { +val result = spark.read.format("image") + .option("dropImageFailures", "true").load(imagePath) + .select(substring_index(col("image.origin"), "/", -1).as("origin"), col("cls"), col("date")) + .collect() + +assert(Set(result: _*) === Set( + Row("29.5.a_b_EGDP022204.jpg", "kittens", "2018-01"), + Row("54893.jpg", "kittens", "2018-02"), + Row("DP153539.jpg", "kittens", "2018-02"), + Row("DP802813.jpg", "kittens", "2018-02"), + Row("BGRA.png", "multichannel", "2018-01"), + Row("BGRA_alpha_60.png", "multichannel", "2018-01"), + Row("chr30.4.184.jpg", "multichannel", "2018-02"), + Row("grayscale.jpg", "multichannel", "2018-02") +)) + } + + // Images with the different number of channels + test("readImages pixel values test") { + +val images = spark.read.format("image").option("dropImageFailures", "true") + .load(imagePath + "/cls=multichannel/").collect() + +val firstBytes20Map = images.map { rrow => + val row = rrow.getAs[Row]("image") + val filename = Paths.get(getOrigin(row)).getFileName().toString() + val mode = getMode(row) + val bytes20 = getData(row).slice(0, 20).toList + filename -> Tuple2(mode, bytes20) --- End diff -- Why is `Tuple2` required here? Wouldn't `(mode, bytes20)` work here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215037240 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +import com.google.common.io.{ByteStreams, Closeables} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.Job + +import org.apache.spark.ml.image.ImageSchema +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeRow} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, OutputWriterFactory, PartitionedFile} +import org.apache.spark.sql.sources.{DataSourceRegister, Filter} +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + + +private[image] class ImageFileFormatOptions( +@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable { + + def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + + val dropImageFailures = parameters.getOrElse("dropImageFailures", "false").toBoolean +} + +private[image] class ImageFileFormat extends FileFormat with DataSourceRegister { + + override def inferSchema( + sparkSession: SparkSession, + options: Map[String, String], + files: Seq[FileStatus]): Option[StructType] = Some(ImageSchema.imageSchema) + + override def prepareWrite( + sparkSession: SparkSession, + job: Job, options: Map[String, String], --- End diff -- New line after `job: Job` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215039097 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala --- @@ -567,6 +567,7 @@ object DataSource extends Logging { val parquet = classOf[ParquetFileFormat].getCanonicalName val csv = classOf[CSVFileFormat].getCanonicalName val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat" +val image = "org.apache.spark.ml.source.image.ImageFileFormat" --- End diff -- Why is this needed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215036263 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +/** + * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. + * The loaded `DataFrame` has one `StructType` column: `image`. + * The schema of the `image` column is: + * - origin: String (represent the origin of image. If loaded from file, then it is file path) --- End diff -- "represents" + "the image". I can see many missing `a`s and `the`s in the description :( --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215037968 --- Diff: mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +import java.nio.file.Paths + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.image.ImageSchema._ +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.Row +import org.apache.spark.sql.functions.{col, substring_index} + +class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext { + + // Single column of images named "image" + private lazy val imagePath = "../data/mllib/images/imagesWithPartitions" + + test("image datasource count test") { +val df1 = spark.read.format("image").load(imagePath) +assert(df1.count === 9) + +val df2 = spark.read.format("image").option("dropImageFailures", "true").load(imagePath) --- End diff -- `true` as a boolean value, please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22328: [SPARK-22666][ML][SQL] Spark datasource for image...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22328#discussion_r215036643 --- Diff: mllib/src/main/scala/org/apache/spark/ml/source/image/ImageDataSource.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.source.image + +/** + * `image` package implements Spark SQL data source API for loading IMAGE data as `DataFrame`. + * The loaded `DataFrame` has one `StructType` column: `image`. + * The schema of the `image` column is: + * - origin: String (represent the origin of image. If loaded from file, then it is file path) + * - height: Int (height of image) + * - width: Int (width of image) + * - nChannels: Int (number of image channels) + * - mode: Int (OpenCV-compatible type) + * - data: BinaryType (Image bytes in OpenCV-compatible order: row-wise BGR in most cases) + * + * To use IMAGE data source, you need to set "image" as the format in `DataFrameReader` and + * optionally specify options, for example: + * {{{ + * // Scala + * val df = spark.read.format("image") + * .option("dropImageFailures", "true") --- End diff -- `true` as a boolean value, please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22332: [SPARK-25333][SQL] Ability add new columns in the beginn...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/22332 Why not `select($"*", newColumnHere)` or `select(newColumnHere, $"*")`? Somehow I don't think the use case merits overloading `withColumn`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22316: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22316#discussion_r214752855 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -416,7 +426,7 @@ class RelationalGroupedDataset protected[sql]( new RelationalGroupedDataset( df, groupingExprs, - RelationalGroupedDataset.PivotType(pivotColumn.expr, values.map(Literal.apply))) + RelationalGroupedDataset.PivotType(pivotColumn.expr, values.map(lit(_).expr))) --- End diff -- What do you think about `map(lit).map(_.expr)` instead? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22318: [SPARK-25150][SQL] Fix attribute deduplication in...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22318#discussion_r214752480 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala --- @@ -295,4 +295,14 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan } } + + test("SPARK-25150: Attribute deduplication handles attributes in join condition properly") { +val a = spark.range(1, 5) +val b = spark.range(10) +val c = b.filter($"id" % 2 === 0) + +val r = a.join(b, a("id") === b("id"), "inner").join(c, a("id") === c("id"), "inner") --- End diff -- Why is this a simpler `a.join(b, "id").join(c, "id")`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r214751309 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala --- @@ -805,6 +805,80 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + test("Insert overwrite table command should output correct schema: basic") { +withTable("tbl", "tbl2") { + withView("view1") { +val df = spark.range(10).toDF("id") +df.write.format("parquet").saveAsTable("tbl") +spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") +spark.sql("CREATE TABLE tbl2(ID long) USING parquet") +spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") +val identifier = TableIdentifier("tbl2", Some("default")) +val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString +val expectedSchema = StructType(Seq(StructField("ID", LongType, true))) +assert(spark.read.parquet(location).schema == expectedSchema) +checkAnswer(spark.table("tbl2"), df) + } +} + } + + test("Insert overwrite table command should output correct schema: complex") { +withTable("tbl", "tbl2") { + withView("view1") { +val df = spark.range(10).map(x => (x, x.toInt, x.toInt)).toDF("col1", "col2", "col3") +df.write.format("parquet").saveAsTable("tbl") +spark.sql("CREATE VIEW view1 AS SELECT * FROM tbl") +spark.sql("CREATE TABLE tbl2(COL1 long, COL2 int, COL3 int) USING parquet PARTITIONED " + + "BY (COL2) CLUSTERED BY (COL3) INTO 3 BUCKETS") +spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT COL1, COL2, COL3 FROM view1") +val identifier = TableIdentifier("tbl2", Some("default")) +val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString +val expectedSchema = StructType(Seq( + StructField("COL1", LongType, true), + StructField("COL3", IntegerType, true), --- End diff -- You could use a little magic here: `$"COL1".int` ``` scala> $"COL1".int res1: org.apache.spark.sql.types.StructField = StructField(COL1,IntegerType,true) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r214750815 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala --- @@ -805,6 +805,80 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + test("Insert overwrite table command should output correct schema: basic") { +withTable("tbl", "tbl2") { + withView("view1") { +val df = spark.range(10).toDF("id") --- End diff -- Why is `toDF("id")` required? Why not `spark.range(10)` alone? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r214751930 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala --- @@ -754,6 +754,47 @@ class HiveDDLSuite } } + test("Insert overwrite Hive table should output correct schema") { +withTable("tbl", "tbl2") { + withView("view1") { +spark.sql("CREATE TABLE tbl(id long)") +spark.sql("INSERT OVERWRITE TABLE tbl SELECT 4") +spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") +spark.sql("CREATE TABLE tbl2(ID long)") +spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") +checkAnswer(spark.table("tbl2"), Seq(Row(4))) + } +} + } + + test("Insert into Hive directory should output correct schema") { +withTable("tbl") { + withView("view1") { +withTempPath { path => + spark.sql("CREATE TABLE tbl(id long)") + spark.sql("INSERT OVERWRITE TABLE tbl SELECT 4") --- End diff -- `s/SELECT/VALUES` as it could be a bit more Spark-idiomatic? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r214751219 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala --- @@ -805,6 +805,80 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + test("Insert overwrite table command should output correct schema: basic") { +withTable("tbl", "tbl2") { + withView("view1") { +val df = spark.range(10).toDF("id") +df.write.format("parquet").saveAsTable("tbl") +spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") +spark.sql("CREATE TABLE tbl2(ID long) USING parquet") +spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") +val identifier = TableIdentifier("tbl2", Some("default")) +val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString +val expectedSchema = StructType(Seq(StructField("ID", LongType, true))) +assert(spark.read.parquet(location).schema == expectedSchema) +checkAnswer(spark.table("tbl2"), df) + } +} + } + + test("Insert overwrite table command should output correct schema: complex") { +withTable("tbl", "tbl2") { + withView("view1") { +val df = spark.range(10).map(x => (x, x.toInt, x.toInt)).toDF("col1", "col2", "col3") +df.write.format("parquet").saveAsTable("tbl") +spark.sql("CREATE VIEW view1 AS SELECT * FROM tbl") +spark.sql("CREATE TABLE tbl2(COL1 long, COL2 int, COL3 int) USING parquet PARTITIONED " + + "BY (COL2) CLUSTERED BY (COL3) INTO 3 BUCKETS") +spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT COL1, COL2, COL3 FROM view1") +val identifier = TableIdentifier("tbl2", Some("default")) +val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString +val expectedSchema = StructType(Seq( + StructField("COL1", LongType, true), --- End diff -- `nullable` is `true` by default. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r214751023 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala --- @@ -805,6 +805,80 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + test("Insert overwrite table command should output correct schema: basic") { +withTable("tbl", "tbl2") { + withView("view1") { +val df = spark.range(10).toDF("id") +df.write.format("parquet").saveAsTable("tbl") +spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") +spark.sql("CREATE TABLE tbl2(ID long) USING parquet") +spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") +val identifier = TableIdentifier("tbl2", Some("default")) --- End diff -- `default` is the default database name, isn't it? I'd remove it from the test or use `spark.catalog.currentDatabase`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r214751748 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -63,7 +63,7 @@ case class CreateHiveTableAsSelectCommand( query, overwrite = false, ifPartitionNotExists = false, -outputColumns = outputColumns).run(sparkSession, child) +outputColumnNames = outputColumnNames).run(sparkSession, child) --- End diff -- Can you remove one `outputColumnNames`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22320: [SPARK-25313][SQL]Fix regression in FileFormatWri...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22320#discussion_r214751169 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala --- @@ -805,6 +805,80 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + test("Insert overwrite table command should output correct schema: basic") { +withTable("tbl", "tbl2") { + withView("view1") { +val df = spark.range(10).toDF("id") +df.write.format("parquet").saveAsTable("tbl") +spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl") +spark.sql("CREATE TABLE tbl2(ID long) USING parquet") +spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1") +val identifier = TableIdentifier("tbl2", Some("default")) +val location = spark.sessionState.catalog.getTableMetadata(identifier).location.toString +val expectedSchema = StructType(Seq(StructField("ID", LongType, true))) +assert(spark.read.parquet(location).schema == expectedSchema) +checkAnswer(spark.table("tbl2"), df) + } +} + } + + test("Insert overwrite table command should output correct schema: complex") { +withTable("tbl", "tbl2") { + withView("view1") { +val df = spark.range(10).map(x => (x, x.toInt, x.toInt)).toDF("col1", "col2", "col3") +df.write.format("parquet").saveAsTable("tbl") +spark.sql("CREATE VIEW view1 AS SELECT * FROM tbl") +spark.sql("CREATE TABLE tbl2(COL1 long, COL2 int, COL3 int) USING parquet PARTITIONED " + + "BY (COL2) CLUSTERED BY (COL3) INTO 3 BUCKETS") +spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT COL1, COL2, COL3 FROM view1") +val identifier = TableIdentifier("tbl2", Some("default")) --- End diff -- Same as above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22238: [SPARK-25245][DOCS][SS] Explain regarding limitin...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22238#discussion_r213264912 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -868,7 +870,9 @@ object SQLConf { .internal() .doc( "The class used to manage state data in stateful streaming queries. This class must " + - "be a subclass of StateStoreProvider, and must have a zero-arg constructor.") + "be a subclass of StateStoreProvider, and must have a zero-arg constructor. " + + "Note: For structured streaming, this configuration cannot be changed between query " + --- End diff -- s/cannot be/must not be/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22238: [SPARK-25245][DOCS][SS] Explain regarding limitin...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22238#discussion_r213264786 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -266,7 +266,9 @@ object SQLConf { .createWithDefault(Long.MaxValue) val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions") -.doc("The default number of partitions to use when shuffling data for joins or aggregations.") +.doc("The default number of partitions to use when shuffling data for joins or aggregations. " + + "Note: For structured streaming, this configuration cannot be changed between query " + --- End diff -- s/cannot be/must not be/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22238: [SPARK-25245][DOCS][SS] Explain regarding limitin...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/22238#discussion_r213063267 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -2812,6 +2812,12 @@ See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections f # Additional Information +**Gotchas** + +- For structured streaming, modifying "spark.sql.shuffle.partitions" is restricted once you run the query. + - This is because state is partitioned via key, hence number of partitions for state should be unchanged. + - If you want to run less tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning. Please note that it will also affect downstream operators. --- End diff -- An example of how to use `coalesce` operator with stateful streaming query would be superb. I'd also appreciate if you added what type of downstream operators are affected and how. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21858: [SPARK-24899][SQL][DOC] Add example of monotonica...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21858#discussion_r210681673 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala --- @@ -80,7 +80,5 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Stateful { override def prettyName: String = "monotonically_increasing_id" - override def sql: String = s"$prettyName()" --- End diff -- It's the default and no need for the override, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21858: [SPARK-24899][SQL][DOC] Add example of monotonica...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21858#discussion_r205058875 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -1150,16 +1150,48 @@ object functions { /** * A column expression that generates monotonically increasing 64-bit integers. * - * The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. + * The generated IDs are guaranteed to be monotonically increasing and unique, but not + * consecutive (unless all rows are in the same single partition which you rarely want due to + * the volume of the data). * The current implementation puts the partition ID in the upper 31 bits, and the record number * within each partition in the lower 33 bits. The assumption is that the data frame has * less than 1 billion partitions, and each partition has less than 8 billion records. * - * As an example, consider a `DataFrame` with two partitions, each with 3 records. - * This expression would return the following IDs: - * * {{{ - * 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. + * // Create a dataset with four partitions, each with two rows. + * val q = spark.range(start = 0, end = 8, step = 1, numPartitions = 4) + * + * // Make sure that every partition has the same number of rows + * q.mapPartitions(rows => Iterator(rows.size)).foreachPartition(rows => assert(rows.next == 2)) + * q.select(monotonically_increasing_id).show --- End diff -- I thought about explaining the "internals" of the operator through a more involved example and actually thought about removing the line 1166 (but forgot). I think the following lines make for a very in-depth explanation and use other operators in use. In other words, I'm in favour of removing the line 1166 and leaving the others with no changes. Possible? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21858: [SPARK-24899][SQL][DOC] Add example of monotonica...
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/spark/pull/21858 [SPARK-24899][SQL][DOC] Add example of monotonically_increasing_id standard function to scaladoc ## What changes were proposed in this pull request? Example of `monotonically_increasing_id` standard function (with how it works internally) in scaladoc ## How was this patch tested? Local build. Waiting for Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/spark SPARK-24899-monotonically_increasing_id Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21858.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21858 commit 29def0069d96ca449204ad27e8c66ca2a218ce84 Author: Jacek Laskowski Date: 2018-07-24T09:34:49Z [SPARK-24899][SQL][DOC] Add example of monotonically_increasing_id standard function to scaladoc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r204098057 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -270,7 +270,7 @@ case class FileSourceScanExec( private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") - override val metadata: Map[String, String] = { + override lazy val metadata: Map[String, String] = { --- End diff -- Ouch. I'd have never thought about any code with `RDD` and physical operators on the executor-side (!) Learnt it today. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203666346 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { +withTempPath { path => + spark.range(1).toDF().write.parquet(path.getAbsolutePath) --- End diff -- Redundant `toDF` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203665574 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -270,7 +270,7 @@ case class FileSourceScanExec( private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter) logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}") - override val metadata: Map[String, String] = { + override lazy val metadata: Map[String, String] = { --- End diff -- That's driver-only too, isn't it? Why is this `lazy` required? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203666893 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { +withTempPath { path => + spark.range(1).toDF().write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val fileSourceScanExec = + df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get --- End diff -- This `isInstanceOf` is a bit non-Scala IMHO and I'd prefer `collectFirst { case op: FileSourceScanExec => op }` instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203666125 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { --- End diff -- nit: s/in/on --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203667943 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSourceScanExecSuite.scala --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.test.SharedSQLContext + +class FileSourceScanExecSuite extends SharedSQLContext { + test("FileSourceScanExec should be canonicalizable in executor side") { +withTempPath { path => + spark.range(1).toDF().write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val fileSourceScanExec = + df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get + try { +spark.range(1).foreach(_ => fileSourceScanExec.canonicalized) + } catch { +case e: Throwable => fail("FileSourceScanExec was not canonicalizable", e) --- End diff -- It's a named test so I'd get rid of the `try-catch` block because: 1. It's going to fail the test anyway 2. The title of the test matches the `fail` message. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203664621 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -166,10 +166,10 @@ case class FileSourceScanExec( override val tableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with ColumnarBatchScan { - override val supportsBatch: Boolean = relation.fileFormat.supportBatch( + override lazy val supportsBatch: Boolean = relation.fileFormat.supportBatch( relation.sparkSession, StructType.fromAttributes(output)) - override val needsUnsafeRowConversion: Boolean = { + override lazy val needsUnsafeRowConversion: Boolean = { if (relation.fileFormat.isInstanceOf[ParquetSource]) { SparkSession.getActiveSession.get.sessionState.conf.parquetVectorizedReaderEnabled --- End diff -- Since you mentioned `SparkSession`, that line caught my attention where the active `SparkSession` is accessed using `SparkSession.getActiveSession.get` not `relation.sparkSession` as is the case for other places. I think that's something worth considering changing since we're at it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21815: [SPARK-23731][SQL] Make FileSourceScanExec canoni...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21815#discussion_r203665187 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -199,7 +199,7 @@ case class FileSourceScanExec( ret } - override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { + override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { --- End diff -- That happens on the driver so no need for the `lazy` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12119: [SPARK-14288][SQL] Memory Sink for streaming
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/12119 Use u...@spark.apache.org mailing list to ask questions (see http://spark.apache.org/community.html#mailing-lists). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler i...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21510#discussion_r194632125 --- Diff: core/src/main/scala/org/apache/spark/ui/WebUI.scala --- @@ -88,41 +90,41 @@ private[spark] abstract class WebUI( handlers += renderHandler } - /** Attach a handler to this UI. */ + /** Attaches a handler to this UI. */ def attachHandler(handler: ServletContextHandler) { handlers += handler serverInfo.foreach(_.addHandler(handler)) } - /** Detach a handler from this UI. */ + /** Detaches a handler from this UI. */ def detachHandler(handler: ServletContextHandler) { handlers -= handler serverInfo.foreach(_.removeHandler(handler)) } /** - * Add a handler for static content. + * Adds a handler for static content. * * @param resourceBase Root of where to find resources to serve. * @param path Path in UI where to mount the resources. */ - def addStaticHandler(resourceBase: String, path: String): Unit = { + def addStaticHandler(resourceBase: String, path: String = "/static"): Unit = { attachHandler(JettyUtils.createStaticHandler(resourceBase, path)) } /** - * Remove a static content handler. + * Removes a static content handler. * * @param path Path in UI to unmount. */ def removeStaticHandler(path: String): Unit = { --- End diff -- OK...since @vanzin requested I'm gonna make all the other changes while at it :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/21510 @kiszk @jerryshao @srowen Added `s` (and even more scaladoc). Thanks for reviewing (and hopefully merging right after :))! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/21510 May I ask for some help merging it? /cc @srowen @holdenk @kiszk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler i...
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/spark/pull/21510 [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web UIs `WebUI` defines `addStaticHandler` that web UIs don't use (and simply introduce duplication). Let's clean them up and remove duplications. Local build and waiting for Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/spark SPARK-24490-Use-WebUI.addStaticHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21510.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21510 commit 58a9ec42402cc92675e3e057309a803d08fd0cd7 Author: Jacek Laskowski Date: 2018-06-07T21:56:38Z [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web UIs Closes https://issues.apache.org/jira/browse/SPARK-24490 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21448: [SPARK-24408][SQL][DOC] Move abs, bitwiseNOT, isnan, nan...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/21448 It is such a small change that I don't think it's going to take long to get merged. Reaching out to friendly folks to reach a consensus on it :) /cc @srowen @holdenk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21448: [SPARK-24408][SQL][DOC] Move abs, bitwiseNOT, isn...
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/spark/pull/21448 [SPARK-24408][SQL][DOC] Move abs, bitwiseNOT, isnan, nanvl functions to math_funcs group ## What changes were proposed in this pull request? A few math functions (`abs` , `bitwiseNOT`, `isnan`, `nanvl`) are not in **math_funcs** group. They should really be. ## How was this patch tested? Awaiting Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/spark SPARK-24408-math-funcs-doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21448.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21448 commit 25fe97f05aac5c4f44bea9d1356722e6735cc940 Author: Jacek Laskowski Date: 2018-05-29T07:32:12Z [SPARK-24408][SQL][DOC] Move abs, bitwiseNOT, isnan, nanvl functions to math_funcs group --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21316: [SPARK-20538][SQL] Wrap Dataset.reduce with withN...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/21316#discussion_r187899299 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1607,7 +1607,9 @@ class Dataset[T] private[sql]( */ @Experimental @InterfaceStability.Evolving - def reduce(func: (T, T) => T): T = rdd.reduce(func) + def reduce(func: (T, T) => T): T = withNewExecutionId { --- End diff -- @maropu When you asked about this API did you refer to `reduce` or `withNewExecutionId`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20856: [SPARK-23731][SQL] FileSourceScanExec throws NullPointer...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/20856 BTW, I've just realized that even without the issue it's clear that creating a new `FileSourceScanExec` will end up with a NPE from the `supportsBatch` field. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20856: [SPARK-23731][SQL] FileSourceScanExec throws NullPointer...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/20856 I spent over 2 days applying different modifications to the query hoping I could cut the number of `CASE WHEN`s and other projections, but noticed no correlation between the number or their "types". I'll see if renaming the columns leads to the issue and submit a test case. Thanks for your support! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20856: [SPARK-23731][SQL] FileSourceScanExec throws Null...
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/spark/pull/20856 [SPARK-23731][SQL] FileSourceScanExec throws NullPointerException in subexpression elimination ## What changes were proposed in this pull request? Avoids ("fixes") a NullPointerException in subexpression elimination for subqueries with FileSourceScanExec. ## How was this patch tested? Local build. No new tests as I could not reproduce it other than using the query and data under NDA. Waiting for Jenkins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/spark SPARK-23731-FileSourceScanExec-throws-NPE Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20856.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20856 commit 39814216026da32eee5aabf3886bbedd3b90ed08 Author: Jacek Laskowski <jacek@...> Date: 2018-03-18T17:12:32Z [SPARK-23731][SQL] FileSourceScanExec throws NullPointerException in subexpression elimination --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20855: [SPARK-23731][SQL] FileSourceScanExec throws Null...
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/spark/pull/20855 [SPARK-23731][SQL] FileSourceScanExec throws NullPointerException in subexpression elimination ## What changes were proposed in this pull request? Avoids (not necessarily fixes) a NullPointerException in subexpression elimination for subqueries with FileSourceScanExec. ## How was this patch tested? Local build. No new tests as I could not reproduce it other than using the query and data under NDA. Waiting for Jenkins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/spark SPARK-23731-FileSourceScanExec-throws-NPE Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20855.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20855 commit 8ef323c572cee181e3bdbddeeb7119eda03d78f4 Author: Dongjoon Hyun <dongjoon@...> Date: 2018-01-17T06:32:18Z [SPARK-23072][SQL][TEST] Add a Unicode schema test for file-based data sources ## What changes were proposed in this pull request? After [SPARK-20682](https://github.com/apache/spark/pull/19651), Apache Spark 2.3 is able to read ORC files with Unicode schema. Previously, it raises `org.apache.spark.sql.catalyst.parser.ParseException`. This PR adds a Unicode schema test for CSV/JSON/ORC/Parquet file-based data sources. Note that TEXT data source only has [a single column with a fixed name 'value'](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala#L71). ## How was this patch tested? Pass the newly added test case. Author: Dongjoon Hyun <dongj...@apache.org> Closes #20266 from dongjoon-hyun/SPARK-23072. (cherry picked from commit a0aedb0ded4183cc33b27e369df1cbf862779e26) Signed-off-by: Wenchen Fan <wenc...@databricks.com> commit bfbc2d41b8a9278b347b6df2d516fe4679b41076 Author: Henry Robinson <henry@...> Date: 2018-01-17T08:01:41Z [SPARK-23062][SQL] Improve EXCEPT documentation ## What changes were proposed in this pull request? Make the default behavior of EXCEPT (i.e. EXCEPT DISTINCT) more explicit in the documentation, and call out the change in behavior from 1.x. Author: Henry Robinson <he...@cloudera.com> Closes #20254 from henryr/spark-23062. (cherry picked from commit 1f3d933e0bd2b1e934a233ed699ad39295376e71) Signed-off-by: gatorsmile <gatorsm...@gmail.com> commit cbb6bda437b0d2832496b5c45f8264e5527f1cce Author: Dongjoon Hyun <dongjoon@...> Date: 2018-01-17T13:53:36Z [SPARK-21783][SQL] Turn on ORC filter push-down by default ## What changes were proposed in this pull request? ORC filter push-down is disabled by default from the beginning, [SPARK-2883](https://github.com/apache/spark/commit/aa31e431fc09f0477f1c2351c6275769a31aca90#diff-41ef65b9ef5b518f77e2a03559893f4dR149 ). Now, Apache Spark starts to depend on Apache ORC 1.4.1. For Apache Spark 2.3, this PR turns on ORC filter push-down by default like Parquet ([SPARK-9207](https://issues.apache.org/jira/browse/SPARK-21783)) as a part of [SPARK-20901](https://issues.apache.org/jira/browse/SPARK-20901), "Feature parity for ORC with Parquet". ## How was this patch tested? Pass the existing tests. Author: Dongjoon Hyun <dongj...@apache.org> Closes #20265 from dongjoon-hyun/SPARK-21783. (cherry picked from commit 0f8a28617a0742d5a99debfbae91222c2e3b5cec) Signed-off-by: Wenchen Fan <wenc...@databricks.com> commit aae73a21a42fa366a09c2be1a4b91308ef211beb Author: Wang Gengliang <ltnwgl@...> Date: 2018-01-17T16:05:26Z [SPARK-23079][SQL] Fix query constraints propagation with aliases ## What changes were proposed in this pull request? Previously, PR #19201 fix the problem of non-converging constraints. After that PR #19149 improve the loop and constraints is inferred only once. So the problem of non-converging constraints is gone. However, the case below will fail. ``` spark.range(5).write.saveAsTable("t") val t = spark.read.table("t") val left = t.withColumn("xid", $"id" + lit(1)).as("x") val right = t.withColumnRenamed("id", "xid").as("y") val df = left.join(right, "xid").filter("id = 3").toDF() checkAnswer(df, Row(4, 3)) ``` Because `aliasMap` replace all the aliased child. See the test case in PR for details. This PR is to fix this bug by removing usel
[GitHub] spark pull request #20855: [SPARK-23731][SQL] FileSourceScanExec throws Null...
Github user jaceklaskowski closed the pull request at: https://github.com/apache/spark/pull/20855 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20832: [SPARK-20536][SQL] Extend ColumnName to create St...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20832#discussion_r174700327 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala --- @@ -1208,85 +1208,172 @@ class ColumnName(name: String) extends Column(name) { */ def boolean: StructField = StructField(name, BooleanType) + /** + * Creates a new `StructField` of type boolean. + * @since 2.3.0 + */ + def boolean(nullable: Boolean): StructField = StructField(name, BooleanType, nullable) + /** * Creates a new `StructField` of type byte. * @since 1.3.0 */ def byte: StructField = StructField(name, ByteType) + /** + * Creates a new `StructField` of type byte. + * @since 2.3.0 --- End diff -- `2.4.0` and in the other places too (unless they patch 2.3.0 and becomes 2.3.1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20832: [SPARK-20536][SQL] Extend ColumnName to create St...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20832#discussion_r174699743 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala --- @@ -1208,85 +1208,172 @@ class ColumnName(name: String) extends Column(name) { */ def boolean: StructField = StructField(name, BooleanType) + /** + * Creates a new `StructField` of type boolean. + * @since 2.3.0 --- End diff -- `2.4.0` I think --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20550: [MINOR][HIVE] Typo fixes
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/20550 I'll try to add more typos from other modules. I didn't mean to "pollute" a minor change and make it bigger (and possibly questionable :)) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20550: [MINOR][HIVE] Typo fixes
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/spark/pull/20550 [MINOR][HIVE] Typo fixes ## What changes were proposed in this pull request? Typo fixes (with expanding a Hive property) ## How was this patch tested? local build. Awaiting Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/spark hiveutils-typos Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20550.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20550 commit dcbb4da3b2501d74deb43df8b879b5e75154a51b Author: Jacek Laskowski <jacek@...> Date: 2018-02-08T18:36:02Z [MINOR][HIVE] Typo fixes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20438: [SPARK-23272][SQL] add calendar interval type sup...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20438#discussion_r164728359 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/CalendarInterval.java --- @@ -87,7 +87,7 @@ public static CalendarInterval fromString(String s) { } } - public static long toLongWithRange(String fieldName, + private static long toLongWithRange(String fieldName, --- End diff -- Why?! It's much harder (if at all possible) to test `private` methods (been bitten few times this week and remember the pain). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20438: [SPARK-23272][SQL] add calendar interval type sup...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20438#discussion_r164729684 --- Diff: sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java --- @@ -235,10 +237,30 @@ public MapData getMap(int ordinal) { */ public abstract byte[] getBinary(int rowId); + /** + * Returns the calendar interval type value for rowId. + * + * In Spark, calendar interval type value is basically an integer value representing the number of + * months in this interval, and a long value representing the number of microseconds in this + * interval. A interval type vector is same as a struct type vector with 2 fields: `months` and + * `microseconds`. + * + * To support interval type, implementations must implement {@link #getChild(int)} and define 2 + * child vectors: the first child vector is a int type vector, containing all the month values of + * all the interval values in this vector. The second child vector is a long type vector, + * containing all the microsecond values of all the interval values in this vector. + */ + public final CalendarInterval getInterval(int rowId) { +if (isNullAt(rowId)) return null; +final int months = getChild(0).getInt(rowId); +final long microseconds = getChild(1).getLong(rowId); +return new CalendarInterval(months, microseconds); + } + /** * Returns the ordinal's child column vector. --- End diff -- `@return [[ColumnVector]] at the ordinal` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20438: [SPARK-23272][SQL] add calendar interval type sup...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20438#discussion_r164729086 --- Diff: sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java --- @@ -235,10 +237,30 @@ public MapData getMap(int ordinal) { */ public abstract byte[] getBinary(int rowId); + /** + * Returns the calendar interval type value for rowId. + * + * In Spark, calendar interval type value is basically an integer value representing the number of + * months in this interval, and a long value representing the number of microseconds in this + * interval. A interval type vector is same as a struct type vector with 2 fields: `months` and --- End diff -- "**An** interval" + "is **the** same as" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20438: [SPARK-23272][SQL] add calendar interval type sup...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20438#discussion_r164729429 --- Diff: sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java --- @@ -235,10 +237,30 @@ public MapData getMap(int ordinal) { */ public abstract byte[] getBinary(int rowId); + /** + * Returns the calendar interval type value for rowId. + * + * In Spark, calendar interval type value is basically an integer value representing the number of + * months in this interval, and a long value representing the number of microseconds in this + * interval. A interval type vector is same as a struct type vector with 2 fields: `months` and + * `microseconds`. + * + * To support interval type, implementations must implement {@link #getChild(int)} and define 2 + * child vectors: the first child vector is a int type vector, containing all the month values of + * all the interval values in this vector. The second child vector is a long type vector, + * containing all the microsecond values of all the interval values in this vector. + */ + public final CalendarInterval getInterval(int rowId) { +if (isNullAt(rowId)) return null; +final int months = getChild(0).getInt(rowId); --- End diff -- What's the purpose of `final` keyword here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20438: [SPARK-23272][SQL] add calendar interval type sup...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20438#discussion_r164729250 --- Diff: sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java --- @@ -235,10 +237,30 @@ public MapData getMap(int ordinal) { */ public abstract byte[] getBinary(int rowId); + /** + * Returns the calendar interval type value for rowId. + * + * In Spark, calendar interval type value is basically an integer value representing the number of + * months in this interval, and a long value representing the number of microseconds in this + * interval. A interval type vector is same as a struct type vector with 2 fields: `months` and + * `microseconds`. + * + * To support interval type, implementations must implement {@link #getChild(int)} and define 2 + * child vectors: the first child vector is a int type vector, containing all the month values of --- End diff -- is **an** int type vector --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20405: [SPARK-23229][SQL] Dataset.hint should use planWi...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20405#discussion_r164267100 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1216,7 +1216,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan { -UnresolvedHint(name, parameters, logicalPlan) +UnresolvedHint(name, parameters, planWithBarrier) --- End diff -- I thought that that's what `ResolveBroadcastHints` does --> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala#L93-L101, doesn't it? I'm going to write a test case for it to confirm (and that's what I was asking for in the email to dev@spark the other day). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20405: [SPARK-23229][SQL] Dataset.hint should use planWithBarri...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/20405 Looks like the tests failed due to _"java.io.IOException: Failed to delete: /home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-5a9b5811-306d-4ba0-8bfb-9e263ddf47b8"_ Is this because of the change or a "misnomer"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20405: [SPARK-23229][SQL] Dataset.hint should use planWi...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20405#discussion_r164191381 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -1216,7 +1216,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def hint(name: String, parameters: Any*): Dataset[T] = withTypedPlan { -UnresolvedHint(name, parameters, logicalPlan) +UnresolvedHint(name, parameters, planWithBarrier) --- End diff -- My understanding however is that `planWithBarrier` is already analyzed (and `ResolveBroadcastHints` as the very first rule had its chance to do its work). That's the extra processing `hint` does every time it's called. Using `planWithBarrier` makes it less "painful". Just use `hint` twice and see the analyzed plan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20405: [SPARK-23229][SQL] Dataset.hint should use planWithBarri...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/20405 /cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20405: [SPARK-23229][SQL] Dataset.hint should use planWi...
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/spark/pull/20405 [SPARK-23229][SQL] Dataset.hint should use planWithBarrier logical plan ## What changes were proposed in this pull request? Every time `Dataset.hint` is used it triggers execution of logical commands, their unions and hint resolution (among other things that analyzer does). `hint` should use `planWithBarrier` instead. Fixes https://issues.apache.org/jira/browse/SPARK-23229 ## How was this patch tested? Existing unit tests, local build + awaiting Jenkins You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/spark SPARK-23229-hint-planWithBarrier Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20405 commit 47bb245353202208f2c41634c3796c8e4d2be663 Author: Jacek Laskowski <jacek@...> Date: 2018-01-26T11:20:48Z [SPARK-23229][SQL] Dataset.hint should use planWithBarrier logical plan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20368: [SPARK-23195] [SQL] Keep the Hint of Cached Data
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20368#discussion_r163376001 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -126,6 +126,22 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { } } + test("broadcast hint is retained in a cached plan") { +Seq(true, false).foreach { materialized => + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { +val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") +val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value") +broadcast(df2).cache() +if (materialized) df2.collect() +val df3 = df1.join(df2, Seq("key"), "inner") --- End diff -- `val df3 = df1.join(df2, "key")`? `inner` is implied, isn't it? (I'm proposing the change as this and other tests could be easily used as a learning tool to master Spark SQL's API) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20368: [SPARK-23195] [SQL] Keep the Hint of Cached Data
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20368#discussion_r163375534 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala --- @@ -126,6 +126,22 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils { } } + test("broadcast hint is retained in a cached plan") { +Seq(true, false).foreach { materialized => + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { +val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") --- End diff -- Is `spark.createDataFrame(...)` wrapper really required? I thought `Seq((1, "4"), (2, "2")).toDF("key", "value")` would just work fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20368: [SPARK-23195] [SQL] Keep the Hint of Cached Data
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20368#discussion_r163375216 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -77,7 +77,7 @@ case class InMemoryRelation( // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache statsOfPlanToCache } else { - Statistics(sizeInBytes = batchStats.value.longValue) + Statistics(sizeInBytes = batchStats.value.longValue, hints = statsOfPlanToCache.hints) --- End diff -- Why don't you simply `statsOfPlanToCache.copy(sizeInBytes = batchStats.value.longValue)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20344: [MINOR] Typo fixes
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/20344 The builds failed due to a change in one of the error messages that the tests assert (!) Fixing... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20344: [MINOR] Typo fixes
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/spark/pull/20344 [MINOR] Typo fixes ## What changes were proposed in this pull request? Typo fixes ## How was this patch tested? Local build / Doc-only changes You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/spark typo-fixes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20344.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20344 commit 9fff0ed104650f4e92ae87deb91381cd79ac5bfa Author: Jacek Laskowski <jacek@...> Date: 2018-01-21T17:59:26Z [MINOR] Typo fixes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20076: [SPARK-21786][SQL] When acquiring 'compressionCod...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20076#discussion_r159142765 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -323,11 +323,13 @@ object SQLConf { .createWithDefault(false) val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec") -.doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " + - "uncompressed, snappy, gzip, lzo.") +.doc("Sets the compression codec use when writing Parquet files. If other compression codec " + --- End diff -- s/use when/used when --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20076: [SPARK-21786][SQL] When acquiring 'compressionCod...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20076#discussion_r159142783 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -364,7 +366,9 @@ object SQLConf { .createWithDefault(true) val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec") -.doc("Sets the compression codec use when writing ORC files. Acceptable values include: " + +.doc("Sets the compression codec use when writing ORC files. If other compression codec " + --- End diff -- s/use when/used when --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20076: [SPARK-21786][SQL] When acquiring 'compressionCod...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/20076#discussion_r159142760 --- Diff: docs/sql-programming-guide.md --- @@ -953,8 +953,10 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession spark.sql.parquet.compression.codec snappy -Sets the compression codec use when writing Parquet files. Acceptable values include: -uncompressed, snappy, gzip, lzo. +Sets the compression codec use when writing Parquet files. If other compression codec --- End diff -- s/use when/used when --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19774: [SPARK-22475][SQL] show histogram in DESC COLUMN ...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19774#discussion_r151838625 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -689,6 +689,11 @@ case class DescribeColumnCommand( buffer += Row("distinct_count", cs.map(_.distinctCount.toString).getOrElse("NULL")) buffer += Row("avg_col_len", cs.map(_.avgLen.toString).getOrElse("NULL")) buffer += Row("max_col_len", cs.map(_.maxLen.toString).getOrElse("NULL")) + buffer ++= cs.flatMap(_.histogram.map { hist => +val header = Row("histogram", s"height: ${hist.height}, num_of_bins: ${hist.bins.length}") +Seq(header) ++ hist.bins.map(bin => + Row("", s"lower_bound: ${bin.lo}, upper_bound: ${bin.hi}, distinct_count: ${bin.ndv}")) --- End diff -- @wzhfy I'd rather define a `val` with the comment being the name of the val. That would make it "compile-safe". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19407: [SPARK-21667][Streaming] ConsoleSink should not f...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19407#discussion_r151838606 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -267,11 +267,12 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { useTempCheckpointLocation = true, trigger = trigger) } else { - val (useTempCheckpointLocation, recoverFromCheckpointLocation) = + val recoverFromCheckpointLocation = true + val useTempCheckpointLocation = if (source == "console") { - (true, true) + true } else { - (false, true) + false --- End diff -- Do we really need it anymore since the `if` expression is just `source == "console"`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19725: [DO NOT REVIEW][SPARK-22042] [SQL] Insert shuffle...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19725#discussion_r151741374 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/InjectPlaceholderExchange.scala --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.exchange + +import org.apache.spark.sql.catalyst.expressions.SortOrder +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.internal.SQLConf + +case class InjectPlaceholderExchange(conf: SQLConf) extends Rule[SparkPlan] { + private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions + + /** + * Given a required distribution, returns a partitioning that satisfies that distribution. + * @param requiredDistribution The distribution that is required by the operator + * @param numPartitions Used when the distribution doesn't require a specific number of partitions + */ + private def createPartitioning(requiredDistribution: Distribution, + numPartitions: Int): Partitioning = { +requiredDistribution match { + case AllTuples => SinglePartition + case ClusteredDistribution(clustering, desiredPartitions) => +HashPartitioning(clustering, desiredPartitions.getOrElse(numPartitions)) + case OrderedDistribution(ordering) => RangePartitioning(ordering, numPartitions) + case dist => sys.error(s"Do not know how to satisfy distribution $dist") +} + } + + def apply(plan: SparkPlan): SparkPlan = plan.transformUp { +case operator @ ShuffleExchangeExec(partitioning, child, _) => + child.children match { +case ShuffleExchangeExec(childPartitioning, baseChild, _)::Nil => --- End diff -- No white spaces around `::` intended? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r151739773 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -318,16 +318,28 @@ case class AlterTableChangeColumnCommand( s"'${newColumn.name}' with type '${newColumn.dataType}'") } +val changeSchema = originColumn.dataType != newColumn.dataType --- End diff -- What do you think about renaming the val to `typeChanged`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r151740225 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala --- @@ -1459,6 +1459,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { // Ensure that change column will preserve other metadata fields. sql("ALTER TABLE dbx.tab1 CHANGE COLUMN col1 col1 INT COMMENT 'this is col1'") assert(getMetadata("col1").getString("key") == "value") + +// Ensure that change column type take effect --- End diff -- s/change/changing + s/take/takes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19773: [SPARK-22546][SQL] Supporting for changing column...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19773#discussion_r151739604 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala --- @@ -318,16 +318,28 @@ case class AlterTableChangeColumnCommand( s"'${newColumn.name}' with type '${newColumn.dataType}'") } +val changeSchema = originColumn.dataType != newColumn.dataType val newSchema = table.schema.fields.map { field => if (field.name == originColumn.name) { -// Create a new column from the origin column with the new comment. -addComment(field, newColumn.getComment) +var newField = field --- End diff -- I'd recommend getting rid of this `var` and re-writting the code as follows: ``` val newField = newColumn.getComment.map(...).getOrElse(field) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19774: [SPARK-22475][SQL] show histogram in DESC COLUMN ...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19774#discussion_r151737674 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -689,6 +689,11 @@ case class DescribeColumnCommand( buffer += Row("distinct_count", cs.map(_.distinctCount.toString).getOrElse("NULL")) buffer += Row("avg_col_len", cs.map(_.avgLen.toString).getOrElse("NULL")) buffer += Row("max_col_len", cs.map(_.maxLen.toString).getOrElse("NULL")) + buffer ++= cs.flatMap(_.histogram.map { hist => --- End diff -- I'm pretty sure that for-comprehension would make the code read easier. ```scala for { c <- cs hist <- c.histogram ... } yield ... ``` Let me know if you need help with that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] kafka pull request #4038: [KAFKA-4818][FOLLOW-UP] Include isolationLevel in ...
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/kafka/pull/4038 [KAFKA-4818][FOLLOW-UP] Include isolationLevel in toString of FetchRequest Include `isolationLevel` in `toString` of `FetchRequest` This is a follow-up to https://issues.apache.org/jira/browse/KAFKA-4818. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/kafka KAFKA-4818-isolationLevel Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4038.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4038 commit 12e6367952c93326b687d22771882d732fd41cf3 Author: Jacek Laskowski <ja...@japila.pl> Date: 2017-10-07T14:48:19Z [KAFKA-4818][FOLLOW-UP] Include isolationLevel in toString of FetchRequest ---
[GitHub] spark pull request #19407: [SPARK-21667][Streaming] ConsoleSink should not f...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19407#discussion_r142022819 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala --- @@ -269,7 +269,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } else { val (useTempCheckpointLocation, recoverFromCheckpointLocation) = if (source == "console") { - (true, false) + (true, true) --- End diff -- Is there any source that uses `recoverFromCheckpointLocation` disabled? What's the use case if any? Remove `recoverFromCheckpointLocation` here as it's always `true` and make it explicit. The JIRA issue is to fix the exception followed by cleaning the code that was needed in the past. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19261: [SPARK-22040] Add current_date function with timezone id
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/19261 OK I feel convinced that you feel convinced Spark SQL should not offer this as part of the public API. Thanks for being with me for so long and patient to explain the things. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19261: [SPARK-22040] Add current_date function with timezone id
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/19261 @rxin @gatorsmile Let me ask you a very similar question then, why does `CurrentDate` operator has the optional timezone parameter? What's the purpose? Wouldn't that answer your questions? I don't mind not having the change, but am curious what is the reason for the "mismatch"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19261: [SPARK-22040] Add current_date function with timezone id
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/19261 @gatorsmile Dunno, but the logical operator does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19261: [SPARK-22040] Add current_date function with time...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19261#discussion_r139309272 --- Diff: python/pyspark/sql/functions.py --- @@ -793,12 +793,12 @@ def ntile(n): # -- Date/Timestamp functions -- @since(1.5) -def current_date(): +def current_date(timeZone=None): --- End diff -- Would the change beg a different `@since`? It's no longer true that it existed `since(1.5)`, is it? Just asking...no idea how it really should be. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19261: [SPARK-22040] Add current_date function with time...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19261#discussion_r139309246 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -2508,6 +2508,14 @@ object functions { def current_date(): Column = withExpr { CurrentDate() } /** + * Returns the current date in the given timezone as a date column. + * + * @group datetime_funcs + * @since 2.3.0 + */ + def current_date(timeZone: String): Column = withExpr { CurrentDate(Option(timeZone)) } --- End diff -- s/`Option`/`Some` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19261: [SPARK-22040] Add current_date function with time...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19261#discussion_r139309261 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -2508,6 +2508,14 @@ object functions { def current_date(): Column = withExpr { CurrentDate() } /** + * Returns the current date in the given timezone as a date column. --- End diff -- `@return`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19112: [SPARK-21901][SS] Define toString for StateOperatorProgr...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/19112 Hey @HyukjinKwon, as the only committer who's been involved in this PR, could you review it again and possibly merge to master? Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19112: [SPARK-21901][SS] Define toString for StateOperat...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19112#discussion_r136750244 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -200,7 +202,7 @@ class SourceProgress protected[sql]( */ @InterfaceStability.Evolving class SinkProgress protected[sql]( -val description: String) extends Serializable { --- End diff -- Should I fix the other places then? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19112: [SPARK-21901][SS] Define toString for StateOperat...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19112#discussion_r136726445 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -177,11 +179,11 @@ class SourceProgress protected[sql]( } ("description" -> JString(description)) ~ - ("startOffset" -> tryParse(startOffset)) ~ - ("endOffset" -> tryParse(endOffset)) ~ - ("numInputRows" -> JInt(numInputRows)) ~ - ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ - ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) --- End diff -- I though it'd been the opposite - see https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala#L53-L57 and https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala#L128-L140. I've made the section to match the style of the others in the source file. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19112: [SPARK-21901][SS] Define toString for StateOperat...
Github user jaceklaskowski commented on a diff in the pull request: https://github.com/apache/spark/pull/19112#discussion_r136726289 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala --- @@ -200,7 +202,7 @@ class SourceProgress protected[sql]( */ @InterfaceStability.Evolving class SinkProgress protected[sql]( -val description: String) extends Serializable { --- End diff -- Really?! Then the other places in the file are incorrect like https://github.com/jaceklaskowski/spark/blob/337ad489bb43ef93a651bdd4952bd7f0738698dc/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala#L90 or https://github.com/jaceklaskowski/spark/blob/337ad489bb43ef93a651bdd4952bd7f0738698dc/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala#L161? I might be missing something though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19112: [SPARK-21901][SS] Define toString for StateOperat...
GitHub user jaceklaskowski opened a pull request: https://github.com/apache/spark/pull/19112 [SPARK-21901][SS] Define toString for StateOperatorProgress ## What changes were proposed in this pull request? Just `StateOperatorProgress.toString` + few formatting fixes ## How was this patch tested? Local build. Waiting for OK from Jenkins. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jaceklaskowski/spark SPARK-21901-StateOperatorProgress-toString Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19112.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19112 commit 337ad489bb43ef93a651bdd4952bd7f0738698dc Author: Jacek Laskowski <ja...@japila.pl> Date: 2017-09-03T18:17:45Z [SPARK-21901][SS] Define toString for StateOperatorProgress --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org