AJ Bousquet created SPARK-38059: ----------------------------------- Summary: Incorrect query ordering with flatMap() and distinct() Key: SPARK-38059 URL: https://issues.apache.org/jira/browse/SPARK-38059 Project: Spark Issue Type: Bug Components: Optimizer Affects Versions: 3.0.2 Reporter: AJ Bousquet
I have a Dataset of non-unique identifiers that I can use with {{Dataset::flatMap()}} to create multiple rows with sub-identifiers for each id. When I run the code below, the {{limit(2)}} call is placed _after_ the call to {{flatMap()}} in the optimized logical plan. This unexpectedly yields only 2 rows, when I would expect it to yield 6. {code:java} StructType idSchema = DataTypes.createStructType(List.of(DataTypes.createStructField("id", DataTypes.LongType, false))); StructType flatMapSchema = DataTypes.createStructType(List.of( DataTypes.createStructField("id", DataTypes.LongType, false), DataTypes.createStructField("subId", DataTypes.LongType, false) ));Dataset<Row> inputDataset = context.sparkSession().createDataset( LongStream.range(0,5).mapToObj((id) -> RowFactory.create(id)).collect(Collectors.toList()), RowEncoder.apply(idSchema) ); return inputDataset .distinct() .limit(2) .flatMap((Row row) -> { Long id = row.getLong(row.fieldIndex("id")); return LongStream.range(6,8).mapToObj((subid) -> RowFactory.create(id, subid)).iterator(); }, RowEncoder.apply(flatMapSchema)); {code} When run, the above code produces something like: ||id||subID|| |0|6| |0|7| But I would expect something like: ||id||subID|| |1|6| |1|7| |1|8| |0|6| |0|7| |0|8| -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org