Ah, I missed that bit of documentation my bad :). That totally explains the behavior!
Thanks a lot! ------- Regards, Andy On Sat, Jan 7, 2017 at 10:11 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote: > > Hi Andy, > > Thanks for sharing the code snippet. > > I am not sure if you miss something in the snippet, because some function > signature are not matched, e.g., > > @Override > public StructType bufferSchema() { > return new UserDefineType(schema, unboundedEncoder); > } > > > Maybe you define a class UserDefineType which extends StructType. > > Anyway, I noticed that in this line: > > data.add(unboundedEncoder.toRow(input)); > > If you read the comment of "toRow", you will find it says: > > Note that multiple calls to toRow are allowed to return the same actual > [[InternalRow]] object. Thus, the caller should copy the result before > making another call if required. > > I think it is why you get a list of the same entries. > > So you may need to change it to: > > data.add(unboundedEncoder.toRow(input).copy()); > > > > Andy Dang wrote > > Hi Liang-Chi, > > > > The snippet of code is below. If I bind the encoder early (the schema > > doesn't change throughout the execution), the final result is a list of > > the > > same entries. > > > > @RequiredArgsConstructor > > public class UDAF extends UserDefinedAggregateFunction { > > > > // Do not resolve and bind this expression encoder eagerly > > private final ExpressionEncoder > > <Row> > > unboundedEncoder; > > private final StructType schema; > > > > @Override > > public StructType inputSchema() { > > return schema; > > } > > > > @Override > > public StructType bufferSchema() { > > return new UserDefineType(schema, unboundedEncoder); > > } > > > > @Override > > public DataType dataType() { > > return DataTypes.createArrayType(schema); > > } > > > > @Override > > public void initialize(MutableAggregationBuffer buffer) { > > buffer.update(0, new InternalRow[0]); > > } > > > > @Override > > public void update(MutableAggregationBuffer buffer, Row input) { > > UserDefineType data = buffer.getAs(0); > > > > data.add(unboundedEncoder.toRow(input)); > > > > buffer.update(0, data); > > } > > > > @Override > > public void merge(MutableAggregationBuffer buffer1, Row buffer2) { > > // merge > > buffer1.update(0, data1); > > } > > > > @Override > > public Object evaluate(Row buffer) { > > UserDefineType data = buffer.getAs(0); > > > > // need to return Row here instead of Internal Row > > return data.rows(); > > } > > > > static ExpressionEncoder > > <Row> > > resolveAndBind(ExpressionEncoder > > <Row> > > encoder) { > > val attributes = > > JavaConversions.asJavaCollection(encoder.schema().toAttributes()). > stream().map(Attribute::toAttribute).collect(Collectors.toList()); > > return encoder.resolveAndBind(ScalaUtils.scalaSeq(attributes), > > SimpleAnalyzer$.MODULE$); > > } > > } > > > > // Wrap around a list of InternalRow > > class TopKDataType extends UserDefinedType > > <TopKDataType> > > { > > private final ExpressionEncoder > > <Row> > > unboundedEncoder; > > private final List > > <InternalRow> > > data; > > > > public Row[] rows() { > > val encoder = resolveAndBind(this.unboundedEncoder); > > > > return data.stream().map(encoder::fromRow).toArray(Row[]::new); > > } > > } > > > > ------- > > Regards, > > Andy > > > > On Fri, Jan 6, 2017 at 3:48 AM, Liang-Chi Hsieh < > > > viirya@ > > > > wrote: > > > >> > >> Can you show how you use the encoder in your UDAF? > >> > >> > >> Andy Dang wrote > >> > One more question about the behavior of ExpressionEncoder > >> > > > <Row> > >> > . > >> > > >> > I have a UDAF that has ExpressionEncoder > >> > > > <Row> > >> > as a member variable. > >> > > >> > However, if call resolveAndBind() eagerly on this encoder, it appears > >> to > >> > break the UDAF. Bascially somehow the deserialized row are all the > same > >> > during the merge step. Is it the expected behavior of Encoders? > >> > > >> > ------- > >> > Regards, > >> > Andy > >> > > >> > On Thu, Jan 5, 2017 at 10:55 AM, Andy Dang < > >> > >> > namd88@ > >> > >> > > wrote: > >> > > >> >> Perfect. The API in Java is bit clumsy though > >> >> > >> >> What I ended up doing in Java (the val is from lombok, if anyone's > >> >> wondering): > >> >> val attributes = JavaConversions.asJavaCollection(schema. > >> >> toAttributes()).stream().map(Attribute::toAttribute). > >> >> collect(Collectors.toList()); > >> >> val encoder = > >> >> RowEncoder.apply(schema).resolveAndBind(ScalaUtils. > >> scalaSeq(attributes), > >> >> SimpleAnalyzer$.MODULE$); > >> >> > >> >> > >> >> ------- > >> >> Regards, > >> >> Andy > >> >> > >> >> On Thu, Jan 5, 2017 at 2:53 AM, Liang-Chi Hsieh < > >> > >> > viirya@ > >> > >> > > wrote: > >> >> > >> >>> > >> >>> You need to resolve and bind the encoder. > >> >>> > >> >>> ExpressionEncoder > >> > > > <Row> > >> > enconder = RowEncoder.apply(struct).resol > >> >>> veAndBind(); > >> >>> > >> >>> > >> >>> Andy Dang wrote > >> >>> > Hi all, > >> >>> > (cc-ing dev since I've hit some developer API corner) > >> >>> > > >> >>> > What's the best way to convert an InternalRow to a Row if I've got > >> an > >> >>> > InternalRow and the corresponding Schema. > >> >>> > > >> >>> > Code snippet: > >> >>> > @Test > >> >>> > public void foo() throws Exception { > >> >>> > Row row = RowFactory.create(1); > >> >>> > StructType struct = new StructType().add("id", > >> >>> > DataTypes.IntegerType); > >> >>> > ExpressionEncoder > >> >>> > > >> > > > <Row> > >> >>> > enconder = RowEncoder.apply(struct); > >> >>> > InternalRow internalRow = enconder.toRow(row); > >> >>> > System.out.println("Internal row size: " + > >> >>> > internalRow.numFields()); > >> >>> > Row roundTrip = enconder.fromRow(internalRow); > >> >>> > System.out.println("Round trip: " + roundTrip.size()); > >> >>> > } > >> >>> > > >> >>> > The code fails at the line encoder.fromRow() with the exception: > >> >>> >> Caused by: java.lang.UnsupportedOperationException: Cannot > >> evaluate > >> >>> > expression: getcolumnbyordinal(0, IntegerType) > >> >>> > > >> >>> > ------- > >> >>> > Regards, > >> >>> > Andy > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> ----- > >> >>> Liang-Chi Hsieh | @viirya > >> >>> Spark Technology Center > >> >>> http://www.spark.tc/ > >> >>> -- > >> >>> View this message in context: http://apache-spark-developers > >> >>> -list.1001551.n3.nabble.com/Converting-an-InternalRow-to- > >> >>> a-Row-tp20460p20465.html > >> >>> Sent from the Apache Spark Developers List mailing list archive at > >> >>> Nabble.com. > >> >>> > >> >>> ------------------------------------------------------------ > --------- > >> >>> To unsubscribe e-mail: > >> > >> > dev-unsubscribe@.apache > >> > >> >>> > >> >>> > >> >> > >> > >> > >> > >> > >> > >> ----- > >> Liang-Chi Hsieh | @viirya > >> Spark Technology Center > >> http://www.spark.tc/ > >> -- > >> View this message in context: http://apache-spark- > >> developers-list.1001551.n3.nabble.com/Converting-an- > InternalRow-to-a-Row- > >> tp20460p20487.html > >> Sent from the Apache Spark Developers List mailing list archive at > >> Nabble.com. > >> > >> --------------------------------------------------------------------- > >> To unsubscribe e-mail: > > > dev-unsubscribe@.apache > > >> > >> > > > > > > ----- > Liang-Chi Hsieh | @viirya > Spark Technology Center > http://www.spark.tc/ > -- > View this message in context: http://apache-spark- > developers-list.1001551.n3.nabble.com/Converting-an-InternalRow-to-a-Row- > tp20460p20506.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >