Hi All, I have this problem where in Spark Dataframe is having null columns
for the attributes from JSON that are not present. A clear explanation is
provided below:

*Use case:* Convert the JSON object into dataframe for further usage.

*Case - 1:* Without specifying the schema for JSON:

records.foreachRDD(new VoidFunction2<JavaRDD&lt;String>, Time>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public void call(JavaRDD<String> rdd, Time time) throws 
Exception {
                                if (rdd.count() > 0) {
                                        JavaRDD<String> filteredRDD = 
rdd.filter(x -> x.length()>0);                                    
                                        sqlContext = 
SQLContextSingleton.getInstance(filteredRDD.context());
                                        DataFrame df = 
sqlContext.read().json(filteredRDD);
                                        df.show();
                                }
                        }
                });

In the above code sample, filteredRDD is Strings as JSON Objects.

*Sample JSON Record: *
{"request_id":"f791e831f71e4918b2fcaebfdf6fe2c2","org_id":"y08e7p9g","queue_id":1234,"disposition":"O","created":"2017-06-02
23:49:10.410","assigned":"2017-06-02
23:49:10.410","final_review_status":"cancel","datetime":"2017-06-02
23:49:10.410"}

*Dataframe Output:*

<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8407/Screenshot_at_Sep_07_11-36-27.png>
 

*Case - 2:* With specifying the schema for JSON:

records.foreachRDD(new VoidFunction2<JavaRDD&lt;String>, Time>() {
                        private static final long serialVersionUID = 1L;
                        @Override
                        public void call(JavaRDD<String> rdd, Time time) throws 
Exception {
                                if (rdd.count() > 0) {
                                        JavaRDD<String> filteredRDD = 
rdd.filter(x -> x.length()>0);                                    
                                        sqlContext = 
SQLContextSingleton.getInstance(filteredRDD.context());
                                        DataFrame df =
sqlContext.read().schema(SchemaBuilder.buildSchema()).json(filteredRDD);
                                        df.show();
                                }
                        }
                });

In the above code sample, filteredRDD is Strings as JSON Objects.

*Schema Definition:*
public static StructType buildSchema() {
                StructType schema = new StructType(
                                new StructField[] { 
DataTypes.createStructField("request_id",
DataTypes.StringType, false),
                                                
DataTypes.createStructField("org_id", DataTypes.StringType, false),
                                                
DataTypes.createStructField("queue_id", DataTypes.IntegerType, true),
                                                
DataTypes.createStructField("owner", DataTypes.StringType, true),
                                                
DataTypes.createStructField("disposition", DataTypes.StringType,
true),
                                                
DataTypes.createStructField("created", DataTypes.TimestampType, true),
                                                
DataTypes.createStructField("created_user", DataTypes.StringType,
true),
                                                
DataTypes.createStructField("assigned", DataTypes.TimestampType,
true),
                                                
DataTypes.createStructField("assigned_user", DataTypes.StringType,
true),
                                                
DataTypes.createStructField("notes", DataTypes.StringType, true),
                                                
DataTypes.createStructField("final_review_status",
DataTypes.StringType, true),
                                                
DataTypes.createStructField("event_tag", DataTypes.StringType, true),
                                                
DataTypes.createStructField("additional_data", DataTypes.StringType,
true),
                                                
DataTypes.createStructField("datetime", DataTypes.TimestampType,
true),
                                                
DataTypes.createStructField("dc", DataTypes.StringType, true),
                                                
DataTypes.createStructField("case_id", DataTypes.StringType, true),
                                                
DataTypes.createStructField("case_status", DataTypes.StringType, true)
});
                return (schema);
        }

*Sample JSON Record: *
{"request_id":"f791e831f71e4918b2fcaebfdf6fe2c2","org_id":"y08e7p9g","queue_id":1234,"disposition":"O","created":"2017-06-02
23:49:10.410","assigned":"2017-06-02
23:49:10.410","final_review_status":"cancel","datetime":"2017-06-02
23:49:10.410"}

*Dataframe Output:*
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8407/sample.png> 

If you see in the above case, when schema is defined I am getting the
columns that are not specified in the JSON as null. Any work around on
getting the result as expected in the first image(without nulls) using
schema? I needed this to perform updates into Kudu table. As the other
columns are assigned NULL, they are getting updated into KUDU as null which
is not desired.

Thanks,
Ravi



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to