[ https://issues.apache.org/jira/browse/SPARK-38059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17485550#comment-17485550 ]
Vu Tan commented on SPARK-38059: -------------------------------- I ran your above Java app and got the below result (on spark 3.2.0) {code:java} 0,6 0,7 1,6 1,7 {code} So I think it is working as expected. > Incorrect query ordering with flatMap() and distinct() > ------------------------------------------------------ > > Key: SPARK-38059 > URL: https://issues.apache.org/jira/browse/SPARK-38059 > Project: Spark > Issue Type: Bug > Components: Optimizer > Affects Versions: 3.0.2, 3.2.0 > Reporter: AJ Bousquet > Priority: Major > > I have a Dataset of non-unique identifiers that I can use with > {{Dataset::flatMap()}} to create multiple rows with sub-identifiers for each > id. When I run the code below, the {{limit(2)}} call is placed _after_ the > call to {{flatMap()}} in the optimized logical plan. This unexpectedly yields > only 2 rows, when I would expect it to yield 6. > {code:java} > StructType idSchema = > DataTypes.createStructType(List.of(DataTypes.createStructField("id", > DataTypes.LongType, false))); > StructType flatMapSchema = DataTypes.createStructType(List.of( > DataTypes.createStructField("id", DataTypes.LongType, false), > DataTypes.createStructField("subId", DataTypes.LongType, false) > ));Dataset<Row> inputDataset = context.sparkSession().createDataset( > LongStream.range(0,5).mapToObj((id) -> > RowFactory.create(id)).collect(Collectors.toList()), > RowEncoder.apply(idSchema) > ); > return inputDataset > .distinct() > .limit(2) > .flatMap((Row row) -> { > Long id = row.getLong(row.fieldIndex("id")); return > LongStream.range(6,8).mapToObj((subid) -> RowFactory.create(id, > subid)).iterator(); > }, RowEncoder.apply(flatMapSchema)); {code} > When run, the above code produces something like: > ||id||subID|| > |0|6| > |0|7| > But I would expect something like: > ||id||subID|| > |1|6| > |1|7| > |1|8| > |0|6| > |0|7| > |0|8| -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org