Re: Null pointer exception while replying WAL

2024-02-12 Thread Mich Talebzadeh
OK

Getting Null pointer exception while replying WAL! One possible reason is
that the messages RDD might contain null elements, and attempting to read
JSON from null values can result in an NPE. To handle this, you can add a
filter before processing the RDD to remove null elements.

msgs.foreachRDD { rdd =>
  if (rdd.take(1).nonEmpty) {
val messages: RDD[String] = rdd
  .map { sr =>
Option(sr).getOrElse("NO records found")
  }
  .filter(_ != "NO records found")

try {
  val messagesJson = spark.read.json(messages)
  messagesJson.write.mode("append").parquet(data)
} catch {
  case ex: Exception =>
ex.printStackTrace()
}
  }
}

This modification uses Option *t*o handle potential null values in the rdd
and filters out any elements that are still "NO records found" after the
mapping operation.

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 12 Feb 2024 at 14:22, nayan sharma  wrote:

>
> Please find below code
>
>  def main(args: Array[String]): Unit = {
> val config: Config = ConfigFactory.load()
> val streamC = StreamingContext.getOrCreate(
>   checkpointDirectory,
>   () => functionToCreateContext(config, checkpointDirectory)
> )
>
> streamC.start()
> streamC.awaitTermination()
>   }
>
>   def functionToCreateContext(config: Config, checkpointDirectory:
> String): StreamingContext = {
>
> val brokerUrl = config.getString("streaming.solace.brokerURL")
> val username = config.getString("streaming.solace.userName")
> val passwordSol = config.getString("streaming.solace.password")
> val vpn = config.getString("streaming.solace.vpn")
> val queue = config.getString("streaming.solace.queueName")
> val connectionFactory =
> config.getString("streaming.solace.connectionFactory")
>
>
>
> val spark = SparkSession
>   .builder()
>   .appName("rem-Streaming-Consumer")
>   .config("spark.streaming.receiver.writeAheadLog.enable", "true")
>   .config("spark.streaming.blockInterval", blockInterval)
>   .config("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>   .config("spark.streaming.receiver.writeAheadLog.enable", "true")
>.enableHiveSupport
>   .getOrCreate()
> val sc = spark.sparkContext
> val ssc = new StreamingContext(sc, Seconds(batchInterval))
> ssc.checkpoint(checkpointDirectory)
>
> val converter: Message => Option[String] = {
>   case msg: TextMessage =>
> Some(msg.getText)
>   case _ =>
> None
> }
>
> val props = new Properties()
> props.setProperty(
>   Context.INITIAL_CONTEXT_FACTORY,
>   "com.solacesystems.jndi.SolJNDIInitialContextFactory"
> )
> props.setProperty(Context.PROVIDER_URL, brokerUrl)
> props.setProperty(Context.SECURITY_PRINCIPAL, username)
> props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol)
> props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn)
>
> val msgs = JmsStreamUtils.createSynchronousJmsQueueStream(
>   ssc,
>
> JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue), 
> connectionFactoryName
> = connectionFactory,messageSelector =
> ""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 )
>
> msgs.foreachRDD(rdd =>
>   if (rdd.take(1).length > 0) {
> val messages: RDD[String] = rdd.map { sr =>
>   if (sr == null) {
> println("NO records found")
> "NO records found"
>   } else {
> println("Input Records from Solace queue : " + sr.toString)
> sr.toString
>   }
> }
> Thread.sleep(12)
> try{
>   * val messagesJson = spark.read.json(messages) ===> getting NPE
> here after restarting using WAL*
>   messagesJson.write.mode("append").parquet(data)
> }
> catch {
>   case ex => ex.printStackTrace()
> }
>   })
> ssc
>   }
> Thanks & Regards,
> Nayan Sharma
>  *+91-8095382952*
>
> 
> 
>
>
> On Mon, Feb 12, 2024 at 4:23 AM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> It is challenging to make a recommendation without further details. I am
>> guessing you are trying to build a fault-tolerant spark application (spark
>> structured streaming) that consumes messages from Solace?
>> To address *NullPointerException* in the 

Re: Null pointer exception while replying WAL

2024-02-12 Thread nayan sharma
Please find below code

 def main(args: Array[String]): Unit = {
val config: Config = ConfigFactory.load()
val streamC = StreamingContext.getOrCreate(
  checkpointDirectory,
  () => functionToCreateContext(config, checkpointDirectory)
)

streamC.start()
streamC.awaitTermination()
  }

  def functionToCreateContext(config: Config, checkpointDirectory: String):
StreamingContext = {

val brokerUrl = config.getString("streaming.solace.brokerURL")
val username = config.getString("streaming.solace.userName")
val passwordSol = config.getString("streaming.solace.password")
val vpn = config.getString("streaming.solace.vpn")
val queue = config.getString("streaming.solace.queueName")
val connectionFactory =
config.getString("streaming.solace.connectionFactory")



val spark = SparkSession
  .builder()
  .appName("rem-Streaming-Consumer")
  .config("spark.streaming.receiver.writeAheadLog.enable", "true")
  .config("spark.streaming.blockInterval", blockInterval)
  .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
  .config("spark.streaming.receiver.writeAheadLog.enable", "true")
   .enableHiveSupport
  .getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(batchInterval))
ssc.checkpoint(checkpointDirectory)

val converter: Message => Option[String] = {
  case msg: TextMessage =>
Some(msg.getText)
  case _ =>
None
}

val props = new Properties()
props.setProperty(
  Context.INITIAL_CONTEXT_FACTORY,
  "com.solacesystems.jndi.SolJNDIInitialContextFactory"
)
props.setProperty(Context.PROVIDER_URL, brokerUrl)
props.setProperty(Context.SECURITY_PRINCIPAL, username)
props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol)
props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn)

val msgs = JmsStreamUtils.createSynchronousJmsQueueStream(
  ssc,

JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue),
connectionFactoryName
= connectionFactory,messageSelector =
""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 )

msgs.foreachRDD(rdd =>
  if (rdd.take(1).length > 0) {
val messages: RDD[String] = rdd.map { sr =>
  if (sr == null) {
println("NO records found")
"NO records found"
  } else {
println("Input Records from Solace queue : " + sr.toString)
sr.toString
  }
}
Thread.sleep(12)
try{
  * val messagesJson = spark.read.json(messages) ===> getting NPE here
after restarting using WAL*
  messagesJson.write.mode("append").parquet(data)
}
catch {
  case ex => ex.printStackTrace()
}
  })
ssc
  }
Thanks & Regards,
Nayan Sharma
 *+91-8095382952*





On Mon, Feb 12, 2024 at 4:23 AM Mich Talebzadeh 
wrote:

> Hi,
>
> It is challenging to make a recommendation without further details. I am
> guessing you are trying to build a fault-tolerant spark application (spark
> structured streaming) that consumes messages from Solace?
> To address *NullPointerException* in the context of the provided
> information, you need to review the part of the code where the exception is
> thrown and identifying which object or method call is resulting in *null* can
> help the debugging process plus checking the logs.
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 10 Feb 2024 at 05:29, nayan sharma 
> wrote:
>
>> Hi Users,
>>
>> I am trying to build fault tolerant spark solace consumer.
>>
>> Issue :- we have to take restart of the job due to multiple issue load
>> average is one of them. At that time whatever spark is processing or
>> batches in the queue is lost. We can't replay it because we already had
>> send ack while calling store().
>>
>> Solution:- I have tried implementing WAL and checkpointing in the
>> solution. Job is able to identify the lost batches, records are not being
>> written in the log file but throwing NPE.
>>
>> We are creating sparkcontext using sc.getorcreate()
>>
>>
>> Thanks,
>> Nayan
>>
>