HeartSaVioR commented on code in PR #38683: URL: https://github.com/apache/spark/pull/38683#discussion_r1027509435
########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala: ########## @@ -600,7 +600,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession { val df2 = spark.read.format("json") .load(dir.getCanonicalPath + "/target/new-streaming-data-join") // Verify self-join results - assert(streamQuery2.lastProgress.numInputRows == 4L) + assert(streamQuery2.lastProgress.numInputRows == 2L) Review Comment: Off-topic: this is very interesting. Looks like fixing this "enables" ReusedExchange, which somehow makes ProgressReporter pick up the metric from the single leaf node instead of two. > Before the fix ``` == Parsed Logical Plan == WriteToMicroBatchDataSourceV1 FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join], 77baa2ac-cc0b-4e01-94ff-ec20c98eb29b, [checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join, path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join], Append, 0 +- Project [name#2339, age#2340, info#2341, _metadata#2345] +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND (info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507)) :- Project [name#2339, age#2340, info#2341, _metadata#2345] : +- Project [_metadata#2345, name#2339, age#2340, info#2341, _metadata#2345] : +- Project [name#2517 AS name#2339, age#2518 AS age#2340, info#2519 AS info#2341, _metadata#2529 AS _metadata#2345] : +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json +- Project [name#2504, age#2505, info#2506, _metadata#2507] +- Project [_metadata#2507, name#2504, age#2505, info#2506, _metadata#2507] +- Project [name#2523 AS name#2504, age#2524 AS age#2505, info#2525 AS info#2506, _metadata#2530 AS _metadata#2507] +- Relation [name#2523,age#2524,info#2525,_metadata#2530] json == Analyzed Logical Plan == name: string, age: int, info: struct<id:bigint,university:string>, _metadata: struct<file_path:string,file_name:string,file_size:bigint,file_modification_time:timestamp> WriteToMicroBatchDataSourceV1 FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join], 77baa2ac-cc0b-4e01-94ff-ec20c98eb29b, [checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join, path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join], Append, 0 +- Project [name#2339, age#2340, info#2341, _metadata#2345] +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND (info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507)) :- Project [name#2339, age#2340, info#2341, _metadata#2345] : +- Project [_metadata#2345, name#2339, age#2340, info#2341, _metadata#2345] : +- Project [name#2517 AS name#2339, age#2518 AS age#2340, info#2519 AS info#2341, _metadata#2529 AS _metadata#2345] : +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json +- Project [name#2504, age#2505, info#2506, _metadata#2507] +- Project [_metadata#2507, name#2504, age#2505, info#2506, _metadata#2507] +- Project [name#2523 AS name#2504, age#2524 AS age#2505, info#2525 AS info#2506, _metadata#2530 AS _metadata#2507] +- Relation [name#2523,age#2524,info#2525,_metadata#2530] json == Optimized Logical Plan == Project [name#2517, age#2518, info#2519, _metadata#2529] +- Join Inner, ((((name#2517 = name#2523) AND (age#2518 = age#2524)) AND (info#2519 = info#2525)) AND (_metadata#2529 = _metadata#2530)) :- Filter (((isnotnull(name#2517) AND isnotnull(age#2518)) AND isnotnull(info#2519)) AND isnotnull(_metadata#2529)) : +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json +- Filter (((isnotnull(name#2523) AND isnotnull(age#2524)) AND isnotnull(info#2525)) AND isnotnull(_metadata#2530)) +- Relation [name#2523,age#2524,info#2525,_metadata#2530] json == Physical Plan == *(3) Project [name#2517, age#2518, info#2519, _metadata#2529] +- StreamingSymmetricHashJoin [name#2517, age#2518, info#2519, _metadata#2529], [name#2523, age#2524, info#2525, _metadata#2530], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join/state, runId = b3233731-bee2-478f-9774-3322b2f88110, opId = 0, ver = 0, numPartitions = 5], 0, 0, state cleanup [ left = null, right = null ], 2 :- Exchange hashpartitioning(name#2517, age#2518, info#2519, _metadata#2529, 5), ENSURE_REQUIREMENTS, [plan_id=2637] : +- *(1) Filter (((isnotnull(name#2517) AND isnotnull(age#2518)) AND isnotnull(info#2519)) AND isnotnull(_metadata#2529)) : +- *(1) Project [name#2517, age#2518, info#2519, named_struct(file_path, file_path#2533, file_name, file_name#2534, file_size, file_size#2535L, file_modification_time, file_modification_time#2536) AS _metadata#2529] : +- FileScan json [name#2517,age#2518,info#2519,file_path#2533,file_name#2534,file_size#2535L,file_modification_time#2536] Batched: false, DataFilters: [isnotnull(name#2517), isnotnull(age#2518), isnotnull(info#2519), isnotnull(_metadata#2529)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b..., PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age), IsNotNull(info)], ReadSchema: struct<name:string,age:int,info:struct<id:bigint,university:string>> +- Exchange hashpartitioning(name#2523, age#2524, info#2525, _metadata#2530, 5), ENSURE_REQUIREMENTS, [plan_id=2642] +- *(2) Filter (((isnotnull(name#2523) AND isnotnull(age#2524)) AND isnotnull(info#2525)) AND isnotnull(_metadata#2530)) +- *(2) Project [name#2523, age#2524, info#2525, named_struct(file_path, file_path#2537, file_name, file_name#2538, file_size, file_size#2539L, file_modification_time, file_modification_time#2540) AS _metadata#2530] +- FileScan json [name#2523,age#2524,info#2525,file_path#2537,file_name#2538,file_size#2539L,file_modification_time#2540] Batched: false, DataFilters: [isnotnull(name#2523), isnotnull(age#2524), isnotnull(info#2525), isnotnull(_metadata#2530)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b..., PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age), IsNotNull(info)], ReadSchema: struct<name:string,age:int,info:struct<id:bigint,university:string>> ``` > After the fix ``` == Parsed Logical Plan == WriteToMicroBatchDataSourceV1 FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join], d8c57232-267e-436b-ad82-4cf8b7f4849b, [checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join, path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join], Append, 0 +- Project [name#2339, age#2340, info#2341, _metadata#2345] +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND (info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507)) :- Project [name#2339, age#2340, info#2341, _metadata#2345] : +- Project [_metadata#2345, name#2339, age#2340, info#2341, _metadata#2345] : +- Project [name#2523 AS name#2339, age#2524 AS age#2340, info#2525 AS info#2341, _metadata#2529 AS _metadata#2345] : +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json +- Project [name#2504, age#2505, info#2506, _metadata#2507] +- Project [_metadata#2507, name#2504, age#2505, info#2506, _metadata#2507] +- Project [name#2517 AS name#2504, age#2518 AS age#2505, info#2519 AS info#2506, _metadata#2530 AS _metadata#2507] +- Relation [name#2517,age#2518,info#2519,_metadata#2530] json == Analyzed Logical Plan == name: string, age: int, info: struct<id:bigint,university:string>, _metadata: struct<file_path:string,file_name:string,file_size:bigint,file_modification_time:timestamp> WriteToMicroBatchDataSourceV1 FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join], d8c57232-267e-436b-ad82-4cf8b7f4849b, [checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join, path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join], Append, 0 +- Project [name#2339, age#2340, info#2341, _metadata#2345] +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND (info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507)) :- Project [name#2339, age#2340, info#2341, _metadata#2345] : +- Project [_metadata#2345, name#2339, age#2340, info#2341, _metadata#2345] : +- Project [name#2523 AS name#2339, age#2524 AS age#2340, info#2525 AS info#2341, _metadata#2529 AS _metadata#2345] : +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json +- Project [name#2504, age#2505, info#2506, _metadata#2507] +- Project [_metadata#2507, name#2504, age#2505, info#2506, _metadata#2507] +- Project [name#2517 AS name#2504, age#2518 AS age#2505, info#2519 AS info#2506, _metadata#2530 AS _metadata#2507] +- Relation [name#2517,age#2518,info#2519,_metadata#2530] json == Optimized Logical Plan == Project [name#2523, age#2524, info#2525, _metadata#2529] +- Join Inner, ((((name#2523 = name#2517) AND (age#2524 = age#2518)) AND (info#2525 = info#2519)) AND (_metadata#2529 = _metadata#2530)) :- Filter ((isnotnull(name#2523) AND isnotnull(age#2524)) AND isnotnull(info#2525)) : +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json +- Filter ((isnotnull(name#2517) AND isnotnull(age#2518)) AND isnotnull(info#2519)) +- Relation [name#2517,age#2518,info#2519,_metadata#2530] json == Physical Plan == *(3) Project [name#2523, age#2524, info#2525, _metadata#2529] +- StreamingSymmetricHashJoin [name#2523, age#2524, info#2525, _metadata#2529], [name#2517, age#2518, info#2519, _metadata#2530], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join/state, runId = 649e748e-fc6d-42c0-9acd-babc7809c621, opId = 0, ver = 0, numPartitions = 5], 0, 0, state cleanup [ left = null, right = null ], 2 :- Exchange hashpartitioning(name#2523, age#2524, info#2525, _metadata#2529, 5), ENSURE_REQUIREMENTS, [plan_id=2637] : +- *(1) Filter ((isnotnull(name#2523) AND isnotnull(age#2524)) AND isnotnull(info#2525)) : +- *(1) Project [name#2523, age#2524, info#2525, knownnotnull(named_struct(file_path, file_path#2533, file_name, file_name#2534, file_size, file_size#2535L, file_modification_time, file_modification_time#2536)) AS _metadata#2529] : +- FileScan json [name#2523,age#2524,info#2525,file_path#2533,file_name#2534,file_size#2535L,file_modification_time#2536] Batched: false, DataFilters: [isnotnull(name#2523), isnotnull(age#2524), isnotnull(info#2525)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a..., PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age), IsNotNull(info)], ReadSchema: struct<name:string,age:int,info:struct<id:bigint,university:string>> +- ReusedExchange [name#2517, age#2518, info#2519, _metadata#2530], Exchange hashpartitioning(name#2523, age#2524, info#2525, _metadata#2529, 5), ENSURE_REQUIREMENTS, [plan_id=2637] ``` This is definitely an "improvement", but it also shows us the way we collect metrics with DSv1 in microbatch can be also affected by physical planning along with optimization as well. It has been a sort of fragile. Anyway, even if this happens with DSv2, the number of input rows would have been counted once, so I'd consider this as "correct". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org