RE: get corrupted rows using columnNameOfCorruptRecord

2016-12-12 Thread Yehuda Finkelstein
Ok got it.

The destination column must be exists in the data frame.



I thought that it will create new column in the data frame.



Thanks you for your help.

Yehuda







*From:* Hyukjin Kwon [mailto:gurwls...@gmail.com]
*Sent:* Wednesday, December 07, 2016 12:19 PM
*To:* Yehuda Finkelstein
*Cc:* Michael Armbrust; user
*Subject:* Re: get corrupted rows using columnNameOfCorruptRecord



Let me please just extend the suggestion a bit more verbosely.

I think you could try something like this maybe.

val jsonDF = spark.read

  .option("columnNameOfCorruptRecord", "xxx")

  .option("mode","PERMISSIVE")

  .schema(StructType(schema.fields :+ StructField("xxx", StringType, true)))

  .json(corruptRecords)

val malformed = jsonDF.filter("xxx is not null").select("xxx")

malformed.show()

This prints something like the ones below:

++

| xxx|

++

|   {|

|{"a":1, b:2}|

|{"a":{, b:3}|

|   ]|

++

​



If the schema is not specified, then the inferred schema has the malformed
column automatically



but in case of specifying the schema, I believe this should be manually set.









2016-12-07 18:06 GMT+09:00 Yehuda Finkelstein :

Hi



I tried it already but it say that this column doesn’t exists.



scala> var df = spark.sqlContext.read.

 | option("columnNameOfCorruptRecord","xxx").

 | option("mode","PERMISSIVE").

 | schema(df_schema.schema).json(f)

df: org.apache.spark.sql.DataFrame = [auctionid: string, timestamp: string
... 37 more fields]



scala> df.select

select   selectExpr



scala> df.select("xxx").show

org.apache.spark.sql.AnalysisException: cannot resolve '`xxx`' given input
columns: […];;



  at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)

  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)

  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)

  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)

  at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)

  at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)

  at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:269)

  at org.apache.spark.sql.catalyst.plans.QueryPlan.org
$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:279)

  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:283)

  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

  at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

  at scala.collection.AbstractTraversable.map(Traversable.scala:104)

  at org.apache.spark.sql.catalyst.plans.QueryPlan.org
$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:283)

  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8.apply(QueryPlan.scala:288)

  at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)

  at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:288)

  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74)

  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)

  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)

  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)

  at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)

  at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)

  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)

  at org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603)

  at org.apache.spark.sql.Dataset.select(Dataset.scala:969)

  at org.apache.spark.sql.Dataset.select(Dataset.scala:987)

  ... 48 elided



scala>





*From:* Michael Armbrust [mailto:mich...@databricks.com]
*Sent:* Tuesday, December 06, 2016 10:26 PM
*To:* Yehuda Finkelstein
*Cc:* user
*Subject:* Re: get corrupted rows using 

Re: get corrupted rows using columnNameOfCorruptRecord

2016-12-07 Thread Hyukjin Kwon
Let me please just extend the suggestion a bit more verbosely.

I think you could try something like this maybe.

val jsonDF = spark.read
  .option("columnNameOfCorruptRecord", "xxx")
  .option("mode","PERMISSIVE")
  .schema(StructType(schema.fields :+ StructField("xxx", StringType, true)))
  .json(corruptRecords)
val malformed = jsonDF.filter("xxx is not null").select("xxx")
malformed.show()

This prints something like the ones below:

++
| xxx|
++
|   {|
|{"a":1, b:2}|
|{"a":{, b:3}|
|   ]|
++

​

If the schema is not specified, then the inferred schema has the malformed
column automatically

but in case of specifying the schema, I believe this should be manually set.




2016-12-07 18:06 GMT+09:00 Yehuda Finkelstein :

> Hi
>
>
>
> I tried it already but it say that this column doesn’t exists.
>
>
>
> scala> var df = spark.sqlContext.read.
>
>  | option("columnNameOfCorruptRecord","xxx").
>
>  | option("mode","PERMISSIVE").
>
>  | schema(df_schema.schema).json(f)
>
> df: org.apache.spark.sql.DataFrame = [auctionid: string, timestamp: string
> ... 37 more fields]
>
>
>
> scala> df.select
>
> select   selectExpr
>
>
>
> scala> df.select("xxx").show
>
> org.apache.spark.sql.AnalysisException: cannot resolve '`xxx`' given
> input columns: […];;
>
>
>
>   at org.apache.spark.sql.catalyst.analysis.package$
> AnalysisErrorAt.failAnalysis(package.scala:42)
>
>   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(
> CheckAnalysis.scala:77)
>
>   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(
> CheckAnalysis.scala:74)
>
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformUp$1.apply(TreeNode.scala:308)
>
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> transformUp$1.apply(TreeNode.scala:308)
>
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.
> withOrigin(TreeNode.scala:69)
>
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(
> TreeNode.scala:307)
>
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.
> transformExpressionUp$1(QueryPlan.scala:269)
>
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$
> spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$
> 2(QueryPlan.scala:279)
>
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$
> apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(
> QueryPlan.scala:283)
>
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>
>   at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
>
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$
> spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$
> 2(QueryPlan.scala:283)
>
>   at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8.
> apply(QueryPlan.scala:288)
>
>   at org.apache.spark.sql.catalyst.trees.TreeNode.
> mapProductIterator(TreeNode.scala:186)
>
>   at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(
> QueryPlan.scala:288)
>
>   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74)
>
>   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
>
>   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:126)
>
>   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
> checkAnalysis(CheckAnalysis.scala:67)
>
>   at org.apache.spark.sql.catalyst.analysis.Analyzer.
> checkAnalysis(Analyzer.scala:58)
>
>   at org.apache.spark.sql.execution.QueryExecution.
> assertAnalyzed(QueryExecution.scala:49)
>
>   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
>
>   at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$
> withPlan(Dataset.scala:2603)
>
>   at org.apache.spark.sql.Dataset.select(Dataset.scala:969)
>
>   at org.apache.spark.sql.Dataset.select(Dataset.scala:987)
>
>   ... 48 elided
>
>
&

RE: get corrupted rows using columnNameOfCorruptRecord

2016-12-07 Thread Yehuda Finkelstein
Hi



I tried it already but it say that this column doesn’t exists.



scala> var df = spark.sqlContext.read.

 | option("columnNameOfCorruptRecord","xxx").

 | option("mode","PERMISSIVE").

 | schema(df_schema.schema).json(f)

df: org.apache.spark.sql.DataFrame = [auctionid: string, timestamp: string
... 37 more fields]



scala> df.select

select   selectExpr



scala> df.select("xxx").show

org.apache.spark.sql.AnalysisException: cannot resolve '`xxx`' given input
columns: […];;



  at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)

  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)

  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)

  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)

  at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)

  at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)

  at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)

  at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:269)

  at org.apache.spark.sql.catalyst.plans.QueryPlan.org
$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:279)

  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:283)

  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

  at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

  at scala.collection.AbstractTraversable.map(Traversable.scala:104)

  at org.apache.spark.sql.catalyst.plans.QueryPlan.org
$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:283)

  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8.apply(QueryPlan.scala:288)

  at
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)

  at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:288)

  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74)

  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)

  at
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)

  at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)

  at
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58)

  at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)

  at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)

  at org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603)

  at org.apache.spark.sql.Dataset.select(Dataset.scala:969)

  at org.apache.spark.sql.Dataset.select(Dataset.scala:987)

  ... 48 elided



scala>





*From:* Michael Armbrust [mailto:mich...@databricks.com]
*Sent:* Tuesday, December 06, 2016 10:26 PM
*To:* Yehuda Finkelstein
*Cc:* user
*Subject:* Re: get corrupted rows using columnNameOfCorruptRecord



.where("xxx IS NOT NULL") will give you the rows that couldn't be parsed.



On Tue, Dec 6, 2016 at 6:31 AM, Yehuda Finkelstein <
yeh...@veracity-group.com> wrote:

Hi all



I’m trying to parse json using existing schema and got rows with NULL’s

//get schema

val df_schema = spark.sqlContext.sql("select c1,c2,…cn t1  limit 1")

//read json file

val f = sc.textFile("/tmp/x")

//load json into data frame using schema

var df =
spark.sqlContext.read.option("columnNameOfCorruptRecord","xxx").option("mode","PERMISSIVE").schema(df_schema.schema).json(f)



in documentation it say that you can query the corrupted rows by this
columns à columnNameOfCorruptRecord

o“columnNameOfCorruptRecord (default is the value specified in
spark.sql.columnNameOfCorruptRecord): allows renaming the new field having
malformed string created by PERMISSIVE mode. This overrides
spark.sql.columnNameOfCorruptRecord.”



The question is how to fetch those corrupted rows ?





Thanks

Yehuda


Re: get corrupted rows using columnNameOfCorruptRecord

2016-12-06 Thread Michael Armbrust
.where("xxx IS NOT NULL") will give you the rows that couldn't be parsed.

On Tue, Dec 6, 2016 at 6:31 AM, Yehuda Finkelstein <
yeh...@veracity-group.com> wrote:

> Hi all
>
>
>
> I’m trying to parse json using existing schema and got rows with NULL’s
>
> //get schema
>
> val df_schema = spark.sqlContext.sql("select c1,c2,…cn t1  limit 1")
>
> //read json file
>
> val f = sc.textFile("/tmp/x")
>
> //load json into data frame using schema
>
> var df = spark.sqlContext.read.option("columnNameOfCorruptRecord","
> xxx").option("mode","PERMISSIVE").schema(df_schema.schema).json(f)
>
>
>
> in documentation it say that you can query the corrupted rows by this
> columns à columnNameOfCorruptRecord
>
> o“columnNameOfCorruptRecord (default is the value specified in
> spark.sql.columnNameOfCorruptRecord): allows renaming the new field
> having malformed string created by PERMISSIVE mode. This overrides
> spark.sql.columnNameOfCorruptRecord.”
>
>
>
> The question is how to fetch those corrupted rows ?
>
>
>
>
>
> Thanks
>
> Yehuda
>
>
>
>
>