Thanks Ryan. Yes. That is ForeachWriter.
Can someone help me on how to solve this?. Basically the output of flatMapGroupsWithState function is Dataset<InsightEventUpdate> sessionUpdates ; InsightEventUpdate class contains list of Spark Row which I need to convert back to Dataset<Row>. Something like Dataset<Row> correlatedEvents = <do something here with sessionUpdates>;. Please note this is a streaming application. regards, Robin. On Fri, Oct 5, 2018 at 11:12 PM Shixiong(Ryan) Zhu <shixi...@databricks.com> wrote: > oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor > is a ForeachWriter. Right? You can not use SparkSession in its process > method as it will run in executors. > > Best Regards, > Ryan > > > On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <kutta...@gmail.com> wrote: > >> Hello, >> >> I have a spark streaming application which reads from Kafka based on the >> given schema. >> >> Dataset<Row> m_oKafkaEvents = >> getSparkSession().readStream().format("kafka") >> .option("kafka.bootstrap.servers", strKafkaAddress) >> .option("assign", strSubscription) >> .option("maxOffsetsPerTrigger", "100000") >> .option("startingOffsets", "latest") >> .option("failOnDataLoss", false) >> .load() >> .filter(strFilter) >> >> .select(functions.from_json(functions.col("value").cast("string"), >> schema).alias("events")) >> .select("events.*"); >> >> >> Now this dataset is grouped by one of the column(InstanceId) which is the >> key for us and then fed into flatMapGroupsWithState function. This function >> does some correlation. >> >> Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey( >> new MapFunction<Row, String>() { >> @Override public String call(Row event) { >> return event.getAs("InstanceId"); >> } >> }, Encoders.STRING()) >> .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(), >> Encoders.bean(InsightEventInfo.class), >> Encoders.bean(InsightEventUpdate.class), >> GroupStateTimeout.ProcessingTimeTimeout()); >> >> >> The output dataset is of type InsightEventUpdate which contains List of >> Spark Rows which is related to the InstanceId. >> >> Now I want to convert this back into of type Dataset<Row>. Basically I >> have List of rows. >> >> I tried >> >> sparkSession.createDataFrame(listOfRows, schema); >> >> this gives me >> >> ava.lang.NullPointerException >> at >> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139) >> at >> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137) >> at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73) >> at >> org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376) >> at >> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102) >> >> Can someone help me what is the way to go ahead? >> >> thanks >> Robin Kuttaiah >> >> >> >> >>