[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21190 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89963/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21190 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21190 **[Test build #89963 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89963/testReport)** for PR 21190 at commit [`fc67909`](https://github.com/apache/spark/commit/fc679098d917d226a834a8ab6d08c23dbe5bf7db). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class WidenSetOperationTypes(conf: SQLConf) extends Rule[LogicalPlan] ` * ` case class FunctionArgumentConversion(conf: SQLConf) extends TypeCoercionRule ` * ` case class CaseWhenCoercion(conf: SQLConf) extends TypeCoercionRule ` * ` case class IfCoercion(conf: SQLConf) extends TypeCoercionRule ` * ` case class ImplicitTypeCasts(conf: SQLConf) extends TypeCoercionRule ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20959: [SPARK-23846][SQL] The samplingRatio option for CSV data...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20959 **[Test build #89964 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89964/testReport)** for PR 20959 at commit [`d4d9d65`](https://github.com/apache/spark/commit/d4d9d65ce28c4176c085449564c8e5f8ec0b3ff7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20959: [SPARK-23846][SQL] The samplingRatio option for CSV data...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20959 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21180: [SPARK-22674][PYTHON] Disabled _hack_namedtuple f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21180#discussion_r184870575 --- Diff: python/pyspark/serializers.py --- @@ -523,7 +523,21 @@ def namedtuple(*args, **kwargs): for k, v in _old_namedtuple_kwdefaults.items(): kwargs[k] = kwargs.get(k, v) cls = _old_namedtuple(*args, **kwargs) -return _hack_namedtuple(cls) + +import sys +f = sys._getframe(1) --- End diff -- Yea but thing is, that the doc says this not guaranteed although most of Python implementations look having it - there's a risk here we should take (it could be broken in a specific implementation of Python although it sounds unlikely). Is there any other way to avoid this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20140: [SPARK-19228][SQL] Introduce tryParseDate method to proc...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20140 Not a big deal but mind fixing the PR title to be complete and fix the PR description as the format indicates? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20140: [SPARK-19228][SQL] Introduce tryParseDate method ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20140#discussion_r184870477 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala --- @@ -140,14 +141,23 @@ private[csv] object CSVInferSchema { private def tryParseDouble(field: String, options: CSVOptions): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) { DoubleType +} else { + tryParseDate(field, options) +} + } + + private def tryParseDate(field: String, options: CSVOptions): DataType = { +// This case infers a custom `dateFormat` is set. +if ((allCatch opt options.dateFormatter.parse(field)).isDefined) { + DateType } else { tryParseTimestamp(field, options) } } private def tryParseTimestamp(field: String, options: CSVOptions): DataType = { -// This case infers a custom `dataFormat` is set. -if ((allCatch opt options.timestampFormat.parse(field)).isDefined) { +// This case infers a custom `timestampFormat` is set. +if ((allCatch opt options.timestampFormatter.parse(field)).isDefined) { --- End diff -- Probably, adding a configuration to control this behaviour looks preferred in this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20140: [SPARK-19228][SQL] Introduce tryParseDate method ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20140#discussion_r184870456 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala --- @@ -140,14 +141,23 @@ private[csv] object CSVInferSchema { private def tryParseDouble(field: String, options: CSVOptions): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field, options)) { DoubleType +} else { + tryParseDate(field, options) +} + } + + private def tryParseDate(field: String, options: CSVOptions): DataType = { +// This case infers a custom `dateFormat` is set. +if ((allCatch opt options.dateFormatter.parse(field)).isDefined) { + DateType } else { tryParseTimestamp(field, options) } } private def tryParseTimestamp(field: String, options: CSVOptions): DataType = { -// This case infers a custom `dataFormat` is set. -if ((allCatch opt options.timestampFormat.parse(field)).isDefined) { +// This case infers a custom `timestampFormat` is set. +if ((allCatch opt options.timestampFormatter.parse(field)).isDefined) { --- End diff -- Should we replace it to `timestampFormat` everywhere and document it in the migration guide? (e.g., date format is now inferred correctly and also things you mentioned in https://github.com/apache/spark/pull/20140#discussion_r166261313) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20140: [SPARK-19228][SQL] Introduce tryParseDate method ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20140#discussion_r184870401 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala --- @@ -150,6 +151,16 @@ class CSVOptions( val isCommentSet = this.comment != '\u' + lazy val dateFormatter: DateTimeFormatter = { --- End diff -- `@transient lazy val` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20140: [SPARK-19228][SQL] Introduce tryParseDate method ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20140#discussion_r184870380 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala --- @@ -216,6 +226,8 @@ private[csv] object CSVInferSchema { } else { Some(DecimalType(range + scale, scale)) } +// By design 'TimestampType' (8 bytes) is larger than 'DateType' (4 bytes). +case (t1: DateType, t2: TimestampType) => Some(TimestampType) --- End diff -- I think we should do the opposite case too ``` case (t1: TimestampType, t2: DateType) => Some(TimestampType) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20937 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20937 It doesn't necessarily make a followup for styles but it should be good to remember those when we review related PRs next time. Thanks for bearing with me all here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20937 Merged to master !!! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184870228 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of JSON parsing when encoding is set and isn't. --- End diff -- I usually avoid abbreviation in the doc tho. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184870234 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -175,11 +187,18 @@ object MultiLineJsonDataSource extends JsonDataSource { .values } - private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = { -val path = new Path(record.getPath()) -CreateJacksonParser.inputStream( - jsonFactory, - CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path)) + private def dataToInputStream(dataStream: PortableDataStream): InputStream = { +val path = new Path(dataStream.getPath()) + CodecStreams.createInputStreamWithCloseResource(dataStream.getConfiguration, path) + } + + private def createParser(jsonFactory: JsonFactory, stream: PortableDataStream): JsonParser = { +CreateJacksonParser.inputStream(jsonFactory, dataToInputStream(stream)) + } + + private def createParser(enc: String, jsonFactory: JsonFactory, + stream: PortableDataStream): JsonParser = { --- End diff -- ditto for style --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184870219 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2171,241 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "test-data/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "test-data/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("test-data/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, + expectedContent: String): Unit = { --- End diff -- I think it should be ``` def checkEncoding( expectedEncoding: String, pathToJsonFiles: String, expectedContent: String): Unit = { ``` per https://github.com/databricks/scala-style-guide#spacing-and-indentation or ``` def checkEncoding( expectedEncoding: String, pathToJsonFiles: String, expectedContent: String): Unit = { ``` if it fits per https://github.com/databricks/scala-style-guide/issues/58#issue-243844040 Not a big deal --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184870196 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2171,241 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "test-data/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "test-data/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("test-data/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "test-data/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding(expectedEncoding: String, pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.json(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write.json(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = "UTF-8", +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: wrong output encoding") { +val encoding = "UTF-128" +
[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21190 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2741/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21190 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21190 **[Test build #89963 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89963/testReport)** for PR 21190 at commit [`fc67909`](https://github.com/apache/spark/commit/fc679098d917d226a834a8ab6d08c23dbe5bf7db). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21190 I believe this is also the root cause of the branch 2.3 test failures like https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-branch-2.3-test-sbt-hadoop-2.6/lastCompletedBuild/testReport/org.apache.spark.sql.execution.datasources.parquet/ParquetQuerySuite/SPARK_15678__not_use_cache_on_append/ This PR might be too large to backport, we should look into how branch master avoids the test failures and backport it 2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21190: [SPARK-22938][SQL][followup] Assert that SQLConf.get is ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21190 cc @juliuszsompolski @kiszk @dongjoon-hyun @gatorsmile @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21190: [SPARK-22938][SQL][followup] Assert that SQLConf....
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/21190 [SPARK-22938][SQL][followup] Assert that SQLConf.get is accessed only on the driver ## What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/20136 . #20136 didn't really work because in the test, we are using local backend, which shares the driver side `SparkEnv`, so `SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER` doesn't work. This PR changes the check to `TaskContext.get != null`, and move the check to `SQLConf.get`, and fix all the places that violate this check: * `InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there. * `DataType#sameType` may be executed in executor side, for things like json schema inference, so we can't call `conf.caseSensitiveAnalysis` there. This contributes to most of the code changes, as we need to add `caseSensitive` parameter to a lot of methods. * `ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't can't call `conf.parquetFilterPushDownDate` there. * `WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there. ## How was this patch tested? existing test You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark minor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21190.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21190 commit fc679098d917d226a834a8ab6d08c23dbe5bf7db Author: Wenchen FanDate: 2018-04-29T01:15:14Z SQLConf should not be accessed in executor --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21133#discussion_r184868808 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala --- @@ -279,4 +282,11 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(query, expected) } } + + test("SPARK-24013: unneeded compress can cause performance issues with sorted input") { +failAfter(30 seconds) { --- End diff -- We can add a UT for `ApproximatePercentile`, and check that after calling `add`, `isCompressed` is still false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21167: [SPARK-24100][PYSPARK]Add the CompressionCodec to the sa...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/21167 The Scala API `saveAsTextFiles` doesn't have this param. If we want to add it to Python, shouldn't we also add it to Scala API? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21109 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89961/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21109 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21109 **[Test build #89961 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89961/testReport)** for PR 21109 at commit [`e6e6628`](https://github.com/apache/spark/commit/e6e6628bf3d63e0486c2ba90c03712aa0eade013). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20937 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20937 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89962/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20937 **[Test build #89962 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89962/testReport)** for PR 20937 at commit [`d3d28aa`](https://github.com/apache/spark/commit/d3d28aa852dc90acc898df5b7a4e38135b0daf10). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...
Github user zecevicp commented on the issue: https://github.com/apache/spark/pull/21109 Hi, Gaido, thanks for the comment. As I said, it was difficult to debug it and I didn't have time. We might open a different ticket for the non-wholestage codegen case, once this is merged? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/21109 @zecevicp wholestage codegen now is turned on by default only if we have few columns (less than 100). This can be false in many real use-cases. Is there any specific reason why this optimization cannot be applied to the non-wholestage codegen case? If not, I think it is worth to consider also this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21133#discussion_r184866957 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala --- @@ -279,4 +282,11 @@ class ApproximatePercentileQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(query, expected) } } + + test("SPARK-24013: unneeded compress can cause performance issues with sorted input") { +failAfter(30 seconds) { --- End diff -- I agree that this is not the best UT, but I couldn't find any better way to test this. If anybody has any idea of a better test, I am happy to follow your right suggestion... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21133#discussion_r184866940 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala --- @@ -238,12 +238,6 @@ object ApproximatePercentile { summaries = summaries.insert(value) // The result of QuantileSummaries.insert is un-compressed isCompressed = false --- End diff -- I think so, since we still compress in many places: in `merge`, `getPercentiles` and in `quantileSummaries`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21173: [SPARK-23856][SQL] Add an option `queryTimeout` in JDBCO...
Github user MaxGekk commented on the issue: https://github.com/apache/spark/pull/21173 Please, have a look at `dropTable`, `truncateTable` and `createTable` in `JdbcUtils.scala`. It would be nice to set timeouts for statements inside of the methods too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21136 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21136 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89960/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21136 **[Test build #89960 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89960/testReport)** for PR 21136 at commit [`ad7a7f8`](https://github.com/apache/spark/commit/ad7a7f88c9d2fe312a7f51a497e8cc146065e29d). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` case class TestStreamingRelationV2(output: Seq[Attribute]) extends LeafNode ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21136 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21136 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89959/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21136 **[Test build #89959 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89959/testReport)** for PR 21136 at commit [`e671c78`](https://github.com/apache/spark/commit/e671c78802f6a8ea54e0ac62fa4fa2c3f3f5b6c4). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...
Github user zecevicp commented on the issue: https://github.com/apache/spark/pull/21109 Hey Liang-Chi, thanks for looking into this. Yes, the problem can be circumvented by changing the join condition as you describe, but only in the benchmark case, because my "expensive function" was a bit misleading. The problem is not in the function itself, but in the number of rows that are checked for each pair of matching equi-join keys. I changed the benchmark test case now so to better demonstrate this. I completely removed the expensive function and I'm only doing a count on the matched rows. The results are the following. Without the optimization: ``` AMD EPYC 7401 24-Core Processor sort merge join: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative - sort merge join wholestage off30956 / 31374 0.0 75575.5 1.0X sort merge join wholestage on 10864 / 11043 0.0 26523.6 2.8X ``` With the optimization: ``` AMD EPYC 7401 24-Core Processor sort merge join: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative - sort merge join wholestage off30734 / 31135 0.0 75035.2 1.0X sort merge join wholestage on959 / 1040 0.4 2341.3 32.0X ``` This shows a 10x improvement over the non-optimized case (as I already said, this depends on the range condition, number of matched rows, the calculated function, etc.). Regarding your second question as to why is the "wholestage off" case in the optimized version so slow, that is because the optimization is turned off when the wholestage code generation is turned off. And that is simply because it was too hard to debug it and I figured the wholestage generation is on by default, so I'm guessing (and hoping) that it would not be too hard of a requirement to have to turn wholestage codegen on if you want to use this optimization. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184863165 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -43,7 +47,38 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(record.getBytes, 0, record.getLength) } - def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { -jsonFactory.createParser(record) + def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = { +val bais = new ByteArrayInputStream(in, 0, length) +val byteChannel = Channels.newChannel(bais) +val decodingBufferSize = Math.min(length, 8192) +val decoder = Charset.forName(enc).newDecoder() + +StreamDecoder.forDecoder(byteChannel, decoder, decodingBufferSize) + } + + def text(enc: String, jsonFactory: JsonFactory, record: Text): JsonParser = { +val sd = getStreamDecoder(enc, record.getBytes, record.getLength) +jsonFactory.createParser(sd) + } + + def inputStream(jsonFactory: JsonFactory, is: InputStream): JsonParser = { +jsonFactory.createParser(is) + } + + def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = { +jsonFactory.createParser(new InputStreamReader(is, enc)) --- End diff -- I added a comment above --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184863159 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -372,6 +372,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. * `multiLine` (default `false`): parse one record, which may span multiple lines, * per file + * `encoding` (by default it is not set): allows to forcibly set one of standard basic --- End diff -- I updated python's comment to make it the same as here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support cus...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20937 **[Test build #89962 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89962/testReport)** for PR 20937 at commit [`d3d28aa`](https://github.com/apache/spark/commit/d3d28aa852dc90acc898df5b7a4e38135b0daf10). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21109: [SPARK-24020][SQL] Sort-merge join inner range optimizat...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21109 **[Test build #89961 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89961/testReport)** for PR 21109 at commit [`e6e6628`](https://github.com/apache/spark/commit/e6e6628bf3d63e0486c2ba90c03712aa0eade013). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184861410 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #21180: [SPARK-22674][PYTHON] Disabled _hack_namedtuple f...
Github user superbobry commented on a diff in the pull request: https://github.com/apache/spark/pull/21180#discussion_r184860643 --- Diff: python/pyspark/serializers.py --- @@ -523,7 +523,21 @@ def namedtuple(*args, **kwargs): for k, v in _old_namedtuple_kwdefaults.items(): kwargs[k] = kwargs.get(k, v) cls = _old_namedtuple(*args, **kwargs) -return _hack_namedtuple(cls) + +import sys +f = sys._getframe(1) --- End diff -- Good point. [`collections.nametuple`](https://github.com/python/cpython/blob/master/Lib/collections/__init__.py#L466) has a fix for Jython and IronPython. I can backport it for completeness, but realistically, the probability of someone running PySpark on these implementations is not very high. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20140: [SPARK-19228][SQL] Introduce tryParseDate method to proc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20140 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89958/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20140: [SPARK-19228][SQL] Introduce tryParseDate method to proc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20140 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20140: [SPARK-19228][SQL] Introduce tryParseDate method to proc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20140 **[Test build #89958 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89958/testReport)** for PR 20140 at commit [`84b236a`](https://github.com/apache/spark/commit/84b236a742e7f5a62ee2e6ce6d230c3e6628294b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21136 **[Test build #89960 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89960/testReport)** for PR 21136 at commit [`ad7a7f8`](https://github.com/apache/spark/commit/ad7a7f88c9d2fe312a7f51a497e8cc146065e29d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21136: [SPARK-24061][SS]Add TypedFilter support for cont...
Github user yanlin-Lynn commented on a diff in the pull request: https://github.com/apache/spark/pull/21136#discussion_r184859268 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala --- @@ -840,4 +857,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite { def this(attribute: Attribute) = this(Seq(attribute)) override def isStreaming: Boolean = true } + + case class StreamingRelationV2(output: Seq[Attribute]) extends LeafNode { --- End diff -- ack --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21136: [SPARK-24061][SS]Add TypedFilter support for continuous ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21136 **[Test build #89959 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89959/testReport)** for PR 21136 at commit [`e671c78`](https://github.com/apache/spark/commit/e671c78802f6a8ea54e0ac62fa4fa2c3f3f5b6c4). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21136: [SPARK-24061][SS]Add TypedFilter support for cont...
Github user yanlin-Lynn commented on a diff in the pull request: https://github.com/apache/spark/pull/21136#discussion_r184858343 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala --- @@ -771,6 +778,16 @@ class UnsupportedOperationsSuite extends SparkFunSuite { } } + /** Assert that the logical plan is supportd for continuous procsssing mode */ --- End diff -- ah, my bad!. Thanks for pointing it out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21136: [SPARK-24061][SS]Add TypedFilter support for cont...
Github user yanlin-Lynn commented on a diff in the pull request: https://github.com/apache/spark/pull/21136#discussion_r184858144 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala --- @@ -840,4 +857,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite { def this(attribute: Attribute) = this(Seq(attribute)) override def isStreaming: Boolean = true } + + case class StreamingRelationV2(output: Seq[Attribute]) extends LeafNode { --- End diff -- I have tried to do that, but the code in [UnsupportedOperationChecker](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala#L349) will fail the UT. So, should I change the checking logic there too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21173: [SPARK-23856][SQL] Add an option `queryTimeout` in JDBCO...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21173 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21173: [SPARK-23856][SQL] Add an option `queryTimeout` in JDBCO...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21173 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89956/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21189: [SPARK-24117][SQL] Unified the getSizePerRow
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21189 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89955/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21173: [SPARK-23856][SQL] Add an option `queryTimeout` in JDBCO...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21173 **[Test build #89956 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89956/testReport)** for PR 21173 at commit [`3d3f84e`](https://github.com/apache/spark/commit/3d3f84e64dc7cd3c84d1fe0d93d39ca277fcb681). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21189: [SPARK-24117][SQL] Unified the getSizePerRow
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21189 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21189: [SPARK-24117][SQL] Unified the getSizePerRow
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21189 **[Test build #89955 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89955/testReport)** for PR 21189 at commit [`cd41538`](https://github.com/apache/spark/commit/cd415381386f0ac5c29cd6dab57ceafc86e96adf). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21177: [SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21177 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21177: [SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21177 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89957/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21177: [SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries i...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21177 **[Test build #89957 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89957/testReport)** for PR 21177 at commit [`99ecd12`](https://github.com/apache/spark/commit/99ecd123a8c5971f80fecb39f44d039be513a27b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20140: [SPARK-19228][SQL] Introduce tryParseDate method to proc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20140 **[Test build #89958 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89958/testReport)** for PR 20140 at commit [`84b236a`](https://github.com/apache/spark/commit/84b236a742e7f5a62ee2e6ce6d230c3e6628294b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20140: [SPARK-19228][SQL] Introduce tryParseDate method to proc...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20140 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21167: [SPARK-24100][PYSPARK]Add the CompressionCodec to...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21167#discussion_r184854225 --- Diff: python/pyspark/streaming/dstream.py --- @@ -249,15 +249,15 @@ def countByValue(self): """ return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y) -def saveAsTextFiles(self, prefix, suffix=None): +def saveAsTextFiles(self, prefix, suffix=None, compressionCodecClass=None): """ Save each RDD in this DStream as at text file, using string representation of elements. --- End diff -- Shall we add a param doc? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21180: [SPARK-22674][PYTHON] Disabled _hack_namedtuple f...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21180#discussion_r184854019 --- Diff: python/pyspark/serializers.py --- @@ -523,7 +523,21 @@ def namedtuple(*args, **kwargs): for k, v in _old_namedtuple_kwdefaults.items(): kwargs[k] = kwargs.get(k, v) cls = _old_namedtuple(*args, **kwargs) -return _hack_namedtuple(cls) + +import sys +f = sys._getframe(1) --- End diff -- Hm .. https://docs.python.org/2/library/sys.html#sys._getframe > CPython implementation detail: This function should be used for internal and specialized purposes only. It is not guaranteed to exist in all implementations of Python. Is it safe to use it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21169: [SPARK-23715][SQL] the input of to/from_utc_timestamp ca...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21169 I think we should fix the doc here too: https://github.com/apache/spark/blob/cd10f9df8284ee8a5d287b2cd204c70b8ba87f5e/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L2871-L2872 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21169: [SPARK-23715][SQL] the input of to/from_utc_times...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21169#discussion_r184852914 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -296,10 +296,28 @@ object DateTimeUtils { * `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m` */ def stringToTimestamp(s: UTF8String): Option[SQLTimestamp] = { -stringToTimestamp(s, defaultTimeZone()) +stringToTimestamp(s, defaultTimeZone(), rejectTzInString = false) } def stringToTimestamp(s: UTF8String, timeZone: TimeZone): Option[SQLTimestamp] = { +stringToTimestamp(s, timeZone, rejectTzInString = false) + } + + /** + * Converts a timestamp string to microseconds from the unix epoch, w.r.t. the given timezone. --- End diff -- BTW, I usually avoid abbreviation in doc tho (w.r.t.). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851362 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851747 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851331 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") --- End diff -- Hm, why should we replace spaces? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851348 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") --- End diff -- I think `.mode("overwrite")` is not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851361 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") --- End diff -- ditto ---
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851438 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184850594 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -43,7 +47,38 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(record.getBytes, 0, record.getLength) } - def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { -jsonFactory.createParser(record) + def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = { --- End diff -- nit: private? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851413 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184850820 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -86,14 +86,43 @@ private[sql] class JSONOptions( val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) + /** + * A string between two consecutive JSON records. + */ val lineSeparator: Option[String] = parameters.get("lineSep").map { sep => require(sep.nonEmpty, "'lineSep' cannot be an empty string.") sep } - // Note that the option 'lineSep' uses a different default value in read and write. - val lineSeparatorInRead: Option[Array[Byte]] = -lineSeparator.map(_.getBytes(StandardCharsets.UTF_8)) - // Note that JSON uses writer with UTF-8 charset. This string will be written out as UTF-8. + + /** + * Standard encoding (charset) name. For example UTF-8, UTF-16LE and UTF-32BE. + * If the encoding is not specified (None), it will be detected automatically + * when the multiLine option is set to `true`. + */ + val encoding: Option[String] = parameters.get("encoding") +.orElse(parameters.get("charset")).map { enc => + // The following encodings are not supported in per-line mode (multiline is false) + // because they cause some problems in reading files with BOM which is supposed to + // present in the files with such encodings. After splitting input files by lines, + // only the first lines will have the BOM which leads to impossibility for reading + // the rest lines. Besides of that, the lineSep option must have the BOM in such + // encodings which can never present between lines. + val blacklist = Seq(Charset.forName("UTF-16"), Charset.forName("UTF-32")) + val isBlacklisted = blacklist.contains(Charset.forName(enc)) + require(multiLine || !isBlacklisted, +s"""The ${enc} encoding must not be included in the blacklist when multiLine is disabled: + | ${blacklist.mkString(", ")}""".stripMargin) + + val forcingLineSep = !(multiLine == false && --- End diff -- `forcingLineSep` -> things like ... `isLineSepRequired`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851795 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184850865 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala --- @@ -372,6 +372,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * `java.text.SimpleDateFormat`. This applies to timestamp type. * `multiLine` (default `false`): parse one record, which may span multiple lines, * per file + * `encoding` (by default it is not set): allows to forcibly set one of standard basic --- End diff -- Not a big deal but shall we match the description to Python side? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184850683 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/CreateJacksonParser.scala --- @@ -43,7 +47,38 @@ private[sql] object CreateJacksonParser extends Serializable { jsonFactory.createParser(record.getBytes, 0, record.getLength) } - def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = { -jsonFactory.createParser(record) + def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = { +val bais = new ByteArrayInputStream(in, 0, length) +val byteChannel = Channels.newChannel(bais) +val decodingBufferSize = Math.min(length, 8192) +val decoder = Charset.forName(enc).newDecoder() + +StreamDecoder.forDecoder(byteChannel, decoder, decodingBufferSize) + } + + def text(enc: String, jsonFactory: JsonFactory, record: Text): JsonParser = { +val sd = getStreamDecoder(enc, record.getBytes, record.getLength) +jsonFactory.createParser(sd) + } + + def inputStream(jsonFactory: JsonFactory, is: InputStream): JsonParser = { +jsonFactory.createParser(is) + } + + def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = { +jsonFactory.createParser(new InputStreamReader(is, enc)) --- End diff -- I think https://github.com/apache/spark/pull/20937#issuecomment-381406357 is a good investigation. It should be good to leave a small note that we should avoid this way if possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851656 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851652 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851642 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851188 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure JSON read/write performance. + * To run this: + * spark-submit --class --jars + */ +object JSONBenchmarks { + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("benchmark-json-datasource") +.config(conf) +.getOrCreate() + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + + def schemaInferring(rowsNum: Int): Unit = { +val benchmark = new Benchmark("JSON schema inferring", rowsNum) + +withTempPath { path => + // scalastyle:off + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on + + spark.sparkContext.range(0, rowsNum, 1) +.map(_ => "a") +.toDF("fieldA") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + benchmark.addCase("No encoding", 3) { _ => +spark.read.json(path.getAbsolutePath) + } + + benchmark.addCase("UTF-8 is set", 3) { _ => +spark.read + .option("encoding", "UTF-8") + .json(path.getAbsolutePath) + } + + /* + Intel(R) Core(TM) i7-7920HQ CPU @ 3.10GHz + + JSON schema inferring: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + + No encoding 38902 / 39282 2.6 389.0 1.0X + UTF-8 is set56959 / 57261 1.8 569.6 0.7X + */ + benchmark.run() +} + } + + def perlineParsing(rowsNum: Int): Unit = { +val benchmark = new Benchmark("JSON per-line parsing", rowsNum) + +withTempPath { path => + // scalastyle:off --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851231 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" --- End diff -- Shall we put the files in `test-data`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851099 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure JSON read/write performance. --- End diff -- I think we should mention the purpose of this is to check when encoding is set or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851211 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } --- End diff -- Let's put this up like `CSVSuite` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851606 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2167,4 +2167,259 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val sampled = spark.read.option("samplingRatio", 1.0).json(ds) assert(sampled.count() == ds.count()) } + + def testFile(fileName: String): String = { + Thread.currentThread().getContextClassLoader.getResource(fileName).toString + } + + test("SPARK-23723: json in UTF-16 with BOM") { +val fileName = "json-tests/utf16WithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .option("encoding", "UTF-16") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"), Row("Doug", "Rood"))) + } + + test("SPARK-23723: multi-line json in UTF-32BE with BOM") { +val fileName = "json-tests/utf32BEWithBOM.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Use user's encoding in reading of multi-line json in UTF-16LE") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "UTF-16LE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + test("SPARK-23723: Unsupported encoding name") { +val invalidCharset = "UTF-128" +val exception = intercept[UnsupportedCharsetException] { + spark.read +.options(Map("encoding" -> invalidCharset, "lineSep" -> "\n")) +.json(testFile("json-tests/utf16LE.json")) +.count() +} + +assert(exception.getMessage.contains(invalidCharset)) + } + + test("SPARK-23723: checking that the encoding option is case agnostic") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val jsonDF = spark.read.schema(schema) + .option("multiline", "true") + .options(Map("encoding" -> "uTf-16lE")) + .json(testFile(fileName)) + +checkAnswer(jsonDF, Seq(Row("Chris", "Baird"))) + } + + + test("SPARK-23723: specified encoding is not matched to actual encoding") { +val fileName = "json-tests/utf16LE.json" +val schema = new StructType().add("firstName", StringType).add("lastName", StringType) +val exception = intercept[SparkException] { + spark.read.schema(schema) +.option("mode", "FAILFAST") +.option("multiline", "true") +.options(Map("encoding" -> "UTF-16BE")) +.json(testFile(fileName)) +.count() +} +val errMsg = exception.getMessage + +assert(errMsg.contains("Malformed records are detected in record parsing")) + } + + def checkEncoding( + expectedEncoding: String, + pathToJsonFiles: String, + expectedContent: String): Unit = { +val jsonFiles = new File(pathToJsonFiles) + .listFiles() + .filter(_.isFile) + .filter(_.getName.endsWith("json")) +val actualContent = jsonFiles.map { file => + new String(Files.readAllBytes(file.toPath), expectedEncoding) +}.mkString.trim.replaceAll(" ", "") + +assert(actualContent == expectedContent) + } + + test("SPARK-23723: save json in UTF-32BE") { +val encoding = "UTF-32BE" +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.options(Map("encoding" -> encoding, "lineSep" -> "\n")) +.format("json").mode("overwrite") +.save(path.getCanonicalPath) + + checkEncoding( +expectedEncoding = encoding, +pathToJsonFiles = path.getCanonicalPath, +expectedContent = """{"_1":"Dog","_2":42}""") +} + } + + test("SPARK-23723: save json in default encoding - UTF-8") { +withTempPath { path => + val df = spark.createDataset(Seq(("Dog", 42))) + df.write +.format("json").mode("overwrite") +.save(path.getCanonicalPath)
[GitHub] spark pull request #20937: [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Supp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20937#discussion_r184851052 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.json + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.{LongType, StringType, StructType} +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure JSON read/write performance. + * To run this: + * spark-submit --class --jars + */ +object JSONBenchmarks { + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("benchmark-json-datasource") +.config(conf) +.getOrCreate() + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + + def schemaInferring(rowsNum: Int): Unit = { +val benchmark = new Benchmark("JSON schema inferring", rowsNum) + +withTempPath { path => + // scalastyle:off --- End diff -- ``` // scalastyle:off println ... // scalastyle:on println ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21177: [SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21177 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21177: [SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries i...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21177 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2740/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21173: [SPARK-23856][SQL] Add an option `queryTimeout` in JDBCO...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21173 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21173: [SPARK-23856][SQL] Add an option `queryTimeout` in JDBCO...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21173 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2739/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21177: [SPARK-24111][SQL] Add the TPCDS v2.7 (latest) queries i...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21177 **[Test build #89957 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89957/testReport)** for PR 21177 at commit [`99ecd12`](https://github.com/apache/spark/commit/99ecd123a8c5971f80fecb39f44d039be513a27b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21177: [SPARK-24111][SQL] Add the TPCDS v2.7 (latest) qu...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21177#discussion_r184851009 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala --- @@ -78,7 +81,7 @@ object TPCDSQueryBenchmark extends Logging { } val numRows = queryRelations.map(tableSizes.getOrElse(_, 0L)).sum val benchmark = new Benchmark(s"TPCDS Snappy", numRows, 5) - benchmark.addCase(name) { i => + benchmark.addCase(s"$name$nameSuffix") { _ => --- End diff -- yes and no; I feel both is ok. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org