Hello,

I am using Apache Spark 2.2.1 with Scala. I am trying to load below JSON from 
Kafka and trying to extract "JOBTYPE" and "LOADID" from the nested JSON object. 
 Need help with extraction logic.

Code

val workRequests = new StructType().add("after", new StructType()
                              .add("JOBTYPE", StringType)
                              .add("LOADID", StringType))

val dfn = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", 
"zlp21299.vci.att.com:29092,zlp21301.vci.att.com:29093,zlp21318.vci.att.com:29094")
        .option("subscribe", "edgemw")
        .load()


      val df2 = dfn.selectExpr("CAST(value AS STRING)").as[(String)]
          .select(from_json($"value", workRequests)).as("data")
          .select("data.*")

      df2.writeStream
          .format("console")
          .option("truncate","false")
          .option("numRows", 10)
          .start()
          .awaitTermination()


Desired Output

JOBTYPE | LOADID
----------------------
Val 1      | Val 1
Val 2      | Val 2

JSON

{"table":"FORCEMW.XBFWR_T","op_type":"U","op_ts":"2018-04-03 
15:10:35.036322","current_ts":"2018-04-03T15:10:39.269000","pos":"00000296150084924838","before":{},"after":{"WRID":93694148,"LOADID":"OH_OHIO_I","SOURCETYPE":"RWG","EXTKEY1":"S0010","WORKTYPE":"RT","WORKTYPEQUALIFIER":"IN","TIMESTAMP":"2018-04-03:12:41:49","EXTKEY2":"NWCMOH49
                 S0010     00130    
","WRSTAT":"ASSIGN","WRSTATQUAL":null,"JOBTYPE":"S0010","ESTDURATION":"28","ESTSETUP":"0","PRIORITY":"55","LOADSTARTDATETIME":"2018-04-01:11:00:00","LOADENDDATETIME":"2018-04-11:03:59:00","ASSIGNDATE":"2018-04-03:00:00:00","COMMITMENTDATETIME":"2018-05-01:03:59:00","LATITUDE":40.273643,"LONGITUDE":-81.607832,"ASSIGNCOORDHORIZ":null,"ASSIGNCOORDVERT":null,"ACCESSBEGINDATETIME":null,"ACCESSENDDATETIME":null,"DISPATCH2AREA":"N","LATESTSTARTDATETIME":null,"TECHPREFERRED":null,"OWNERID":"Force","TECHASSIGNED":"MM4341","UNSCHEDULED":"N","UNLOCATED":"N","ACCESSCODE":null,"COMPCANDATETIME":null,"APPOINTMENTDATETIME":null,"CLOSEOUTVALIDATIONRQD":null,"CUSTADVISEDNAMERQD":null,"CUSTADVISEDREACHNBRRQD":null,"DISPATCHDATE":null,"DISPATCHDATECLASS":null,"DMNAENDDATETIME1":null,"DMNAENDDATETIME2":null,"DMNAFLAG1":null,"DMNAFLAG2":null,"DMNASTARTDATETIME1":null,"DMNASTARTDATETIME2":null,"ESTORIGDURATION":"28","MANUALLOCKDATETIME":null,"MANUALLOCKEDFLAG":"N","MISSINGDESIGNDATE":null,"PCTCOMPLETE":0,"TASKDURATIONAREANAME":null,"TRAVELTIMEAREANAME":null,"SUMMARYRQD":null,"TRAVELDATETIMEFACTOR":0,"UPDATEDWHILEDISPATCHED":"N","VISITFLAG":"Y","EXTDESTINATION":null,"LOCQUALIFIER":"001","ROUTETOCODE":null,"AUXSTATUS":null,"CURRENTSOURCESTATUS":null,"LASTDISPATCHSOURCESTATUS":null,"LASTSOURCESTATUSDATETIME":null,"APPOINTMENTWINDOWEND":null,"VERSIONNBR":"9","SEARCHDATE":null,"AUXSTATQUAL":null,"EXCEEDPRIORITYTHRES":"N","CANCELORDER":null,"CIRCUITWORKLOCCLLI":"NWCMOH49","JOBTYPEREQUIRESAPPROVAL":"N","APPROVEDFORDISPATCH":null,"WIT":"W","MINIMUMASSIGNTIME":null,"RTNTYPE":"PM","RTNBEGINTIME":null,"RTNENDTIME":null,"ASSIGNABLEFLAG":"Y","DUEDATEFLAG":"N","GROUPKEY":null,"PRIORITIZATIONAREA":null,"ORDDUEDATE":null,"SAMELOCATIONINDICATOR":"Y","ESTIMATEDSTARTDATETIME":"2018-04-03:18:40:45","ESTIMATEDCOMPLETIONDATETIME":"2018-04-03:18:54:45","OBJECTTIMEZONE":"EASTERN","LINKKEY":null,"CUSTNAME":null,"SERVICEADDRSTRING":null,"RESERVATIONID":null,"RESERVATIONSTATQUAL":null,"RSVTECHASSIGNED":null,"RSVASSIGNDATE":null,"RSVMATCHFLAG":null,"RSVCNTFLAG":null,"RSVOVRBKFLAG":"N","RSVDETAILS":null,"REISSUEFLAG":"Y","OOSINDICATOR":null,"DELAYDISPATCHFLAG":null,"INTOWFLAG":null,"HELPERFLAG":null,"NUMTASKSCOMPLETED":null,"HONORFUTUREACCESSFLAG":"N","CALLOUTASSIGNFLAG":null,"TECHLOADTYPE":null,"OVERRIDELOADPARAMSIND":"N","LOCATIONPROXIMITYCODE":null,"OOSMEMBERCOUNT":null,"PMAFLAG":null,"OVERRIDEEVALUATEIND":null,"REPORTEDDATETIME":null,"IMMEDDISPIND":null,"RELATEDTROUBLEREPORT":null,"DISPCOORDFLAG":null,"CUSTEMAIL":null,"CUSTNOTIFY":null,"ESTIMATEDARRIVALDATETIME":null,"DISPCOORDKEY":null,"DISPCOORDSEQ":null,"SOURCEQUALIFIER":null,"APDFIELDLOCKS":0,"RESTRICTACCESSCANDIDATEFLAG":null,"ALAPINDICATOR":null,"ENTRYDATETIME":"2018-03-19:18:02:29","JEPRELEASEDATE":null,"JOBLISTDATA":null,"STEPSEQNBR":null,"CMDCNTRLFLAGS":null}}
============================================================================================================================

Disclaimer:  This message and the information contained herein is proprietary 
and confidential and subject to the Tech Mahindra policy statement, you may 
review the policy at http://www.techmahindra.com/Disclaimer.html 
<http://www.techmahindra.com/Disclaimer.html> externally 
http://tim.techmahindra.com/tim/disclaimer.html 
<http://tim.techmahindra.com/tim/disclaimer.html> internally within 
TechMahindra.

============================================================================================================================

Reply via email to