Thanks Ryan.

Yes. That is ForeachWriter.

Can someone help me on how to solve this?.

Basically the output of  flatMapGroupsWithState   function is
Dataset<InsightEventUpdate> sessionUpdates ;

InsightEventUpdate  class contains list of Spark Row which I need to
convert back to Dataset<Row>.

Something like

Dataset<Row>  correlatedEvents =  <do something here with sessionUpdates>;.

Please note this is a streaming application.

regards,
Robin.




On Fri, Oct 5, 2018 at 11:12 PM Shixiong(Ryan) Zhu <shixi...@databricks.com>
wrote:

> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor
> is a ForeachWriter. Right? You can not use SparkSession in its process
> method as it will run in executors.
>
> Best Regards,
> Ryan
>
>
> On Fri, Oct 5, 2018 at 6:54 AM Kuttaiah Robin <kutta...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a spark streaming application which reads from Kafka based on the
>> given schema.
>>
>> Dataset<Row>  m_oKafkaEvents =
>> getSparkSession().readStream().format("kafka")
>>             .option("kafka.bootstrap.servers", strKafkaAddress)
>>             .option("assign", strSubscription)
>>             .option("maxOffsetsPerTrigger", "100000")
>>             .option("startingOffsets", "latest")
>>             .option("failOnDataLoss", false)
>>             .load()
>>             .filter(strFilter)
>>
>> .select(functions.from_json(functions.col("value").cast("string"),
>> schema).alias("events"))
>>             .select("events.*");
>>
>>
>> Now this dataset is grouped by one of the column(InstanceId) which is the
>> key for us and then fed into flatMapGroupsWithState function. This function
>> does some correlation.
>>
>> Dataset<InsightEventUpdate> sessionUpdates = m_oKafkaEvents.groupByKey(
>>   new MapFunction<Row, String>() {
>>     @Override public String call(Row event) {
>>     return event.getAs("InstanceId");
>>     }
>>   }, Encoders.STRING())
>>   .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>>   Encoders.bean(InsightEventInfo.class),
>> Encoders.bean(InsightEventUpdate.class),
>>   GroupStateTimeout.ProcessingTimeTimeout());
>>
>>
>> The output dataset is of type InsightEventUpdate which contains List of
>> Spark Rows which is related to the InstanceId.
>>
>> Now I want to convert this back into of type Dataset<Row>. Basically I
>> have List of rows.
>>
>> I tried
>>
>> sparkSession.createDataFrame(listOfRows, schema);
>>
>> this gives me
>>
>> ava.lang.NullPointerException
>>         at
>> org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:139)
>>         at
>> org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:137)
>>         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:73)
>>         at
>> org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:376)
>>         at
>> oracle.insight.spark.identifier_processor.ForeachFederatedEventProcessor.process(ForeachFederatedEventProcessor.java:102)
>>
>> Can someone help me what is the way to go ahead?
>>
>> thanks
>> Robin Kuttaiah
>>
>>
>>
>>
>>

Reply via email to