[ 
https://issues.apache.org/jira/browse/SPARK-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15134988#comment-15134988
 ] 

Asif Hussain Shahid commented on SPARK-13116:
---------------------------------------------

It will show up if you throw an exception in the below  mentioned place.

In SparkPlan. newMutableProjection, if the code path for some exception, takes 
to creating InterpretedMutableProjection, the issue will show up.
Just add 
throw new UnsupportedOperationException("XXXX")

after the  GenerateMutableProjection.generate, to simulate .


At this point I can only reproduce it artificially, by throwing an Exception in 
the code below in source code.
SparkPlan:
protected def newMutableProjection(
expressions: Seq[Expression],
inputSchema: Seq[Attribute],
useSubexprElimination: Boolean = false): () => MutableProjection = {
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
try
{ GenerateMutableProjection.generate(expressions, inputSchema, 
useSubexprElimination) throw new UnsupportedOperationException("TEST") }
catch {
case e: Exception =>
if (isTesting)
{ throw e }
else
{ log.error("Failed to generate mutable projection, fallback to interpreted", 
e) () => new InterpretedMutableProjection(expressions, inputSchema) }
}
}



> TungstenAggregate though it is supposedly capable of all processing unsafe & 
> safe rows, fails if the input is safe rows
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13116
>                 URL: https://issues.apache.org/jira/browse/SPARK-13116
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: Asif Hussain Shahid
>         Attachments: SPARK_13116_Test.scala
>
>
> TungstenAggregate though it is supposedly capable of all processing unsafe & 
> safe rows, fails if the input is safe rows.
> If the input to TungstenAggregateIterator is a SafeRow, while the target is 
> an UnsafeRow ,  the current code will try to set the fields in the UnsafeRow 
> using the update method in UnSafeRow. 
> This method is called via TunsgtenAggregateIterator on the 
> InterpretedMutableProjection. The target row in the 
> InterpretedMutableProjection is an UnsafeRow, while the current row is a 
> SafeRow.
> In the InterpretedMutableProjection's apply method, it invokes
>  mutableRow(i) = exprArray(i).eval(input)
> Now for UnsafeRow, the update method throws UnsupportedOperationException.
> The proposed fix I did for our forked branch , on the class 
> InterpretedProjection is:
> +  private var targetUnsafe = false
>  +  type UnsafeSetter = (UnsafeRow,  Any ) => Unit
>  +  private var setters : Array[UnsafeSetter] = _
>     private[this] val exprArray = expressions.toArray
>     private[this] var mutableRow: MutableRow = new 
> GenericMutableRow(exprArray.length)
>     def currentValue: InternalRow = mutableRow
>   
>  +
>     override def target(row: MutableRow): MutableProjection = {
>       mutableRow = row
>  +    targetUnsafe = row match {
>  +      case _:UnsafeRow =>{
>  +        if(setters == null) {
>  +          setters = Array.ofDim[UnsafeSetter](exprArray.length)
>  +          for(i <- 0 until exprArray.length) {
>  +            setters(i) = exprArray(i).dataType match {
>  +              case IntegerType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setInt(i,value.asInstanceOf[Int])
>  +              case LongType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setLong(i,value.asInstanceOf[Long])
>  +              case DoubleType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setDouble(i,value.asInstanceOf[Double])
>  +              case FloatType => (target: UnsafeRow, value: Any ) =>
>  +                target.setFloat(i,value.asInstanceOf[Float])
>  +
>  +              case NullType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setNullAt(i)
>  +
>  +              case BooleanType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setBoolean(i,value.asInstanceOf[Boolean])
>  +
>  +              case ByteType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setByte(i,value.asInstanceOf[Byte])
>  +              case ShortType => (target: UnsafeRow, value: Any ) =>
>  +                target.setShort(i,value.asInstanceOf[Short])
>  +
>  +            }
>  +          }
>  +        }
>  +        true
>  +      }
>  +      case _ => false
>  +    }
>  +
>       this
>     }
>   
>     override def apply(input: InternalRow): InternalRow = {
>       var i = 0
>       while (i < exprArray.length) {
>  -      mutableRow(i) = exprArray(i).eval(input)
>  +      if(targetUnsafe) {
>  +        setters(i)(mutableRow.asInstanceOf[UnsafeRow], 
> exprArray(i).eval(input))
>  +      }else {
>  +        mutableRow(i) = exprArray(i).eval(input)
>  +      }
>         i += 1
>       }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to