[ 
https://issues.apache.org/jira/browse/SPARK-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134929#comment-15134929
 ] 

Asif Hussain Shahid commented on SPARK-13116:
---------------------------------------------


I further debugged the issue & found the following 

The problem appears only if the SparkPlan.newMutableProjection ,  throws an 
exception in generating mutable projection through code generation. In which 
case, the code defaults to creating InterpretedMutableProjection.  This is what 
causes the problem. Looks like in our case, one of our  UnaryExpression had 
incorrect code generation which caused exception in bytes generation, resulting 
in InterpretedMutableProjection being created.
I am attaching a Test , which will fail, if  the generated mutable projection 
fails for some reason.

At this point I can only reproduce it artificially, by throwing an Exception in 
the code below in source code.

SparkPlan:
protected def newMutableProjection(
      expressions: Seq[Expression],
      inputSchema: Seq[Attribute],
      useSubexprElimination: Boolean = false): () => MutableProjection = {
    log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
    try {
      GenerateMutableProjection.generate(expressions, inputSchema, 
useSubexprElimination)
      throw new UnsupportedOperationException("TEST")
    } catch {
      case e: Exception =>
        if (isTesting) {
          throw e
        } else {
          log.error("Failed to generate mutable projection, fallback to 
interpreted", e)
          () => new InterpretedMutableProjection(expressions, inputSchema)
        }
    }
  }

I suppose it is not a high priority bug , or something which requires fix , 
just something which I came across in our development.   
Just to note: I found this issue to be present in the latest code of spark , 
too.

> TungstenAggregate though it is supposedly capable of all processing unsafe & 
> safe rows, fails if the input is safe rows
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13116
>                 URL: https://issues.apache.org/jira/browse/SPARK-13116
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: Asif Hussain Shahid
>         Attachments: SPARK_13116_Test.scala
>
>
> TungstenAggregate though it is supposedly capable of all processing unsafe & 
> safe rows, fails if the input is safe rows.
> If the input to TungstenAggregateIterator is a SafeRow, while the target is 
> an UnsafeRow ,  the current code will try to set the fields in the UnsafeRow 
> using the update method in UnSafeRow. 
> This method is called via TunsgtenAggregateIterator on the 
> InterpretedMutableProjection. The target row in the 
> InterpretedMutableProjection is an UnsafeRow, while the current row is a 
> SafeRow.
> In the InterpretedMutableProjection's apply method, it invokes
>  mutableRow(i) = exprArray(i).eval(input)
> Now for UnsafeRow, the update method throws UnsupportedOperationException.
> The proposed fix I did for our forked branch , on the class 
> InterpretedProjection is:
> +  private var targetUnsafe = false
>  +  type UnsafeSetter = (UnsafeRow,  Any ) => Unit
>  +  private var setters : Array[UnsafeSetter] = _
>     private[this] val exprArray = expressions.toArray
>     private[this] var mutableRow: MutableRow = new 
> GenericMutableRow(exprArray.length)
>     def currentValue: InternalRow = mutableRow
>   
>  +
>     override def target(row: MutableRow): MutableProjection = {
>       mutableRow = row
>  +    targetUnsafe = row match {
>  +      case _:UnsafeRow =>{
>  +        if(setters == null) {
>  +          setters = Array.ofDim[UnsafeSetter](exprArray.length)
>  +          for(i <- 0 until exprArray.length) {
>  +            setters(i) = exprArray(i).dataType match {
>  +              case IntegerType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setInt(i,value.asInstanceOf[Int])
>  +              case LongType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setLong(i,value.asInstanceOf[Long])
>  +              case DoubleType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setDouble(i,value.asInstanceOf[Double])
>  +              case FloatType => (target: UnsafeRow, value: Any ) =>
>  +                target.setFloat(i,value.asInstanceOf[Float])
>  +
>  +              case NullType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setNullAt(i)
>  +
>  +              case BooleanType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setBoolean(i,value.asInstanceOf[Boolean])
>  +
>  +              case ByteType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setByte(i,value.asInstanceOf[Byte])
>  +              case ShortType => (target: UnsafeRow, value: Any ) =>
>  +                target.setShort(i,value.asInstanceOf[Short])
>  +
>  +            }
>  +          }
>  +        }
>  +        true
>  +      }
>  +      case _ => false
>  +    }
>  +
>       this
>     }
>   
>     override def apply(input: InternalRow): InternalRow = {
>       var i = 0
>       while (i < exprArray.length) {
>  -      mutableRow(i) = exprArray(i).eval(input)
>  +      if(targetUnsafe) {
>  +        setters(i)(mutableRow.asInstanceOf[UnsafeRow], 
> exprArray(i).eval(input))
>  +      }else {
>  +        mutableRow(i) = exprArray(i).eval(input)
>  +      }
>         i += 1
>       }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to