[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 Yep, should be doable without too much effort. On Sun, Jun 4, 2017 at 9:54 PM, Xiao Li wrote: > @NathanHowell <https://github.com/nathanhowell> It sounds like we also > can provide multi-line support for JSON too. For example, in a single JSON > file > > {"a": 1, > "b": 1.1} > {"a": 2, "b": 1.1} > {"a": 3, "b": 1.1} > > When using the wholeFile mode, we only parse the first Json record {"a": > 1, b": 1.1} but ignore the following records. It sounds like we should > also parse them too and rename wholeFile to multiLine? > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/16386#issuecomment-306102836>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAKbTeYAiNRX7wtQDPt4NRvVgAbIcbSGks5sA4nvgaJpZM4LUihp> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #12217: [WIP][SPARK-14408][CORE] Changed RDD.treeAggregate to us...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/12217 Nothing looks obviously broken, their combiner looks fine. Rerunning the tests would help. On Jun 2, 2017 07:02, "Hyukjin Kwon" wrote: > Hi @jkbradley <https://github.com/jkbradley> and @srowen > <https://github.com/srowen>, could we retest this just to see the error > messages? It looks the last test results are not accessible (to me). > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/12217#issuecomment-305796729>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAKbTYW4U9nMQhZ3uGwnF-p7aYOmEAU8ks5sABXrgaJpZM4IBfi3> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17255: [SPARK-19918][SQL] Use TextFileFormat in implemen...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/17255#discussion_r105942833 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala --- @@ -40,18 +40,11 @@ private[sql] object JsonInferSchema { json: RDD[T], configOptions: JSONOptions, createParser: (JsonFactory, T) => JsonParser): StructType = { -require(configOptions.samplingRatio > 0, - s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0") val shouldHandleCorruptRecord = configOptions.permissive val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord -val schemaData = if (configOptions.samplingRatio > 0.99) { - json -} else { - json.sample(withReplacement = false, configOptions.samplingRatio, 1) -} --- End diff -- Yah, perhaps the RDD->Dataset changes should be done under a separate issue. I think it can be done across the board (removing most/all RDD references) but I'm not sure what other implications it would have. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17255: [SPARK-19918][SQL] Use TextFileFormat in implementation ...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/17255 Would there be any additional benefit of replacing more (or all?) of the uses of `RDD` with the equivalent `Dataset` operations? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17255: [SPARK-19918][SQL] Use TextFileFormat in implemen...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/17255#discussion_r105563532 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -23,24 +23,25 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser} import com.google.common.io.ByteStreams import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus -import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark.TaskContext import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.{AnalysisException, Encoders, SparkSession} --- End diff -- Is the `Encoders` import still necessary? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16976#discussion_r102668816 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import java.io.InputStream +import java.nio.charset.{Charset, StandardCharsets} + +import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.types.StructType + +/** + * Common functions for parsing CSV files + */ +abstract class CSVDataSource extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into [[InternalRow]] instances. + */ + def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + parsedOptions: CSVOptions): Iterator[InternalRow] + + /** + * Infers the schema from `inputPaths` files. + */ + def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: CSVOptions): Option[StructType] --- End diff -- Sounds good to me. Reducing code duplication between the JSON and CSV parsers would be great. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16976#discussion_r102667812 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import java.io.InputStream +import java.nio.charset.{Charset, StandardCharsets} + +import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.types.StructType + +/** + * Common functions for parsing CSV files + */ +abstract class CSVDataSource extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into [[InternalRow]] instances. + */ + def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + parsedOptions: CSVOptions): Iterator[InternalRow] + + /** + * Infers the schema from `inputPaths` files. + */ + def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: CSVOptions): Option[StructType] + + /** + * Generates a header from the given row which is null-safe and duplicate-safe. + */ + protected def makeSafeHeader( + row: Array[String], + caseSensitive: Boolean, + options: CSVOptions): Array[String] = { +if (options.headerFlag) { + val duplicates = { +val headerNames = row.filter(_ != null) + .map(name => if (caseSensitive) name else name.toLowerCase) +headerNames.diff(headerNames.distinct).distinct + } + + row.zipWithIndex.map { case (value, index) => +if (value == null || value.isEmpty || value == options.nullValue) { + // When there are empty strings or the values set in `nullValue`, put the + // index as the suffix. + s"_c$index" +} else if (!caseSensitive && duplicates.contains(value.toLowerCase)) { + // When there are case-insensitive duplicates, put the index as the suffix. + s"$value$index" +} else if (duplicates.contains(value)) { + // When there are duplicates, put the index as the suffix. + s"$value$index" +} else { + value +} + } +} else { + row.zipWithIndex.map { case (_, index) => +// Uses default column names, "_c#" where # is its position of fields +// when header option is disabled. +s"_c$index" + } +} + } +} + +object CSVDataSource { + def apply(options: CSVOptions): CSVDataSource = { +if (options.wholeFile) { + WholeFileCSVDataSource +} else { + TextInputCSVDataSource +} + } +} + +object TextInputCSVDataSource extends CSVDataSource { + override val isSplitable: Boolean = true + + override def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + pars
[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16976#discussion_r102665872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import java.io.InputStream +import java.nio.charset.{Charset, StandardCharsets} + +import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.types.StructType + +/** + * Common functions for parsing CSV files + */ +abstract class CSVDataSource extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into [[InternalRow]] instances. + */ + def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + parsedOptions: CSVOptions): Iterator[InternalRow] + + /** + * Infers the schema from `inputPaths` files. + */ + def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: CSVOptions): Option[StructType] + + /** + * Generates a header from the given row which is null-safe and duplicate-safe. + */ + protected def makeSafeHeader( + row: Array[String], + caseSensitive: Boolean, + options: CSVOptions): Array[String] = { +if (options.headerFlag) { + val duplicates = { +val headerNames = row.filter(_ != null) + .map(name => if (caseSensitive) name else name.toLowerCase) +headerNames.diff(headerNames.distinct).distinct + } + + row.zipWithIndex.map { case (value, index) => +if (value == null || value.isEmpty || value == options.nullValue) { + // When there are empty strings or the values set in `nullValue`, put the + // index as the suffix. + s"_c$index" +} else if (!caseSensitive && duplicates.contains(value.toLowerCase)) { + // When there are case-insensitive duplicates, put the index as the suffix. + s"$value$index" +} else if (duplicates.contains(value)) { + // When there are duplicates, put the index as the suffix. + s"$value$index" +} else { + value +} + } +} else { + row.zipWithIndex.map { case (_, index) => +// Uses default column names, "_c#" where # is its position of fields +// when header option is disabled. +s"_c$index" + } +} + } +} + +object CSVDataSource { + def apply(options: CSVOptions): CSVDataSource = { +if (options.wholeFile) { + WholeFileCSVDataSource +} else { + TextInputCSVDataSource +} + } +} + +object TextInputCSVDataSource extends CSVDataSource { + override val isSplitable: Boolean = true + + override def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + pars
[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16976#discussion_r102665619 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import java.io.InputStream +import java.nio.charset.{Charset, StandardCharsets} + +import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.types.StructType + +/** + * Common functions for parsing CSV files + */ +abstract class CSVDataSource extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into [[InternalRow]] instances. + */ + def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + parsedOptions: CSVOptions): Iterator[InternalRow] + + /** + * Infers the schema from `inputPaths` files. + */ + def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: CSVOptions): Option[StructType] --- End diff -- Both subclasses return `Some(...)` so it would be more clear to change this to `def infer(...): StructType` and do the option wrapping only in `CSVFileFormat.inferSchema` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16976#discussion_r102663016 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala --- @@ -43,23 +37,26 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { override def shortName(): String = "csv" - override def toString: String = "CSV" - - override def hashCode(): Int = getClass.hashCode() - - override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat] + override def isSplitable( + sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = { +val parsedOptions = + new CSVOptions(options, sparkSession.sessionState.conf.sessionLocalTimeZone) +val csvDataSource = CSVDataSource(parsedOptions) +csvDataSource.isSplitable && super.isSplitable(sparkSession, options, path) + } override def inferSchema( sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { require(files.nonEmpty, "Cannot infer schema from an empty set of files") --- End diff -- Should this return `None` instead of throwing an exception? The JSON parser does, though I'm not sure which approach is recommended. They should be consistent though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16976#discussion_r102662637 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import java.io.InputStream +import java.nio.charset.{Charset, StandardCharsets} + +import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.types.StructType + +/** + * Common functions for parsing CSV files + */ +abstract class CSVDataSource extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into [[InternalRow]] instances. + */ + def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + parsedOptions: CSVOptions): Iterator[InternalRow] + + /** + * Infers the schema from `inputPaths` files. + */ + def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: CSVOptions): Option[StructType] + + /** + * Generates a header from the given row which is null-safe and duplicate-safe. + */ + protected def makeSafeHeader( + row: Array[String], + caseSensitive: Boolean, + options: CSVOptions): Array[String] = { +if (options.headerFlag) { + val duplicates = { +val headerNames = row.filter(_ != null) + .map(name => if (caseSensitive) name else name.toLowerCase) +headerNames.diff(headerNames.distinct).distinct + } + + row.zipWithIndex.map { case (value, index) => +if (value == null || value.isEmpty || value == options.nullValue) { + // When there are empty strings or the values set in `nullValue`, put the + // index as the suffix. + s"_c$index" +} else if (!caseSensitive && duplicates.contains(value.toLowerCase)) { + // When there are case-insensitive duplicates, put the index as the suffix. + s"$value$index" +} else if (duplicates.contains(value)) { + // When there are duplicates, put the index as the suffix. + s"$value$index" +} else { + value +} + } +} else { + row.zipWithIndex.map { case (_, index) => +// Uses default column names, "_c#" where # is its position of fields +// when header option is disabled. +s"_c$index" + } +} + } +} + +object CSVDataSource { + def apply(options: CSVOptions): CSVDataSource = { +if (options.wholeFile) { + WholeFileCSVDataSource +} else { + TextInputCSVDataSource +} + } +} + +object TextInputCSVDataSource extends CSVDataSource { + override val isSplitable: Boolean = true + + override def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + pars
[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16976#discussion_r102662258 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import java.io.InputStream +import java.nio.charset.{Charset, StandardCharsets} + +import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.types.StructType + +/** + * Common functions for parsing CSV files + */ +abstract class CSVDataSource extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into [[InternalRow]] instances. + */ + def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + parsedOptions: CSVOptions): Iterator[InternalRow] + + /** + * Infers the schema from `inputPaths` files. + */ + def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: CSVOptions): Option[StructType] + + /** + * Generates a header from the given row which is null-safe and duplicate-safe. + */ + protected def makeSafeHeader( + row: Array[String], + caseSensitive: Boolean, + options: CSVOptions): Array[String] = { +if (options.headerFlag) { + val duplicates = { +val headerNames = row.filter(_ != null) + .map(name => if (caseSensitive) name else name.toLowerCase) +headerNames.diff(headerNames.distinct).distinct + } + + row.zipWithIndex.map { case (value, index) => +if (value == null || value.isEmpty || value == options.nullValue) { + // When there are empty strings or the values set in `nullValue`, put the + // index as the suffix. + s"_c$index" +} else if (!caseSensitive && duplicates.contains(value.toLowerCase)) { + // When there are case-insensitive duplicates, put the index as the suffix. + s"$value$index" +} else if (duplicates.contains(value)) { + // When there are duplicates, put the index as the suffix. + s"$value$index" +} else { + value +} + } +} else { + row.zipWithIndex.map { case (_, index) => +// Uses default column names, "_c#" where # is its position of fields +// when header option is disabled. +s"_c$index" + } +} + } +} + +object CSVDataSource { + def apply(options: CSVOptions): CSVDataSource = { +if (options.wholeFile) { + WholeFileCSVDataSource +} else { + TextInputCSVDataSource +} + } +} + +object TextInputCSVDataSource extends CSVDataSource { + override val isSplitable: Boolean = true + + override def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + pars
[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16976#discussion_r102662330 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import java.io.InputStream +import java.nio.charset.{Charset, StandardCharsets} + +import com.univocity.parsers.csv.{CsvParser, CsvParserSettings} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{Dataset, Encoders, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.types.StructType + +/** + * Common functions for parsing CSV files + */ +abstract class CSVDataSource extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into [[InternalRow]] instances. + */ + def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + parsedOptions: CSVOptions): Iterator[InternalRow] + + /** + * Infers the schema from `inputPaths` files. + */ + def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: CSVOptions): Option[StructType] + + /** + * Generates a header from the given row which is null-safe and duplicate-safe. + */ + protected def makeSafeHeader( + row: Array[String], + caseSensitive: Boolean, + options: CSVOptions): Array[String] = { +if (options.headerFlag) { + val duplicates = { +val headerNames = row.filter(_ != null) + .map(name => if (caseSensitive) name else name.toLowerCase) +headerNames.diff(headerNames.distinct).distinct + } + + row.zipWithIndex.map { case (value, index) => +if (value == null || value.isEmpty || value == options.nullValue) { + // When there are empty strings or the values set in `nullValue`, put the + // index as the suffix. + s"_c$index" +} else if (!caseSensitive && duplicates.contains(value.toLowerCase)) { + // When there are case-insensitive duplicates, put the index as the suffix. + s"$value$index" +} else if (duplicates.contains(value)) { + // When there are duplicates, put the index as the suffix. + s"$value$index" +} else { + value +} + } +} else { + row.zipWithIndex.map { case (_, index) => +// Uses default column names, "_c#" where # is its position of fields +// when header option is disabled. +s"_c$index" + } +} + } +} + +object CSVDataSource { + def apply(options: CSVOptions): CSVDataSource = { +if (options.wholeFile) { + WholeFileCSVDataSource +} else { + TextInputCSVDataSource +} + } +} + +object TextInputCSVDataSource extends CSVDataSource { + override val isSplitable: Boolean = true + + override def readFile( + conf: Configuration, + file: PartitionedFile, + parser: UnivocityParser, + pars
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r101671453 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,117 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + assert(new File(path).listFiles().exists(_.getName.endsWith(".gz"))) + + val jsonDF = spark.read.option("wholeFile", true).json(path) --- End diff -- I rewrote these tests. Please take a look @gatorsmile and @cloud-fan. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 @cloud-fan When implementing tests for the other modes I've uncovered an existing bug in schema inference in `DROPMALFORMED` mode: https://issues.apache.org/jira/browse/SPARK-19641. Since it is not introduced in this set of patches I will open a new pull request once this is one merged. You can inspect the fix here: https://github.com/NathanHowell/spark/commit/e233fd03346a73b3b447fa4c24f3b12c8b2e53ae --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 @cloud-fan I just pushed a few more changes to address some of your comments. I'll be back later next week to continue work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100653879 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) --- End diff -- Probably, but I'm out of time for today. I'll be out for a few days and can pick this back up on Thursday or Friday next week. --- If your project is set up for it, you can reply to this email and have yo
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100653835 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) --- End diff -- Probably, but I'm out of time for today. I'll be out for a few days and can pick this back up on Thursday or Friday next week. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100653757 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + assert(jsonDF.count() === corruptRecordCount) + assert(jsonDF.schema === new StructType() +.add("_corrupt_record", StringType) +.add("dummy", StringType)) + val counts = jsonDF
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100653580 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempPath { dir => + val path = dir.getCanonicalPath + spark +.createDataFrame(Seq(Tuple1("{}{invalid}"))) +.coalesce(1) +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + // no corrupt record column should be created + assert(jsonDF.schema === StructType(Seq())) + // only the first object should be read + assert(jsonDF.count() === 1) +} + } + + test("SPARK-18352: Handle corrupt documents") { +withTempPath { dir => + val path = dir.getCanonicalPath + val corruptRecordCount = additionalCorruptRecords.count().toInt + assert(corruptRecordCount === 5) + + additionalCorruptRecords +.toDF("value") +.repartition(corruptRecordCount * 4) --- End diff -- Yep, I'll fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100652445 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.json + +import java.io.InputStream + +import scala.reflect.ClassTag + +import com.fasterxml.jackson.core.{JsonFactory, JsonParser} +import com.google.common.io.ByteStreams +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.execution.datasources.{CodecStreams, HadoopFileLinesReader, PartitionedFile} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils + +/** + * Common functions for parsing JSON files + * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or [[String]] + */ +abstract class JsonDataSource[T] extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances + */ + def readFile( +conf: Configuration, +file: PartitionedFile, +parser: JacksonParser): Iterator[InternalRow] + + /** + * Create an [[RDD]] that handles the preliminary parsing of [[T]] records + */ + protected def createBaseRdd( +sparkSession: SparkSession, +inputPaths: Seq[FileStatus]): RDD[T] + + /** + * A generic wrapper to invoke the correct [[JsonFactory]] method to allocate a [[JsonParser]] + * for an instance of [[T]] + */ + def createParser(jsonFactory: JsonFactory, value: T): JsonParser + + final def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: JSONOptions): Option[StructType] = { +if (inputPaths.nonEmpty) { + val jsonSchema = JsonInferSchema.infer( +createBaseRdd(sparkSession, inputPaths), +parsedOptions, +createParser) + checkConstraints(jsonSchema) + Some(jsonSchema) +} else { + None +} + } + + /** Constraints to be imposed on schema to be stored. */ + private def checkConstraints(schema: StructType): Unit = { --- End diff -- IIRC this was a check added because some of the backends (maybe parquet?) were writing corrupt files... if this is checked globally now it should be fine to remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100652259 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType --- End diff -- Right, it's a single value that spans multiple lines. The Python test is reusing some Python specific test data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100652192 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read --- End diff -- I'm not sure what you mean? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100651990 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") --- End diff -- Right --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100651910 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempPath { dir => + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => --- End diff -- For a reason that is no longer relevant, I'll switch this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100651706 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala --- @@ -79,7 +80,7 @@ private[sql] object JsonInferSchema { private[this] val structFieldComparator = new Comparator[StructField] { override def compare(o1: StructField, o2: StructField): Int = { - o1.name.compare(o2.name) + o1.name.compareTo(o2.name) --- End diff -- `.compare` is a very expensive way of comparing two strings. `compare`: ``` public int compare(org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField); Code: 0: new #14 // class scala/collection/immutable/StringOps 3: dup 4: getstatic #20 // Field scala/Predef$.MODULE$:Lscala/Predef$; 7: aload_1 8: invokevirtual #26 // Method org/apache/spark/sql/types/StructField.name:()Ljava/lang/String; 11: invokevirtual #30 // Method scala/Predef$.augmentString:(Ljava/lang/String;)Ljava/lang/String; 14: invokespecial #34 // Method scala/collection/immutable/StringOps."":(Ljava/lang/String;)V 17: aload_2 18: invokevirtual #26 // Method org/apache/spark/sql/types/StructField.name:()Ljava/lang/String; 21: invokevirtual #37 // Method scala/collection/immutable/StringOps.compare:(Ljava/lang/String;)I 24: ireturn ``` `compareTo`: ``` public int compare(org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField); Code: 0: aload_1 1: invokevirtual #18 // Method org/apache/spark/sql/types/StructField.name:()Ljava/lang/String; 4: aload_2 5: invokevirtual #18 // Method org/apache/spark/sql/types/StructField.name:()Ljava/lang/String; 8: invokevirtual #24 // Method java/lang/String.compareTo:(Ljava/lang/String;)I 11: ireturn ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100650450 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -394,36 +447,32 @@ class JacksonParser( } /** - * Parse the string JSON input to the set of [[InternalRow]]s. + * Parse the JSON input to the set of [[InternalRow]]s. + * + * @param recordLiteral an optional function that will be used to generate + * the corrupt record text instead of record.toString */ - def parse(input: String): Seq[InternalRow] = { -if (input.trim.isEmpty) { - Nil -} else { - try { -Utils.tryWithResource(factory.createParser(input)) { parser => - parser.nextToken() - rootConverter.apply(parser) match { -case null => failedRecord(input) -case row: InternalRow => row :: Nil -case array: ArrayData => - // Here, as we support reading top level JSON arrays and take every element - // in such an array as a row, this case is possible. - if (array.numElements() == 0) { -Nil - } else { -array.toArray[InternalRow](schema) - } -case _ => - failedRecord(input) + def parse[T]( + record: T, + createParser: (JsonFactory, T) => JsonParser, + recordLiteral: PartialFunction[T, UTF8String] = PartialFunction.empty): Seq[InternalRow] = { --- End diff -- Passing a function in instead of a closure saves an allocation that will be held for the duration of parsing, and is likely to be promoted to a later GC generation. If we went the closure route the function signature should be this: ```scala def parse( createParser: JsonFactory => JsonParser, recordLiteral: => UTF8String): Seq[InternalRow] ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100649836 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -48,69 +47,110 @@ class JacksonParser( // A `ValueConverter` is responsible for converting a value from `JsonParser` // to a value in a field for `InternalRow`. - private type ValueConverter = (JsonParser) => Any + private type ValueConverter = JsonParser => AnyRef // `ValueConverter`s for the root schema for all fields in the schema - private val rootConverter: ValueConverter = makeRootConverter(schema) + private val rootConverter = makeRootConverter(schema) private val factory = new JsonFactory() options.setJacksonOptions(factory) private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) + private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) + + @transient + private[this] var isWarningPrinted: Boolean = false + @transient - private[this] var isWarningPrintedForMalformedRecord: Boolean = false + private def printWarningForMalformedRecord(record: () => UTF8String): Unit = { +def sampleRecord: String = { + if (options.wholeFile) { +"" + } else { +s"Sample record: ${record()}\n" + } +} + +def footer: String = { + s"""Code example to print all malformed records (scala): + |=== + |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin +} + +if (options.permissive) { + logWarning( +s"""Found at least one malformed record. The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} else if (options.dropMalformed) { + logWarning( +s"""Found at least one malformed record. The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} + } + + @transient + private def printWarningIfWholeFile(): Unit = { +if (options.wholeFile && corruptFieldIndex.isDefined) { + logWarning( +s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result + |in very large allocations or OutOfMemoryExceptions being raised. + | + """.stripMargin) +} + } /** * This function deals with the cases it fails to parse. This function will be called * when exceptions are caught during converting. This functions also deals with `mode` option. */ - private def failedRecord(record: String): Seq[InternalRow] = { -// create a row even if no corrupt record column is present -if (options.failFast) { - throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record") -} -if (options.dropMalformed) { - if (!isWarningPrintedForMalformedRecord) { -logWarning( - s"""Found at least one malformed records (sample: $record). The JSON reader will drop - |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which - |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE - |mode and use the default inferred schema. - | - |Code example to print all malformed records (scala): - |=== - |// The corrupted record exists in column ${columnNameOfCorruptRecord} - |val parsedJson = spark.read.json("/path/to/json/file/test.json") - | -
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100649635 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -31,10 +31,17 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( -@transient private val parameters: CaseInsensitiveMap) +@transient private val parameters: CaseInsensitiveMap, +defaultColumnNameOfCorruptRecord: String) extends Logging with Serializable { - def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters)) + def this( + parameters: Map[String, String], + defaultColumnNameOfCorruptRecord: String = "") = { --- End diff -- Yes, it's really not a good solution, but it doesn't make sense to have a corrupt column name in all use cases. Picking another sentinel could inadvertently conflict with a real column. It should be `Option[String] = None` but this winds up being a large change that deserves a separate pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100648524 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -394,36 +447,32 @@ class JacksonParser( } /** - * Parse the string JSON input to the set of [[InternalRow]]s. + * Parse the JSON input to the set of [[InternalRow]]s. + * + * @param recordLiteral an optional function that will be used to generate + * the corrupt record text instead of record.toString */ - def parse(input: String): Seq[InternalRow] = { -if (input.trim.isEmpty) { - Nil -} else { - try { -Utils.tryWithResource(factory.createParser(input)) { parser => - parser.nextToken() - rootConverter.apply(parser) match { -case null => failedRecord(input) -case row: InternalRow => row :: Nil -case array: ArrayData => - // Here, as we support reading top level JSON arrays and take every element - // in such an array as a row, this case is possible. - if (array.numElements() == 0) { -Nil - } else { -array.toArray[InternalRow](schema) - } -case _ => - failedRecord(input) + def parse[T]( + record: T, + createParser: (JsonFactory, T) => JsonParser, + recordLiteral: PartialFunction[T, UTF8String] = PartialFunction.empty): Seq[InternalRow] = { +try { + Utils.tryWithResource(createParser(factory, record)) { parser => +// a null first token is equivalent to testing for input.trim.isEmpty +// but it works on any token stream and not just strings +parser.nextToken() match { + case null => Nil + case _ => rootConverter.apply(parser) match { +case null => throw new SparkSQLJsonProcessingException("Root converter returned null") +case rows => rows } } - } catch { -case _: JsonProcessingException => - failedRecord(input) -case _: SparkSQLJsonProcessingException => - failedRecord(input) } +} catch { + case (_: JsonProcessingException) | (_: SparkSQLJsonProcessingException) => +failedRecord(() => recordLiteral.applyOrElse[T, UTF8String]( --- End diff -- When I do that I usually get review comments to make call by name parameters explicit... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100646497 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -48,69 +47,98 @@ class JacksonParser( // A `ValueConverter` is responsible for converting a value from `JsonParser` // to a value in a field for `InternalRow`. - private type ValueConverter = (JsonParser) => Any + private type ValueConverter = JsonParser => AnyRef // `ValueConverter`s for the root schema for all fields in the schema - private val rootConverter: ValueConverter = makeRootConverter(schema) + private val rootConverter = makeRootConverter(schema) private val factory = new JsonFactory() options.setJacksonOptions(factory) private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) + private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) + + @transient + private val printWarningForMalformedRecord = ExecuteOnce[() => UTF8String] { record => +def sampleRecord: String = { + if (options.wholeFile) { +"" + } else { +s"Sample record: ${record()}\n" + } +} + +def footer: String = { + s"""Code example to print all malformed records (scala): + |=== + |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin +} + +if (options.permissive) { + logWarning( +s"""Found at least one malformed record. The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} else if (options.dropMalformed) { + logWarning( +s"""Found at least one malformed record. The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} + } + @transient - private[this] var isWarningPrintedForMalformedRecord: Boolean = false + private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ => +if (options.wholeFile && corruptFieldIndex.isDefined) { + logWarning( +s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result + |in very large allocations or OutOfMemoryExceptions being raised. + | + """.stripMargin) +} + } /** * This function deals with the cases it fails to parse. This function will be called * when exceptions are caught during converting. This functions also deals with `mode` option. */ - private def failedRecord(record: String): Seq[InternalRow] = { -// create a row even if no corrupt record column is present -if (options.failFast) { - throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record") -} -if (options.dropMalformed) { - if (!isWarningPrintedForMalformedRecord) { -logWarning( - s"""Found at least one malformed records (sample: $record). The JSON reader will drop - |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which - |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE - |mode and use the default inferred schema. - | - |Code example to print all malformed records (scala): - |=== - |// The corrupted record exists in column ${columnNameOfCorruptRecord} - |val parsedJson = spark.read.json("/path/to/json/file/test.json") - | - """.stripMargin) -isWarningPrintedForMa
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100640620 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -48,69 +47,98 @@ class JacksonParser( // A `ValueConverter` is responsible for converting a value from `JsonParser` // to a value in a field for `InternalRow`. - private type ValueConverter = (JsonParser) => Any + private type ValueConverter = JsonParser => AnyRef // `ValueConverter`s for the root schema for all fields in the schema - private val rootConverter: ValueConverter = makeRootConverter(schema) + private val rootConverter = makeRootConverter(schema) private val factory = new JsonFactory() options.setJacksonOptions(factory) private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) + private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) + + @transient + private val printWarningForMalformedRecord = ExecuteOnce[() => UTF8String] { record => +def sampleRecord: String = { + if (options.wholeFile) { +"" + } else { +s"Sample record: ${record()}\n" + } +} + +def footer: String = { + s"""Code example to print all malformed records (scala): + |=== + |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin +} + +if (options.permissive) { + logWarning( +s"""Found at least one malformed record. The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} else if (options.dropMalformed) { + logWarning( +s"""Found at least one malformed record. The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} + } + @transient - private[this] var isWarningPrintedForMalformedRecord: Boolean = false + private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ => +if (options.wholeFile && corruptFieldIndex.isDefined) { + logWarning( +s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result + |in very large allocations or OutOfMemoryExceptions being raised. + | + """.stripMargin) +} + } /** * This function deals with the cases it fails to parse. This function will be called * when exceptions are caught during converting. This functions also deals with `mode` option. */ - private def failedRecord(record: String): Seq[InternalRow] = { -// create a row even if no corrupt record column is present -if (options.failFast) { - throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record") -} -if (options.dropMalformed) { - if (!isWarningPrintedForMalformedRecord) { -logWarning( - s"""Found at least one malformed records (sample: $record). The JSON reader will drop - |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which - |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE - |mode and use the default inferred schema. - | - |Code example to print all malformed records (scala): - |=== - |// The corrupted record exists in column ${columnNameOfCorruptRecord} - |val parsedJson = spark.read.json("/path/to/json/file/test.json") - | - """.stripMargin) -isWarningPrintedForMa
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100610662 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -48,69 +47,98 @@ class JacksonParser( // A `ValueConverter` is responsible for converting a value from `JsonParser` // to a value in a field for `InternalRow`. - private type ValueConverter = (JsonParser) => Any + private type ValueConverter = JsonParser => AnyRef // `ValueConverter`s for the root schema for all fields in the schema - private val rootConverter: ValueConverter = makeRootConverter(schema) + private val rootConverter = makeRootConverter(schema) private val factory = new JsonFactory() options.setJacksonOptions(factory) private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) + private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) + + @transient + private val printWarningForMalformedRecord = ExecuteOnce[() => UTF8String] { record => +def sampleRecord: String = { + if (options.wholeFile) { +"" + } else { +s"Sample record: ${record()}\n" + } +} + +def footer: String = { + s"""Code example to print all malformed records (scala): + |=== + |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin +} + +if (options.permissive) { + logWarning( +s"""Found at least one malformed record. The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} else if (options.dropMalformed) { + logWarning( +s"""Found at least one malformed record. The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} + } + @transient - private[this] var isWarningPrintedForMalformedRecord: Boolean = false + private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ => +if (options.wholeFile && corruptFieldIndex.isDefined) { + logWarning( +s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result + |in very large allocations or OutOfMemoryExceptions being raised. + | + """.stripMargin) +} + } /** * This function deals with the cases it fails to parse. This function will be called * when exceptions are caught during converting. This functions also deals with `mode` option. */ - private def failedRecord(record: String): Seq[InternalRow] = { -// create a row even if no corrupt record column is present -if (options.failFast) { - throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record") -} -if (options.dropMalformed) { - if (!isWarningPrintedForMalformedRecord) { -logWarning( - s"""Found at least one malformed records (sample: $record). The JSON reader will drop - |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which - |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE - |mode and use the default inferred schema. - | - |Code example to print all malformed records (scala): - |=== - |// The corrupted record exists in column ${columnNameOfCorruptRecord} - |val parsedJson = spark.read.json("/path/to/json/file/test.json") - | - """.stripMargin) -isWarningPrintedForMa
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100474809 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -48,69 +47,102 @@ class JacksonParser( // A `ValueConverter` is responsible for converting a value from `JsonParser` // to a value in a field for `InternalRow`. - private type ValueConverter = (JsonParser) => Any + private type ValueConverter = JsonParser => AnyRef // `ValueConverter`s for the root schema for all fields in the schema - private val rootConverter: ValueConverter = makeRootConverter(schema) + private val rootConverter = makeRootConverter(schema) private val factory = new JsonFactory() options.setJacksonOptions(factory) private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) + private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) + @transient - private[this] var isWarningPrintedForMalformedRecord: Boolean = false + private var printWarningForMalformedRecord: (() => UTF8String) => Unit = { record => +def sampleRecord: String = { + if (options.wholeFile) { +"" + } else { +s"Sample record: ${record()}\n" + } +} + +def footer: String = { + s"""Code example to print all malformed records (scala): + |=== + |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin +} + +if (options.permissive) { + logWarning( +s"""Found at least one malformed record. The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} else if (options.dropMalformed) { + logWarning( +s"""Found at least one malformed record. The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} + +printWarningForMalformedRecord = _ => () --- End diff -- @cloud-fan please take a look at aafe7bded6e614dddaed74d15cecfb8b1a78a639, I hope this is more clear --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16199: [SPARK-18772][SQL] NaN/Infinite float parsing in JSON is...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16199 @HyukjinKwon Good idea, I'll take another stab and try to revive the original pull request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100344879 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -48,69 +47,102 @@ class JacksonParser( // A `ValueConverter` is responsible for converting a value from `JsonParser` // to a value in a field for `InternalRow`. - private type ValueConverter = (JsonParser) => Any + private type ValueConverter = JsonParser => AnyRef // `ValueConverter`s for the root schema for all fields in the schema - private val rootConverter: ValueConverter = makeRootConverter(schema) + private val rootConverter = makeRootConverter(schema) private val factory = new JsonFactory() options.setJacksonOptions(factory) private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) + private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) + @transient - private[this] var isWarningPrintedForMalformedRecord: Boolean = false + private var printWarningForMalformedRecord: (() => UTF8String) => Unit = { record => +def sampleRecord: String = { + if (options.wholeFile) { +"" + } else { +s"Sample record: ${record()}\n" + } +} + +def footer: String = { + s"""Code example to print all malformed records (scala): + |=== + |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin +} + +if (options.permissive) { + logWarning( +s"""Found at least one malformed record. The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} else if (options.dropMalformed) { + logWarning( +s"""Found at least one malformed record. The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} + +printWarningForMalformedRecord = _ => () --- End diff -- I can wrap it up into a helper class, this is to avoid having to keep multiple variables in sync.. `wholeFile` adds another warning, and I'm sure there will be additional warnings added in the future. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100344282 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -48,69 +47,102 @@ class JacksonParser( // A `ValueConverter` is responsible for converting a value from `JsonParser` // to a value in a field for `InternalRow`. - private type ValueConverter = (JsonParser) => Any + private type ValueConverter = JsonParser => AnyRef // `ValueConverter`s for the root schema for all fields in the schema - private val rootConverter: ValueConverter = makeRootConverter(schema) + private val rootConverter = makeRootConverter(schema) private val factory = new JsonFactory() options.setJacksonOptions(factory) private val emptyRow: Seq[InternalRow] = Seq(new GenericInternalRow(schema.length)) + private val corruptFieldIndex = schema.getFieldIndex(options.columnNameOfCorruptRecord) + @transient - private[this] var isWarningPrintedForMalformedRecord: Boolean = false + private var printWarningForMalformedRecord: (() => UTF8String) => Unit = { record => +def sampleRecord: String = { + if (options.wholeFile) { +"" + } else { +s"Sample record: ${record()}\n" + } +} + +def footer: String = { + s"""Code example to print all malformed records (scala): + |=== + |// The corrupted record exists in column ${options.columnNameOfCorruptRecord}. + |val parsedJson = spark.read.json("/path/to/json/file/test.json") + | + """.stripMargin +} + +if (options.permissive) { + logWarning( +s"""Found at least one malformed record. The JSON reader will replace + |all malformed records with placeholder null in current $PERMISSIVE_MODE parser mode. + |To find out which corrupted records have been replaced with null, please use the + |default inferred schema instead of providing a custom schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} else if (options.dropMalformed) { + logWarning( +s"""Found at least one malformed record. The JSON reader will drop + |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which + |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE + |mode and use the default inferred schema. + | + |${sampleRecord ++ footer} + | + """.stripMargin) +} + +printWarningForMalformedRecord = _ => () + } + + @transient + private var printWarningIfWholeFile: () => Unit = { () => +if (options.wholeFile && corruptFieldIndex.isDefined) { + logWarning( +s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result + |in very large allocations or OutOfMemoryExceptions being raised. + | + """.stripMargin) +} + +printWarningIfWholeFile = () => () + } /** * This function deals with the cases it fails to parse. This function will be called * when exceptions are caught during converting. This functions also deals with `mode` option. */ - private def failedRecord(record: String): Seq[InternalRow] = { -// create a row even if no corrupt record column is present -if (options.failFast) { - throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record") -} -if (options.dropMalformed) { - if (!isWarningPrintedForMalformedRecord) { -logWarning( - s"""Found at least one malformed records (sample: $record). The JSON reader will drop - |all malformed records in current $DROP_MALFORMED_MODE parser mode. To find out which - |corrupted records have been dropped, please switch the parser mode to $PERMISSIVE_MODE - |mode and use the default inferred schema. - | - |Code example to print all malformed records (scala): - |=== - |// The corrupted record exists in column ${columnNameOfCorruptRecord} - |val parsedJson = spark.read.json("/path/to/json/file/test.jso
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100219153 --- Diff: core/src/main/scala/org/apache/spark/input/PortableDataStream.scala --- @@ -194,5 +195,8 @@ class PortableDataStream( } def getPath(): String = path + + @Since("2.2.0") --- End diff -- Done, pushed in f71a465cf07fb9c043b2ccd86fa57e8e8ea9dc00 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 Rebased again to pickup the build break hotfix in c618ccdbe9ac103dfa3182346e2a14a1e7fca91a --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 I rebased to master and hopefully addressed all of your comments @cloud-fan, please have another look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100104738 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -298,22 +312,22 @@ class JacksonParser( // Here, we pass empty `PartialFunction` so that this case can be // handled as a failed conversion. It will throw an exception as // long as the value is not null. -parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, Any]) +parseJsonToken[AnyRef](parser, dataType)(PartialFunction.empty[JsonToken, AnyRef]) } /** * This method skips `FIELD_NAME`s at the beginning, and handles nulls ahead before trying * to parse the JSON token using given function `f`. If the `f` failed to parse and convert the * token, call `failedConversion` to handle the token. */ - private def parseJsonToken( + private def parseJsonToken[R >: Null]( --- End diff -- It states that `R` must be a nullable type. This enables `null: R` to compile and is preferable to the runtime cast `null.asInstanceOf[R]` because it is verified at compile time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100103739 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala --- @@ -227,66 +267,71 @@ class JacksonParser( } case TimestampType => - (parser: JsonParser) => parseJsonToken(parser, dataType) { + (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) { case VALUE_STRING => + val stringValue = parser.getText // This one will lose microseconds parts. // See https://issues.apache.org/jira/browse/SPARK-10681. - Try(options.timestampFormat.parse(parser.getText).getTime * 1000L) -.getOrElse { - // If it fails to parse, then tries the way used in 2.0 and 1.x for backwards - // compatibility. - DateTimeUtils.stringToTime(parser.getText).getTime * 1000L -} + Long.box { --- End diff -- This is needed to satisfy the type checker. The other approach is to explicitly specify the type in two locations: `Try[java.lang.Long](...).getOrElse[java.lang.Long](...)`. I found explicitly boxing to be more readable than the alternative. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100101464 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -160,7 +164,17 @@ public void writeTo(OutputStream out) throws IOException { throw new ArrayIndexOutOfBoundsException(); } - out.write(bytes, (int) arrayOffset, numBytes); + return ByteBuffer.wrap(bytes, (int) arrayOffset, numBytes); +} else { + return null; --- End diff -- It will allocate an extra object but would simplify the calling code... since it would be a short lived allocation it's probably fine to do this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100100641 --- Diff: core/src/main/scala/org/apache/spark/input/PortableDataStream.scala --- @@ -194,5 +195,8 @@ class PortableDataStream( } def getPath(): String = path + + @Since("2.2.0") --- End diff -- This is a public class so I thought adding a `since` tag would benefit the documentation. If it's not desired I can certainly remove it. As for making the lazy val public vs private: I'm following the style used already in the class. There are public get methods for each private field. I'm not partial to either approach but prefer to keep it consistent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100099791 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala --- @@ -31,10 +31,17 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]]. */ private[sql] class JSONOptions( -@transient private val parameters: CaseInsensitiveMap) +@transient private val parameters: CaseInsensitiveMap, +defaultColumnNameOfCorruptRecord: String) --- End diff -- Previously the `JSONOptions` instance was always passed around with a `columnNameOfCorruptRecord` value. This just makes it a field in `JSONOptions` instead to put all options in one place. Since it's a required option it made more sense to use a field instead making an entry in the `CaseInsensitiveMap`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100098008 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -1764,4 +1769,125 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { val df2 = spark.read.option("PREfersdecimaL", "true").json(records) assert(df2.schema == schema) } + + test("SPARK-18352: Parse normal multi-line JSON files (compressed)") { +withTempDir { dir => + dir.delete() + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.option("compression", "GzIp") +.text(path) + + new File(path).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".gz"))) + } + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.option("compression", "gZiP") +.save(jsonDir) + + new File(jsonDir).listFiles() match { +case compressedFiles => + assert(compressedFiles.exists(_.getName.endsWith(".json.gz"))) + } + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") { +withTempDir { dir => + dir.delete() + val path = dir.getCanonicalPath + primitiveFieldAndType +.toDF("value") +.write +.text(path) + + val jsonDF = spark.read.option("wholeFile", true).json(path) + val jsonDir = new File(dir, "json").getCanonicalPath + jsonDF.coalesce(1).write +.format("json") +.save(jsonDir) + + val compressedFiles = new File(jsonDir).listFiles() + assert(compressedFiles.exists(_.getName.endsWith(".json"))) + + val jsonCopy = spark.read +.format("json") +.load(jsonDir) + + assert(jsonCopy.count === jsonDF.count) + val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean") + val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean") + checkAnswer(jsonCopySome, jsonDFSome) +} + } + + test("SPARK-18352: Expect one JSON document per file") { +// the json parser terminates as soon as it sees a matching END_OBJECT or END_ARRAY token. +// this might not be the optimal behavior but this test verifies that only the first value +// is parsed and the rest are discarded. + +// alternatively the parser could continue parsing following objects, which may further reduce +// allocations by skipping the line reader entirely + +withTempDir { dir => + dir.delete() + val path = dir.getCanonicalPath + primitiveFieldAndType +.flatMap(Iterator.fill(3)(_) ++ Iterator("\n{invalid}")) --- End diff -- sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r100097749 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.json + +import java.io.InputStream + +import scala.reflect.ClassTag + +import com.fasterxml.jackson.core.{JsonFactory, JsonParser} +import com.google.common.io.ByteStreams +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.execution.datasources.{CodecStreams, HadoopFileLinesReader, PartitionedFile} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils + +/** + * Common functions for parsing JSON files + * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or [[String]] + */ +abstract class JsonDataSource[T] extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances + */ + def readFile( +conf: Configuration, +file: PartitionedFile, +parser: JacksonParser): Iterator[InternalRow] + + /** + * Create an [[RDD]] that handles the preliminary parsing of [[T]] records + */ + protected def createBaseRdd( +sparkSession: SparkSession, +inputPaths: Seq[FileStatus]): RDD[T] + + /** + * A generic wrapper to invoke the correct [[JsonFactory]] method to allocate a [[JsonParser]] + * for an instance of [[T]] + */ + def createParser(jsonFactory: JsonFactory, value: T): JsonParser + + final def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: JSONOptions): Option[StructType] = { +if (inputPaths.nonEmpty) { + val jsonSchema = InferSchema.infer( +createBaseRdd(sparkSession, inputPaths), +parsedOptions, +createParser) + checkConstraints(jsonSchema) + Some(jsonSchema) +} else { + None +} + } + + /** Constraints to be imposed on schema to be stored. */ + private def checkConstraints(schema: StructType): Unit = { +if (schema.fieldNames.length != schema.fieldNames.distinct.length) { + val duplicateColumns = schema.fieldNames.groupBy(identity).collect { +case (x, ys) if ys.length > 1 => "\"" + x + "\"" + }.mkString(", ") + throw new AnalysisException(s"Duplicate column(s) : $duplicateColumns found, " + +s"cannot save to JSON format") +} + } +} + +object JsonDataSource { + def apply(options: JSONOptions): JsonDataSource[_] = { +if (options.wholeFile) { + WholeFileJsonDataSource +} else { + TextInputJsonDataSource +} + } + + /** + * Create a new [[RDD]] via the supplied callback if there is at least one file to process, + * otherwise an [[org.apache.spark.rdd.EmptyRDD]] will be returned. + */ + def createBaseRddConf[T : ClassTag]( --- End diff -- Habit from working with languages that do
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 Any other comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 Can someone kick off the tests again? The last failure was in another module (Kafka). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 @HyukjinKwon I just pushed a change that makes the corrupt record handling consistent: if a corrupt record column is defined it will always get the json text for failed records. If `wholeFile` is enabled a warning is emitted. I think more discussion is needed to figure out the best way to handle corrupt records and exceptions, perhaps it can be shelved for now and we can pick it up later under another ticket? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 The tests failed for an unrelated reason, looks to be running out of heap space in SBT somewhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 @HyukjinKwon I agree that overloading the corrupt record column is undesirable and `F.input_file_name` is a better way to fetch the filename. It would be nice to extend this concept further and provide new functions (like `F.json_exception`) to retrieve exceptions and their locations, and this would work for the base case (parsing a string) as well as `wholeFile`. Plumbing this type of change through appears to require thread locale storage (unfortunately) but otherwise doesn't look too bad. The question then is what to put in the corrupt record column, if one is defined, when in `wholeFile` mode. To retain consistency with the string paths we should really put the entire file in the column. This is problematic for large files (>2GB) since Spark SQL doesn't have blob support... so the allocations will fail (along with the task) and there is no way for the end user to work around this limitation. Functions like `substr` are applied to byte arrays and not file streams. Perhaps it's good enough to issue a warning (along the lines of "don't define a corrupt record column in wholeFile mode" and hope for the best? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r93970059 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala --- @@ -36,29 +31,31 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration +object JsonFileFormat { + def parseJsonOptions(sparkSession: SparkSession, options: Map[String, String]): JSONOptions = { --- End diff -- I just removed the method entirely since all it did was fetch the value of `columnNameOfCorruptRecord`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16386#discussion_r93969732 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala --- @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.json + +import java.io.InputStream + +import scala.reflect.ClassTag + +import com.fasterxml.jackson.core.{JsonFactory, JsonParser} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} + +import org.apache.spark.TaskContext +import org.apache.spark.input.{PortableDataStream, StreamInputFormat} +import org.apache.spark.rdd.{BinaryFileRDD, RDD} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.execution.datasources.{CodecStreams, HadoopFileLinesReader, PartitionedFile} +import org.apache.spark.sql.types.StructType + +/** + * Common functions for parsing JSON files + * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or [[String]] + */ +abstract class JsonDataSource[T] extends Serializable { + def isSplitable: Boolean + + /** + * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances + */ + def readFile( +conf: Configuration, +file: PartitionedFile, +parser: JacksonParser): Iterator[InternalRow] + + /** + * Create an [[RDD]] that handles the preliminary parsing of [[T]] records + */ + protected def createBaseRdd( +sparkSession: SparkSession, +inputPaths: Seq[FileStatus]): RDD[T] + + /** + * A generic wrapper to invoke the correct [[JsonFactory]] method to allocate a [[JsonParser]] + * for an instance of [[T]] + */ + def createParser(jsonFactory: JsonFactory, value: T): JsonParser + + final def infer( + sparkSession: SparkSession, + inputPaths: Seq[FileStatus], + parsedOptions: JSONOptions): Option[StructType] = { +val jsonSchema = InferSchema.infer( + createBaseRdd(sparkSession, inputPaths), + parsedOptions, + createParser) +checkConstraints(jsonSchema) + +if (jsonSchema.fields.nonEmpty) { --- End diff -- Yes, it was a regression that caused a test failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 @srowen It is functionally the same as what you're suggesting. The question is how (or if) it should it be first class in the `DataFrameReader` api. If we agree that it should be exposed, either via a new `FileFormat` or an option to `JsonFileFormat`, some abstraction is necessary to support reading from different RDD classes. This PR just pushes that boundary a little further and let's the inference and parser code work over more types, not just `String`. This may make parsing more efficient in the line oriented codepath by avoiding a conversion from `Text` and `UTF8String` (in `JsonToStruct`) to `String`, and also lets us parse an `InputStream` without requiring all of the data to be in memory. For small files it's not likely to have a benefit (if the file is smaller than 4k it will be read entirely anyways) but as the file size increases this reduces the amount of memory required for parsing, is friendlier (in theory) on the GC and let's us consume files larger than 2GB. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16386 Hello recent JacksonGenerator.scala commiters, please take a look. cc/ @rxin @hvanhovell @clockfly @hyukjinkwon @cloud-fan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...
GitHub user NathanHowell opened a pull request: https://github.com/apache/spark/pull/16386 [SPARK-18352][SQL] Support parsing multiline json files ## What changes were proposed in this pull request? If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory. Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired. I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one. ## How was this patch tested? New and existing unit tests. No performance or load tests have been run. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NathanHowell/spark SPARK-18352 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16386.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16386 commit 740620210b30ef02e280d161d6b08088d07300fa Author: Nathan Howell Date: 2016-12-22T22:16:49Z [SPARK-18352][SQL] Support parsing multiline json files commit 7902255a79fc2581214a09ccd38437cebd19d862 Author: Nathan Howell Date: 2016-12-22T00:27:19Z JacksonParser.parseJsonToken should be explicit about nulls and boxing commit 149418647c9831e88af866d44d31496940c02162 Author: Nathan Howell Date: 2016-12-21T23:49:37Z Increase type safety of makeRootConverter, remove runtime type tests commit 7ad5d5be0c7b41112f9f6ad3cb0cf9055de62695 Author: Nathan Howell Date: 2016-12-23T02:13:59Z Field converter lookups should be O(1) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16375: [SPARK-18963] o.a.s.unsafe.types.UTF8StringSuite....
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16375#discussion_r93460507 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java --- @@ -591,7 +591,11 @@ public void writeToOutputStreamIntArray() throws IOException { // verify that writes work on objects that are not byte arrays final ByteBuffer buffer = StandardCharsets.UTF_8.encode("大åä¸ç"); buffer.position(0); -buffer.order(ByteOrder.LITTLE_ENDIAN); + --- End diff -- Yeah, that's probably the right approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16199: [SPARK-18772][SQL] NaN/Infinite float parsing in JSON is...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16199 Hello @HyukjinKwon, can you take a look at this one? I am unsure if we should be accepting lowercased values like `nan` (versus strictly testing for `NaN`) but I think this PR matches the original intent of the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16199: [SPARK-18772][SQL] NaN/Infinite float parsing in ...
GitHub user NathanHowell opened a pull request: https://github.com/apache/spark/pull/16199 [SPARK-18772][SQL] NaN/Infinite float parsing in JSON is inconsistent ## What changes were proposed in this pull request? This relaxes the parsing of `Float` and `Double` columns to properly support mixed case values of `NaN` and (+/-)`Infinity`, as well as properly supporting (+/-)`Inf`. Currently a string literal of `Nan` or `InfinitY` will cause a task to fail instead of placing the record in the corrupt record column, and `Inf` causes a failure instead of being a valid double. ## How was this patch tested? Additional unit tests have been added You can merge this pull request into a Git repository by running: $ git pull https://github.com/NathanHowell/spark SPARK-18772 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16199.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16199 commit 11ac4438e12dc01ba252304da8793077280f3067 Author: Nathan Howell Date: 2016-12-07T23:32:14Z [SPARK-18772][SQL] NaN/Infinite float parsing in JSON is inconsistent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16107: SPARK-18677: Fix parsing ['key'] in JSON path expression...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16107 I wrote the buggy version, doh... but this LGTM. Thanks for fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90566162 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -147,6 +147,17 @@ public void writeTo(ByteBuffer buffer) { buffer.position(pos + numBytes); } + public void writeTo(OutputStream out) throws IOException { --- End diff -- I've added a few tests for this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90521381 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala --- @@ -245,24 +230,12 @@ private[csv] class CsvOutputWriter( override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") override protected[sql] def writeInternal(row: InternalRow): Unit = { -csvWriter.writeRow(rowToString(row), records == 0L && params.headerFlag) -records += 1 -if (records % FLUSH_BATCH_SIZE == 0) { - flush() -} - } - - private def flush(): Unit = { -val lines = csvWriter.flush() -if (lines.nonEmpty) { - text.set(lines) - recordWriter.write(NullWritable.get(), text) -} +csvWriter.writeRow(rowToString(row), printHeader) --- End diff -- The uniVocity CSV writer converts every column to a `String` before writing so it's (probably?) not possible to further optimize this without doing a whole bunch of work. I only did a quick scan through their code though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90509459 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -194,4 +194,8 @@ private[sql] class JacksonGenerator( writeFields(row, schema, rootFieldWriters) } } + + def writeLineEnding(): Unit = { +gen.writeRaw('\n') --- End diff -- That is my assumption. I'm also assuming that writing a single byte is slightly more efficient than writing an array of a single byte. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16089: [SPARK-18658][SQL] Write text records directly to a File...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16089 @steveloughran Spark is handling the output committing somewhere further up the stack. The path being passed in to `OutputWriterFactory.newInstance` is to a temporary file, such as `/private/var/folders/sq/vmncyd7506q_ch43llrwr8sn6zfknl/T/spark-3db2844b-1f3c-45c2-8bf4-8a3c81440e38/_temporary/0/_temporary/attempt_20161201081833__m_00_0/part-0-8dd44cea-c01e-4bfe-ab03-641ebce18afb.txt`. I'll make a pass through the existing tests to see if anything obvious is missing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90503024 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{OutputStream, OutputStreamWriter} +import java.nio.charset.{Charset, StandardCharsets} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress._ +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.util.ReflectionUtils + +object CodecStreams { + private def getCompressionCodec( + context: JobContext, + file: Option[Path] = None): Option[CompressionCodec] = { +if (FileOutputFormat.getCompressOutput(context)) { + val compressorClass = FileOutputFormat.getOutputCompressorClass( +context, +classOf[GzipCodec]) + + Some(ReflectionUtils.newInstance(compressorClass, context.getConfiguration)) +} else { + file.flatMap { path => +val compressionCodecs = new CompressionCodecFactory(context.getConfiguration) +Option(compressionCodecs.getCodec(path)) + } +} + } + + /** + * Create a new file and open it for writing. + * If compression is enabled in the [[JobContext]] the stream will write compressed data to disk. + * An exception will be thrown if the file already exists. --- End diff -- Is this a problem with Hadoop in general? The FileSystem docs also specify this behavior: ``` /** * Create an FSDataOutputStream at the indicated Path. * @param f the file to create * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an exception will be thrown. */ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90502343 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -147,6 +147,17 @@ public void writeTo(ByteBuffer buffer) { buffer.position(pos + numBytes); } + public void writeTo(OutputStream out) throws IOException { --- End diff -- Agreed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90501927 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -194,4 +194,8 @@ private[sql] class JacksonGenerator( writeFields(row, schema, rootFieldWriters) } } + + def writeLineEnding(): Unit = { +gen.writeRaw('\n') --- End diff -- 7-bit ASCII is a subset of UTF-8, `\n` is the same in both. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90488454 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala --- @@ -132,39 +128,17 @@ class TextOutputWriter( context: TaskAttemptContext) extends OutputWriter { - private[this] val buffer = new Text() - - private val recordWriter: RecordWriter[NullWritable, Text] = { -new TextOutputFormat[NullWritable, Text]() { - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { -new Path(path) - } -}.getRecordWriter(context) - } + private val writer = CodecStreams.getOutputStream(context, new Path(path)) override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") override protected[sql] def writeInternal(row: InternalRow): Unit = { val utf8string = row.getUTF8String(0) -buffer.set(utf8string.getBytes) -recordWriter.write(NullWritable.get(), buffer) +writer.write(utf8string.getBytes) --- End diff -- Done, but I'm not 100% sure about the implementation. Can you have someone more familiar with `UTF8String`'s internals double check it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90468858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala --- @@ -132,39 +128,17 @@ class TextOutputWriter( context: TaskAttemptContext) extends OutputWriter { - private[this] val buffer = new Text() - - private val recordWriter: RecordWriter[NullWritable, Text] = { -new TextOutputFormat[NullWritable, Text]() { - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { -new Path(path) - } -}.getRecordWriter(context) - } + private val writer = CodecStreams.getOutputStream(context, new Path(path)) override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") override protected[sql] def writeInternal(row: InternalRow): Unit = { val utf8string = row.getUTF8String(0) -buffer.set(utf8string.getBytes) -recordWriter.write(NullWritable.get(), buffer) +writer.write(utf8string.getBytes) --- End diff -- It is creating a new array, I'll pass the internal one through instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90468563 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{OutputStream, OutputStreamWriter} +import java.nio.charset.{Charset, StandardCharsets} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress._ +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.util.ReflectionUtils + +object CodecStreams { + private def getCompressionCodec( + context: JobContext, + file: Option[Path] = None): Option[CompressionCodec] = { +if (FileOutputFormat.getCompressOutput(context)) { + val compressorClass = FileOutputFormat.getOutputCompressorClass( +context, +classOf[GzipCodec]) + + Some(ReflectionUtils.newInstance(compressorClass, context.getConfiguration)) +} else { + file.flatMap { path => +val compressionCodecs = new CompressionCodecFactory(context.getConfiguration) +Option(compressionCodecs.getCodec(path)) + } +} + } + + /** Create a new file and open it for writing. + * If compression is enabled in the [[JobContext]] the stream will write compressed data to disk. + * An exception will be thrown if the file already exists. + */ + def getOutputStream(context: JobContext, file: Path): OutputStream = { +val fs = file.getFileSystem(context.getConfiguration) +val outputStream: OutputStream = fs.create(file, false) + +getCompressionCodec(context, Some(file)).fold(outputStream) { codec => --- End diff -- Yah, it's a terrible name (and it's not a fold). I'll replace them with `.map(...).getOrElse`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90385252 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{OutputStream, OutputStreamWriter} +import java.nio.charset.{Charset, StandardCharsets} + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.compress._ +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.util.ReflectionUtils + +private[spark] object CodecStreams { --- End diff -- Looks that way, I've removed it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16089: [SPARK-18658][SQL] Write text records directly to a File...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16089 Doh, forgot to run the Hive tests. Should be fixed now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/16089#discussion_r90380594 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala --- @@ -132,39 +128,17 @@ class TextOutputWriter( context: TaskAttemptContext) extends OutputWriter { - private[this] val buffer = new Text() - - private val recordWriter: RecordWriter[NullWritable, Text] = { -new TextOutputFormat[NullWritable, Text]() { - override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = { -new Path(path) - } -}.getRecordWriter(context) - } + private val writer = CodecStreams.getOutputStream(context, new Path(path)) override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") override protected[sql] def writeInternal(row: InternalRow): Unit = { val utf8string = row.getUTF8String(0) -buffer.set(utf8string.getBytes) -recordWriter.write(NullWritable.get(), buffer) +writer.write(utf8string.getBytes) +writer.write('\n') --- End diff -- This mirrors what Hadoop code does, see https://github.com/apache/hadoop/blob/f67237cbe7bc48a1b9088e990800b37529f1db2a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java#L48-L49 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16089: [SPARK-18658][SQL] Write text records directly to a File...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16089 Yep. It uses the Hadoop `FileSystem` class to open files, just like `TextOutputFormat` does. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16089: [SPARK-18658][SQL] Write text records directly to a File...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16089 This touches a fair number of components. I also haven't done any performance testing to see what the impact of this is. Curious what your thoughts are? cc/ @marmbrus @rxin @JoshRosen --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...
GitHub user NathanHowell opened a pull request: https://github.com/apache/spark/pull/16089 [SPARK-18658][SQL] Write text records directly to a FileOutputStream ## What changes were proposed in this pull request? This replaces uses of `TextOutputFormat` with an `OutputStream`, which will either write directly to the filesystem or indirectly via a compressor (if so configured). This avoids intermediate buffering. The inverse of this (reading directly from a stream) is necessary for streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep the read and write paths symmetric. ## How was this patch tested? Existing unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NathanHowell/spark SPARK-18658 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16089.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16089 commit 66e02959dd5f750579d29c8d79b577844df58c0c Author: Nathan Howell Date: 2016-11-30T22:39:53Z [SPARK-18658][SQL] Write text records directly to a FileOutputStream --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16084: [SPARK-18654][SQL] Remove unreachable patterns in makeRo...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/16084 cc/ @HyukjinKwon --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16084: [SPARK-18654][SQL] Remove unreachable patterns in...
GitHub user NathanHowell opened a pull request: https://github.com/apache/spark/pull/16084 [SPARK-18654][SQL] Remove unreachable patterns in makeRootConverter ## What changes were proposed in this pull request? `makeRootConverter` is only called with a `StructType` value. By making this method less general we can remove pattern matches, which are never actually hit outside of the test suite. ## How was this patch tested? The existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/NathanHowell/spark SPARK-18654 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16084.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16084 commit 3535acf4f84a9057f4bbb88a81e4fff5f5167c0d Author: Nathan Howell Date: 2016-11-30T18:47:16Z [SPARK-18654][SQL] Remove unreachable patterns in makeRootConverter --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15813: [SPARK-18362][SQL] Use TextFileFormat in JsonFileFormat ...
Github user NathanHowell commented on the issue: https://github.com/apache/spark/pull/15813 Any thoughts on modifying `JsonToStruct` to support arrays (and options), then parsing could be something like: ``` dataset.select( Column(Inline( JsonToValue( ArrayType(schema), options, Column("value").expr ``` Likely out of scope for this pull request, but if there is a push to migrate from `RDD[T]` to `Dataset[T]` it would clean things up a bit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14972] Improve performance of JSON sche...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/12750#discussion_r61526935 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala --- @@ -246,12 +263,39 @@ private[sql] object InferSchema { } case (StructType(fields1), StructType(fields2)) => - val newFields = (fields1 ++ fields2).groupBy(field => field.name).map { -case (name, fieldTypes) => - val dataType = fieldTypes.view.map(_.dataType).reduce(compatibleType) - StructField(name, dataType, nullable = true) + // Both fields1 and fields2 should be sorted by name, since inferField performs sorting. + // Therefore, we can take advantage of the fact that we're merging sorted lists and skip + // building a hash map or performing additional sorting. + val newFields = new mutable.ArrayBuffer[StructField]() + + var f1Idx = 0 + var f2Idx = 0 + + while (f1Idx < fields1.length && f2Idx < fields2.length) { +val f1Name = fields1(f1Idx).name +val f2Name = fields2(f2Idx).name +if (f1Name == f2Name) { + val dataType = compatibleType(fields1(f1Idx).dataType, fields2(f2Idx).dataType) + newFields += StructField(f1Name, dataType, nullable = true) + f1Idx += 1 + f2Idx += 1 +} else if (f1Name < f2Name) { + newFields += fields1(f1Idx) + f1Idx += 1 +} else { // f1Name > f2Name + newFields += fields2(f2Idx) + f2Idx += 1 +} + } + while (f1Idx < fields1.length) { +newFields += fields1(f1Idx) +f1Idx += 1 + } + while (f2Idx < fields2.length) { +newFields += fields2(f2Idx) +f2Idx += 1 } - StructType(newFields.toSeq.sortBy(_.name)) + StructType(newFields.toSeq) --- End diff -- `StructType(Seq[StructField])` just calls `toArray` on the fields... so I would use `toArray` instead of `toSeq` to reduce one allocation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14972] Improve performance of JSON sche...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/12750#discussion_r61526900 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala --- @@ -246,12 +263,39 @@ private[sql] object InferSchema { } case (StructType(fields1), StructType(fields2)) => - val newFields = (fields1 ++ fields2).groupBy(field => field.name).map { -case (name, fieldTypes) => - val dataType = fieldTypes.view.map(_.dataType).reduce(compatibleType) - StructField(name, dataType, nullable = true) + // Both fields1 and fields2 should be sorted by name, since inferField performs sorting. + // Therefore, we can take advantage of the fact that we're merging sorted lists and skip + // building a hash map or performing additional sorting. + val newFields = new mutable.ArrayBuffer[StructField]() --- End diff -- Consider calling `newFields.sizeHint(fields1.length max fields2.length)` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14972] Improve performance of JSON sche...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/12750#discussion_r61526786 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala --- @@ -76,6 +78,15 @@ private[sql] object InferSchema { } } + private def sortFieldsInPlace(fields: Array[StructField]): Unit = { +// Note: other code relies on this sorting for correctness, so don't remove it! +java.util.Arrays.sort(fields, new Comparator[StructField] { --- End diff -- Should the `Comparator` instance be cached? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14972] Improve performance of JSON sche...
Github user NathanHowell commented on the pull request: https://github.com/apache/spark/pull/12750#issuecomment-215607825 Alright, here's a few ideas that will at least reduce allocations by a bit. Your version with the merge sort is likely better than the insertion sort here but I thought I'd try something different :trollface: any chance you can run it through your benchmark? I'll put my other comments inline, generally looks good to me. ``` scala /** * Combine two sorted arrays of StructFields into a new StructType */ private def compatibleFields( fields1: Array[StructField], fields2: Array[StructField]): StructType = { // perform an insertion sort of the smaller struct into the larger one val (bigger, smaller) = if (fields1.length > fields2.length) { (fields1.toBuffer, fields2) } else { (fields2.toBuffer, fields1) } var biggerIdx = 0 var smallerIdx = 0 while (biggerIdx < bigger.length && smallerIdx < smaller.length) { val biggerVal = bigger(biggerIdx) val smallerVal = smaller(smallerIdx) val comp = biggerVal.name.compareTo(smallerVal.name) if (comp == 0) { if (biggerVal.dataType != smallerVal.dataType) { val merged = compatibleType(biggerVal.dataType, smallerVal.dataType) // test to see if the merged type is equivalent to one of the existing // StructField instances, reuse will reduce GC pressure if (smallerVal.dataType == merged) { bigger.update(biggerIdx, smallerVal) } else if (biggerVal.dataType == merged) { // do nothing, the bigger struct already has the correct field } else { // we can't reuse an existing field so allocate a new one bigger.update(biggerIdx, biggerVal.copy(dataType = merged)) } } biggerIdx += 1 smallerIdx += 1 } else if (comp > 0) { bigger.insert(biggerIdx, smallerVal) // bump both indexes, the bigger struct has grown biggerIdx += 1 smallerIdx += 1 } else { // comp < 0 // advance to the next field on the bigger struct // nothing else to do here biggerIdx += 1 } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14972] Improve performance of JSON sche...
Github user NathanHowell commented on the pull request: https://github.com/apache/spark/pull/12750#issuecomment-215585269 Would Guava's `Iterables.mergeSorted[T]` help out here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12182][ML] Distributed binning for tree...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/10231#discussion_r56742884 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala --- @@ -956,7 +956,7 @@ private[ml] object RandomForest extends Logging { valueCounts.map(_._1) } else { // stride between splits -val stride: Double = featureSamples.length.toDouble / (numSplits + 1) +val stride: Double = featureSamples.size.toDouble / (numSplits + 1) --- End diff -- This will do a second pass over the `Iterable`. Would it be preferable to combine this into the `foldLeft` above so it only does a single pass? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12182][ML] Distributed binning for tree...
Github user NathanHowell commented on the pull request: https://github.com/apache/spark/pull/10231#issuecomment-169393380 @sethah looks good to me. :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12182][ML] Distributed binning for tree...
Github user NathanHowell commented on the pull request: https://github.com/apache/spark/pull/10231#issuecomment-163422959 Yeah I can take a look tonight or tomorrow On Dec 9, 2015 14:25, "Seth Hendrickson" wrote: > @NathanHowell <https://github.com/NathanHowell> would you be able to > review this? > > cc @jkbradley <https://github.com/jkbradley> > > â > Reply to this email directly or view it on GitHub > <https://github.com/apache/spark/pull/10231#issuecomment-163419168>. > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10064] [ML] Parallelize decision tree b...
Github user NathanHowell commented on the pull request: https://github.com/apache/spark/pull/8246#issuecomment-146348263 There were already tests for the returned split lengths, so I just removed the metadata checks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10064] [ML] Parallelize decision tree b...
Github user NathanHowell commented on the pull request: https://github.com/apache/spark/pull/8246#issuecomment-146009085 I'll have time tomorrow --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/7946#discussion_r40846432 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala --- @@ -307,3 +308,140 @@ case class GetJsonObject(json: Expression, path: Expression) } } } + +case class JsonTuple(children: Seq[Expression]) + extends Expression with CodegenFallback { + + import SharedFactory._ + + override def nullable: Boolean = { +// a row is always returned +false + } + + // if processing fails this shared value will be returned + @transient private lazy val nullRow: InternalRow = +new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) + + // the json body is the first child + @transient private lazy val jsonExpr: Expression = children.head + + // the fields to query are the remaining children + @transient private lazy val fieldExpressions: Seq[Expression] = children.tail + + // eagerly evaluate any foldable the field names + @transient private lazy val foldableFieldNames: IndexedSeq[String] = { +fieldExpressions.map { + case expr if expr.foldable => expr.eval().asInstanceOf[UTF8String].toString + case _ => null +}.toIndexedSeq + } + + // and count the number of foldable fields, we'll use this later to optimize evaluation + @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) + + override lazy val dataType: StructType = { +val fields = fieldExpressions.zipWithIndex.map { + case (_, idx) => StructField( +name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize +dataType = StringType, +nullable = true) +} + +StructType(fields) + } + + override def prettyName: String = "json_tuple" + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length < 2) { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two arguments") +} else if (children.forall(child => StringType.acceptsType(child.dataType))) { + TypeCheckResult.TypeCheckSuccess +} else { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all arguments are strings") +} + } + + override def eval(input: InternalRow): InternalRow = { +try { + val json = jsonExpr.eval(input).asInstanceOf[UTF8String] + if (json == null) { +return nullRow + } + + val parser = jsonFactory.createParser(json.getBytes) --- End diff -- I think the answer is no, but it's probably better to do it anyways. `GetJsonObject` doesn't do this, for example... might be worth fixing there too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple
Github user NathanHowell commented on the pull request: https://github.com/apache/spark/pull/7946#issuecomment-144529990 Alright, I think I've addressed all your comments @yhuai. I haven't run the tests though :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/7946#discussion_r40810618 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala --- @@ -307,3 +308,140 @@ case class GetJsonObject(json: Expression, path: Expression) } } } + +case class JsonTuple(children: Seq[Expression]) + extends Expression with CodegenFallback { + + import SharedFactory._ + + override def nullable: Boolean = { +// a row is always returned +false + } + + // if processing fails this shared value will be returned + @transient private lazy val nullRow: InternalRow = +new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) + + // the json body is the first child + @transient private lazy val jsonExpr: Expression = children.head + + // the fields to query are the remaining children + @transient private lazy val fieldExpressions: Seq[Expression] = children.tail + + // eagerly evaluate any foldable the field names + @transient private lazy val foldableFieldNames: IndexedSeq[String] = { +fieldExpressions.map { + case expr if expr.foldable => expr.eval().asInstanceOf[UTF8String].toString + case _ => null +}.toIndexedSeq + } + + // and count the number of foldable fields, we'll use this later to optimize evaluation + @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) + + override lazy val dataType: StructType = { +val fields = fieldExpressions.zipWithIndex.map { + case (_, idx) => StructField( +name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize +dataType = StringType, +nullable = true) +} + +StructType(fields) + } + + override def prettyName: String = "json_tuple" + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length < 2) { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two arguments") +} else if (children.forall(child => StringType.acceptsType(child.dataType))) { + TypeCheckResult.TypeCheckSuccess +} else { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all arguments are strings") +} + } + + override def eval(input: InternalRow): InternalRow = { +try { + val json = jsonExpr.eval(input).asInstanceOf[UTF8String] + if (json == null) { +return nullRow + } + + val parser = jsonFactory.createParser(json.getBytes) + + // only objects are supported + if (parser.nextToken() != JsonToken.START_OBJECT) { +return nullRow + } + + // evaluate the field names as String rather than UTF8String to + // optimize lookups from the json token, which is also a String + val fieldNames = if (constantFields == fieldExpressions.length) { +// typically the user will provide the field names as foldable expressions +// so we can use the cached copy +foldableFieldNames + } else if (constantFields == 0) { +// none are foldable so all field names need to be evaluated from the input row + fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String].toString) + } else { +// if there is a mix of constant and non-constant expressions +// prefer the cached copy when available +foldableFieldNames.zip(fieldExpressions).map { + case (null, expr) => expr.eval(input).asInstanceOf[UTF8String].toString + case (fieldName, _) => fieldName +} + } + + val row = Array.ofDim[Any](fieldNames.length) + + // start reading through the token stream, looking for any requested field names + while (parser.nextToken() != JsonToken.END_OBJECT) { +if (parser.getCurrentToken == JsonToken.FIELD_NAME) { + // check to see if this field is desired in the output + val idx = fieldNames.indexOf(parser.getCurrentName) + if (idx >= 0) { +// it is, copy the child tree to the correct location in the output row +val output = new ByteArrayOutputStream() + +// write the output directly to UTF8 encoded byte array +val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) --- End diff -- I'm going to move the generator construction into the `if` block, it's not used otherwise. --- If your project is set up for it, you can reply to this email and have your reply appear o
[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/7946#discussion_r40810335 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala --- @@ -307,3 +308,140 @@ case class GetJsonObject(json: Expression, path: Expression) } } } + +case class JsonTuple(children: Seq[Expression]) + extends Expression with CodegenFallback { + + import SharedFactory._ + + override def nullable: Boolean = { +// a row is always returned +false + } + + // if processing fails this shared value will be returned + @transient private lazy val nullRow: InternalRow = +new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) + + // the json body is the first child + @transient private lazy val jsonExpr: Expression = children.head + + // the fields to query are the remaining children + @transient private lazy val fieldExpressions: Seq[Expression] = children.tail + + // eagerly evaluate any foldable the field names + @transient private lazy val foldableFieldNames: IndexedSeq[String] = { +fieldExpressions.map { + case expr if expr.foldable => expr.eval().asInstanceOf[UTF8String].toString + case _ => null +}.toIndexedSeq + } + + // and count the number of foldable fields, we'll use this later to optimize evaluation + @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) + + override lazy val dataType: StructType = { +val fields = fieldExpressions.zipWithIndex.map { + case (_, idx) => StructField( +name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize +dataType = StringType, +nullable = true) +} + +StructType(fields) + } + + override def prettyName: String = "json_tuple" + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length < 2) { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two arguments") +} else if (children.forall(child => StringType.acceptsType(child.dataType))) { + TypeCheckResult.TypeCheckSuccess +} else { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all arguments are strings") +} + } + + override def eval(input: InternalRow): InternalRow = { +try { + val json = jsonExpr.eval(input).asInstanceOf[UTF8String] + if (json == null) { +return nullRow + } + + val parser = jsonFactory.createParser(json.getBytes) + + // only objects are supported + if (parser.nextToken() != JsonToken.START_OBJECT) { +return nullRow + } + + // evaluate the field names as String rather than UTF8String to + // optimize lookups from the json token, which is also a String + val fieldNames = if (constantFields == fieldExpressions.length) { +// typically the user will provide the field names as foldable expressions +// so we can use the cached copy +foldableFieldNames + } else if (constantFields == 0) { +// none are foldable so all field names need to be evaluated from the input row + fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String].toString) + } else { +// if there is a mix of constant and non-constant expressions +// prefer the cached copy when available +foldableFieldNames.zip(fieldExpressions).map { + case (null, expr) => expr.eval(input).asInstanceOf[UTF8String].toString + case (fieldName, _) => fieldName +} + } + + val row = Array.ofDim[Any](fieldNames.length) + + // start reading through the token stream, looking for any requested field names + while (parser.nextToken() != JsonToken.END_OBJECT) { +if (parser.getCurrentToken == JsonToken.FIELD_NAME) { + // check to see if this field is desired in the output + val idx = fieldNames.indexOf(parser.getCurrentName) + if (idx >= 0) { +// it is, copy the child tree to the correct location in the output row +val output = new ByteArrayOutputStream() + +// write the output directly to UTF8 encoded byte array +val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) +if (parser.nextToken() != JsonToken.VALUE_NULL) { + copyCurrentStructure(generator, parser) + generator.close() +
[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/7946#discussion_r40809995 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala --- @@ -307,3 +308,140 @@ case class GetJsonObject(json: Expression, path: Expression) } } } + +case class JsonTuple(children: Seq[Expression]) + extends Expression with CodegenFallback { + + import SharedFactory._ + + override def nullable: Boolean = { +// a row is always returned +false + } + + // if processing fails this shared value will be returned + @transient private lazy val nullRow: InternalRow = +new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length)) + + // the json body is the first child + @transient private lazy val jsonExpr: Expression = children.head + + // the fields to query are the remaining children + @transient private lazy val fieldExpressions: Seq[Expression] = children.tail + + // eagerly evaluate any foldable the field names + @transient private lazy val foldableFieldNames: IndexedSeq[String] = { +fieldExpressions.map { + case expr if expr.foldable => expr.eval().asInstanceOf[UTF8String].toString + case _ => null +}.toIndexedSeq + } + + // and count the number of foldable fields, we'll use this later to optimize evaluation + @transient private lazy val constantFields: Int = foldableFieldNames.count(_ != null) + + override lazy val dataType: StructType = { +val fields = fieldExpressions.zipWithIndex.map { + case (_, idx) => StructField( +name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize +dataType = StringType, +nullable = true) +} + +StructType(fields) + } + + override def prettyName: String = "json_tuple" + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.length < 2) { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two arguments") +} else if (children.forall(child => StringType.acceptsType(child.dataType))) { + TypeCheckResult.TypeCheckSuccess +} else { + TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all arguments are strings") +} + } + + override def eval(input: InternalRow): InternalRow = { +try { + val json = jsonExpr.eval(input).asInstanceOf[UTF8String] + if (json == null) { +return nullRow + } + + val parser = jsonFactory.createParser(json.getBytes) + + // only objects are supported + if (parser.nextToken() != JsonToken.START_OBJECT) { +return nullRow + } + + // evaluate the field names as String rather than UTF8String to + // optimize lookups from the json token, which is also a String + val fieldNames = if (constantFields == fieldExpressions.length) { +// typically the user will provide the field names as foldable expressions +// so we can use the cached copy +foldableFieldNames + } else if (constantFields == 0) { +// none are foldable so all field names need to be evaluated from the input row + fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String].toString) + } else { +// if there is a mix of constant and non-constant expressions +// prefer the cached copy when available +foldableFieldNames.zip(fieldExpressions).map { + case (null, expr) => expr.eval(input).asInstanceOf[UTF8String].toString + case (fieldName, _) => fieldName +} + } + + val row = Array.ofDim[Any](fieldNames.length) + + // start reading through the token stream, looking for any requested field names + while (parser.nextToken() != JsonToken.END_OBJECT) { +if (parser.getCurrentToken == JsonToken.FIELD_NAME) { + // check to see if this field is desired in the output + val idx = fieldNames.indexOf(parser.getCurrentName) + if (idx >= 0) { +// it is, copy the child tree to the correct location in the output row +val output = new ByteArrayOutputStream() + +// write the output directly to UTF8 encoded byte array +val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) +if (parser.nextToken() != JsonToken.VALUE_NULL) { + copyCurrentStructure(generator, parser) + generator.close() +
[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple
Github user NathanHowell commented on the pull request: https://github.com/apache/spark/pull/7946#issuecomment-15101 @yhuai I'll see what I can do, running some larger jobs today so I may have a long enough gap to fix this up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10064] [ML] Parallelize decision tree b...
Github user NathanHowell commented on the pull request: https://github.com/apache/spark/pull/8246#issuecomment-139792166 I tend to rebase out of habit to prevent merge-build failures. I'll look at the test failure on Monday, they were all passing at one point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10064] [ML] Parallelize decision tree b...
Github user NathanHowell commented on a diff in the pull request: https://github.com/apache/spark/pull/8246#discussion_r39216747 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala --- @@ -1056,6 +988,70 @@ object DecisionTree extends Serializable with Logging { } } + private def findSplitsBinsBySorting( + input: RDD[LabeledPoint], + metadata: DecisionTreeMetadata, + continuousFeatures: IndexedSeq[Int]): (Array[Array[Split]], Array[Array[Bin]]) = { +def findSplits( +featureIndex: Int, +featureSamples: Iterable[Double]): (Int, (Array[Split], Array[Bin])) = { + val splits = { +val featureSplits = findSplitsForContinuousFeature( + featureSamples.toArray, + metadata, + featureIndex) +logDebug(s"featureIndex = $featureIndex, numSplits = ${featureSplits.length}") + +featureSplits.map(threshold => new Split(featureIndex, threshold, Continuous, Nil)) + } + + val bins = { +val lowSplit = new DummyLowSplit(featureIndex, Continuous) +val highSplit = new DummyHighSplit(featureIndex, Continuous) +(lowSplit +: splits.toSeq :+ highSplit).sliding(2).map { + case Seq(lhs, right) => new Bin(lhs, right, Continuous, Double.MinValue) +}.toArray + } + + (featureIndex, (splits, bins)) +} + +val continuousSplits = input + .flatMap(point => continuousFeatures.map(idx => (idx, point.features(idx + .groupByKey(numPartitions = math.min(continuousFeatures.length, input.partitions.length)) --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org