[ 
https://issues.apache.org/jira/browse/SPARK-37821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

muhong updated SPARK-37821:
---------------------------
    Description: 
this problem will happen in long run spark application,such as thrift server;

as only one SparkContext instance in thrift server driver size,so if the 
concurrency of sql request is large or the sql is too complicate(this will 
create a lot of rdd), the rdd will be generate too fast , the rdd id 
(SparkContext.scala#nextRddId:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala]
 )will be consume fast, after a few months the nextRddId will overflow。the 
newRddId will be negative number,but the rdd's block id need to be positive, so 
this will lead a exception"Failed to parse rdd_-2123452330_2 into block ID"(rdd 
block id formate“val RDD = 
"rdd_([0-9]{+})_([0-9]{+})".r”:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockId.scala]),so
 can not exchange data during sql execution, and lead sql execute failed

if rddId overflow , when rdd.MapPartition execute , error will occur, the error 
is occur on driver side, when driver deserialize block id from "block message" 
inputstream

when executor invoke rdd.MapPartition, it will call block manager to report 
block status,  the the block id is negative,when the message send back to 
driver , the driver regex will failed match and throw an exception

 

how to fix the problem???

SparkContext.scala

 
{code:java}
...
...
 private val nextShuffleId = new AtomicInteger(0)
  private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
  private var nextRddId = new AtomicInteger(0) // change happen
  /** Register a new RDD, returning its RDD ID */  
// change happen
private[spark] def newRddId(): Int = {
  var id = nextRddId.getAndIncrement()
  if (id > 0) {
    return id
  }
  this.synchronized {
    id = nextRddId.getAndIncrement()
   if (id < 0) {       
      nextRddId = new AtomicInteger(0)
      id = nextRddId.getAndIncrement()
    }
  }
  id
}
...
...{code}

  was:
this problem will happen in long run spark application,such as thrift server;

as only one SparkContext instance in thrift server driver size,so if the 
concurrency of sql request is large or the sql is too complicate(this will 
create a lot of rdd), the rdd will be generate too fast , the rdd id 
(SparkContext.scala#nextRddId:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala]
 )will be consume fast, after a few months the nextRddId will overflow。the 
newRddId will be negative number,but the rdd's block id need to be positive, so 
this will lead a exception"Failed to parse rdd_-2123452330_2 into block ID"(rdd 
block id formate“val RDD = 
"rdd_([0-9]{+})_([0-9]{+})".r”:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockId.scala]),so
 can not exchange data during sql execution, and lead sql execute failed

 

if rddId overflow , when rdd.MapPartition execute , error will occure

 

how to fix the problem???

SparkContext.scala

 
{code:java}
...
...
 private val nextShuffleId = new AtomicInteger(0)
  private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
  private var nextRddId = new AtomicInteger(0) // change happen
  /** Register a new RDD, returning its RDD ID */  
// change happen
private[spark] def newRddId(): Int = {
  var id = nextRddId.getAndIncrement()
  if (id > 0) {
    return id
  }
  this.synchronized {
    id = nextRddId.getAndIncrement()
   if (id < 0) {       
      nextRddId = new AtomicInteger(0)
      id = nextRddId.getAndIncrement()
    }
  }
  id
}
...
...{code}


> spark thrift server RDD ID overflow lead sql execute failed
> -----------------------------------------------------------
>
>                 Key: SPARK-37821
>                 URL: https://issues.apache.org/jira/browse/SPARK-37821
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.2.0
>            Reporter: muhong
>            Priority: Major
>
> this problem will happen in long run spark application,such as thrift server;
> as only one SparkContext instance in thrift server driver size,so if the 
> concurrency of sql request is large or the sql is too complicate(this will 
> create a lot of rdd), the rdd will be generate too fast , the rdd id 
> (SparkContext.scala#nextRddId:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala]
>  )will be consume fast, after a few months the nextRddId will overflow。the 
> newRddId will be negative number,but the rdd's block id need to be positive, 
> so this will lead a exception"Failed to parse rdd_-2123452330_2 into block 
> ID"(rdd block id formate“val RDD = 
> "rdd_([0-9]{+})_([0-9]{+})".r”:[https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockId.scala]),so
>  can not exchange data during sql execution, and lead sql execute failed
> if rddId overflow , when rdd.MapPartition execute , error will occur, the 
> error is occur on driver side, when driver deserialize block id from "block 
> message" inputstream
> when executor invoke rdd.MapPartition, it will call block manager to report 
> block status,  the the block id is negative,when the message send back to 
> driver , the driver regex will failed match and throw an exception
>  
> how to fix the problem???
> SparkContext.scala
>  
> {code:java}
> ...
> ...
>  private val nextShuffleId = new AtomicInteger(0)
>   private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
>   private var nextRddId = new AtomicInteger(0) // change happen
>   /** Register a new RDD, returning its RDD ID */  
> // change happen
> private[spark] def newRddId(): Int = {
>   var id = nextRddId.getAndIncrement()
>   if (id > 0) {
>     return id
>   }
>   this.synchronized {
>     id = nextRddId.getAndIncrement()
>    if (id < 0) {       
>       nextRddId = new AtomicInteger(0)
>       id = nextRddId.getAndIncrement()
>     }
>   }
>   id
> }
> ...
> ...{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to