[jira] [Assigned] (SPARK-13116) TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows
[ https://issues.apache.org/jira/browse/SPARK-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-13116: Assignee: Apache Spark > TungstenAggregate though it is supposedly capable of all processing unsafe & > safe rows, fails if the input is safe rows > --- > > Key: SPARK-13116 > URL: https://issues.apache.org/jira/browse/SPARK-13116 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Asif Hussain Shahid >Assignee: Apache Spark > > TungstenAggregate though it is supposedly capable of all processing unsafe & > safe rows, fails if the input is safe rows. > If the input to TungstenAggregateIterator is a SafeRow, while the target is > an UnsafeRow , the current code will try to set the fields in the UnsafeRow > using the update method in UnSafeRow. > This method is called via TunsgtenAggregateIterator on the > InterpretedMutableProjection. The target row in the > InterpretedMutableProjection is an UnsafeRow, while the current row is a > SafeRow. > In the InterpretedMutableProjection's apply method, it invokes > mutableRow(i) = exprArray(i).eval(input) > Now for UnsafeRow, the update method throws UnsupportedOperationException. > The proposed fix I did for our forked branch , on the class > InterpretedProjection is: > + private var targetUnsafe = false > + type UnsafeSetter = (UnsafeRow, Any ) => Unit > + private var setters : Array[UnsafeSetter] = _ > private[this] val exprArray = expressions.toArray > private[this] var mutableRow: MutableRow = new > GenericMutableRow(exprArray.length) > def currentValue: InternalRow = mutableRow > > + > override def target(row: MutableRow): MutableProjection = { > mutableRow = row > +targetUnsafe = row match { > + case _:UnsafeRow =>{ > +if(setters == null) { > + setters = Array.ofDim[UnsafeSetter](exprArray.length) > + for(i <- 0 until exprArray.length) { > +setters(i) = exprArray(i).dataType match { > + case IntegerType => (target: UnsafeRow, value: Any ) => > +target.setInt(i,value.asInstanceOf[Int]) > + case LongType => (target: UnsafeRow, value: Any ) => > +target.setLong(i,value.asInstanceOf[Long]) > + case DoubleType => (target: UnsafeRow, value: Any ) => > +target.setDouble(i,value.asInstanceOf[Double]) > + case FloatType => (target: UnsafeRow, value: Any ) => > +target.setFloat(i,value.asInstanceOf[Float]) > + > + case NullType => (target: UnsafeRow, value: Any ) => > +target.setNullAt(i) > + > + case BooleanType => (target: UnsafeRow, value: Any ) => > +target.setBoolean(i,value.asInstanceOf[Boolean]) > + > + case ByteType => (target: UnsafeRow, value: Any ) => > +target.setByte(i,value.asInstanceOf[Byte]) > + case ShortType => (target: UnsafeRow, value: Any ) => > +target.setShort(i,value.asInstanceOf[Short]) > + > +} > + } > +} > +true > + } > + case _ => false > +} > + > this > } > > override def apply(input: InternalRow): InternalRow = { > var i = 0 > while (i < exprArray.length) { > - mutableRow(i) = exprArray(i).eval(input) > + if(targetUnsafe) { > +setters(i)(mutableRow.asInstanceOf[UnsafeRow], > exprArray(i).eval(input)) > + }else { > +mutableRow(i) = exprArray(i).eval(input) > + } > i += 1 > } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-13116) TungstenAggregate though it is supposedly capable of all processing unsafe & safe rows, fails if the input is safe rows
[ https://issues.apache.org/jira/browse/SPARK-13116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-13116: Assignee: (was: Apache Spark) > TungstenAggregate though it is supposedly capable of all processing unsafe & > safe rows, fails if the input is safe rows > --- > > Key: SPARK-13116 > URL: https://issues.apache.org/jira/browse/SPARK-13116 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Asif Hussain Shahid > > TungstenAggregate though it is supposedly capable of all processing unsafe & > safe rows, fails if the input is safe rows. > If the input to TungstenAggregateIterator is a SafeRow, while the target is > an UnsafeRow , the current code will try to set the fields in the UnsafeRow > using the update method in UnSafeRow. > This method is called via TunsgtenAggregateIterator on the > InterpretedMutableProjection. The target row in the > InterpretedMutableProjection is an UnsafeRow, while the current row is a > SafeRow. > In the InterpretedMutableProjection's apply method, it invokes > mutableRow(i) = exprArray(i).eval(input) > Now for UnsafeRow, the update method throws UnsupportedOperationException. > The proposed fix I did for our forked branch , on the class > InterpretedProjection is: > + private var targetUnsafe = false > + type UnsafeSetter = (UnsafeRow, Any ) => Unit > + private var setters : Array[UnsafeSetter] = _ > private[this] val exprArray = expressions.toArray > private[this] var mutableRow: MutableRow = new > GenericMutableRow(exprArray.length) > def currentValue: InternalRow = mutableRow > > + > override def target(row: MutableRow): MutableProjection = { > mutableRow = row > +targetUnsafe = row match { > + case _:UnsafeRow =>{ > +if(setters == null) { > + setters = Array.ofDim[UnsafeSetter](exprArray.length) > + for(i <- 0 until exprArray.length) { > +setters(i) = exprArray(i).dataType match { > + case IntegerType => (target: UnsafeRow, value: Any ) => > +target.setInt(i,value.asInstanceOf[Int]) > + case LongType => (target: UnsafeRow, value: Any ) => > +target.setLong(i,value.asInstanceOf[Long]) > + case DoubleType => (target: UnsafeRow, value: Any ) => > +target.setDouble(i,value.asInstanceOf[Double]) > + case FloatType => (target: UnsafeRow, value: Any ) => > +target.setFloat(i,value.asInstanceOf[Float]) > + > + case NullType => (target: UnsafeRow, value: Any ) => > +target.setNullAt(i) > + > + case BooleanType => (target: UnsafeRow, value: Any ) => > +target.setBoolean(i,value.asInstanceOf[Boolean]) > + > + case ByteType => (target: UnsafeRow, value: Any ) => > +target.setByte(i,value.asInstanceOf[Byte]) > + case ShortType => (target: UnsafeRow, value: Any ) => > +target.setShort(i,value.asInstanceOf[Short]) > + > +} > + } > +} > +true > + } > + case _ => false > +} > + > this > } > > override def apply(input: InternalRow): InternalRow = { > var i = 0 > while (i < exprArray.length) { > - mutableRow(i) = exprArray(i).eval(input) > + if(targetUnsafe) { > +setters(i)(mutableRow.asInstanceOf[UnsafeRow], > exprArray(i).eval(input)) > + }else { > +mutableRow(i) = exprArray(i).eval(input) > + } > i += 1 > } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org