RE: get corrupted rows using columnNameOfCorruptRecord
Ok got it. The destination column must be exists in the data frame. I thought that it will create new column in the data frame. Thanks you for your help. Yehuda *From:* Hyukjin Kwon [mailto:gurwls...@gmail.com] *Sent:* Wednesday, December 07, 2016 12:19 PM *To:* Yehuda Finkelstein *Cc:* Michael Armbrust; user *Subject:* Re: get corrupted rows using columnNameOfCorruptRecord Let me please just extend the suggestion a bit more verbosely. I think you could try something like this maybe. val jsonDF = spark.read .option("columnNameOfCorruptRecord", "xxx") .option("mode","PERMISSIVE") .schema(StructType(schema.fields :+ StructField("xxx", StringType, true))) .json(corruptRecords) val malformed = jsonDF.filter("xxx is not null").select("xxx") malformed.show() This prints something like the ones below: ++ | xxx| ++ | {| |{"a":1, b:2}| |{"a":{, b:3}| | ]| ++ If the schema is not specified, then the inferred schema has the malformed column automatically but in case of specifying the schema, I believe this should be manually set. 2016-12-07 18:06 GMT+09:00 Yehuda Finkelstein : Hi I tried it already but it say that this column doesn’t exists. scala> var df = spark.sqlContext.read. | option("columnNameOfCorruptRecord","xxx"). | option("mode","PERMISSIVE"). | schema(df_schema.schema).json(f) df: org.apache.spark.sql.DataFrame = [auctionid: string, timestamp: string ... 37 more fields] scala> df.select select selectExpr scala> df.select("xxx").show org.apache.spark.sql.AnalysisException: cannot resolve '`xxx`' given input columns: […];; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:269) at org.apache.spark.sql.catalyst.plans.QueryPlan.org $apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:279) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:283) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.org $apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:283) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8.apply(QueryPlan.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:288) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.Dataset.org $apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603) at org.apache.spark.sql.Dataset.select(Dataset.scala:969) at org.apache.spark.sql.Dataset.select(Dataset.scala:987) ... 48 elided scala> *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Tuesday, December 06, 2016 10:26 PM *To:* Yehuda Finkelstein *Cc:* user *Subject:* Re: get corrupted rows using
Re: get corrupted rows using columnNameOfCorruptRecord
Let me please just extend the suggestion a bit more verbosely. I think you could try something like this maybe. val jsonDF = spark.read .option("columnNameOfCorruptRecord", "xxx") .option("mode","PERMISSIVE") .schema(StructType(schema.fields :+ StructField("xxx", StringType, true))) .json(corruptRecords) val malformed = jsonDF.filter("xxx is not null").select("xxx") malformed.show() This prints something like the ones below: ++ | xxx| ++ | {| |{"a":1, b:2}| |{"a":{, b:3}| | ]| ++ If the schema is not specified, then the inferred schema has the malformed column automatically but in case of specifying the schema, I believe this should be manually set. 2016-12-07 18:06 GMT+09:00 Yehuda Finkelstein : > Hi > > > > I tried it already but it say that this column doesn’t exists. > > > > scala> var df = spark.sqlContext.read. > > | option("columnNameOfCorruptRecord","xxx"). > > | option("mode","PERMISSIVE"). > > | schema(df_schema.schema).json(f) > > df: org.apache.spark.sql.DataFrame = [auctionid: string, timestamp: string > ... 37 more fields] > > > > scala> df.select > > select selectExpr > > > > scala> df.select("xxx").show > > org.apache.spark.sql.AnalysisException: cannot resolve '`xxx`' given > input columns: […];; > > > > at org.apache.spark.sql.catalyst.analysis.package$ > AnalysisErrorAt.failAnalysis(package.scala:42) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse( > CheckAnalysis.scala:77) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse( > CheckAnalysis.scala:74) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformUp$1.apply(TreeNode.scala:308) > > at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$ > transformUp$1.apply(TreeNode.scala:308) > > at org.apache.spark.sql.catalyst.trees.CurrentOrigin$. > withOrigin(TreeNode.scala:69) > > at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp( > TreeNode.scala:307) > > at org.apache.spark.sql.catalyst.plans.QueryPlan. > transformExpressionUp$1(QueryPlan.scala:269) > > at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$ > spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$ > 2(QueryPlan.scala:279) > > at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$ > apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply( > QueryPlan.scala:283) > > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > > at scala.collection.TraversableLike$$anonfun$map$ > 1.apply(TraversableLike.scala:234) > > at scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$ > spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$ > 2(QueryPlan.scala:283) > > at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8. > apply(QueryPlan.scala:288) > > at org.apache.spark.sql.catalyst.trees.TreeNode. > mapProductIterator(TreeNode.scala:186) > > at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp( > QueryPlan.scala:288) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$ > anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) > > at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp( > TreeNode.scala:126) > > at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class. > checkAnalysis(CheckAnalysis.scala:67) > > at org.apache.spark.sql.catalyst.analysis.Analyzer. > checkAnalysis(Analyzer.scala:58) > > at org.apache.spark.sql.execution.QueryExecution. > assertAnalyzed(QueryExecution.scala:49) > > at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) > > at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$ > withPlan(Dataset.scala:2603) > > at org.apache.spark.sql.Dataset.select(Dataset.scala:969) > > at org.apache.spark.sql.Dataset.select(Dataset.scala:987) > > ... 48 elided > > &
RE: get corrupted rows using columnNameOfCorruptRecord
Hi I tried it already but it say that this column doesn’t exists. scala> var df = spark.sqlContext.read. | option("columnNameOfCorruptRecord","xxx"). | option("mode","PERMISSIVE"). | schema(df_schema.schema).json(f) df: org.apache.spark.sql.DataFrame = [auctionid: string, timestamp: string ... 37 more fields] scala> df.select select selectExpr scala> df.select("xxx").show org.apache.spark.sql.AnalysisException: cannot resolve '`xxx`' given input columns: […];; at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:269) at org.apache.spark.sql.catalyst.plans.QueryPlan.org $apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:279) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:283) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.org $apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:283) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8.apply(QueryPlan.scala:288) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:288) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:74) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:58) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.Dataset.org $apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603) at org.apache.spark.sql.Dataset.select(Dataset.scala:969) at org.apache.spark.sql.Dataset.select(Dataset.scala:987) ... 48 elided scala> *From:* Michael Armbrust [mailto:mich...@databricks.com] *Sent:* Tuesday, December 06, 2016 10:26 PM *To:* Yehuda Finkelstein *Cc:* user *Subject:* Re: get corrupted rows using columnNameOfCorruptRecord .where("xxx IS NOT NULL") will give you the rows that couldn't be parsed. On Tue, Dec 6, 2016 at 6:31 AM, Yehuda Finkelstein < yeh...@veracity-group.com> wrote: Hi all I’m trying to parse json using existing schema and got rows with NULL’s //get schema val df_schema = spark.sqlContext.sql("select c1,c2,…cn t1 limit 1") //read json file val f = sc.textFile("/tmp/x") //load json into data frame using schema var df = spark.sqlContext.read.option("columnNameOfCorruptRecord","xxx").option("mode","PERMISSIVE").schema(df_schema.schema).json(f) in documentation it say that you can query the corrupted rows by this columns à columnNameOfCorruptRecord o“columnNameOfCorruptRecord (default is the value specified in spark.sql.columnNameOfCorruptRecord): allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql.columnNameOfCorruptRecord.” The question is how to fetch those corrupted rows ? Thanks Yehuda
Re: get corrupted rows using columnNameOfCorruptRecord
.where("xxx IS NOT NULL") will give you the rows that couldn't be parsed. On Tue, Dec 6, 2016 at 6:31 AM, Yehuda Finkelstein < yeh...@veracity-group.com> wrote: > Hi all > > > > I’m trying to parse json using existing schema and got rows with NULL’s > > //get schema > > val df_schema = spark.sqlContext.sql("select c1,c2,…cn t1 limit 1") > > //read json file > > val f = sc.textFile("/tmp/x") > > //load json into data frame using schema > > var df = spark.sqlContext.read.option("columnNameOfCorruptRecord"," > xxx").option("mode","PERMISSIVE").schema(df_schema.schema).json(f) > > > > in documentation it say that you can query the corrupted rows by this > columns à columnNameOfCorruptRecord > > o“columnNameOfCorruptRecord (default is the value specified in > spark.sql.columnNameOfCorruptRecord): allows renaming the new field > having malformed string created by PERMISSIVE mode. This overrides > spark.sql.columnNameOfCorruptRecord.” > > > > The question is how to fetch those corrupted rows ? > > > > > > Thanks > > Yehuda > > > > >