AJ Bousquet created SPARK-38059:
-----------------------------------

             Summary: Incorrect query ordering with flatMap() and distinct()
                 Key: SPARK-38059
                 URL: https://issues.apache.org/jira/browse/SPARK-38059
             Project: Spark
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 3.0.2
            Reporter: AJ Bousquet


I have a Dataset of non-unique identifiers that I can use with 
{{Dataset::flatMap()}} to create multiple rows with sub-identifiers for each 
id. When I run the code below, the {{limit(2)}} call is placed _after_ the call 
to {{flatMap()}} in the optimized logical plan. This unexpectedly yields only 2 
rows, when I would expect it to yield 6.

{code:java}
StructType idSchema = 
DataTypes.createStructType(List.of(DataTypes.createStructField("id", 
DataTypes.LongType, false)));
StructType flatMapSchema = DataTypes.createStructType(List.of(
    DataTypes.createStructField("id", DataTypes.LongType, false),
    DataTypes.createStructField("subId", DataTypes.LongType, false)
));Dataset<Row> inputDataset = context.sparkSession().createDataset(
    LongStream.range(0,5).mapToObj((id) -> 
RowFactory.create(id)).collect(Collectors.toList()),
    RowEncoder.apply(idSchema)
);
return inputDataset
    .distinct()
    .limit(2)
    .flatMap((Row row) -> {
        Long id = row.getLong(row.fieldIndex("id"));        return 
LongStream.range(6,8).mapToObj((subid) -> RowFactory.create(id, 
subid)).iterator();
    }, RowEncoder.apply(flatMapSchema));  {code}
When run, the above code produces something like:
||id||subID||
|0|6|
|0|7|

But I would expect something like:
||id||subID||
|1|6|
|1|7|
|1|8|
|0|6|
|0|7|
|0|8|



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to