[ 
https://issues.apache.org/jira/browse/SPARK-37821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

muhong updated SPARK-37821:
---------------------------
    Description: 
this problem will happen in long run spark application,such as thrift server;

as only one SparkContext instance in thrift server driver size,so if the 
concurrency of sql request is large or the sql is too complicate(this will 
create a lot of rdd), the rdd will be generate too fast , the rdd id 
(SparkContext.scala#nextRddId:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala]
 )will be consume fast, after a few months the nextRddId will overflow。the 
newRddId will be negative number,but the rdd's block id need to be positive, so 
this will lead a exception"Failed to parse rdd_-2123452330_2 into block ID"(rdd 
block id formate“val RDD = 
"rdd_([0-9]{+})_([0-9]{+})".r”:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockId.scala]),so
 can not exchange data during sql execution, and lead sql execute failed

 

if rddId overflow , when rdd.MapPartition execute , error will occure

 

how to fix the problem???

SparkContext.scala

 
{code:java}
...
...
 private val nextShuffleId = new AtomicInteger(0)
  private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
  private val nextRddId = new AtomicInteger(0)
  /** Register a new RDD, returning its RDD ID */  private[spark] def 
newRddId(): Int = nextRddId.getAndIncrement()
  /**   * Registers listeners specified in spark.extraListeners, then starts 
the listener bus.   * This should be called after all internal listeners have 
been registered with the listener bus   * (e.g. after the web UI and event 
logging listeners have been registered).   */  private def 
setupAndStartListenerBus(): Unit = {    try {      
conf.get(EXTRA_LISTENERS).foreach { classNames =>        val listeners = 
Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)        
listeners.foreach { listener =>          listenerBus.addToSharedQueue(listener) 
         logInfo(s"Registered listener ${listener.getClass().getName()}")       
 }      }    } catch {      case e: Exception =>        try {          stop()   
     } finally {          throw new SparkException(s"Exception when registering 
SparkListener", e)        }    }
    listenerBus.start(this, _env.metricsSystem)    _listenerBusStarted = true  
} 

...
...{code}

  was:
this problem will happen in long run spark application,such as thrift server;

as only one SparkContext instance in thrift server driver size,so if the 
concurrency of sql request is large or the sql is too complicate(this will 
create a lot of rdd), the rdd will be generate too fast , the rdd id 
(SparkContext.scala#nextRddId:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala]
 )will be consume fast, after a few months the nextRddId will overflow。the 
newRddId will be negative number,but the rdd's block id need to be positive, so 
this will lead a exception"Failed to parse rdd_-2123452330_2 into block ID"(rdd 
block id formate“val RDD = 
"rdd_([0-9]{+})_([0-9]{+})".r”:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockId.scala]),so
 can not exchange data during sql execution, and lead sql execute failed

 

if rddId overflow , when rdd.MapPartition execute , error will occure


> spark thrift server RDD ID overflow lead sql execute failed
> -----------------------------------------------------------
>
>                 Key: SPARK-37821
>                 URL: https://issues.apache.org/jira/browse/SPARK-37821
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.0
>            Reporter: muhong
>            Priority: Major
>
> this problem will happen in long run spark application,such as thrift server;
> as only one SparkContext instance in thrift server driver size,so if the 
> concurrency of sql request is large or the sql is too complicate(this will 
> create a lot of rdd), the rdd will be generate too fast , the rdd id 
> (SparkContext.scala#nextRddId:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala]
>  )will be consume fast, after a few months the nextRddId will overflow。the 
> newRddId will be negative number,but the rdd's block id need to be positive, 
> so this will lead a exception"Failed to parse rdd_-2123452330_2 into block 
> ID"(rdd block id formate“val RDD = 
> "rdd_([0-9]{+})_([0-9]{+})".r”:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockId.scala]),so
>  can not exchange data during sql execution, and lead sql execute failed
>  
> if rddId overflow , when rdd.MapPartition execute , error will occure
>  
> how to fix the problem???
> SparkContext.scala
>  
> {code:java}
> ...
> ...
>  private val nextShuffleId = new AtomicInteger(0)
>   private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
>   private val nextRddId = new AtomicInteger(0)
>   /** Register a new RDD, returning its RDD ID */  private[spark] def 
> newRddId(): Int = nextRddId.getAndIncrement()
>   /**   * Registers listeners specified in spark.extraListeners, then starts 
> the listener bus.   * This should be called after all internal listeners have 
> been registered with the listener bus   * (e.g. after the web UI and event 
> logging listeners have been registered).   */  private def 
> setupAndStartListenerBus(): Unit = {    try {      
> conf.get(EXTRA_LISTENERS).foreach { classNames =>        val listeners = 
> Utils.loadExtensions(classOf[SparkListenerInterface], classNames, conf)       
>  listeners.foreach { listener =>          
> listenerBus.addToSharedQueue(listener)          logInfo(s"Registered listener 
> ${listener.getClass().getName()}")        }      }    } catch {      case e: 
> Exception =>        try {          stop()        } finally {          throw 
> new SparkException(s"Exception when registering SparkListener", e)        }   
>  }
>     listenerBus.start(this, _env.metricsSystem)    _listenerBusStarted = true 
>  } 
> ...
> ...{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to