[jira] [Updated] (SPARK-45714) Spark UI throws 500 error when StructuredStreaming query filter is selected

2023-10-27 Thread Satyam Raj (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satyam Raj updated SPARK-45714:
---
Description: 
h2.  

When I'm trying to do the following inside the listener class, and go to the UI 
to check the streaming query tab and click on Sort by Latest Batch, I get the 
below error.

 
{code:java}
class MyListener extends StreamingQueryListener {

  private val kafkaAdminClient = try {
Some(Admin.create(getKafkaProp()))
  } catch {
case _: Throwable => None
  } 
  private val registeredConsumerGroups = 
kafkaAdminClient.get.listConsumerGroups().all().get()

  override def onQueryProgress(event: 
StreamingQueryListener.QueryProgressEvent): Unit = {
    val eventName = event.progress.name
    commitConsumerOffset(event.progress.sources.head.endOffset, eventName)
  }

  private def getKafkaProp(eventName: String = "dummy-admin"): Properties = {
val props: Properties = new Properties()
props.put("bootstrap.servers", 
Configuration.configurationMap("kafka.broker"))
props.put("group.id", eventName)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringDeserializer")
props.put("security.protocol", 
Configuration.configurationMap("kafka.security.protocol"))
props.put("sasl.mechanism", 
Configuration.configurationMap("kafka.sasl.mechanism"))
props.put("sasl.jaas.config", 
Configuration.configurationMap("kafka.sasl.jaas.config"))
props
  }

  private def getTopicPartitionMap(topic: String, jsonOffsets: Map[String, 
Map[String, Long]]) = {
val offsets = jsonOffsets.head._2
val topicPartitionMap = new util.HashMap[TopicPartition, 
OffsetAndMetadata]()
offsets.keys.foreach(partition => {
  val tp = new TopicPartition(topic, partition.toInt)
  val oam = new 
OffsetAndMetadata(offsets(partition).asInstanceOf[Number].longValue())
  topicPartitionMap.put(tp, oam)
})
topicPartitionMap
  }


  private def commitConsumerOffset(endOffset: String, eventName: String) = {
val jsonOffsets = mapper.readValue(endOffset, classOf[Map[String, 
Map[String, Long]]])
val topicPartitionMap = getTopicPartitionMap(eventName, jsonOffsets)
if (!registeredConsumerGroups.contains(eventName)) {
  print("topic consumer not created! creating")
  val kafkaConsumer = new KafkaConsumer[String, 
String](getKafkaProp(eventName))
  kafkaConsumer.commitSync(topicPartitionMap)
  kafkaConsumer.close()
} else {
  print("committing cg offsets using admin")
  kafkaAdminClient.get.alterConsumerGroupOffsets(eventName, 
topicPartitionMap).all().get()
}
  }
}{code}
 

 
h2. HTTP ERROR 500 java.lang.NullPointerException
||URI:|/StreamingQuery/active|
||STATUS:|500|
||MESSAGE:|java.lang.NullPointerException|
||SERVLET:|org.apache.spark.ui.JettyUtils$$anon$1-522a82dd|
||CAUSED BY:|java.lang.NullPointerException|
h3. Caused by:

 
{code:java}
java.lang.NullPointerException at 
org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9(StreamingQueryPage.scala:258)
 at 
org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9$adapted(StreamingQueryPage.scala:258)
 at scala.math.Ordering$$anon$5.compare(Ordering.scala:253) at 
java.base/java.util.TimSort.binarySort(TimSort.java:296) at 
java.base/java.util.TimSort.sort(TimSort.java:239) at 
java.base/java.util.Arrays.sort(Arrays.java:1441) at 
scala.collection.SeqLike.sorted(SeqLike.scala:659) at 
scala.collection.SeqLike.sorted$(SeqLike.scala:647) at 
scala.collection.AbstractSeq.sorted(Seq.scala:45) at 
org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.(StreamingQueryPage.scala:223)
 at 
org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.dataSource(StreamingQueryPage.scala:149)
 at org.apache.spark.ui.PagedTable.table(PagedTable.scala:101) at 
org.apache.spark.ui.PagedTable.table$(PagedTable.scala:100) at 
org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.table(StreamingQueryPage.scala:114)
 at 
org.apache.spark.sql.streaming.ui.StreamingQueryPage.queryTable(StreamingQueryPage.scala:101)
 at 
org.apache.spark.sql.streaming.ui.StreamingQueryPage.generateStreamingQueryTable(StreamingQueryPage.scala:60)
 at 
org.apache.spark.sql.streaming.ui.StreamingQueryPage.render(StreamingQueryPage.scala:38)
 at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at 
org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at 
javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at 
javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at 
org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at 
org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626)
 at 

[jira] [Updated] (SPARK-45714) Spark UI throws 500 error when StructuredStreaming query filter is selected

2023-10-27 Thread Satyam Raj (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satyam Raj updated SPARK-45714:
---
Description: 
h2. HTTP ERROR 500 java.lang.NullPointerException
||URI:|/StreamingQuery/active|
||STATUS:|500|
||MESSAGE:|java.lang.NullPointerException|
||SERVLET:|org.apache.spark.ui.JettyUtils$$anon$1-522a82dd|
||CAUSED BY:|java.lang.NullPointerException|
h3. Caused by:

 
{code:java}
java.lang.NullPointerException at 
org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9(StreamingQueryPage.scala:258)
 at 
org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9$adapted(StreamingQueryPage.scala:258)
 at scala.math.Ordering$$anon$5.compare(Ordering.scala:253) at 
java.base/java.util.TimSort.binarySort(TimSort.java:296) at 
java.base/java.util.TimSort.sort(TimSort.java:239) at 
java.base/java.util.Arrays.sort(Arrays.java:1441) at 
scala.collection.SeqLike.sorted(SeqLike.scala:659) at 
scala.collection.SeqLike.sorted$(SeqLike.scala:647) at 
scala.collection.AbstractSeq.sorted(Seq.scala:45) at 
org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.(StreamingQueryPage.scala:223)
 at 
org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.dataSource(StreamingQueryPage.scala:149)
 at org.apache.spark.ui.PagedTable.table(PagedTable.scala:101) at 
org.apache.spark.ui.PagedTable.table$(PagedTable.scala:100) at 
org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.table(StreamingQueryPage.scala:114)
 at 
org.apache.spark.sql.streaming.ui.StreamingQueryPage.queryTable(StreamingQueryPage.scala:101)
 at 
org.apache.spark.sql.streaming.ui.StreamingQueryPage.generateStreamingQueryTable(StreamingQueryPage.scala:60)
 at 
org.apache.spark.sql.streaming.ui.StreamingQueryPage.render(StreamingQueryPage.scala:38)
 at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at 
org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at 
javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at 
javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at 
org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at 
org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626)
 at 
org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) at 
org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at 
org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
 at 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:185)
 at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) 
at 
org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
 at 
org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) 
at 
org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
 at 
org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434)
 at 
org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
 at 
org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) 
at 
org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
 at 
org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349)
 at 
org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
 at 
org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763)
 at 
org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
 at 
org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
 at org.sparkproject.jetty.server.Server.handle(Server.java:516) at 
org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) 
at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) at 
org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:380) at 
org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
 at 
org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
 at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at 
org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at 
org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
 at 
org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
 at 
org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
 at 
org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
 at 
org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:386)
 at