[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-06-06 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
Yep, should be doable without too much effort.

On Sun, Jun 4, 2017 at 9:54 PM, Xiao Li  wrote:

> @NathanHowell <https://github.com/nathanhowell> It sounds like we also
> can provide multi-line support for JSON too. For example, in a single JSON
> file
>
> {"a": 1,
> "b": 1.1}
> {"a": 2, "b": 1.1}
> {"a": 3, "b": 1.1}
>
> When using the wholeFile mode, we only parse the first Json record {"a":
> 1, b": 1.1} but ignore the following records. It sounds like we should
> also parse them too and rename wholeFile to multiLine?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/16386#issuecomment-306102836>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAKbTeYAiNRX7wtQDPt4NRvVgAbIcbSGks5sA4nvgaJpZM4LUihp>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #12217: [WIP][SPARK-14408][CORE] Changed RDD.treeAggregate to us...

2017-06-02 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/12217
  
Nothing looks obviously broken, their combiner looks fine. Rerunning the
tests would help.

On Jun 2, 2017 07:02, "Hyukjin Kwon"  wrote:

> Hi @jkbradley <https://github.com/jkbradley> and @srowen
> <https://github.com/srowen>, could we retest this just to see the error
> messages? It looks the last test results are not accessible (to me).
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/12217#issuecomment-305796729>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAKbTYW4U9nMQhZ3uGwnF-p7aYOmEAU8ks5sABXrgaJpZM4IBfi3>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17255: [SPARK-19918][SQL] Use TextFileFormat in implemen...

2017-03-14 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/17255#discussion_r105942833
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
 ---
@@ -40,18 +40,11 @@ private[sql] object JsonInferSchema {
   json: RDD[T],
   configOptions: JSONOptions,
   createParser: (JsonFactory, T) => JsonParser): StructType = {
-require(configOptions.samplingRatio > 0,
-  s"samplingRatio (${configOptions.samplingRatio}) should be greater 
than 0")
 val shouldHandleCorruptRecord = configOptions.permissive
 val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord
-val schemaData = if (configOptions.samplingRatio > 0.99) {
-  json
-} else {
-  json.sample(withReplacement = false, configOptions.samplingRatio, 1)
-}
--- End diff --

Yah, perhaps the RDD->Dataset changes should be done under a separate 
issue. I think it can be done across the board (removing most/all RDD 
references) but I'm not sure what other implications it would have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17255: [SPARK-19918][SQL] Use TextFileFormat in implementation ...

2017-03-12 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/17255
  
Would there be any additional benefit of replacing more (or all?) of the 
uses of `RDD` with the equivalent `Dataset` operations?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17255: [SPARK-19918][SQL] Use TextFileFormat in implemen...

2017-03-12 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/17255#discussion_r105563532
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -23,24 +23,25 @@ import com.fasterxml.jackson.core.{JsonFactory, 
JsonParser}
 import com.google.common.io.ByteStreams
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileStatus
-import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 
 import org.apache.spark.TaskContext
 import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
 import org.apache.spark.rdd.{BinaryFileRDD, RDD}
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Encoders, SparkSession}
--- End diff --

Is the `Encoders` import still necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...

2017-02-23 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16976#discussion_r102668816
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import java.io.InputStream
+import java.nio.charset.{Charset, StandardCharsets}
+
+import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Common functions for parsing CSV files
+ */
+abstract class CSVDataSource extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into [[InternalRow]] instances.
+   */
+  def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  parsedOptions: CSVOptions): Iterator[InternalRow]
+
+  /**
+   * Infers the schema from `inputPaths` files.
+   */
+  def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: CSVOptions): Option[StructType]
--- End diff --

Sounds good to me. Reducing code duplication between the JSON and CSV 
parsers would be great.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...

2017-02-23 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16976#discussion_r102667812
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import java.io.InputStream
+import java.nio.charset.{Charset, StandardCharsets}
+
+import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Common functions for parsing CSV files
+ */
+abstract class CSVDataSource extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into [[InternalRow]] instances.
+   */
+  def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  parsedOptions: CSVOptions): Iterator[InternalRow]
+
+  /**
+   * Infers the schema from `inputPaths` files.
+   */
+  def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: CSVOptions): Option[StructType]
+
+  /**
+   * Generates a header from the given row which is null-safe and 
duplicate-safe.
+   */
+  protected def makeSafeHeader(
+  row: Array[String],
+  caseSensitive: Boolean,
+  options: CSVOptions): Array[String] = {
+if (options.headerFlag) {
+  val duplicates = {
+val headerNames = row.filter(_ != null)
+  .map(name => if (caseSensitive) name else name.toLowerCase)
+headerNames.diff(headerNames.distinct).distinct
+  }
+
+  row.zipWithIndex.map { case (value, index) =>
+if (value == null || value.isEmpty || value == options.nullValue) {
+  // When there are empty strings or the values set in 
`nullValue`, put the
+  // index as the suffix.
+  s"_c$index"
+} else if (!caseSensitive && 
duplicates.contains(value.toLowerCase)) {
+  // When there are case-insensitive duplicates, put the index as 
the suffix.
+  s"$value$index"
+} else if (duplicates.contains(value)) {
+  // When there are duplicates, put the index as the suffix.
+  s"$value$index"
+} else {
+  value
+}
+  }
+} else {
+  row.zipWithIndex.map { case (_, index) =>
+// Uses default column names, "_c#" where # is its position of 
fields
+// when header option is disabled.
+s"_c$index"
+  }
+}
+  }
+}
+
+object CSVDataSource {
+  def apply(options: CSVOptions): CSVDataSource = {
+if (options.wholeFile) {
+  WholeFileCSVDataSource
+} else {
+  TextInputCSVDataSource
+}
+  }
+}
+
+object TextInputCSVDataSource extends CSVDataSource {
+  override val isSplitable: Boolean = true
+
+  override def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  pars

[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...

2017-02-23 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16976#discussion_r102665872
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import java.io.InputStream
+import java.nio.charset.{Charset, StandardCharsets}
+
+import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Common functions for parsing CSV files
+ */
+abstract class CSVDataSource extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into [[InternalRow]] instances.
+   */
+  def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  parsedOptions: CSVOptions): Iterator[InternalRow]
+
+  /**
+   * Infers the schema from `inputPaths` files.
+   */
+  def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: CSVOptions): Option[StructType]
+
+  /**
+   * Generates a header from the given row which is null-safe and 
duplicate-safe.
+   */
+  protected def makeSafeHeader(
+  row: Array[String],
+  caseSensitive: Boolean,
+  options: CSVOptions): Array[String] = {
+if (options.headerFlag) {
+  val duplicates = {
+val headerNames = row.filter(_ != null)
+  .map(name => if (caseSensitive) name else name.toLowerCase)
+headerNames.diff(headerNames.distinct).distinct
+  }
+
+  row.zipWithIndex.map { case (value, index) =>
+if (value == null || value.isEmpty || value == options.nullValue) {
+  // When there are empty strings or the values set in 
`nullValue`, put the
+  // index as the suffix.
+  s"_c$index"
+} else if (!caseSensitive && 
duplicates.contains(value.toLowerCase)) {
+  // When there are case-insensitive duplicates, put the index as 
the suffix.
+  s"$value$index"
+} else if (duplicates.contains(value)) {
+  // When there are duplicates, put the index as the suffix.
+  s"$value$index"
+} else {
+  value
+}
+  }
+} else {
+  row.zipWithIndex.map { case (_, index) =>
+// Uses default column names, "_c#" where # is its position of 
fields
+// when header option is disabled.
+s"_c$index"
+  }
+}
+  }
+}
+
+object CSVDataSource {
+  def apply(options: CSVOptions): CSVDataSource = {
+if (options.wholeFile) {
+  WholeFileCSVDataSource
+} else {
+  TextInputCSVDataSource
+}
+  }
+}
+
+object TextInputCSVDataSource extends CSVDataSource {
+  override val isSplitable: Boolean = true
+
+  override def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  pars

[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...

2017-02-23 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16976#discussion_r102665619
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import java.io.InputStream
+import java.nio.charset.{Charset, StandardCharsets}
+
+import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Common functions for parsing CSV files
+ */
+abstract class CSVDataSource extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into [[InternalRow]] instances.
+   */
+  def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  parsedOptions: CSVOptions): Iterator[InternalRow]
+
+  /**
+   * Infers the schema from `inputPaths` files.
+   */
+  def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: CSVOptions): Option[StructType]
--- End diff --

Both subclasses return `Some(...)` so it would be more clear to change this 
to `def infer(...): StructType` and do the option wrapping only in 
`CSVFileFormat.inferSchema`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...

2017-02-23 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16976#discussion_r102663016
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
 ---
@@ -43,23 +37,26 @@ class CSVFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
 
   override def shortName(): String = "csv"
 
-  override def toString: String = "CSV"
-
-  override def hashCode(): Int = getClass.hashCode()
-
-  override def equals(other: Any): Boolean = 
other.isInstanceOf[CSVFileFormat]
+  override def isSplitable(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  path: Path): Boolean = {
+val parsedOptions =
+  new CSVOptions(options, 
sparkSession.sessionState.conf.sessionLocalTimeZone)
+val csvDataSource = CSVDataSource(parsedOptions)
+csvDataSource.isSplitable && super.isSplitable(sparkSession, options, 
path)
+  }
 
   override def inferSchema(
   sparkSession: SparkSession,
   options: Map[String, String],
   files: Seq[FileStatus]): Option[StructType] = {
 require(files.nonEmpty, "Cannot infer schema from an empty set of 
files")
--- End diff --

Should this return `None` instead of throwing an exception? The JSON parser 
does, though I'm not sure which approach is recommended. They should be 
consistent though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...

2017-02-23 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16976#discussion_r102662637
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import java.io.InputStream
+import java.nio.charset.{Charset, StandardCharsets}
+
+import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Common functions for parsing CSV files
+ */
+abstract class CSVDataSource extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into [[InternalRow]] instances.
+   */
+  def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  parsedOptions: CSVOptions): Iterator[InternalRow]
+
+  /**
+   * Infers the schema from `inputPaths` files.
+   */
+  def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: CSVOptions): Option[StructType]
+
+  /**
+   * Generates a header from the given row which is null-safe and 
duplicate-safe.
+   */
+  protected def makeSafeHeader(
+  row: Array[String],
+  caseSensitive: Boolean,
+  options: CSVOptions): Array[String] = {
+if (options.headerFlag) {
+  val duplicates = {
+val headerNames = row.filter(_ != null)
+  .map(name => if (caseSensitive) name else name.toLowerCase)
+headerNames.diff(headerNames.distinct).distinct
+  }
+
+  row.zipWithIndex.map { case (value, index) =>
+if (value == null || value.isEmpty || value == options.nullValue) {
+  // When there are empty strings or the values set in 
`nullValue`, put the
+  // index as the suffix.
+  s"_c$index"
+} else if (!caseSensitive && 
duplicates.contains(value.toLowerCase)) {
+  // When there are case-insensitive duplicates, put the index as 
the suffix.
+  s"$value$index"
+} else if (duplicates.contains(value)) {
+  // When there are duplicates, put the index as the suffix.
+  s"$value$index"
+} else {
+  value
+}
+  }
+} else {
+  row.zipWithIndex.map { case (_, index) =>
+// Uses default column names, "_c#" where # is its position of 
fields
+// when header option is disabled.
+s"_c$index"
+  }
+}
+  }
+}
+
+object CSVDataSource {
+  def apply(options: CSVOptions): CSVDataSource = {
+if (options.wholeFile) {
+  WholeFileCSVDataSource
+} else {
+  TextInputCSVDataSource
+}
+  }
+}
+
+object TextInputCSVDataSource extends CSVDataSource {
+  override val isSplitable: Boolean = true
+
+  override def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  pars

[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...

2017-02-23 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16976#discussion_r102662258
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import java.io.InputStream
+import java.nio.charset.{Charset, StandardCharsets}
+
+import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Common functions for parsing CSV files
+ */
+abstract class CSVDataSource extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into [[InternalRow]] instances.
+   */
+  def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  parsedOptions: CSVOptions): Iterator[InternalRow]
+
+  /**
+   * Infers the schema from `inputPaths` files.
+   */
+  def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: CSVOptions): Option[StructType]
+
+  /**
+   * Generates a header from the given row which is null-safe and 
duplicate-safe.
+   */
+  protected def makeSafeHeader(
+  row: Array[String],
+  caseSensitive: Boolean,
+  options: CSVOptions): Array[String] = {
+if (options.headerFlag) {
+  val duplicates = {
+val headerNames = row.filter(_ != null)
+  .map(name => if (caseSensitive) name else name.toLowerCase)
+headerNames.diff(headerNames.distinct).distinct
+  }
+
+  row.zipWithIndex.map { case (value, index) =>
+if (value == null || value.isEmpty || value == options.nullValue) {
+  // When there are empty strings or the values set in 
`nullValue`, put the
+  // index as the suffix.
+  s"_c$index"
+} else if (!caseSensitive && 
duplicates.contains(value.toLowerCase)) {
+  // When there are case-insensitive duplicates, put the index as 
the suffix.
+  s"$value$index"
+} else if (duplicates.contains(value)) {
+  // When there are duplicates, put the index as the suffix.
+  s"$value$index"
+} else {
+  value
+}
+  }
+} else {
+  row.zipWithIndex.map { case (_, index) =>
+// Uses default column names, "_c#" where # is its position of 
fields
+// when header option is disabled.
+s"_c$index"
+  }
+}
+  }
+}
+
+object CSVDataSource {
+  def apply(options: CSVOptions): CSVDataSource = {
+if (options.wholeFile) {
+  WholeFileCSVDataSource
+} else {
+  TextInputCSVDataSource
+}
+  }
+}
+
+object TextInputCSVDataSource extends CSVDataSource {
+  override val isSplitable: Boolean = true
+
+  override def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  pars

[GitHub] spark pull request #16976: [SPARK-19610][SQL] Support parsing multiline CSV ...

2017-02-23 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16976#discussion_r102662330
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.csv
+
+import java.io.InputStream
+import java.nio.charset.{Charset, StandardCharsets}
+
+import com.univocity.parsers.csv.{CsvParser, CsvParserSettings}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.text.TextFileFormat
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Common functions for parsing CSV files
+ */
+abstract class CSVDataSource extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into [[InternalRow]] instances.
+   */
+  def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  parsedOptions: CSVOptions): Iterator[InternalRow]
+
+  /**
+   * Infers the schema from `inputPaths` files.
+   */
+  def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: CSVOptions): Option[StructType]
+
+  /**
+   * Generates a header from the given row which is null-safe and 
duplicate-safe.
+   */
+  protected def makeSafeHeader(
+  row: Array[String],
+  caseSensitive: Boolean,
+  options: CSVOptions): Array[String] = {
+if (options.headerFlag) {
+  val duplicates = {
+val headerNames = row.filter(_ != null)
+  .map(name => if (caseSensitive) name else name.toLowerCase)
+headerNames.diff(headerNames.distinct).distinct
+  }
+
+  row.zipWithIndex.map { case (value, index) =>
+if (value == null || value.isEmpty || value == options.nullValue) {
+  // When there are empty strings or the values set in 
`nullValue`, put the
+  // index as the suffix.
+  s"_c$index"
+} else if (!caseSensitive && 
duplicates.contains(value.toLowerCase)) {
+  // When there are case-insensitive duplicates, put the index as 
the suffix.
+  s"$value$index"
+} else if (duplicates.contains(value)) {
+  // When there are duplicates, put the index as the suffix.
+  s"$value$index"
+} else {
+  value
+}
+  }
+} else {
+  row.zipWithIndex.map { case (_, index) =>
+// Uses default column names, "_c#" where # is its position of 
fields
+// when header option is disabled.
+s"_c$index"
+  }
+}
+  }
+}
+
+object CSVDataSource {
+  def apply(options: CSVOptions): CSVDataSource = {
+if (options.wholeFile) {
+  WholeFileCSVDataSource
+} else {
+  TextInputCSVDataSource
+}
+  }
+}
+
+object TextInputCSVDataSource extends CSVDataSource {
+  override val isSplitable: Boolean = true
+
+  override def readFile(
+  conf: Configuration,
+  file: PartitionedFile,
+  parser: UnivocityParser,
+  pars

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-16 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r101671453
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,117 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

I rewrote these tests. Please take a look @gatorsmile and @cloud-fan.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-16 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
@cloud-fan When implementing tests for the other modes I've uncovered an 
existing bug in schema inference in `DROPMALFORMED` mode: 
https://issues.apache.org/jira/browse/SPARK-19641. Since it is not introduced 
in this set of patches I will open a new pull request once this is one merged. 
You can inspect the fix here: 
https://github.com/NathanHowell/spark/commit/e233fd03346a73b3b447fa4c24f3b12c8b2e53ae


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
@cloud-fan I just pushed a few more changes to address some of your 
comments. I'll be back later next week to continue work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653879
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

Probably, but I'm out of time for today. I'll be out for a few days and can 
pick this back up on Thursday or Friday next week.


---
If your project is set up for it, you can reply to this email and have yo

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653835
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
--- End diff --

Probably, but I'm out of time for today. I'll be out for a few days and can 
pick this back up on Thursday or Friday next week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653757
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  assert(jsonDF.count() === corruptRecordCount)
+  assert(jsonDF.schema === new StructType()
+.add("_corrupt_record", StringType)
+.add("dummy", StringType))
+  val counts = jsonDF

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653580
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
--- End diff --

Yep, I'll fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100652445
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+if (inputPaths.nonEmpty) {
+  val jsonSchema = JsonInferSchema.infer(
+createBaseRdd(sparkSession, inputPaths),
+parsedOptions,
+createParser)
+  checkConstraints(jsonSchema)
+  Some(jsonSchema)
+} else {
+  None
+}
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
--- End diff --

IIRC this was a check added because some of the backends (maybe parquet?) 
were writing corrupt files... if this is checked globally now it should be fine 
to remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100652259
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
--- End diff --

Right, it's a single value that spans multiple lines. The Python test is 
reusing some Python specific test data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100652192
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
--- End diff --

I'm not sure what you mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651990
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
--- End diff --

Right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651910
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
--- End diff --

For a reason that is no longer relevant, I'll switch this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651706
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
 ---
@@ -79,7 +80,7 @@ private[sql] object JsonInferSchema {
 
   private[this] val structFieldComparator = new Comparator[StructField] {
 override def compare(o1: StructField, o2: StructField): Int = {
-  o1.name.compare(o2.name)
+  o1.name.compareTo(o2.name)
--- End diff --

`.compare` is a very expensive way of comparing two strings.

`compare`:
```
  public int compare(org.apache.spark.sql.types.StructField, 
org.apache.spark.sql.types.StructField);
Code:
   0: new   #14 // class 
scala/collection/immutable/StringOps
   3: dup
   4: getstatic #20 // Field 
scala/Predef$.MODULE$:Lscala/Predef$;
   7: aload_1
   8: invokevirtual #26 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
  11: invokevirtual #30 // Method 
scala/Predef$.augmentString:(Ljava/lang/String;)Ljava/lang/String;
  14: invokespecial #34 // Method 
scala/collection/immutable/StringOps."":(Ljava/lang/String;)V
  17: aload_2
  18: invokevirtual #26 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
  21: invokevirtual #37 // Method 
scala/collection/immutable/StringOps.compare:(Ljava/lang/String;)I
  24: ireturn
```

`compareTo`:
```
  public int compare(org.apache.spark.sql.types.StructField, 
org.apache.spark.sql.types.StructField);
Code:
   0: aload_1
   1: invokevirtual #18 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
   4: aload_2
   5: invokevirtual #18 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
   8: invokevirtual #24 // Method 
java/lang/String.compareTo:(Ljava/lang/String;)I
  11: ireturn
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650450
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
--- End diff --

Passing a function in instead of a closure saves an allocation that will be 
held for the duration of parsing, and is likely to be promoted to a later GC 
generation.

If we went the closure route the function signature should be this:

```scala
def parse(
  createParser: JsonFactory => JsonParser,
  recordLiteral: => UTF8String): Seq[InternalRow]
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649836
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,110 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private[this] var isWarningPrinted: Boolean = false
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private def printWarningForMalformedRecord(record: () => UTF8String): 
Unit = {
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
+  @transient
+  private def printWarningIfWholeFile(): Unit = {
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
- 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649635
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -31,10 +31,17 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
  * Most of these map directly to Jackson's internal options, specified in 
[[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-@transient private val parameters: CaseInsensitiveMap)
+@transient private val parameters: CaseInsensitiveMap,
+defaultColumnNameOfCorruptRecord: String)
   extends Logging with Serializable  {
 
-  def this(parameters: Map[String, String]) = this(new 
CaseInsensitiveMap(parameters))
+  def this(
+  parameters: Map[String, String],
+  defaultColumnNameOfCorruptRecord: String = "") = {
--- End diff --

Yes, it's really not a good solution, but it doesn't make sense to have a 
corrupt column name in all use cases. Picking another sentinel could 
inadvertently conflict with a real column. It should be `Option[String] = None` 
but this winds up being a large change that deserves a separate pull request. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100648524
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
+try {
+  Utils.tryWithResource(createParser(factory, record)) { parser =>
+// a null first token is equivalent to testing for 
input.trim.isEmpty
+// but it works on any token stream and not just strings
+parser.nextToken() match {
+  case null => Nil
+  case _ => rootConverter.apply(parser) match {
+case null => throw new SparkSQLJsonProcessingException("Root 
converter returned null")
+case rows => rows
   }
 }
-  } catch {
-case _: JsonProcessingException =>
-  failedRecord(input)
-case _: SparkSQLJsonProcessingException =>
-  failedRecord(input)
   }
+} catch {
+  case (_: JsonProcessingException) | (_: 
SparkSQLJsonProcessingException) =>
+failedRecord(() => recordLiteral.applyOrElse[T, UTF8String](
--- End diff --

When I do that I usually get review comments to make call by name 
parameters explicit...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100646497
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,98 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private val printWarningForMalformedRecord = ExecuteOnce[() => 
UTF8String] { record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMa

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100640620
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,98 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private val printWarningForMalformedRecord = ExecuteOnce[() => 
UTF8String] { record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMa

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100610662
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,98 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private val printWarningForMalformedRecord = ExecuteOnce[() => 
UTF8String] { record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private val printWarningIfWholeFile = ExecuteOnce[Unit] { _ =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMa

[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-09 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100474809
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,102 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private var printWarningForMalformedRecord: (() => UTF8String) => Unit = 
{ record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+
+printWarningForMalformedRecord = _ => ()
--- End diff --

@cloud-fan please take a look at aafe7bded6e614dddaed74d15cecfb8b1a78a639, 
I hope this is more clear


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16199: [SPARK-18772][SQL] NaN/Infinite float parsing in JSON is...

2017-02-09 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16199
  
@HyukjinKwon Good idea, I'll take another stab and try to revive the 
original pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-09 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100344879
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,102 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private var printWarningForMalformedRecord: (() => UTF8String) => Unit = 
{ record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+
+printWarningForMalformedRecord = _ => ()
--- End diff --

I can wrap it up into a helper class, this is to avoid having to keep 
multiple variables in sync.. `wholeFile` adds another warning, and I'm sure 
there will be additional warnings added in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-09 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100344282
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,102 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private var printWarningForMalformedRecord: (() => UTF8String) => Unit = 
{ record =>
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+
+printWarningForMalformedRecord = _ => ()
+  }
+
+  @transient
+  private var printWarningIfWholeFile: () => Unit = { () =>
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+
+printWarningIfWholeFile = () => ()
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.jso

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100219153
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---
@@ -194,5 +195,8 @@ class PortableDataStream(
   }
 
   def getPath(): String = path
+
+  @Since("2.2.0")
--- End diff --

Done, pushed in f71a465cf07fb9c043b2ccd86fa57e8e8ea9dc00


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
Rebased again to pickup the build break hotfix in 
c618ccdbe9ac103dfa3182346e2a14a1e7fca91a


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
I rebased to master and hopefully addressed all of your comments 
@cloud-fan, please have another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100104738
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -298,22 +312,22 @@ class JacksonParser(
 // Here, we pass empty `PartialFunction` so that this case can be
 // handled as a failed conversion. It will throw an exception as
 // long as the value is not null.
-parseJsonToken(parser, dataType)(PartialFunction.empty[JsonToken, 
Any])
+parseJsonToken[AnyRef](parser, 
dataType)(PartialFunction.empty[JsonToken, AnyRef])
   }
 
   /**
* This method skips `FIELD_NAME`s at the beginning, and handles nulls 
ahead before trying
* to parse the JSON token using given function `f`. If the `f` failed 
to parse and convert the
* token, call `failedConversion` to handle the token.
*/
-  private def parseJsonToken(
+  private def parseJsonToken[R >: Null](
--- End diff --

It states that `R` must be a nullable type. This enables `null: R` to 
compile and is preferable to the runtime cast `null.asInstanceOf[R]` because it 
is verified at compile time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100103739
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -227,66 +267,71 @@ class JacksonParser(
   }
 
 case TimestampType =>
-  (parser: JsonParser) => parseJsonToken(parser, dataType) {
+  (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, 
dataType) {
 case VALUE_STRING =>
+  val stringValue = parser.getText
   // This one will lose microseconds parts.
   // See https://issues.apache.org/jira/browse/SPARK-10681.
-  Try(options.timestampFormat.parse(parser.getText).getTime * 
1000L)
-.getOrElse {
-  // If it fails to parse, then tries the way used in 2.0 and 
1.x for backwards
-  // compatibility.
-  DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
-}
+  Long.box {
--- End diff --

This is needed to satisfy the type checker. The other approach is to 
explicitly specify the type in two locations: 
`Try[java.lang.Long](...).getOrElse[java.lang.Long](...)`. I found explicitly 
boxing to be more readable than the alternative.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100101464
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -160,7 +164,17 @@ public void writeTo(OutputStream out) throws 
IOException {
 throw new ArrayIndexOutOfBoundsException();
   }
 
-  out.write(bytes, (int) arrayOffset, numBytes);
+  return ByteBuffer.wrap(bytes, (int) arrayOffset, numBytes);
+} else {
+  return null;
--- End diff --

It will allocate an extra object but would simplify the calling code... 
since it would be a short lived allocation it's probably fine to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100100641
  
--- Diff: 
core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---
@@ -194,5 +195,8 @@ class PortableDataStream(
   }
 
   def getPath(): String = path
+
+  @Since("2.2.0")
--- End diff --

This is a public class so I thought adding a `since` tag would benefit the 
documentation. If it's not desired I can certainly remove it.

As for making the lazy val public vs private: I'm following the style used 
already in the class. There are public get methods for each private field. I'm 
not partial to either approach but prefer to keep it consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100099791
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -31,10 +31,17 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
  * Most of these map directly to Jackson's internal options, specified in 
[[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-@transient private val parameters: CaseInsensitiveMap)
+@transient private val parameters: CaseInsensitiveMap,
+defaultColumnNameOfCorruptRecord: String)
--- End diff --

Previously the `JSONOptions` instance was always passed around with a 
`columnNameOfCorruptRecord` value. This just makes it a field in `JSONOptions` 
instead to put all options in one place. Since it's a required option it made 
more sense to use a field instead making an entry in the `CaseInsensitiveMap`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100098008
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,125 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempDir { dir =>
+  dir.delete()
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempDir { dir =>
+  dir.delete()
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempDir { dir =>
+  dir.delete()
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.flatMap(Iterator.fill(3)(_) ++ Iterator("\n{invalid}"))
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-08 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100097749
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+if (inputPaths.nonEmpty) {
+  val jsonSchema = InferSchema.infer(
+createBaseRdd(sparkSession, inputPaths),
+parsedOptions,
+createParser)
+  checkConstraints(jsonSchema)
+  Some(jsonSchema)
+} else {
+  None
+}
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
+if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
+  val duplicateColumns = schema.fieldNames.groupBy(identity).collect {
+case (x, ys) if ys.length > 1 => "\"" + x + "\""
+  }.mkString(", ")
+  throw new AnalysisException(s"Duplicate column(s) : 
$duplicateColumns found, " +
+s"cannot save to JSON format")
+}
+  }
+}
+
+object JsonDataSource {
+  def apply(options: JSONOptions): JsonDataSource[_] = {
+if (options.wholeFile) {
+  WholeFileJsonDataSource
+} else {
+  TextInputJsonDataSource
+}
+  }
+
+  /**
+   * Create a new [[RDD]] via the supplied callback if there is at least 
one file to process,
+   * otherwise an [[org.apache.spark.rdd.EmptyRDD]] will be returned.
+   */
+  def createBaseRddConf[T : ClassTag](
--- End diff --

Habit from working with languages that do

[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-01-23 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
Any other comments?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-01-10 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-01-10 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
Can someone kick off the tests again? The last failure was in another 
module (Kafka).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2016-12-29 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
@HyukjinKwon I just pushed a change that makes the corrupt record handling 
consistent: if a corrupt record column is defined it will always get the json 
text for failed records. If `wholeFile` is enabled a warning is emitted.

I think more discussion is needed to figure out the best way to handle 
corrupt records and exceptions, perhaps it can be shelved for now and we can 
pick it up later under another ticket?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2016-12-27 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
The tests failed for an unrelated reason, looks to be running out of heap 
space in SBT somewhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2016-12-27 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
@HyukjinKwon I agree that overloading the corrupt record column is 
undesirable and `F.input_file_name` is a better way to fetch the filename. It 
would be nice to extend this concept further and provide new functions (like 
`F.json_exception`) to retrieve exceptions and their locations, and this would 
work for the base case (parsing a string) as well as `wholeFile`. Plumbing this 
type of change through appears to require thread locale storage (unfortunately) 
but otherwise doesn't look too bad.

The question then is what to put in the corrupt record column, if one is 
defined, when in `wholeFile` mode. To retain consistency with the string paths 
we should really put the entire file in the column. This is problematic for 
large files (>2GB) since Spark SQL doesn't have blob support... so the 
allocations will fail (along with the task) and there is no way for the end 
user to work around this limitation. Functions like `substr` are applied to 
byte arrays and not file streams. Perhaps it's good enough to issue a warning 
(along the lines of "don't define a corrupt record column in wholeFile mode" 
and hope for the best?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2016-12-27 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r93970059
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 ---
@@ -36,29 +31,31 @@ import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+object JsonFileFormat {
+  def parseJsonOptions(sparkSession: SparkSession, options: Map[String, 
String]): JSONOptions = {
--- End diff --

I just removed the method entirely since all it did was fetch the value of 
`columnNameOfCorruptRecord`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2016-12-27 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r93969732
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+val jsonSchema = InferSchema.infer(
+  createBaseRdd(sparkSession, inputPaths),
+  parsedOptions,
+  createParser)
+checkConstraints(jsonSchema)
+
+if (jsonSchema.fields.nonEmpty) {
--- End diff --

Yes, it was a regression that caused a test failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2016-12-23 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
@srowen It is functionally the same as what you're suggesting. The question 
is how (or if) it should it be first class in the `DataFrameReader` api. If we 
agree that it should be exposed, either via a new `FileFormat` or an option to 
`JsonFileFormat`, some abstraction is necessary to support reading from 
different RDD classes.

This PR just pushes that boundary a little further and let's the inference 
and parser code work over more types, not just `String`. This may make parsing 
more efficient in the line oriented codepath by avoiding a conversion from 
`Text` and `UTF8String` (in `JsonToStruct`) to `String`, and also lets us parse 
an `InputStream` without requiring all of the data to be in memory. For small 
files it's not likely to have a benefit (if the file is smaller than 4k it will 
be read entirely anyways) but as the file size increases this reduces the 
amount of memory required for parsing, is friendlier (in theory) on the GC and 
let's us consume files larger than 2GB.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2016-12-22 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
Hello recent JacksonGenerator.scala commiters, please take a look.

cc/ @rxin @hvanhovell @clockfly @hyukjinkwon @cloud-fan


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2016-12-22 Thread NathanHowell
GitHub user NathanHowell opened a pull request:

https://github.com/apache/spark/pull/16386

[SPARK-18352][SQL] Support parsing multiline json files

## What changes were proposed in this pull request?

If a new option `wholeFile` is set to `true` the JSON reader will parse 
each file (instead of a single line) as a value. This is done with Jackson 
streaming and it should be capable of parsing very large documents, assuming 
the row will fit in memory.

Because the file is not buffered in memory the corrupt record handling is 
also slightly different when `wholeFile` is enabled: the corrupt column will 
contain the filename instead of the literal JSON if there is a parsing failure. 
It would be easy to extend this to add the parser location (line, column and 
byte offsets) to the output if desired.

I've also included a few other changes that generate slightly better 
bytecode and (imo) make it more obvious when and where boxing is occurring in 
the parser. These are included as separate commits, let me know if they should 
be flattened into this PR or moved to a new one.

## How was this patch tested?

New and existing unit tests. No performance or load tests have been run.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NathanHowell/spark SPARK-18352

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16386


commit 740620210b30ef02e280d161d6b08088d07300fa
Author: Nathan Howell 
Date:   2016-12-22T22:16:49Z

[SPARK-18352][SQL] Support parsing multiline json files

commit 7902255a79fc2581214a09ccd38437cebd19d862
Author: Nathan Howell 
Date:   2016-12-22T00:27:19Z

JacksonParser.parseJsonToken should be explicit about nulls and boxing

commit 149418647c9831e88af866d44d31496940c02162
Author: Nathan Howell 
Date:   2016-12-21T23:49:37Z

Increase type safety of makeRootConverter, remove runtime type tests

commit 7ad5d5be0c7b41112f9f6ad3cb0cf9055de62695
Author: Nathan Howell 
Date:   2016-12-23T02:13:59Z

Field converter lookups should be O(1)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16375: [SPARK-18963] o.a.s.unsafe.types.UTF8StringSuite....

2016-12-21 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16375#discussion_r93460507
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java 
---
@@ -591,7 +591,11 @@ public void writeToOutputStreamIntArray() throws 
IOException {
 // verify that writes work on objects that are not byte arrays
 final ByteBuffer buffer = 
StandardCharsets.UTF_8.encode("大千世界");
 buffer.position(0);
-buffer.order(ByteOrder.LITTLE_ENDIAN);
+
--- End diff --

Yeah, that's probably the right approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16199: [SPARK-18772][SQL] NaN/Infinite float parsing in JSON is...

2016-12-07 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16199
  
Hello @HyukjinKwon, can you take a look at this one? I am unsure if we 
should be accepting lowercased values like `nan` (versus strictly testing for 
`NaN`) but I think this PR matches the original intent of the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16199: [SPARK-18772][SQL] NaN/Infinite float parsing in ...

2016-12-07 Thread NathanHowell
GitHub user NathanHowell opened a pull request:

https://github.com/apache/spark/pull/16199

[SPARK-18772][SQL] NaN/Infinite float parsing in JSON is inconsistent

## What changes were proposed in this pull request?

This relaxes the parsing of `Float` and `Double` columns to properly 
support mixed case values of `NaN` and (+/-)`Infinity`, as well as properly 
supporting (+/-)`Inf`. Currently a string literal of `Nan` or `InfinitY` will 
cause a task to fail instead of placing the record in the corrupt record 
column, and `Inf` causes a failure instead of being a valid double.

## How was this patch tested?

Additional unit tests have been added

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NathanHowell/spark SPARK-18772

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16199.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16199


commit 11ac4438e12dc01ba252304da8793077280f3067
Author: Nathan Howell 
Date:   2016-12-07T23:32:14Z

[SPARK-18772][SQL] NaN/Infinite float parsing in JSON is inconsistent




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16107: SPARK-18677: Fix parsing ['key'] in JSON path expression...

2016-12-02 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16107
  
I wrote the buggy version, doh... but this LGTM. Thanks for fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-12-01 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90566162
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -147,6 +147,17 @@ public void writeTo(ByteBuffer buffer) {
 buffer.position(pos + numBytes);
   }
 
+  public void writeTo(OutputStream out) throws IOException {
--- End diff --

I've added a few tests for this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-12-01 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90521381
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
 ---
@@ -245,24 +230,12 @@ private[csv] class CsvOutputWriter(
   override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
 
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
-csvWriter.writeRow(rowToString(row), records == 0L && 
params.headerFlag)
-records += 1
-if (records % FLUSH_BATCH_SIZE == 0) {
-  flush()
-}
-  }
-
-  private def flush(): Unit = {
-val lines = csvWriter.flush()
-if (lines.nonEmpty) {
-  text.set(lines)
-  recordWriter.write(NullWritable.get(), text)
-}
+csvWriter.writeRow(rowToString(row), printHeader)
--- End diff --

The uniVocity CSV writer converts every column to a `String` before writing 
so it's (probably?) not possible to further optimize this without doing a whole 
bunch of work. I only did a quick scan through their code though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-12-01 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90509459
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 ---
@@ -194,4 +194,8 @@ private[sql] class JacksonGenerator(
   writeFields(row, schema, rootFieldWriters)
 }
   }
+
+  def writeLineEnding(): Unit = {
+gen.writeRaw('\n')
--- End diff --

That is my assumption. I'm also assuming that writing a single byte is 
slightly more efficient than writing an array of a single byte.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16089: [SPARK-18658][SQL] Write text records directly to a File...

2016-12-01 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16089
  
@steveloughran Spark is handling the output committing somewhere further up 
the stack. The path being passed in to `OutputWriterFactory.newInstance` is to 
a temporary file, such as 
`/private/var/folders/sq/vmncyd7506q_ch43llrwr8sn6zfknl/T/spark-3db2844b-1f3c-45c2-8bf4-8a3c81440e38/_temporary/0/_temporary/attempt_20161201081833__m_00_0/part-0-8dd44cea-c01e-4bfe-ab03-641ebce18afb.txt`.

I'll make a pass through the existing tests to see if anything obvious is 
missing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-12-01 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90503024
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{OutputStream, OutputStreamWriter}
+import java.nio.charset.{Charset, StandardCharsets}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.compress._
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.util.ReflectionUtils
+
+object CodecStreams {
+  private def getCompressionCodec(
+  context: JobContext,
+  file: Option[Path] = None): Option[CompressionCodec] = {
+if (FileOutputFormat.getCompressOutput(context)) {
+  val compressorClass = FileOutputFormat.getOutputCompressorClass(
+context,
+classOf[GzipCodec])
+
+  Some(ReflectionUtils.newInstance(compressorClass, 
context.getConfiguration))
+} else {
+  file.flatMap { path =>
+val compressionCodecs = new 
CompressionCodecFactory(context.getConfiguration)
+Option(compressionCodecs.getCodec(path))
+  }
+}
+  }
+
+  /**
+   * Create a new file and open it for writing.
+   * If compression is enabled in the [[JobContext]] the stream will write 
compressed data to disk.
+   * An exception will be thrown if the file already exists.
--- End diff --

Is this a problem with Hadoop in general? The FileSystem docs also specify 
this behavior:

```
  /**
   * Create an FSDataOutputStream at the indicated Path.
   * @param f the file to create
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an exception will be 
thrown.
   */
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-12-01 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90502343
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java ---
@@ -147,6 +147,17 @@ public void writeTo(ByteBuffer buffer) {
 buffer.position(pos + numBytes);
   }
 
+  public void writeTo(OutputStream out) throws IOException {
--- End diff --

Agreed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-12-01 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90501927
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 ---
@@ -194,4 +194,8 @@ private[sql] class JacksonGenerator(
   writeFields(row, schema, rootFieldWriters)
 }
   }
+
+  def writeLineEnding(): Unit = {
+gen.writeRaw('\n')
--- End diff --

7-bit ASCII is a subset of UTF-8, `\n` is the same in both.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-12-01 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90488454
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
 ---
@@ -132,39 +128,17 @@ class TextOutputWriter(
 context: TaskAttemptContext)
   extends OutputWriter {
 
-  private[this] val buffer = new Text()
-
-  private val recordWriter: RecordWriter[NullWritable, Text] = {
-new TextOutputFormat[NullWritable, Text]() {
-  override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
-new Path(path)
-  }
-}.getRecordWriter(context)
-  }
+  private val writer = CodecStreams.getOutputStream(context, new 
Path(path))
 
   override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
 
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
 val utf8string = row.getUTF8String(0)
-buffer.set(utf8string.getBytes)
-recordWriter.write(NullWritable.get(), buffer)
+writer.write(utf8string.getBytes)
--- End diff --

Done, but I'm not 100% sure about the implementation. Can you have someone 
more familiar with `UTF8String`'s internals double check it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-12-01 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90468858
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
 ---
@@ -132,39 +128,17 @@ class TextOutputWriter(
 context: TaskAttemptContext)
   extends OutputWriter {
 
-  private[this] val buffer = new Text()
-
-  private val recordWriter: RecordWriter[NullWritable, Text] = {
-new TextOutputFormat[NullWritable, Text]() {
-  override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
-new Path(path)
-  }
-}.getRecordWriter(context)
-  }
+  private val writer = CodecStreams.getOutputStream(context, new 
Path(path))
 
   override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
 
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
 val utf8string = row.getUTF8String(0)
-buffer.set(utf8string.getBytes)
-recordWriter.write(NullWritable.get(), buffer)
+writer.write(utf8string.getBytes)
--- End diff --

It is creating a new array, I'll pass the internal one through instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-12-01 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90468563
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{OutputStream, OutputStreamWriter}
+import java.nio.charset.{Charset, StandardCharsets}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.compress._
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.util.ReflectionUtils
+
+object CodecStreams {
+  private def getCompressionCodec(
+  context: JobContext,
+  file: Option[Path] = None): Option[CompressionCodec] = {
+if (FileOutputFormat.getCompressOutput(context)) {
+  val compressorClass = FileOutputFormat.getOutputCompressorClass(
+context,
+classOf[GzipCodec])
+
+  Some(ReflectionUtils.newInstance(compressorClass, 
context.getConfiguration))
+} else {
+  file.flatMap { path =>
+val compressionCodecs = new 
CompressionCodecFactory(context.getConfiguration)
+Option(compressionCodecs.getCodec(path))
+  }
+}
+  }
+
+  /** Create a new file and open it for writing.
+   * If compression is enabled in the [[JobContext]] the stream will write 
compressed data to disk.
+   * An exception will be thrown if the file already exists.
+   */
+  def getOutputStream(context: JobContext, file: Path): OutputStream = {
+val fs = file.getFileSystem(context.getConfiguration)
+val outputStream: OutputStream = fs.create(file, false)
+
+getCompressionCodec(context, Some(file)).fold(outputStream) { codec =>
--- End diff --

Yah, it's a terrible name (and it's not a fold). I'll replace them with 
`.map(...).getOrElse`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-11-30 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90385252
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.io.{OutputStream, OutputStreamWriter}
+import java.nio.charset.{Charset, StandardCharsets}
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.compress._
+import org.apache.hadoop.mapreduce.JobContext
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+import org.apache.hadoop.util.ReflectionUtils
+
+private[spark] object CodecStreams {
--- End diff --

Looks that way, I've removed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16089: [SPARK-18658][SQL] Write text records directly to a File...

2016-11-30 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16089
  
Doh, forgot to run the Hive tests. Should be fixed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-11-30 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16089#discussion_r90380594
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
 ---
@@ -132,39 +128,17 @@ class TextOutputWriter(
 context: TaskAttemptContext)
   extends OutputWriter {
 
-  private[this] val buffer = new Text()
-
-  private val recordWriter: RecordWriter[NullWritable, Text] = {
-new TextOutputFormat[NullWritable, Text]() {
-  override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
-new Path(path)
-  }
-}.getRecordWriter(context)
-  }
+  private val writer = CodecStreams.getOutputStream(context, new 
Path(path))
 
   override def write(row: Row): Unit = throw new 
UnsupportedOperationException("call writeInternal")
 
   override protected[sql] def writeInternal(row: InternalRow): Unit = {
 val utf8string = row.getUTF8String(0)
-buffer.set(utf8string.getBytes)
-recordWriter.write(NullWritable.get(), buffer)
+writer.write(utf8string.getBytes)
+writer.write('\n')
--- End diff --

This mirrors what Hadoop code does, see 
https://github.com/apache/hadoop/blob/f67237cbe7bc48a1b9088e990800b37529f1db2a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java#L48-L49


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16089: [SPARK-18658][SQL] Write text records directly to a File...

2016-11-30 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16089
  
Yep. It uses the Hadoop `FileSystem` class to open files, just like 
`TextOutputFormat` does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16089: [SPARK-18658][SQL] Write text records directly to a File...

2016-11-30 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16089
  
This touches a fair number of components. I also haven't done any 
performance testing to see what the impact of this is. Curious what your 
thoughts are?

cc/ @marmbrus @rxin @JoshRosen


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16089: [SPARK-18658][SQL] Write text records directly to...

2016-11-30 Thread NathanHowell
GitHub user NathanHowell opened a pull request:

https://github.com/apache/spark/pull/16089

[SPARK-18658][SQL] Write text records directly to a FileOutputStream

## What changes were proposed in this pull request?

This replaces uses of `TextOutputFormat` with an `OutputStream`, which will 
either write directly to the filesystem or indirectly via a compressor (if so 
configured). This avoids intermediate buffering.

The inverse of this (reading directly from a stream) is necessary for 
streaming large JSON records (when `wholeFile` is enabled) so I wanted to keep 
the read and write paths symmetric.

## How was this patch tested?

Existing unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NathanHowell/spark SPARK-18658

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16089


commit 66e02959dd5f750579d29c8d79b577844df58c0c
Author: Nathan Howell 
Date:   2016-11-30T22:39:53Z

[SPARK-18658][SQL] Write text records directly to a FileOutputStream




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16084: [SPARK-18654][SQL] Remove unreachable patterns in makeRo...

2016-11-30 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16084
  
cc/ @HyukjinKwon 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16084: [SPARK-18654][SQL] Remove unreachable patterns in...

2016-11-30 Thread NathanHowell
GitHub user NathanHowell opened a pull request:

https://github.com/apache/spark/pull/16084

[SPARK-18654][SQL] Remove unreachable patterns in makeRootConverter

## What changes were proposed in this pull request?

`makeRootConverter` is only called with a `StructType` value. By making 
this method less general we can remove pattern matches, which are never 
actually hit outside of the test suite.

## How was this patch tested?

The existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NathanHowell/spark SPARK-18654

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16084.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16084


commit 3535acf4f84a9057f4bbb88a81e4fff5f5167c0d
Author: Nathan Howell 
Date:   2016-11-30T18:47:16Z

[SPARK-18654][SQL] Remove unreachable patterns in makeRootConverter




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15813: [SPARK-18362][SQL] Use TextFileFormat in JsonFileFormat ...

2016-11-22 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/15813
  
Any thoughts on modifying `JsonToStruct` to support arrays (and options), 
then parsing could be something like:

```
dataset.select(
  Column(Inline(
JsonToValue(
  ArrayType(schema),
  options,
  Column("value").expr
```

Likely out of scope for this pull request, but if there is a push to 
migrate from `RDD[T]` to `Dataset[T]` it would clean things up a bit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14972] Improve performance of JSON sche...

2016-04-28 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/12750#discussion_r61526935
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
 ---
@@ -246,12 +263,39 @@ private[sql] object InferSchema {
   }
 
 case (StructType(fields1), StructType(fields2)) =>
-  val newFields = (fields1 ++ fields2).groupBy(field => 
field.name).map {
-case (name, fieldTypes) =>
-  val dataType = 
fieldTypes.view.map(_.dataType).reduce(compatibleType)
-  StructField(name, dataType, nullable = true)
+  // Both fields1 and fields2 should be sorted by name, since 
inferField performs sorting.
+  // Therefore, we can take advantage of the fact that we're 
merging sorted lists and skip
+  // building a hash map or performing additional sorting.
+  val newFields = new mutable.ArrayBuffer[StructField]()
+
+  var f1Idx = 0
+  var f2Idx = 0
+
+  while (f1Idx < fields1.length && f2Idx < fields2.length) {
+val f1Name = fields1(f1Idx).name
+val f2Name = fields2(f2Idx).name
+if (f1Name == f2Name) {
+  val dataType = compatibleType(fields1(f1Idx).dataType, 
fields2(f2Idx).dataType)
+  newFields += StructField(f1Name, dataType, nullable = true)
+  f1Idx += 1
+  f2Idx += 1
+} else if (f1Name < f2Name) {
+  newFields += fields1(f1Idx)
+  f1Idx += 1
+} else { // f1Name > f2Name
+  newFields += fields2(f2Idx)
+  f2Idx += 1
+}
+  }
+  while (f1Idx < fields1.length) {
+newFields += fields1(f1Idx)
+f1Idx += 1
+  }
+  while (f2Idx < fields2.length) {
+newFields += fields2(f2Idx)
+f2Idx += 1
   }
-  StructType(newFields.toSeq.sortBy(_.name))
+  StructType(newFields.toSeq)
--- End diff --

`StructType(Seq[StructField])` just calls `toArray` on the fields... so I 
would use `toArray` instead of `toSeq` to reduce one allocation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14972] Improve performance of JSON sche...

2016-04-28 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/12750#discussion_r61526900
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
 ---
@@ -246,12 +263,39 @@ private[sql] object InferSchema {
   }
 
 case (StructType(fields1), StructType(fields2)) =>
-  val newFields = (fields1 ++ fields2).groupBy(field => 
field.name).map {
-case (name, fieldTypes) =>
-  val dataType = 
fieldTypes.view.map(_.dataType).reduce(compatibleType)
-  StructField(name, dataType, nullable = true)
+  // Both fields1 and fields2 should be sorted by name, since 
inferField performs sorting.
+  // Therefore, we can take advantage of the fact that we're 
merging sorted lists and skip
+  // building a hash map or performing additional sorting.
+  val newFields = new mutable.ArrayBuffer[StructField]()
--- End diff --

Consider calling `newFields.sizeHint(fields1.length max fields2.length)` 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14972] Improve performance of JSON sche...

2016-04-28 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/12750#discussion_r61526786
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala
 ---
@@ -76,6 +78,15 @@ private[sql] object InferSchema {
 }
   }
 
+  private def sortFieldsInPlace(fields: Array[StructField]): Unit = {
+// Note: other code relies on this sorting for correctness, so don't 
remove it!
+java.util.Arrays.sort(fields, new Comparator[StructField] {
--- End diff --

Should the `Comparator` instance be cached?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14972] Improve performance of JSON sche...

2016-04-28 Thread NathanHowell
Github user NathanHowell commented on the pull request:

https://github.com/apache/spark/pull/12750#issuecomment-215607825
  
Alright, here's a few ideas that will at least reduce allocations by a bit. 
Your version with the merge sort is likely better than the insertion sort here 
but I thought I'd try something different :trollface: any chance you can run it 
through your benchmark?

I'll put my other comments inline, generally looks good to me.

``` scala
  /**
* Combine two sorted arrays of StructFields into a new StructType
*/
  private def compatibleFields(
  fields1: Array[StructField],
  fields2: Array[StructField]): StructType = {
// perform an insertion sort of the smaller struct into the larger one
val (bigger, smaller) = if (fields1.length > fields2.length) {
  (fields1.toBuffer, fields2)
} else {
  (fields2.toBuffer, fields1)
}

var biggerIdx = 0
var smallerIdx = 0
while (biggerIdx < bigger.length && smallerIdx < smaller.length) {
  val biggerVal = bigger(biggerIdx)
  val smallerVal = smaller(smallerIdx)
  val comp = biggerVal.name.compareTo(smallerVal.name)
  if (comp == 0) {
if (biggerVal.dataType != smallerVal.dataType) {
  val merged = compatibleType(biggerVal.dataType, 
smallerVal.dataType)
  // test to see if the merged type is equivalent to one of the 
existing
  // StructField instances, reuse will reduce GC pressure
  if (smallerVal.dataType == merged) {
bigger.update(biggerIdx, smallerVal)
  } else if (biggerVal.dataType == merged) {
// do nothing, the bigger struct already has the correct field
  } else {
// we can't reuse an existing field so allocate a new one
bigger.update(biggerIdx, biggerVal.copy(dataType = merged))
  }
}
biggerIdx += 1
smallerIdx += 1
  } else if (comp > 0) {
bigger.insert(biggerIdx, smallerVal)
// bump both indexes, the bigger struct has grown
biggerIdx += 1
smallerIdx += 1
  } else { // comp < 0
// advance to the next field on the bigger struct
// nothing else to do here
biggerIdx += 1
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14972] Improve performance of JSON sche...

2016-04-28 Thread NathanHowell
Github user NathanHowell commented on the pull request:

https://github.com/apache/spark/pull/12750#issuecomment-215585269
  
Would Guava's `Iterables.mergeSorted[T]` help out here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12182][ML] Distributed binning for tree...

2016-03-18 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/10231#discussion_r56742884
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala ---
@@ -956,7 +956,7 @@ private[ml] object RandomForest extends Logging {
 valueCounts.map(_._1)
   } else {
 // stride between splits
-val stride: Double = featureSamples.length.toDouble / (numSplits + 
1)
+val stride: Double = featureSamples.size.toDouble / (numSplits + 1)
--- End diff --

This will do a second pass over the `Iterable`. Would it be preferable to 
combine this into the `foldLeft` above so it only does a single pass?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12182][ML] Distributed binning for tree...

2016-01-06 Thread NathanHowell
Github user NathanHowell commented on the pull request:

https://github.com/apache/spark/pull/10231#issuecomment-169393380
  
@sethah looks good to me. :+1: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12182][ML] Distributed binning for tree...

2015-12-09 Thread NathanHowell
Github user NathanHowell commented on the pull request:

https://github.com/apache/spark/pull/10231#issuecomment-163422959
  
Yeah I can take a look tonight or tomorrow
On Dec 9, 2015 14:25, "Seth Hendrickson"  wrote:
    
> @NathanHowell <https://github.com/NathanHowell> would you be able to
> review this?
>
> cc @jkbradley <https://github.com/jkbradley>
>
> —
> Reply to this email directly or view it on GitHub
> <https://github.com/apache/spark/pull/10231#issuecomment-163419168>.
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10064] [ML] Parallelize decision tree b...

2015-10-07 Thread NathanHowell
Github user NathanHowell commented on the pull request:

https://github.com/apache/spark/pull/8246#issuecomment-146348263
  
There were already tests for the returned split lengths, so I just removed 
the metadata checks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10064] [ML] Parallelize decision tree b...

2015-10-06 Thread NathanHowell
Github user NathanHowell commented on the pull request:

https://github.com/apache/spark/pull/8246#issuecomment-146009085
  
I'll have time tomorrow


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple

2015-09-30 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7946#discussion_r40846432
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala
 ---
@@ -307,3 +308,140 @@ case class GetJsonObject(json: Expression, path: 
Expression)
 }
   }
 }
+
+case class JsonTuple(children: Seq[Expression])
+  extends Expression with CodegenFallback {
+
+  import SharedFactory._
+
+  override def nullable: Boolean = {
+// a row is always returned
+false
+  }
+
+  // if processing fails this shared value will be returned
+  @transient private lazy val nullRow: InternalRow =
+new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length))
+
+  // the json body is the first child
+  @transient private lazy val jsonExpr: Expression = children.head
+
+  // the fields to query are the remaining children
+  @transient private lazy val fieldExpressions: Seq[Expression] = 
children.tail
+
+  // eagerly evaluate any foldable the field names
+  @transient private lazy val foldableFieldNames: IndexedSeq[String] = {
+fieldExpressions.map {
+  case expr if expr.foldable => 
expr.eval().asInstanceOf[UTF8String].toString
+  case _ => null
+}.toIndexedSeq
+  }
+
+  // and count the number of foldable fields, we'll use this later to 
optimize evaluation
+  @transient private lazy val constantFields: Int = 
foldableFieldNames.count(_ != null)
+
+  override lazy val dataType: StructType = {
+val fields = fieldExpressions.zipWithIndex.map {
+  case (_, idx) => StructField(
+name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize
+dataType = StringType,
+nullable = true)
+}
+
+StructType(fields)
+  }
+
+  override def prettyName: String = "json_tuple"
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length < 2) {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two 
arguments")
+} else if (children.forall(child => 
StringType.acceptsType(child.dataType))) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all 
arguments are strings")
+}
+  }
+
+  override def eval(input: InternalRow): InternalRow = {
+try {
+  val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
+  if (json == null) {
+return nullRow
+  }
+
+  val parser = jsonFactory.createParser(json.getBytes)
--- End diff --

I think the answer is no, but it's probably better to do it anyways. 
`GetJsonObject` doesn't do this, for example... might be worth fixing there too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple

2015-09-30 Thread NathanHowell
Github user NathanHowell commented on the pull request:

https://github.com/apache/spark/pull/7946#issuecomment-144529990
  
Alright, I think I've addressed all your comments @yhuai. I haven't run the 
tests though :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple

2015-09-30 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7946#discussion_r40810618
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala
 ---
@@ -307,3 +308,140 @@ case class GetJsonObject(json: Expression, path: 
Expression)
 }
   }
 }
+
+case class JsonTuple(children: Seq[Expression])
+  extends Expression with CodegenFallback {
+
+  import SharedFactory._
+
+  override def nullable: Boolean = {
+// a row is always returned
+false
+  }
+
+  // if processing fails this shared value will be returned
+  @transient private lazy val nullRow: InternalRow =
+new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length))
+
+  // the json body is the first child
+  @transient private lazy val jsonExpr: Expression = children.head
+
+  // the fields to query are the remaining children
+  @transient private lazy val fieldExpressions: Seq[Expression] = 
children.tail
+
+  // eagerly evaluate any foldable the field names
+  @transient private lazy val foldableFieldNames: IndexedSeq[String] = {
+fieldExpressions.map {
+  case expr if expr.foldable => 
expr.eval().asInstanceOf[UTF8String].toString
+  case _ => null
+}.toIndexedSeq
+  }
+
+  // and count the number of foldable fields, we'll use this later to 
optimize evaluation
+  @transient private lazy val constantFields: Int = 
foldableFieldNames.count(_ != null)
+
+  override lazy val dataType: StructType = {
+val fields = fieldExpressions.zipWithIndex.map {
+  case (_, idx) => StructField(
+name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize
+dataType = StringType,
+nullable = true)
+}
+
+StructType(fields)
+  }
+
+  override def prettyName: String = "json_tuple"
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length < 2) {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two 
arguments")
+} else if (children.forall(child => 
StringType.acceptsType(child.dataType))) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all 
arguments are strings")
+}
+  }
+
+  override def eval(input: InternalRow): InternalRow = {
+try {
+  val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
+  if (json == null) {
+return nullRow
+  }
+
+  val parser = jsonFactory.createParser(json.getBytes)
+
+  // only objects are supported
+  if (parser.nextToken() != JsonToken.START_OBJECT) {
+return nullRow
+  }
+
+  // evaluate the field names as String rather than UTF8String to
+  // optimize lookups from the json token, which is also a String
+  val fieldNames = if (constantFields == fieldExpressions.length) {
+// typically the user will provide the field names as foldable 
expressions
+// so we can use the cached copy
+foldableFieldNames
+  } else if (constantFields == 0) {
+// none are foldable so all field names need to be evaluated from 
the input row
+
fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String].toString)
+  } else {
+// if there is a mix of constant and non-constant expressions
+// prefer the cached copy when available
+foldableFieldNames.zip(fieldExpressions).map {
+  case (null, expr) => 
expr.eval(input).asInstanceOf[UTF8String].toString
+  case (fieldName, _) => fieldName
+}
+  }
+
+  val row = Array.ofDim[Any](fieldNames.length)
+
+  // start reading through the token stream, looking for any requested 
field names
+  while (parser.nextToken() != JsonToken.END_OBJECT) {
+if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
+  // check to see if this field is desired in the output
+  val idx = fieldNames.indexOf(parser.getCurrentName)
+  if (idx >= 0) {
+// it is, copy the child tree to the correct location in the 
output row
+val output = new ByteArrayOutputStream()
+
+// write the output directly to UTF8 encoded byte array
+val generator = jsonFactory.createGenerator(output, 
JsonEncoding.UTF8)
--- End diff --

I'm going to move the generator construction into the `if` block, it's not 
used otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear o

[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple

2015-09-30 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7946#discussion_r40810335
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala
 ---
@@ -307,3 +308,140 @@ case class GetJsonObject(json: Expression, path: 
Expression)
 }
   }
 }
+
+case class JsonTuple(children: Seq[Expression])
+  extends Expression with CodegenFallback {
+
+  import SharedFactory._
+
+  override def nullable: Boolean = {
+// a row is always returned
+false
+  }
+
+  // if processing fails this shared value will be returned
+  @transient private lazy val nullRow: InternalRow =
+new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length))
+
+  // the json body is the first child
+  @transient private lazy val jsonExpr: Expression = children.head
+
+  // the fields to query are the remaining children
+  @transient private lazy val fieldExpressions: Seq[Expression] = 
children.tail
+
+  // eagerly evaluate any foldable the field names
+  @transient private lazy val foldableFieldNames: IndexedSeq[String] = {
+fieldExpressions.map {
+  case expr if expr.foldable => 
expr.eval().asInstanceOf[UTF8String].toString
+  case _ => null
+}.toIndexedSeq
+  }
+
+  // and count the number of foldable fields, we'll use this later to 
optimize evaluation
+  @transient private lazy val constantFields: Int = 
foldableFieldNames.count(_ != null)
+
+  override lazy val dataType: StructType = {
+val fields = fieldExpressions.zipWithIndex.map {
+  case (_, idx) => StructField(
+name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize
+dataType = StringType,
+nullable = true)
+}
+
+StructType(fields)
+  }
+
+  override def prettyName: String = "json_tuple"
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length < 2) {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two 
arguments")
+} else if (children.forall(child => 
StringType.acceptsType(child.dataType))) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all 
arguments are strings")
+}
+  }
+
+  override def eval(input: InternalRow): InternalRow = {
+try {
+  val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
+  if (json == null) {
+return nullRow
+  }
+
+  val parser = jsonFactory.createParser(json.getBytes)
+
+  // only objects are supported
+  if (parser.nextToken() != JsonToken.START_OBJECT) {
+return nullRow
+  }
+
+  // evaluate the field names as String rather than UTF8String to
+  // optimize lookups from the json token, which is also a String
+  val fieldNames = if (constantFields == fieldExpressions.length) {
+// typically the user will provide the field names as foldable 
expressions
+// so we can use the cached copy
+foldableFieldNames
+  } else if (constantFields == 0) {
+// none are foldable so all field names need to be evaluated from 
the input row
+
fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String].toString)
+  } else {
+// if there is a mix of constant and non-constant expressions
+// prefer the cached copy when available
+foldableFieldNames.zip(fieldExpressions).map {
+  case (null, expr) => 
expr.eval(input).asInstanceOf[UTF8String].toString
+  case (fieldName, _) => fieldName
+}
+  }
+
+  val row = Array.ofDim[Any](fieldNames.length)
+
+  // start reading through the token stream, looking for any requested 
field names
+  while (parser.nextToken() != JsonToken.END_OBJECT) {
+if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
+  // check to see if this field is desired in the output
+  val idx = fieldNames.indexOf(parser.getCurrentName)
+  if (idx >= 0) {
+// it is, copy the child tree to the correct location in the 
output row
+val output = new ByteArrayOutputStream()
+
+// write the output directly to UTF8 encoded byte array
+val generator = jsonFactory.createGenerator(output, 
JsonEncoding.UTF8)
+if (parser.nextToken() != JsonToken.VALUE_NULL) {
+  copyCurrentStructure(generator, parser)
+  generator.close()
+

[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple

2015-09-30 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/7946#discussion_r40809995
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonFunctions.scala
 ---
@@ -307,3 +308,140 @@ case class GetJsonObject(json: Expression, path: 
Expression)
 }
   }
 }
+
+case class JsonTuple(children: Seq[Expression])
+  extends Expression with CodegenFallback {
+
+  import SharedFactory._
+
+  override def nullable: Boolean = {
+// a row is always returned
+false
+  }
+
+  // if processing fails this shared value will be returned
+  @transient private lazy val nullRow: InternalRow =
+new GenericInternalRow(Array.ofDim[Any](fieldExpressions.length))
+
+  // the json body is the first child
+  @transient private lazy val jsonExpr: Expression = children.head
+
+  // the fields to query are the remaining children
+  @transient private lazy val fieldExpressions: Seq[Expression] = 
children.tail
+
+  // eagerly evaluate any foldable the field names
+  @transient private lazy val foldableFieldNames: IndexedSeq[String] = {
+fieldExpressions.map {
+  case expr if expr.foldable => 
expr.eval().asInstanceOf[UTF8String].toString
+  case _ => null
+}.toIndexedSeq
+  }
+
+  // and count the number of foldable fields, we'll use this later to 
optimize evaluation
+  @transient private lazy val constantFields: Int = 
foldableFieldNames.count(_ != null)
+
+  override lazy val dataType: StructType = {
+val fields = fieldExpressions.zipWithIndex.map {
+  case (_, idx) => StructField(
+name = s"c$idx", // mirroring GenericUDTFJSONTuple.initialize
+dataType = StringType,
+nullable = true)
+}
+
+StructType(fields)
+  }
+
+  override def prettyName: String = "json_tuple"
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.length < 2) {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires at least two 
arguments")
+} else if (children.forall(child => 
StringType.acceptsType(child.dataType))) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  TypeCheckResult.TypeCheckFailure(s"$prettyName requires that all 
arguments are strings")
+}
+  }
+
+  override def eval(input: InternalRow): InternalRow = {
+try {
+  val json = jsonExpr.eval(input).asInstanceOf[UTF8String]
+  if (json == null) {
+return nullRow
+  }
+
+  val parser = jsonFactory.createParser(json.getBytes)
+
+  // only objects are supported
+  if (parser.nextToken() != JsonToken.START_OBJECT) {
+return nullRow
+  }
+
+  // evaluate the field names as String rather than UTF8String to
+  // optimize lookups from the json token, which is also a String
+  val fieldNames = if (constantFields == fieldExpressions.length) {
+// typically the user will provide the field names as foldable 
expressions
+// so we can use the cached copy
+foldableFieldNames
+  } else if (constantFields == 0) {
+// none are foldable so all field names need to be evaluated from 
the input row
+
fieldExpressions.map(_.eval(input).asInstanceOf[UTF8String].toString)
+  } else {
+// if there is a mix of constant and non-constant expressions
+// prefer the cached copy when available
+foldableFieldNames.zip(fieldExpressions).map {
+  case (null, expr) => 
expr.eval(input).asInstanceOf[UTF8String].toString
+  case (fieldName, _) => fieldName
+}
+  }
+
+  val row = Array.ofDim[Any](fieldNames.length)
+
+  // start reading through the token stream, looking for any requested 
field names
+  while (parser.nextToken() != JsonToken.END_OBJECT) {
+if (parser.getCurrentToken == JsonToken.FIELD_NAME) {
+  // check to see if this field is desired in the output
+  val idx = fieldNames.indexOf(parser.getCurrentName)
+  if (idx >= 0) {
+// it is, copy the child tree to the correct location in the 
output row
+val output = new ByteArrayOutputStream()
+
+// write the output directly to UTF8 encoded byte array
+val generator = jsonFactory.createGenerator(output, 
JsonEncoding.UTF8)
+if (parser.nextToken() != JsonToken.VALUE_NULL) {
+  copyCurrentStructure(generator, parser)
+  generator.close()
+

[GitHub] spark pull request: [SPARK-9617] [SQL] Implement json_tuple

2015-09-30 Thread NathanHowell
Github user NathanHowell commented on the pull request:

https://github.com/apache/spark/pull/7946#issuecomment-15101
  
@yhuai I'll see what I can do, running some larger jobs today so I may have 
a long enough gap to fix this up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10064] [ML] Parallelize decision tree b...

2015-09-12 Thread NathanHowell
Github user NathanHowell commented on the pull request:

https://github.com/apache/spark/pull/8246#issuecomment-139792166
  
I tend to rebase out of habit to prevent merge-build failures. I'll look at 
the test failure on Monday, they were all passing at one point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10064] [ML] Parallelize decision tree b...

2015-09-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8246#discussion_r39216747
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -1056,6 +988,70 @@ object DecisionTree extends Serializable with Logging 
{
 }
   }
 
+  private def findSplitsBinsBySorting(
+  input: RDD[LabeledPoint],
+  metadata: DecisionTreeMetadata,
+  continuousFeatures: IndexedSeq[Int]): (Array[Array[Split]], 
Array[Array[Bin]]) = {
+def findSplits(
+featureIndex: Int,
+featureSamples: Iterable[Double]): (Int, (Array[Split], 
Array[Bin])) = {
+  val splits = {
+val featureSplits = findSplitsForContinuousFeature(
+  featureSamples.toArray,
+  metadata,
+  featureIndex)
+logDebug(s"featureIndex = $featureIndex, numSplits = 
${featureSplits.length}")
+
+featureSplits.map(threshold => new Split(featureIndex, threshold, 
Continuous, Nil))
+  }
+
+  val bins = {
+val lowSplit = new DummyLowSplit(featureIndex, Continuous)
+val highSplit = new DummyHighSplit(featureIndex, Continuous)
+(lowSplit +: splits.toSeq :+ highSplit).sliding(2).map {
+  case Seq(lhs, right) => new Bin(lhs, right, Continuous, 
Double.MinValue)
+}.toArray
+  }
+
+  (featureIndex, (splits, bins))
+}
+
+val continuousSplits = input
+  .flatMap(point => continuousFeatures.map(idx => (idx, 
point.features(idx
+  .groupByKey(numPartitions = math.min(continuousFeatures.length, 
input.partitions.length))
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >