Re: FlatMapGroupsWithStateFunction is called thrice - Production use case.
Hi Jungtaek, Thanks for looking into it. We use spark-2.4.3. I removed most of our code and pasted here just to understand the flow. Sorry for the delay. I would try to provide a simple reproducer when I find time, but this is really hurting us. Another observation I see is basically only if I add some value into *outputEvents. List in **FlatMapIdFedGroupFunction I see this is called multiple times otherwise it's only once.* *By any chance you can provide me on how to debug this and any guesses what could be wrong so that I can focus on debugging on the right path.* *thanks* *Robin Kuttaiah* On Fri, Mar 12, 2021 at 8:43 AM Jungtaek Lim wrote: > Hi, > > Could you please provide the Spark version? > > Also it would be pretty much helpful if you could provide a simple > reproducer, like placing your reproducer which can simply be built (mvn or > gradle or sbt) into your Github repository, plus the set of input data to > see the behavior. Worth to know that others aren't interested in your own > code even if they are interested in the problematic behavior itself. It'd > be nice if you can minimize the hurdle on debugging. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > On Thu, Mar 11, 2021 at 4:54 PM Kuttaiah Robin wrote: > >> Hello, >> >> I have a use case where I need to read events(non correlated) from a >> source kafka topic, then correlate and push forward to another target topic. >> >> I use spark structured streaming with FlatMapGroupsWithStateFunction >> along with GroupStateTimeout.ProcessingTimeTimeout() . After each >> timeout, I apply some correlation logic on group events and push forward >> correlated events to another topic(via ForEachBatch). Non correlated events >> are stored in the state until they are correlated in a future set of events. >> >> With this scenario, when I push a single event to source topic, I see it >> comes three times to FlatMapGroupsWithStateFunction(In separate timestamp) >> but only once in ForEachBatch processor(which is good). >> >> Same event coming thrice in FlatMapGroupsWithStateFunction is a problem >> as it causes issues with my correlation logic. >> >> Can someone help me to understand why this is seen thrice >> in FlatMapGroupsWithStateFunction?. >> >> Code snippets are shown below. Please let me know what is missing and how >> can i solve this, >> >> thanks, >> Robin Kuttaiah >> >> *StreamQuery* >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *Dataset sessionUpdates = null; >> FlatMapGroupsWithStateFunction> MilestoneEvent> idstateUpdateFunction = new >> FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);try { >> sessionUpdates = idFedKafkaEvents .groupByKey( new >> MapFunction() {private static final long >> serialVersionUID = -797571731893988577L;@Override public >> String call(Row event) { return >> event.getAs("EVENT_MODEL_ID_COL");} }, >> Encoders.STRING()) >> .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), >> Encoders.bean(IdentifierConnector.class), >> Encoders.bean(MilestoneEvent.class), >> GroupStateTimeout.ProcessingTimeTimeout());} catch (Exception >> oException) { //log and throw back exception* >> >> >> >> >> >> >> >> >> *}ForeachBatchProcessor oForeachBatch = new >> ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent, >> m_strQueryName);DataStreamWriter events = >> sessionUpdates.writeStream().queryName(queryName) >> .outputMode("append").trigger(Trigger.ProcessingTime("*5 seconds" >> *))* >> >> >> *.option("checkpointLocation", checkpointLocation) >> .foreachBatch(oForeachBatch);* >> >> >> *FlatMapGroupsWithStateFunction:* >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *public class FlatMapIdFedGroupFunction implements >> FlatMapGroupsWithStateFunction> MilestoneEvent> { public FlatMapIdFedGroupFunction(InsightEvent iEvent, >> InsightDeployment iDeployment) { } @Override public >> Iterator call(String key, Iterator events, >> GroupState state) thr
FlatMapGroupsWithStateFunction is called thrice - Production use case.
Hello, I have a use case where I need to read events(non correlated) from a source kafka topic, then correlate and push forward to another target topic. I use spark structured streaming with FlatMapGroupsWithStateFunction along with GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I apply some correlation logic on group events and push forward correlated events to another topic(via ForEachBatch). Non correlated events are stored in the state until they are correlated in a future set of events. With this scenario, when I push a single event to source topic, I see it comes three times to FlatMapGroupsWithStateFunction(In separate timestamp) but only once in ForEachBatch processor(which is good). Same event coming thrice in FlatMapGroupsWithStateFunction is a problem as it causes issues with my correlation logic. Can someone help me to understand why this is seen thrice in FlatMapGroupsWithStateFunction?. Code snippets are shown below. Please let me know what is missing and how can i solve this, thanks, Robin Kuttaiah *StreamQuery* *Dataset sessionUpdates = null; FlatMapGroupsWithStateFunction idstateUpdateFunction = new FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);try { sessionUpdates = idFedKafkaEvents .groupByKey( new MapFunction() {private static final long serialVersionUID = -797571731893988577L;@Override public String call(Row event) { return event.getAs("EVENT_MODEL_ID_COL");} }, Encoders.STRING()) .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), Encoders.bean(IdentifierConnector.class), Encoders.bean(MilestoneEvent.class), GroupStateTimeout.ProcessingTimeTimeout());} catch (Exception oException) { //log and throw back exception* *}ForeachBatchProcessor oForeachBatch = new ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent, m_strQueryName);DataStreamWriter events = sessionUpdates.writeStream().queryName(queryName) .outputMode("append").trigger(Trigger.ProcessingTime("*5 seconds" *))* *.option("checkpointLocation", checkpointLocation) .foreachBatch(oForeachBatch);* *FlatMapGroupsWithStateFunction:* *public class FlatMapIdFedGroupFunction implements FlatMapGroupsWithStateFunction { public FlatMapIdFedGroupFunction(InsightEvent iEvent, InsightDeployment iDeployment) { } @Override public Iterator call(String key, Iterator events, GroupState state) throws Exception { List outputEvents = new ArrayList(); IdentifierConnector session = null; IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+" "+System.currentTimeMillis()); //Called thriceif (!state.exists() ) { session = new IdentifierConnector();} else { session = state.get();}while (events.hasNext()) { Row event = events.next(); MilestoneEvent mEventCurr = IdFederationUtil.getMilestoneEvent(event, insightEvent); outputEvents.add(mEventCurr); IdFederationUtil.write(".."+mEventCurr.getMilestoneId()); //Called thrice break;}return outputEvents.iterator(); }* *}* *ForEachBatchFunction:* public class ForeachBatchProcessor implements VoidFunction2, Long>, Serializable { private static final long serialVersionUID = 1L; public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment, InsightEvent in_oInsightEvent, String in_strQueryName) { \ } public void call(Dataset in_Rows, Long in_lBatchID) throws Exception { if (in_Rows.count() == 0L) { return; } IdFederationUtil.write("Processing batch " + in_lBatchID + " "+ in_Rows.count()); List events = in_Rows.collectAsList(); for(MilestoneEvent m: events) { IdFederationUtil.write("..BATCH "+m.getMilestoneId()); } } }
How to handle spark state which is growing too big even with timeout set.
Hello, I have a use case where I need to read events(non correlated) from a kafka topic, then correlate and push forward to another topic. I use spark structured streaming with FlatMapGroupsWithStateFunction along with GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I do some correlation on group events and push forward correlated events to another topic. Non correlated events are stored in the state until they are correlated in a future set of events. There are times where non correlated events are more and the size of non correlated events in the state are growing too big. Does anyone know how to handle this use case or will spark take care of handling state when it grows big? Thanks in advance. regards, Robin Kuttaiah
Spark Listeners for getting dataset partition information in streaming application
Hello, Is there a way spark streaming application will get to know during the start and end of the data read from a dataset partition? I want to create partition specific cache during the start and delete during the partition is read completely. Thanks for you help in advance. regards, Robin Kuttaiah
How to use Dataset forEachPartion and groupByKey together
Hello all, Am using spark-2.3.0 and hadoop-2.7.4. I have spark streaming application which listens to kafka topic, does some transformation and writes to Oracle database using JDBC client. Step 1. Read events from Kafka as shown below; -- Dataset kafkaEvents = getSparkSession().readStream().format("kafka") .option("kafka.bootstrap.servers", strKafkaAddress) .option("assign", strSubscription) .option("maxOffsetsPerTrigger", "10") .option("startingOffsets", "latest") .option("failOnDataLoss", false) .load() .filter(strFilter) .select(functions.from_json(functions.col("value").cast("string"), oSchema).alias("events")) .select("events.*"); I do groupByKey and then for each group, use those set of events obtained per group, create JDBC connection/preparedStatement, insert and then close connection. Am using Oracle JDBC along with flatMapGroupsWithState. Step 2. Groupby and flatMapGroupwithState - Dataset sessionUpdates = kafkaEvents .groupByKey( new MapFunction() { @Override public String call(Row event) { return event.getAs(m_InsightRawEvent.getPrimaryKey()); } }, Encoders.STRING()) .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), Encoders.bean(InsightEventInfo.class), Encoders.bean(InsightEventUpdate.class), GroupStateTimeout.ProcessingTimeTimeout()); This has a drawback where it creates connection, inserts into DB for each group. I need to do it for each partition so that only one connection and one bacth insert can be done for all the new events which is read from the partition. Can somebody point me on how I can achieve this? Basically am looking below; 1. Read from kafka as said above. 2. kafkaEvents.forEachPartion - Create one connection here. 3. Groupby and flatMapGroupwithState thanks Robin Kuttaiah
Redeploying spark streaming application aborts because of checkpoint issue
Hello all, Am using spark-2.3.0 and hadoop-2.7.4. I have spark streaming application which listens to kafka topic, does some transformation and writes to Oracle database using JDBC client. Read events from Kafka as shown below; m_oKafkaEvents = getSparkSession().readStream().format("kafka") .option("kafka.bootstrap.servers", strKafkaAddress) .option("assign", strSubscription) .option("maxOffsetsPerTrigger", "10") .option("startingOffsets", "latest") .option("failOnDataLoss", false) .load() .filter(strFilter) .select(functions.from_json(functions.col("value").cast("string"), oSchema).alias("events")) .select("events.*"); Checkpoint is used as shown below; DataStreamWriter oMilestoneStream = oAggregation .writeStream() .queryName(strQueryName) .outputMode("update") .trigger(Trigger.ProcessingTime(getInsightDeployment().getInstanceSummary().getTriggerInterval())) .option("checkpointLocation", strCheckpointLocation) .foreach(oForeachWriter); strCheckpointLocation is something like /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj. This is hdfs location. With this when I redeploy the spark I get below said exception. The only work around I have currently is to delete the checkpoint location and recreate the topic. I also see couple of JIRA tasks which says RESOLVED but the problem still seen. https://issues.apache.org/jira/browse/SPARK-20894 https://issues.apache.org/jira/browse/SPARK-22262 Can someone help me on what is the best solution for this? thanks, Robin Kuttaiah Exception --- 18/10/14 03:19:16 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.IllegalStateException: Error reading delta file hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta of HDFSStateStoreProvider[id = (op=1,part=0),dir = hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0]: hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta does not exist at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org $apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:371) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:333) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332) at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:332) at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:196) at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:369) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:74) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) Caused by: java.io.FileNotFoundException: File does not exist: /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71) at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) at
Re: Recreate Dataset from list of Row in spark streaming application.
Thanks Ryan. Yes. That is ForeachWriter. Can someone help me on how to solve this?. Basically the output of flatMapGroupsWithState function is Dataset sessionUpdates ; InsightEventUpdate class contains list of Spark Row which I need to convert back to Dataset. Something like Dataset correlatedEvents = ;. Please note this is a streaming application. regards, Robin. On Fri, Oct 5, 2018 at 11:12 PM Shixiong(Ryan) Zhu wrote: > oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor > is a ForeachWriter. Right? You can not use SparkSession in its process > method as it will run in executors. > > Best Regards, > Ryan > > > On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin wrote: > >> Hello, >> >> I have a spark streaming application which reads from Kafka based on the >> given schema. >> >> Dataset m_oKafkaEvents = >> getSparkSession().readStream().format("kafka") >> .option("kafka.bootstrap.servers", strKafkaAddress) >> .option("assign", strSubscription) >> .option("maxOffsetsPerTrigger", "10") >> .option("startingOffsets", "latest") >> .option("failOnDataLoss", false) >> .load() >> .filter(strFilter) >> >> .select(functions.from_json(functions.col("value").cast("string"), >> schema).alias("events")) >> .select("events.*"); >> >> >> Now this dataset is grouped by one of the column(InstanceId) which is the >> key for us and then fed into flatMapGroupsWithState function. This function >> does some correlation. >> >> Dataset sessionUpdates = m_oKafkaEvents.groupByKey( >> new MapFunction() { >> @Override public String call(Row event) { >> return event.getAs("InstanceId"); >> } >> }, Encoders.STRING()) >> .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), >> Encoders.bean(InsightEventInfo.class), >> Encoders.bean(InsightEventUpdate.class), >> GroupStateTimeout.ProcessingTimeTimeout()); >> >> >> The output dataset is of type InsightEventUpdate which contains List of >> Spark Rows which is related to the InstanceId. >> >> Now I want to convert this back into of type Dataset. Basically I >> have List of rows. >> >> I tried >> >> sparkSession.createDataFrame(listOfRows, schema); >> >> this gives me >> >> ava.lang.NullPointerException >> at >> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139) >> at >> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137) >> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73) >> at >> org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376) >> at >> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102) >> >> Can someone help me what is the way to go ahead? >> >> thanks >> Robin Kuttaiah >> >> >> >> >>
Recreate Dataset from list of Row in spark streaming application.
Hello, I have a spark streaming application which reads from Kafka based on the given schema. Dataset m_oKafkaEvents = getSparkSession().readStream().format("kafka") .option("kafka.bootstrap.servers", strKafkaAddress) .option("assign", strSubscription) .option("maxOffsetsPerTrigger", "10") .option("startingOffsets", "latest") .option("failOnDataLoss", false) .load() .filter(strFilter) .select(functions.from_json(functions.col("value").cast("string"), schema).alias("events")) .select("events.*"); Now this dataset is grouped by one of the column(InstanceId) which is the key for us and then fed into flatMapGroupsWithState function. This function does some correlation. Dataset sessionUpdates = m_oKafkaEvents.groupByKey( new MapFunction() { @Override public String call(Row event) { return event.getAs("InstanceId"); } }, Encoders.STRING()) .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), Encoders.bean(InsightEventInfo.class), Encoders.bean(InsightEventUpdate.class), GroupStateTimeout.ProcessingTimeTimeout()); The output dataset is of type InsightEventUpdate which contains List of Spark Rows which is related to the InstanceId. Now I want to convert this back into of type Dataset. Basically I have List of rows. I tried sparkSession.createDataFrame(listOfRows, schema); this gives me ava.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73) at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376) at oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102) Can someone help me what is the way to go ahead? thanks Robin Kuttaiah
Re: Creating spark Row from database values
Thanks, I'll check it out. On Wed, Sep 26, 2018 at 6:25 PM Shahab Yunus wrote: > Hi there. Have you seen this link? > https://medium.com/@mrpowers/manually-creating-spark-dataframes-b14dae906393 > > > It shows you multiple ways to manually create a dataframe. > > Hope it helps. > > Regards, > Shahab > > On Wed, Sep 26, 2018 at 8:02 AM Kuttaiah Robin wrote: > >> Hello, >> >> Currently I have Oracle database table with description as shown below; >> >> Table INSIGHT_ID_FED_IDENTIFIERS >> - >> CURRENT_INSTANCE_ID VARCHAR2(100) >> PREVIOUS_INSTANCE_ID VARCHAR2(100) >> >> >> Sample values in the table basically output of select * from >> INSIGHT_ID_FED_IDENTIFIERS. For simplicity I have put only one row. >> >> >> CURRENT_INSTANCE_ID PREVIOUS_INSTANCE_ID >> --- --- >> curInstanceId1 prevInstanceId1 >> >> >> I have the spark schema associated with it. >> >> >> Now I need to create a Spark row(org.apache.spark.sql.Row) out of it. >> >> Can someone help me understanding on how this can be achieved? >> >> regards, >> Robin Kuttaiah >> >
Creating spark Row from database values
Hello, Currently I have Oracle database table with description as shown below; Table INSIGHT_ID_FED_IDENTIFIERS - CURRENT_INSTANCE_ID VARCHAR2(100) PREVIOUS_INSTANCE_ID VARCHAR2(100) Sample values in the table basically output of select * from INSIGHT_ID_FED_IDENTIFIERS. For simplicity I have put only one row. CURRENT_INSTANCE_ID PREVIOUS_INSTANCE_ID --- --- curInstanceId1 prevInstanceId1 I have the spark schema associated with it. Now I need to create a Spark row(org.apache.spark.sql.Row) out of it. Can someone help me understanding on how this can be achieved? regards, Robin Kuttaiah
Spark FlatMapGroupsWithStateFunction throws cannot resolve 'named_struct()' due to data type mismatch 'SerializeFromObject"
Hello, Am using FlatMapGroupsWithStateFunction in my spark streaming application. FlatMapGroupsWithStateFunction idstateUpdateFunction = new FlatMapGroupsWithStateFunction() {.} SessionUpdate class is having trouble when added the highlighted code which throws below exception; The same attribute milestones with setter/getter has been added to SessionInfo (input class) but it does not throw exception there. public static class SessionUpdate implements Serializable { private static final long serialVersionUID = -3858977319192658483L; *private ArrayList milestones = new ArrayList();* private Timestamp processingTimeoutTimestamp; public SessionUpdate() { super(); } public SessionUpdate(String instanceId, *ArrayList milestones*, Timestamp processingTimeoutTimestamp) { super(); this.instanceId = instanceId; *this.milestones = milestones;* this.processingTimeoutTimestamp = processingTimeoutTimestamp; } public String getInstanceId() { return instanceId; } public void setInstanceId(String instanceId) { this.instanceId = instanceId; } *public ArrayList getMilestones() {* * return milestones;* *}* *public void setMilestones(ArrayList milestones) {* * this.milestones = milestones;* *}* public Timestamp getProcessingTimeoutTimestamp() { return processingTimeoutTimestamp; } public void setProcessingTimeoutTimestamp(Timestamp processingTimeoutTimestamp) { this.processingTimeoutTimestamp = processingTimeoutTimestamp; } } Exception: ERROR cannot resolve 'named_struct()' due to data type mismatch: input to function named_struct requires at least one argument;; 'SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, true]).getInstanceId, true, false) AS instanceId#62, mapobjects(MapObjects_loopValue2, MapObjects_loopIsNull2, ObjectType(class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema), if (isnull(lambdavariable(MapObjects_loopValue2, MapObjects_loopIsNull2, ObjectType(class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema), true))) null else named_struct(), assertnotnull(input[0, oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, true]).getMilestones, None) AS milestones#63, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType, fromJavaTimestamp, assertnotnull(input[0, oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, true]).getProcessingTimeoutTimestamp, true, false) AS processingTimeoutTimestamp#64] +- FlatMapGroupsWithState , cast(value#54 as string).toString, createexternalrow(EventTime#23.toString, InstanceID#24.toString, Model#25.toString, Milestone#26.toString, Region#27.toString, SalesOrganization#28.toString, ProductName#29.toString, ReasonForQuoteReject#30.toString, ReasonforRejectionBy#31.toString, OpportunityAmount#32.toJavaBigDecimal, Discount#33.toJavaBigDecimal, TotalQuoteAmount#34.toJavaBigDecimal, NetQuoteAmount#35.toJavaBigDecimal, ApprovedDiscount#36.toJavaBigDecimal, TotalOrderAmount#37.toJavaBigDecimal, StructField(EventTime,StringType,true), StructField(InstanceID,StringType,true), StructField(Model,StringType,true), StructField(Milestone,StringType,true), StructField(Region,StringType,true), StructField(SalesOrganization,StringType,true), StructField(ProductName,StringType,true), StructField(ReasonForQuoteReject,StringType,true), StructField(ReasonforRejectionBy,StringType,true), ... 6 more fields), [value#54], [EventTime#23, InstanceID#24, Model#25, Milestone#26, Region#27, SalesOrganization#28, ProductName#29, ReasonForQuoteReject#30, ReasonforRejectionBy#31, OpportunityAmount#32, Discount#33, TotalQuoteAmount#34, NetQuoteAmount#35, ApprovedDiscount#36, TotalOrderAmount#37], obj#61: oracle.insight.spark.event_processor.EventProcessor$SessionUpdate, class[instanceId[0]: string, milestones[0]: array>, processingTimeoutTimestamp[0]: timestamp], Append, false, ProcessingTimeTimeout Schema looks like {"Name":"EventTime","DataType":"TimestampType"}, {"Name":"InstanceID", "DataType":"STRING", "Length":100}, {"Name":"Model","DataType":"STRING", "Length":100}, {"Name":"Milestone","DataType":"STRING", "Length":100}, {"Name":"Region", "DataType":"STRING", "Length":100}, {"Name":"SalesOrganization","DataType":"STRING", "Length":100}, {"Name":"ProductName", "DataType":"STRING", "Length":100}, {"Name":"ReasonForQuoteReject", "DataType":"STRING", "Length":100}, {"Name":"ReasonforRejectionBy", "DataType":"STRING", "Length":100}, //Note: org.apache.spark.sql.types.DataTypes.createDecimalType(precision(), scale()) {"Name":"OpportunityAmount","DataType":"DECIMAL", "Precision":38,"Scale":2}, {"Name":"Discount", "DataType":"DECIMAL", "Precision":38,"Scale":2}, {"Name":"TotalQuoteAmount", "DataType":"DECIMAL", "Precision":38,"Scale":2},