[jira] [Updated] (SPARK-45714) Spark UI throws 500 error when StructuredStreaming query filter is selected
[ https://issues.apache.org/jira/browse/SPARK-45714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satyam Raj updated SPARK-45714: --- Description: h2. When I'm trying to do the following inside the listener class, and go to the UI to check the streaming query tab and click on Sort by Latest Batch, I get the below error. {code:java} class MyListener extends StreamingQueryListener { private val kafkaAdminClient = try { Some(Admin.create(getKafkaProp())) } catch { case _: Throwable => None } private val registeredConsumerGroups = kafkaAdminClient.get.listConsumerGroups().all().get() override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { val eventName = event.progress.name commitConsumerOffset(event.progress.sources.head.endOffset, eventName) } private def getKafkaProp(eventName: String = "dummy-admin"): Properties = { val props: Properties = new Properties() props.put("bootstrap.servers", Configuration.configurationMap("kafka.broker")) props.put("group.id", eventName) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") props.put("security.protocol", Configuration.configurationMap("kafka.security.protocol")) props.put("sasl.mechanism", Configuration.configurationMap("kafka.sasl.mechanism")) props.put("sasl.jaas.config", Configuration.configurationMap("kafka.sasl.jaas.config")) props } private def getTopicPartitionMap(topic: String, jsonOffsets: Map[String, Map[String, Long]]) = { val offsets = jsonOffsets.head._2 val topicPartitionMap = new util.HashMap[TopicPartition, OffsetAndMetadata]() offsets.keys.foreach(partition => { val tp = new TopicPartition(topic, partition.toInt) val oam = new OffsetAndMetadata(offsets(partition).asInstanceOf[Number].longValue()) topicPartitionMap.put(tp, oam) }) topicPartitionMap } private def commitConsumerOffset(endOffset: String, eventName: String) = { val jsonOffsets = mapper.readValue(endOffset, classOf[Map[String, Map[String, Long]]]) val topicPartitionMap = getTopicPartitionMap(eventName, jsonOffsets) if (!registeredConsumerGroups.contains(eventName)) { print("topic consumer not created! creating") val kafkaConsumer = new KafkaConsumer[String, String](getKafkaProp(eventName)) kafkaConsumer.commitSync(topicPartitionMap) kafkaConsumer.close() } else { print("committing cg offsets using admin") kafkaAdminClient.get.alterConsumerGroupOffsets(eventName, topicPartitionMap).all().get() } } }{code} h2. HTTP ERROR 500 java.lang.NullPointerException ||URI:|/StreamingQuery/active| ||STATUS:|500| ||MESSAGE:|java.lang.NullPointerException| ||SERVLET:|org.apache.spark.ui.JettyUtils$$anon$1-522a82dd| ||CAUSED BY:|java.lang.NullPointerException| h3. Caused by: {code:java} java.lang.NullPointerException at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9(StreamingQueryPage.scala:258) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9$adapted(StreamingQueryPage.scala:258) at scala.math.Ordering$$anon$5.compare(Ordering.scala:253) at java.base/java.util.TimSort.binarySort(TimSort.java:296) at java.base/java.util.TimSort.sort(TimSort.java:239) at java.base/java.util.Arrays.sort(Arrays.java:1441) at scala.collection.SeqLike.sorted(SeqLike.scala:659) at scala.collection.SeqLike.sorted$(SeqLike.scala:647) at scala.collection.AbstractSeq.sorted(Seq.scala:45) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.(StreamingQueryPage.scala:223) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.dataSource(StreamingQueryPage.scala:149) at org.apache.spark.ui.PagedTable.table(PagedTable.scala:101) at org.apache.spark.ui.PagedTable.table$(PagedTable.scala:100) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.table(StreamingQueryPage.scala:114) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.queryTable(StreamingQueryPage.scala:101) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.generateStreamingQueryTable(StreamingQueryPage.scala:60) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.render(StreamingQueryPage.scala:38) at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626) at
[jira] [Updated] (SPARK-45714) Spark UI throws 500 error when StructuredStreaming query filter is selected
[ https://issues.apache.org/jira/browse/SPARK-45714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satyam Raj updated SPARK-45714: --- Description: h2. HTTP ERROR 500 java.lang.NullPointerException ||URI:|/StreamingQuery/active| ||STATUS:|500| ||MESSAGE:|java.lang.NullPointerException| ||SERVLET:|org.apache.spark.ui.JettyUtils$$anon$1-522a82dd| ||CAUSED BY:|java.lang.NullPointerException| h3. Caused by: {code:java} java.lang.NullPointerException at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9(StreamingQueryPage.scala:258) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9$adapted(StreamingQueryPage.scala:258) at scala.math.Ordering$$anon$5.compare(Ordering.scala:253) at java.base/java.util.TimSort.binarySort(TimSort.java:296) at java.base/java.util.TimSort.sort(TimSort.java:239) at java.base/java.util.Arrays.sort(Arrays.java:1441) at scala.collection.SeqLike.sorted(SeqLike.scala:659) at scala.collection.SeqLike.sorted$(SeqLike.scala:647) at scala.collection.AbstractSeq.sorted(Seq.scala:45) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.(StreamingQueryPage.scala:223) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.dataSource(StreamingQueryPage.scala:149) at org.apache.spark.ui.PagedTable.table(PagedTable.scala:101) at org.apache.spark.ui.PagedTable.table$(PagedTable.scala:100) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.table(StreamingQueryPage.scala:114) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.queryTable(StreamingQueryPage.scala:101) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.generateStreamingQueryTable(StreamingQueryPage.scala:60) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.render(StreamingQueryPage.scala:38) at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626) at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:185) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.sparkproject.jetty.server.Server.handle(Server.java:516) at org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:380) at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:386) at