[ https://issues.apache.org/jira/browse/SPARK-45714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Satyam Raj updated SPARK-45714: ------------------------------- Description: h2. When I'm trying to do the following inside the listener class, and go to the UI to check the streaming query tab and click on Sort by Latest Batch, I get the below error. {code:java} class MyListener extends StreamingQueryListener { private val kafkaAdminClient = try { Some(Admin.create(getKafkaProp())) } catch { case _: Throwable => None } private val registeredConsumerGroups = kafkaAdminClient.get.listConsumerGroups().all().get() override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { val eventName = event.progress.name commitConsumerOffset(event.progress.sources.head.endOffset, eventName) } private def getKafkaProp(eventName: String = "dummy-admin"): Properties = { val props: Properties = new Properties() props.put("bootstrap.servers", Configuration.configurationMap("kafka.broker")) props.put("group.id", eventName) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") props.put("security.protocol", Configuration.configurationMap("kafka.security.protocol")) props.put("sasl.mechanism", Configuration.configurationMap("kafka.sasl.mechanism")) props.put("sasl.jaas.config", Configuration.configurationMap("kafka.sasl.jaas.config")) props } private def getTopicPartitionMap(topic: String, jsonOffsets: Map[String, Map[String, Long]]) = { val offsets = jsonOffsets.head._2 val topicPartitionMap = new util.HashMap[TopicPartition, OffsetAndMetadata]() offsets.keys.foreach(partition => { val tp = new TopicPartition(topic, partition.toInt) val oam = new OffsetAndMetadata(offsets(partition).asInstanceOf[Number].longValue()) topicPartitionMap.put(tp, oam) }) topicPartitionMap } private def commitConsumerOffset(endOffset: String, eventName: String) = { val jsonOffsets = mapper.readValue(endOffset, classOf[Map[String, Map[String, Long]]]) val topicPartitionMap = getTopicPartitionMap(eventName, jsonOffsets) if (!registeredConsumerGroups.contains(eventName)) { print("topic consumer not created! creating") val kafkaConsumer = new KafkaConsumer[String, String](getKafkaProp(eventName)) kafkaConsumer.commitSync(topicPartitionMap) kafkaConsumer.close() } else { print("committing cg offsets using admin") kafkaAdminClient.get.alterConsumerGroupOffsets(eventName, topicPartitionMap).all().get() } } }{code} h2. HTTP ERROR 500 java.lang.NullPointerException ||URI:|/StreamingQuery/active| ||STATUS:|500| ||MESSAGE:|java.lang.NullPointerException| ||SERVLET:|org.apache.spark.ui.JettyUtils$$anon$1-522a82dd| ||CAUSED BY:|java.lang.NullPointerException| h3. Caused by: {code:java} java.lang.NullPointerException at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9(StreamingQueryPage.scala:258) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9$adapted(StreamingQueryPage.scala:258) at scala.math.Ordering$$anon$5.compare(Ordering.scala:253) at java.base/java.util.TimSort.binarySort(TimSort.java:296) at java.base/java.util.TimSort.sort(TimSort.java:239) at java.base/java.util.Arrays.sort(Arrays.java:1441) at scala.collection.SeqLike.sorted(SeqLike.scala:659) at scala.collection.SeqLike.sorted$(SeqLike.scala:647) at scala.collection.AbstractSeq.sorted(Seq.scala:45) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.<init>(StreamingQueryPage.scala:223) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.dataSource(StreamingQueryPage.scala:149) at org.apache.spark.ui.PagedTable.table(PagedTable.scala:101) at org.apache.spark.ui.PagedTable.table$(PagedTable.scala:100) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.table(StreamingQueryPage.scala:114) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.queryTable(StreamingQueryPage.scala:101) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.generateStreamingQueryTable(StreamingQueryPage.scala:60) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.render(StreamingQueryPage.scala:38) at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626) at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:185) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.sparkproject.jetty.server.Server.handle(Server.java:516) at org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:380) at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:386) at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) at java.base/java.lang.Thread.run(Thread.java:829) {code} was: h2. HTTP ERROR 500 java.lang.NullPointerException ||URI:|/StreamingQuery/active| ||STATUS:|500| ||MESSAGE:|java.lang.NullPointerException| ||SERVLET:|org.apache.spark.ui.JettyUtils$$anon$1-522a82dd| ||CAUSED BY:|java.lang.NullPointerException| h3. Caused by: {code:java} java.lang.NullPointerException at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9(StreamingQueryPage.scala:258) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9$adapted(StreamingQueryPage.scala:258) at scala.math.Ordering$$anon$5.compare(Ordering.scala:253) at java.base/java.util.TimSort.binarySort(TimSort.java:296) at java.base/java.util.TimSort.sort(TimSort.java:239) at java.base/java.util.Arrays.sort(Arrays.java:1441) at scala.collection.SeqLike.sorted(SeqLike.scala:659) at scala.collection.SeqLike.sorted$(SeqLike.scala:647) at scala.collection.AbstractSeq.sorted(Seq.scala:45) at org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.<init>(StreamingQueryPage.scala:223) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.dataSource(StreamingQueryPage.scala:149) at org.apache.spark.ui.PagedTable.table(PagedTable.scala:101) at org.apache.spark.ui.PagedTable.table$(PagedTable.scala:100) at org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.table(StreamingQueryPage.scala:114) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.queryTable(StreamingQueryPage.scala:101) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.generateStreamingQueryTable(StreamingQueryPage.scala:60) at org.apache.spark.sql.streaming.ui.StreamingQueryPage.render(StreamingQueryPage.scala:38) at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) at org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626) at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:185) at org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) at org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.sparkproject.jetty.server.Server.handle(Server.java:516) at org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:380) at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:386) at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) at java.base/java.lang.Thread.run(Thread.java:829) {code} > Spark UI throws 500 error when StructuredStreaming query filter is selected > --------------------------------------------------------------------------- > > Key: SPARK-45714 > URL: https://issues.apache.org/jira/browse/SPARK-45714 > Project: Spark > Issue Type: Bug > Components: Java API > Affects Versions: 3.4.0 > Reporter: Satyam Raj > Priority: Major > > h2. > When I'm trying to do the following inside the listener class, and go to the > UI to check the streaming query tab and click on Sort by Latest Batch, I get > the below error. > > {code:java} > class MyListener extends StreamingQueryListener { > private val kafkaAdminClient = try { > Some(Admin.create(getKafkaProp())) > } catch { > case _: Throwable => None > } > private val registeredConsumerGroups = > kafkaAdminClient.get.listConsumerGroups().all().get() > override def onQueryProgress(event: > StreamingQueryListener.QueryProgressEvent): Unit = { > val eventName = event.progress.name > commitConsumerOffset(event.progress.sources.head.endOffset, eventName) > } > private def getKafkaProp(eventName: String = "dummy-admin"): Properties = { > val props: Properties = new Properties() > props.put("bootstrap.servers", > Configuration.configurationMap("kafka.broker")) > props.put("group.id", eventName) > props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer") > props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > "org.apache.kafka.common.serialization.StringDeserializer") > props.put("security.protocol", > Configuration.configurationMap("kafka.security.protocol")) > props.put("sasl.mechanism", > Configuration.configurationMap("kafka.sasl.mechanism")) > props.put("sasl.jaas.config", > Configuration.configurationMap("kafka.sasl.jaas.config")) > props > } > private def getTopicPartitionMap(topic: String, jsonOffsets: Map[String, > Map[String, Long]]) = { > val offsets = jsonOffsets.head._2 > val topicPartitionMap = new util.HashMap[TopicPartition, > OffsetAndMetadata]() > offsets.keys.foreach(partition => { > val tp = new TopicPartition(topic, partition.toInt) > val oam = new > OffsetAndMetadata(offsets(partition).asInstanceOf[Number].longValue()) > topicPartitionMap.put(tp, oam) > }) > topicPartitionMap > } > private def commitConsumerOffset(endOffset: String, eventName: String) = { > val jsonOffsets = mapper.readValue(endOffset, classOf[Map[String, > Map[String, Long]]]) > val topicPartitionMap = getTopicPartitionMap(eventName, jsonOffsets) > if (!registeredConsumerGroups.contains(eventName)) { > print("topic consumer not created! creating") > val kafkaConsumer = new KafkaConsumer[String, > String](getKafkaProp(eventName)) > kafkaConsumer.commitSync(topicPartitionMap) > kafkaConsumer.close() > } else { > print("committing cg offsets using admin") > kafkaAdminClient.get.alterConsumerGroupOffsets(eventName, > topicPartitionMap).all().get() > } > } > }{code} > > > h2. HTTP ERROR 500 java.lang.NullPointerException > ||URI:|/StreamingQuery/active| > ||STATUS:|500| > ||MESSAGE:|java.lang.NullPointerException| > ||SERVLET:|org.apache.spark.ui.JettyUtils$$anon$1-522a82dd| > ||CAUSED BY:|java.lang.NullPointerException| > h3. Caused by: > > {code:java} > java.lang.NullPointerException at > org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9(StreamingQueryPage.scala:258) > at > org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.$anonfun$ordering$9$adapted(StreamingQueryPage.scala:258) > at scala.math.Ordering$$anon$5.compare(Ordering.scala:253) at > java.base/java.util.TimSort.binarySort(TimSort.java:296) at > java.base/java.util.TimSort.sort(TimSort.java:239) at > java.base/java.util.Arrays.sort(Arrays.java:1441) at > scala.collection.SeqLike.sorted(SeqLike.scala:659) at > scala.collection.SeqLike.sorted$(SeqLike.scala:647) at > scala.collection.AbstractSeq.sorted(Seq.scala:45) at > org.apache.spark.sql.streaming.ui.StreamingQueryDataSource.<init>(StreamingQueryPage.scala:223) > at > org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.dataSource(StreamingQueryPage.scala:149) > at org.apache.spark.ui.PagedTable.table(PagedTable.scala:101) at > org.apache.spark.ui.PagedTable.table$(PagedTable.scala:100) at > org.apache.spark.sql.streaming.ui.StreamingQueryPagedTable.table(StreamingQueryPage.scala:114) > at > org.apache.spark.sql.streaming.ui.StreamingQueryPage.queryTable(StreamingQueryPage.scala:101) > at > org.apache.spark.sql.streaming.ui.StreamingQueryPage.generateStreamingQueryTable(StreamingQueryPage.scala:60) > at > org.apache.spark.sql.streaming.ui.StreamingQueryPage.render(StreamingQueryPage.scala:38) > at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:90) at > org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:81) at > javax.servlet.http.HttpServlet.service(HttpServlet.java:503) at > javax.servlet.http.HttpServlet.service(HttpServlet.java:590) at > org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:799) > at > org.sparkproject.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626) > at > org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95) > at > org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) > at > org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) > at > org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.doFilter(AmIpFilter.java:185) > at > org.sparkproject.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193) > at > org.sparkproject.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601) > at > org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548) > at > org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) > at > org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1434) > at > org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) > at > org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501) > at > org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) > at > org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1349) > at > org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:763) > at > org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234) > at > org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) > at org.sparkproject.jetty.server.Server.handle(Server.java:516) at > org.sparkproject.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388) > at org.sparkproject.jetty.server.HttpChannel.dispatch(HttpChannel.java:633) > at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:380) at > org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:277) > at > org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) > at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:105) at > org.sparkproject.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104) at > org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338) > at > org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315) > at > org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173) > at > org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131) > at > org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:386) > at > org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883) > at > org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034) > at java.base/java.lang.Thread.run(Thread.java:829) > > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org