Re: FlatMapGroupsWithStateFunction is called thrice - Production use case.

2021-03-18 Thread Kuttaiah Robin
Hi Jungtaek,

Thanks for looking into it. We use spark-2.4.3.
I removed most of our code and pasted here just to understand the flow.
Sorry for the delay. I would try to provide a simple reproducer when I find
time, but this is really hurting us.

Another observation I see is basically only if I add some value into
*outputEvents.
List in **FlatMapIdFedGroupFunction I see this is called multiple times
otherwise it's only once.*

*By any chance you can provide me on how to debug this and any guesses what
could be wrong so that I can focus on debugging on the right path.*

*thanks*
*Robin Kuttaiah*

On Fri, Mar 12, 2021 at 8:43 AM Jungtaek Lim 
wrote:

> Hi,
>
> Could you please provide the Spark version?
>
> Also it would be pretty much helpful if you could provide a simple
> reproducer, like placing your reproducer which can simply be built (mvn or
> gradle or sbt) into your Github repository, plus the set of input data to
> see the behavior. Worth to know that others aren't interested in your own
> code even if they are interested in the problematic behavior itself. It'd
> be nice if you can minimize the hurdle on debugging.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Thu, Mar 11, 2021 at 4:54 PM Kuttaiah Robin  wrote:
>
>> Hello,
>>
>> I have a use case where I need to read events(non correlated) from a
>> source kafka topic, then correlate and push forward to another target topic.
>>
>> I use spark structured streaming with FlatMapGroupsWithStateFunction
>> along with   GroupStateTimeout.ProcessingTimeTimeout() . After each
>> timeout, I apply some correlation logic on group events and push forward
>> correlated events to another topic(via ForEachBatch). Non correlated events
>> are stored in the state until they are correlated in a future set of events.
>>
>> With this scenario, when I push a single event to source topic, I see it
>> comes three times to FlatMapGroupsWithStateFunction(In separate timestamp)
>> but only once in ForEachBatch processor(which is good).
>>
>> Same event coming thrice in FlatMapGroupsWithStateFunction is a problem
>> as it causes issues with my correlation logic.
>>
>> Can someone help me to understand why this is seen thrice
>> in FlatMapGroupsWithStateFunction?.
>>
>> Code snippets are shown below. Please let me know what is missing and how
>> can i solve this,
>>
>> thanks,
>> Robin Kuttaiah
>>
>> *StreamQuery*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *Dataset sessionUpdates = null;
>> FlatMapGroupsWithStateFunction> MilestoneEvent> idstateUpdateFunction = new
>> FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);try {
>>   sessionUpdates = idFedKafkaEvents  .groupByKey(  new
>> MapFunction() {private static final long
>> serialVersionUID = -797571731893988577L;@Override public
>> String call(Row event) {  return
>> event.getAs("EVENT_MODEL_ID_COL");}  },
>> Encoders.STRING())
>> .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>>   Encoders.bean(IdentifierConnector.class),
>> Encoders.bean(MilestoneEvent.class),
>> GroupStateTimeout.ProcessingTimeTimeout());} catch (Exception
>> oException) {  //log and throw back exception*
>>
>>
>>
>>
>>
>>
>>
>>
>> *}ForeachBatchProcessor oForeachBatch = new
>> ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent,
>> m_strQueryName);DataStreamWriter events =
>> sessionUpdates.writeStream().queryName(queryName)
>> .outputMode("append").trigger(Trigger.ProcessingTime("*5 seconds"
>> *))*
>>
>>
>> *.option("checkpointLocation", checkpointLocation)
>> .foreachBatch(oForeachBatch);*
>>
>>
>> *FlatMapGroupsWithStateFunction:*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class FlatMapIdFedGroupFunction implements
>> FlatMapGroupsWithStateFunction> MilestoneEvent> {  public FlatMapIdFedGroupFunction(InsightEvent iEvent,
>> InsightDeployment iDeployment) {  }  @Override  public
>> Iterator call(String key, Iterator events,
>> GroupState state)  thr

FlatMapGroupsWithStateFunction is called thrice - Production use case.

2021-03-10 Thread Kuttaiah Robin
Hello,

I have a use case where I need to read events(non correlated) from a source
kafka topic, then correlate and push forward to another target topic.

I use spark structured streaming with FlatMapGroupsWithStateFunction along
with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I
apply some correlation logic on group events and push forward correlated
events to another topic(via ForEachBatch). Non correlated events are stored
in the state until they are correlated in a future set of events.

With this scenario, when I push a single event to source topic, I see it
comes three times to FlatMapGroupsWithStateFunction(In separate timestamp)
but only once in ForEachBatch processor(which is good).

Same event coming thrice in FlatMapGroupsWithStateFunction is a problem as
it causes issues with my correlation logic.

Can someone help me to understand why this is seen thrice
in FlatMapGroupsWithStateFunction?.

Code snippets are shown below. Please let me know what is missing and how
can i solve this,

thanks,
Robin Kuttaiah

*StreamQuery*






















*Dataset sessionUpdates = null;
FlatMapGroupsWithStateFunction idstateUpdateFunction = new
FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);try {
  sessionUpdates = idFedKafkaEvents  .groupByKey(  new
MapFunction() {private static final long
serialVersionUID = -797571731893988577L;@Override public
String call(Row event) {  return
event.getAs("EVENT_MODEL_ID_COL");}  },
Encoders.STRING())
.flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(IdentifierConnector.class),
Encoders.bean(MilestoneEvent.class),
GroupStateTimeout.ProcessingTimeTimeout());} catch (Exception
oException) {  //log and throw back exception*








*}ForeachBatchProcessor oForeachBatch = new
ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent,
m_strQueryName);DataStreamWriter events =
sessionUpdates.writeStream().queryName(queryName)
.outputMode("append").trigger(Trigger.ProcessingTime("*5 seconds"
*))*


*.option("checkpointLocation", checkpointLocation)
.foreachBatch(oForeachBatch);*


*FlatMapGroupsWithStateFunction:*




























*public class FlatMapIdFedGroupFunction implements
FlatMapGroupsWithStateFunction {  public FlatMapIdFedGroupFunction(InsightEvent iEvent,
InsightDeployment iDeployment) {  }  @Override  public
Iterator call(String key, Iterator events,
GroupState state)  throws Exception {
List outputEvents = new ArrayList();
IdentifierConnector session = null;
IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+"
 "+System.currentTimeMillis()); //Called thriceif (!state.exists() ) {
session = new IdentifierConnector();}  else {  session =
state.get();}while (events.hasNext()) {  Row event =
events.next();  MilestoneEvent mEventCurr =
IdFederationUtil.getMilestoneEvent(event, insightEvent);
outputEvents.add(mEventCurr);
IdFederationUtil.write(".."+mEventCurr.getMilestoneId()); //Called
thrice  break;}return outputEvents.iterator();  }*

*}*


*ForEachBatchFunction:*

public class ForeachBatchProcessor implements
VoidFunction2, Long>, Serializable {

  private static final long serialVersionUID = 1L;

  public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment,
  InsightEvent in_oInsightEvent, String in_strQueryName) {
\  }

  public void call(Dataset in_Rows, Long in_lBatchID)
  throws Exception {
if (in_Rows.count() == 0L) {
  return;
}
IdFederationUtil.write("Processing batch " + in_lBatchID + "  "+
in_Rows.count());
List events = in_Rows.collectAsList();
for(MilestoneEvent m: events) {
  IdFederationUtil.write("..BATCH "+m.getMilestoneId());
}
  }

}


How to handle spark state which is growing too big even with timeout set.

2021-02-11 Thread Kuttaiah Robin
Hello,

I have a use case where I need to read events(non correlated) from a kafka
topic, then correlate and push forward to another topic.

I use spark structured streaming with FlatMapGroupsWithStateFunction along
with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I do
some correlation on group events and push forward correlated events to
another topic. Non correlated events are stored in the state until they are
correlated in a future set of events.

There are times where non correlated events are more and the size of non
correlated events in the state are growing too big.

Does anyone know how to handle this use case or will spark take care of
handling state when it grows big?

Thanks in advance.

regards,
Robin Kuttaiah


Spark Listeners for getting dataset partition information in streaming application

2018-11-02 Thread Kuttaiah Robin
Hello,

Is there a way spark streaming application will get to know during the
start and end of the data read from a dataset partition?
I want to create partition specific cache during the start and delete
during the partition is read completely.

Thanks for you help in advance.

regards,
Robin Kuttaiah


How to use Dataset forEachPartion and groupByKey together

2018-11-01 Thread Kuttaiah Robin
Hello all,

Am using  spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some
transformation and writes to Oracle database using JDBC client.


Step 1.
Read events from Kafka as shown below;
--
   Dataset kafkaEvents = getSparkSession().readStream().format("kafka")
  .option("kafka.bootstrap.servers", strKafkaAddress)
  .option("assign", strSubscription)
  .option("maxOffsetsPerTrigger", "10")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", false)
  .load()
  .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
oSchema).alias("events"))
  .select("events.*");

I do groupByKey and then for each group, use those set of events obtained
per group, create JDBC connection/preparedStatement, insert and then close
connection.
Am using Oracle JDBC along with flatMapGroupsWithState.


Step 2.
Groupby and flatMapGroupwithState
-
Dataset  sessionUpdates = kafkaEvents
   .groupByKey(
  new MapFunction() {
@Override public String call(Row event) {
  return event.getAs(m_InsightRawEvent.getPrimaryKey());
}
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class),
Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


This has a drawback where it creates connection, inserts into DB for each
group.

I need to do it for each partition so that only one connection and one
bacth insert can be done for all the new events which is read from the
partition.

Can somebody point me on how I can achieve this?

Basically am looking below;
1. Read from kafka as said above.
2. kafkaEvents.forEachPartion - Create one connection here.
3. Groupby and flatMapGroupwithState

thanks
Robin Kuttaiah


Redeploying spark streaming application aborts because of checkpoint issue

2018-10-14 Thread Kuttaiah Robin
Hello all,

Am using  spark-2.3.0 and hadoop-2.7.4.
I have spark streaming application which listens to kafka topic, does some
transformation and writes to Oracle database using JDBC client.

Read events from Kafka as shown below;

m_oKafkaEvents = getSparkSession().readStream().format("kafka")
  .option("kafka.bootstrap.servers", strKafkaAddress)
  .option("assign", strSubscription)
  .option("maxOffsetsPerTrigger", "10")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", false)
  .load()
  .filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
oSchema).alias("events"))
  .select("events.*");

Checkpoint is used as shown below;

DataStreamWriter oMilestoneStream = oAggregation
  .writeStream()
  .queryName(strQueryName)
  .outputMode("update")

.trigger(Trigger.ProcessingTime(getInsightDeployment().getInstanceSummary().getTriggerInterval()))
  .option("checkpointLocation", strCheckpointLocation)
  .foreach(oForeachWriter);

strCheckpointLocation is something
like /insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj.
This is hdfs location.


With this when I redeploy the spark I get below said exception. The only
work around I have currently is to delete the checkpoint location and
recreate the topic.

I also see couple of JIRA tasks which says RESOLVED but the problem still
seen.
https://issues.apache.org/jira/browse/SPARK-20894
https://issues.apache.org/jira/browse/SPARK-22262

Can someone help me on what is the best solution for this?

thanks,
Robin Kuttaiah



Exception
---
18/10/14 03:19:16 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.IllegalStateException: Error reading delta file
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
of HDFSStateStoreProvider[id = (op=1,part=0),dir =
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0]:
hdfs://hdfs-name:9000/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
does not exist
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org
$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:371)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:333)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$loadMap$1.apply(HDFSBackedStateStoreProvider.scala:332)
at
scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:332)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:196)
at
org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:369)
at
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:74)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
Caused by: java.io.FileNotFoundException: File does not exist:
/insight/tenant-0001/instance_summary/checkpoint/Opportunity_UHIgcccj/state/1/0/1.delta
at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:71)
at
org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61)
at

Re: Recreate Dataset from list of Row in spark streaming application.

2018-10-06 Thread Kuttaiah Robin
Thanks Ryan.

Yes. That is ForeachWriter.

Can someone help me on how to solve this?.

Basically the output of  flatMapGroupsWithState   function is
Dataset sessionUpdates ;

InsightEventUpdate  class contains list of Spark Row which I need to
convert back to Dataset.

Something like

Dataset  correlatedEvents =  ;.

Please note this is a streaming application.

regards,
Robin.




On Fri, Oct 5, 2018 at 11:12 PM Shixiong(Ryan) Zhu 
wrote:

> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor
> is a ForeachWriter. Right? You can not use SparkSession in its process
> method as it will run in executors.
>
> Best Regards,
> Ryan
>
>
> On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin  wrote:
>
>> Hello,
>>
>> I have a spark streaming application which reads from Kafka based on the
>> given schema.
>>
>> Dataset  m_oKafkaEvents =
>> getSparkSession().readStream().format("kafka")
>> .option("kafka.bootstrap.servers", strKafkaAddress)
>> .option("assign", strSubscription)
>> .option("maxOffsetsPerTrigger", "10")
>> .option("startingOffsets", "latest")
>> .option("failOnDataLoss", false)
>> .load()
>> .filter(strFilter)
>>
>> .select(functions.from_json(functions.col("value").cast("string"),
>> schema).alias("events"))
>> .select("events.*");
>>
>>
>> Now this dataset is grouped by one of the column(InstanceId) which is the
>> key for us and then fed into flatMapGroupsWithState function. This function
>> does some correlation.
>>
>> Dataset sessionUpdates = m_oKafkaEvents.groupByKey(
>>   new MapFunction() {
>> @Override public String call(Row event) {
>> return event.getAs("InstanceId");
>> }
>>   }, Encoders.STRING())
>>   .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>>   Encoders.bean(InsightEventInfo.class),
>> Encoders.bean(InsightEventUpdate.class),
>>   GroupStateTimeout.ProcessingTimeTimeout());
>>
>>
>> The output dataset is of type InsightEventUpdate which contains List of
>> Spark Rows which is related to the InstanceId.
>>
>> Now I want to convert this back into of type Dataset. Basically I
>> have List of rows.
>>
>> I tried
>>
>> sparkSession.createDataFrame(listOfRows, schema);
>>
>> this gives me
>>
>> ava.lang.NullPointerException
>> at
>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
>> at
>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
>> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
>> at
>> org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
>> at
>> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)
>>
>> Can someone help me what is the way to go ahead?
>>
>> thanks
>> Robin Kuttaiah
>>
>>
>>
>>
>>


Recreate Dataset from list of Row in spark streaming application.

2018-10-05 Thread Kuttaiah Robin
Hello,

I have a spark streaming application which reads from Kafka based on the
given schema.

Dataset  m_oKafkaEvents =
getSparkSession().readStream().format("kafka")
.option("kafka.bootstrap.servers", strKafkaAddress)
.option("assign", strSubscription)
.option("maxOffsetsPerTrigger", "10")
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.load()
.filter(strFilter)

.select(functions.from_json(functions.col("value").cast("string"),
schema).alias("events"))
.select("events.*");


Now this dataset is grouped by one of the column(InstanceId) which is the
key for us and then fed into flatMapGroupsWithState function. This function
does some correlation.

Dataset sessionUpdates = m_oKafkaEvents.groupByKey(
  new MapFunction() {
@Override public String call(Row event) {
return event.getAs("InstanceId");
}
  }, Encoders.STRING())
  .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
  Encoders.bean(InsightEventInfo.class),
Encoders.bean(InsightEventUpdate.class),
  GroupStateTimeout.ProcessingTimeTimeout());


The output dataset is of type InsightEventUpdate which contains List of
Spark Rows which is related to the InstanceId.

Now I want to convert this back into of type Dataset. Basically I have
List of rows.

I tried

sparkSession.createDataFrame(listOfRows, schema);

this gives me

ava.lang.NullPointerException
at
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
at
org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
at
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
at
oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)

Can someone help me what is the way to go ahead?

thanks
Robin Kuttaiah


Re: Creating spark Row from database values

2018-09-26 Thread Kuttaiah Robin
Thanks, I'll check it out.

On Wed, Sep 26, 2018 at 6:25 PM Shahab Yunus  wrote:

> Hi there. Have you seen this link?
> https://medium.com/@mrpowers/manually-creating-spark-dataframes-b14dae906393
>
>
> It shows you multiple ways to manually create a dataframe.
>
> Hope it helps.
>
> Regards,
> Shahab
>
> On Wed, Sep 26, 2018 at 8:02 AM Kuttaiah Robin  wrote:
>
>> Hello,
>>
>> Currently I have Oracle database table with description as shown below;
>>
>> Table INSIGHT_ID_FED_IDENTIFIERS
>>   -
>> CURRENT_INSTANCE_ID   VARCHAR2(100)
>> PREVIOUS_INSTANCE_ID  VARCHAR2(100)
>>
>>
>> Sample values in the table basically output of select * from
>> INSIGHT_ID_FED_IDENTIFIERS. For simplicity I have put only one row.
>>
>>
>> CURRENT_INSTANCE_ID   PREVIOUS_INSTANCE_ID
>> ---   ---
>> curInstanceId1  prevInstanceId1
>>
>>
>> I have the spark schema associated with it.
>>
>>
>> Now I need to create a Spark row(org.apache.spark.sql.Row) out of it.
>>
>> Can someone help me understanding on how this can be achieved?
>>
>> regards,
>> Robin Kuttaiah
>>
>


Creating spark Row from database values

2018-09-26 Thread Kuttaiah Robin
Hello,

Currently I have Oracle database table with description as shown below;

Table INSIGHT_ID_FED_IDENTIFIERS
  -
CURRENT_INSTANCE_ID   VARCHAR2(100)
PREVIOUS_INSTANCE_ID  VARCHAR2(100)


Sample values in the table basically output of select * from
INSIGHT_ID_FED_IDENTIFIERS. For simplicity I have put only one row.


CURRENT_INSTANCE_ID   PREVIOUS_INSTANCE_ID
---   ---
curInstanceId1  prevInstanceId1


I have the spark schema associated with it.


Now I need to create a Spark row(org.apache.spark.sql.Row) out of it.

Can someone help me understanding on how this can be achieved?

regards,
Robin Kuttaiah


Spark FlatMapGroupsWithStateFunction throws cannot resolve 'named_struct()' due to data type mismatch 'SerializeFromObject"

2018-09-17 Thread Kuttaiah Robin
Hello,

Am using FlatMapGroupsWithStateFunction in my spark streaming application.
FlatMapGroupsWithStateFunction
idstateUpdateFunction =
  new FlatMapGroupsWithStateFunction() {.}


SessionUpdate class is having trouble when added the highlighted code which
throws below exception; The same attribute milestones with setter/getter
has been added to SessionInfo (input class)  but it does not throw
exception there.

public static class SessionUpdate implements Serializable {

private static final long serialVersionUID = -3858977319192658483L;

*private ArrayList milestones = new
ArrayList();*

private Timestamp processingTimeoutTimestamp;

public SessionUpdate() {
  super();
}

public SessionUpdate(String instanceId, *ArrayList
milestones*, Timestamp processingTimeoutTimestamp) {
  super();
  this.instanceId = instanceId;
  *this.milestones = milestones;*
  this.processingTimeoutTimestamp = processingTimeoutTimestamp;
}

public String getInstanceId() {
  return instanceId;
}

public void setInstanceId(String instanceId) {
  this.instanceId = instanceId;
}

*public ArrayList getMilestones() {*
*  return milestones;*
*}*

*public void setMilestones(ArrayList milestones) {*
*  this.milestones = milestones;*
*}*

public Timestamp getProcessingTimeoutTimestamp() {
  return processingTimeoutTimestamp;
}

public void setProcessingTimeoutTimestamp(Timestamp
processingTimeoutTimestamp) {
  this.processingTimeoutTimestamp = processingTimeoutTimestamp;
}

}

Exception:
ERROR  cannot resolve 'named_struct()' due to data type mismatch: input to
function named_struct requires at least one argument;;
'SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(input[0,
oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,
true]).getInstanceId, true, false) AS instanceId#62,
mapobjects(MapObjects_loopValue2, MapObjects_loopIsNull2, ObjectType(class
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema), if
(isnull(lambdavariable(MapObjects_loopValue2, MapObjects_loopIsNull2,
ObjectType(class
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema), true)))
null else named_struct(), assertnotnull(input[0,
oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,
true]).getMilestones, None) AS milestones#63, staticinvoke(class
org.apache.spark.sql.catalyst.util.DateTimeUtils$, TimestampType,
fromJavaTimestamp, assertnotnull(input[0,
oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,
true]).getProcessingTimeoutTimestamp, true, false) AS
processingTimeoutTimestamp#64]
+- FlatMapGroupsWithState , cast(value#54 as string).toString,
createexternalrow(EventTime#23.toString, InstanceID#24.toString,
Model#25.toString, Milestone#26.toString, Region#27.toString,
SalesOrganization#28.toString, ProductName#29.toString,
ReasonForQuoteReject#30.toString, ReasonforRejectionBy#31.toString,
OpportunityAmount#32.toJavaBigDecimal, Discount#33.toJavaBigDecimal,
TotalQuoteAmount#34.toJavaBigDecimal, NetQuoteAmount#35.toJavaBigDecimal,
ApprovedDiscount#36.toJavaBigDecimal, TotalOrderAmount#37.toJavaBigDecimal,
StructField(EventTime,StringType,true),
StructField(InstanceID,StringType,true),
StructField(Model,StringType,true), StructField(Milestone,StringType,true),
StructField(Region,StringType,true),
StructField(SalesOrganization,StringType,true),
StructField(ProductName,StringType,true),
StructField(ReasonForQuoteReject,StringType,true),
StructField(ReasonforRejectionBy,StringType,true), ... 6 more fields),
[value#54], [EventTime#23, InstanceID#24, Model#25, Milestone#26,
Region#27, SalesOrganization#28, ProductName#29, ReasonForQuoteReject#30,
ReasonforRejectionBy#31, OpportunityAmount#32, Discount#33,
TotalQuoteAmount#34, NetQuoteAmount#35, ApprovedDiscount#36,
TotalOrderAmount#37], obj#61:
oracle.insight.spark.event_processor.EventProcessor$SessionUpdate,
class[instanceId[0]: string, milestones[0]: array>,
processingTimeoutTimestamp[0]: timestamp], Append, false,
ProcessingTimeTimeout


Schema looks like

{"Name":"EventTime","DataType":"TimestampType"},
{"Name":"InstanceID",   "DataType":"STRING",  "Length":100},
{"Name":"Model","DataType":"STRING",  "Length":100},
{"Name":"Milestone","DataType":"STRING",  "Length":100},
{"Name":"Region",   "DataType":"STRING",  "Length":100},
{"Name":"SalesOrganization","DataType":"STRING",  "Length":100},
{"Name":"ProductName",  "DataType":"STRING",  "Length":100},
{"Name":"ReasonForQuoteReject", "DataType":"STRING",  "Length":100},
{"Name":"ReasonforRejectionBy", "DataType":"STRING",  "Length":100},
//Note: org.apache.spark.sql.types.DataTypes.createDecimalType(precision(),
scale())
{"Name":"OpportunityAmount","DataType":"DECIMAL",
"Precision":38,"Scale":2},
{"Name":"Discount", "DataType":"DECIMAL",
"Precision":38,"Scale":2},
{"Name":"TotalQuoteAmount", "DataType":"DECIMAL",
"Precision":38,"Scale":2},