[Spark] How to find which type of key is illegal during from_json() function

2023-03-08 Thread hueiyuan su
*Component*: Spark
*Level*: Advanced
*Scenario*: How-to

-
*Problems Description*
I have nested json string value in someone field of spark dataframe, and I
would like to use from_json() to parse json object. Especially, if one of
key type is not match with our defined struct type, it will return null.
Based on this, could we find which key type is error? Related example
follow as:

*source dataframe:*
| original_json_string |
| -- |
| "{a:{b:"dn", c:"test"}}" |

ps. And we expected the value type of b should be double type. so we
predefined struct type for from_json() to use, but just directly return
null:

*result dataframe after from_json:*
| original_json_string |
| -- |
| null |

In this sample, because value of a have 2 keys, b and c, could we know is
value type of b is error instead of c, which can let me check data quickly
instead just return null.
If we would like to achieve this objective, how to implement it?
if you have and ideas, I will be appreciated it, thank you.

-- 
Best Regards,

Mars Su
*Phone*: 0988-661-013
*Email*: hueiyua...@gmail.com


Re: Corrupt record handling in spark structured streaming and from_json function

2018-12-31 Thread Colin Williams
Dear spark user community,

I have recieved some insight regarding filtering seperate dataframes
in my spark-structured-streaming job. However I wish to write the
dataframes aforementioned above in the stack overflow question each
using a parquet writer to a separate location. My initial impression
is this requires multiple sinks, but I'm being pressured against that.
I think it might also be possible using the for each / for each batch
writers. But I'm not sure regarding parquet writer, and also the
caveats to this approach. Can some more advanced users or developers
suggest how to go about this, particularly without using multiple
streams?


On Wed, Dec 26, 2018 at 6:01 PM Colin Williams
 wrote:
>
> https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming
>
> On Wed, Dec 26, 2018 at 2:42 PM Colin Williams
>  wrote:
> >
> > From my initial impression it looks like I'd need to create my own
> > `from_json` using `jsonToStructs` as a reference but try to handle `
> > case : BadRecordException => null ` or similar to try to write the non
> > matching string to a corrupt records column
> >
> > On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
> >  wrote:
> > >
> > > Hi,
> > >
> > > I'm trying to figure out how I can write records that don't match a
> > > json read schema via spark structred streaming to an output sink /
> > > parquet location. Previously I did this in batch via corrupt column
> > > features of batch. But in this spark structured streaming I'm reading
> > > from kafka a string and using from_json on the value of that string.
> > > If it doesn't match my schema then I from_json returns null for all
> > > the rows, and does not populate a corrupt record column. But I want to
> > > somehow obtain the source kafka string in a dataframe, and an write to
> > > a output sink / parquet location.
> > >
> > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: 
> > > StructType) = {
> > >   val jsonDataFrame = 
> > > rawKafkaDataFrame.select(col("value").cast("string"))
> > >   jsonDataFrame.select(from_json(col("value"),
> > > schema)).select("jsontostructs(value).*")
> > > }

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Corrupt record handling in spark structured streaming and from_json function

2018-12-26 Thread Colin Williams
https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming

On Wed, Dec 26, 2018 at 2:42 PM Colin Williams
 wrote:
>
> From my initial impression it looks like I'd need to create my own
> `from_json` using `jsonToStructs` as a reference but try to handle `
> case : BadRecordException => null ` or similar to try to write the non
> matching string to a corrupt records column
>
> On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
>  wrote:
> >
> > Hi,
> >
> > I'm trying to figure out how I can write records that don't match a
> > json read schema via spark structred streaming to an output sink /
> > parquet location. Previously I did this in batch via corrupt column
> > features of batch. But in this spark structured streaming I'm reading
> > from kafka a string and using from_json on the value of that string.
> > If it doesn't match my schema then I from_json returns null for all
> > the rows, and does not populate a corrupt record column. But I want to
> > somehow obtain the source kafka string in a dataframe, and an write to
> > a output sink / parquet location.
> >
> > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: 
> > StructType) = {
> >   val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
> >   jsonDataFrame.select(from_json(col("value"),
> > schema)).select("jsontostructs(value).*")
> > }

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Corrupt record handling in spark structured streaming and from_json function

2018-12-26 Thread Colin Williams
>From my initial impression it looks like I'd need to create my own
`from_json` using `jsonToStructs` as a reference but try to handle `
case : BadRecordException => null ` or similar to try to write the non
matching string to a corrupt records column

On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
 wrote:
>
> Hi,
>
> I'm trying to figure out how I can write records that don't match a
> json read schema via spark structred streaming to an output sink /
> parquet location. Previously I did this in batch via corrupt column
> features of batch. But in this spark structured streaming I'm reading
> from kafka a string and using from_json on the value of that string.
> If it doesn't match my schema then I from_json returns null for all
> the rows, and does not populate a corrupt record column. But I want to
> somehow obtain the source kafka string in a dataframe, and an write to
> a output sink / parquet location.
>
> def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) 
> = {
>   val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
>   jsonDataFrame.select(from_json(col("value"),
> schema)).select("jsontostructs(value).*")
> }

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Corrupt record handling in spark structured streaming and from_json function

2018-12-26 Thread Colin Williams
Hi,

I'm trying to figure out how I can write records that don't match a
json read schema via spark structred streaming to an output sink /
parquet location. Previously I did this in batch via corrupt column
features of batch. But in this spark structured streaming I'm reading
from kafka a string and using from_json on the value of that string.
If it doesn't match my schema then I from_json returns null for all
the rows, and does not populate a corrupt record column. But I want to
somehow obtain the source kafka string in a dataframe, and an write to
a output sink / parquet location.

def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: StructType) = {
  val jsonDataFrame = rawKafkaDataFrame.select(col("value").cast("string"))
  jsonDataFrame.select(from_json(col("value"),
schema)).select("jsontostructs(value).*")
}

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: from_json function

2018-08-16 Thread dbolshak
Maxim, thanks for your replay.

I've left comment in the following jira issue
https://issues.apache.org/jira/browse/SPARK-23194?focusedCommentId=16582025=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16582025



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: from_json function

2018-08-15 Thread Maxim Gekk
Hello Denis,

The from_json function supports only the fail fast mode, see:
https://github.com/apache/spark/blob/e2ab7deae76d3b6f41b9ad4d0ece14ea28db40ce/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L568

Your settings "mode" -> "PERMISSIVE" will be overwritten

On Wed, Aug 15, 2018 at 4:52 PM dbolshak  wrote:

> Hello community,
>
> I can not manage to run from_json method with "columnNameOfCorruptRecord"
> option.
> ```
> import org.apache.spark.sql.functions._
>
> val data = Seq(
>   "{'number': 1}",
>   "{'number': }"
> )
>
> val schema = new StructType()
>   .add($"number".int)
>   .add($"_corrupt_record".string)
>
> val sourceDf = data.toDF("column")
>
> val jsonedDf = sourceDf
>   .select(from_json(
> $"column",
> schema,
> Map("mode" -> "PERMISSIVE", "columnNameOfCorruptRecord" ->
> "_corrupt_record")
>   ) as "data").selectExpr("data.number", "data._corrupt_record")
>
>   jsonedDf.show()
> ```
> Does anybody can help me get `_corrupt_record` non empty?
>
> Thanks in advance.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 

Maxim Gekk

Technical Solutions Lead

Databricks Inc.

maxim.g...@databricks.com

databricks.com

  <http://databricks.com/>


from_json function

2018-08-15 Thread dbolshak
Hello community,

I can not manage to run from_json method with "columnNameOfCorruptRecord"
option.
```
import org.apache.spark.sql.functions._

val data = Seq(
  "{'number': 1}",
  "{'number': }"
)

val schema = new StructType()
  .add($"number".int)
  .add($"_corrupt_record".string)

val sourceDf = data.toDF("column")

val jsonedDf = sourceDf
  .select(from_json(
$"column",
schema,
Map("mode" -> "PERMISSIVE", "columnNameOfCorruptRecord" ->
"_corrupt_record")
  ) as "data").selectExpr("data.number", "data._corrupt_record")

  jsonedDf.show()
```
Does anybody can help me get `_corrupt_record` non empty?

Thanks in advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org