[ 
https://issues.apache.org/jira/browse/SPARK-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131102#comment-15131102
 ] 

Apache Spark commented on SPARK-13116:
--------------------------------------

User 'ahshahid' has created a pull request for this issue:
https://github.com/apache/spark/pull/11058

> TungstenAggregate though it is supposedly capable of all processing unsafe & 
> safe rows, fails if the input is safe rows
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13116
>                 URL: https://issues.apache.org/jira/browse/SPARK-13116
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>            Reporter: Asif Hussain Shahid
>
> TungstenAggregate though it is supposedly capable of all processing unsafe & 
> safe rows, fails if the input is safe rows.
> If the input to TungstenAggregateIterator is a SafeRow, while the target is 
> an UnsafeRow ,  the current code will try to set the fields in the UnsafeRow 
> using the update method in UnSafeRow. 
> This method is called via TunsgtenAggregateIterator on the 
> InterpretedMutableProjection. The target row in the 
> InterpretedMutableProjection is an UnsafeRow, while the current row is a 
> SafeRow.
> In the InterpretedMutableProjection's apply method, it invokes
>  mutableRow(i) = exprArray(i).eval(input)
> Now for UnsafeRow, the update method throws UnsupportedOperationException.
> The proposed fix I did for our forked branch , on the class 
> InterpretedProjection is:
> +  private var targetUnsafe = false
>  +  type UnsafeSetter = (UnsafeRow,  Any ) => Unit
>  +  private var setters : Array[UnsafeSetter] = _
>     private[this] val exprArray = expressions.toArray
>     private[this] var mutableRow: MutableRow = new 
> GenericMutableRow(exprArray.length)
>     def currentValue: InternalRow = mutableRow
>   
>  +
>     override def target(row: MutableRow): MutableProjection = {
>       mutableRow = row
>  +    targetUnsafe = row match {
>  +      case _:UnsafeRow =>{
>  +        if(setters == null) {
>  +          setters = Array.ofDim[UnsafeSetter](exprArray.length)
>  +          for(i <- 0 until exprArray.length) {
>  +            setters(i) = exprArray(i).dataType match {
>  +              case IntegerType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setInt(i,value.asInstanceOf[Int])
>  +              case LongType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setLong(i,value.asInstanceOf[Long])
>  +              case DoubleType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setDouble(i,value.asInstanceOf[Double])
>  +              case FloatType => (target: UnsafeRow, value: Any ) =>
>  +                target.setFloat(i,value.asInstanceOf[Float])
>  +
>  +              case NullType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setNullAt(i)
>  +
>  +              case BooleanType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setBoolean(i,value.asInstanceOf[Boolean])
>  +
>  +              case ByteType => (target: UnsafeRow,  value: Any ) =>
>  +                target.setByte(i,value.asInstanceOf[Byte])
>  +              case ShortType => (target: UnsafeRow, value: Any ) =>
>  +                target.setShort(i,value.asInstanceOf[Short])
>  +
>  +            }
>  +          }
>  +        }
>  +        true
>  +      }
>  +      case _ => false
>  +    }
>  +
>       this
>     }
>   
>     override def apply(input: InternalRow): InternalRow = {
>       var i = 0
>       while (i < exprArray.length) {
>  -      mutableRow(i) = exprArray(i).eval(input)
>  +      if(targetUnsafe) {
>  +        setters(i)(mutableRow.asInstanceOf[UnsafeRow], 
> exprArray(i).eval(input))
>  +      }else {
>  +        mutableRow(i) = exprArray(i).eval(input)
>  +      }
>         i += 1
>       }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to