HeartSaVioR commented on code in PR #38683:
URL: https://github.com/apache/spark/pull/38683#discussion_r1027509435


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -600,7 +600,7 @@ class FileMetadataStructSuite extends QueryTest with 
SharedSparkSession {
       val df2 = spark.read.format("json")
         .load(dir.getCanonicalPath + "/target/new-streaming-data-join")
       // Verify self-join results
-      assert(streamQuery2.lastProgress.numInputRows == 4L)
+      assert(streamQuery2.lastProgress.numInputRows == 2L)

Review Comment:
   Off-topic: this is very interesting. Looks like fixing this "enables" 
ReusedExchange, which somehow makes ProgressReporter pick up the metric from 
the single leaf node instead of two.
   
   > Before the fix
   
   ```
   == Parsed Logical Plan ==
   WriteToMicroBatchDataSourceV1 
FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join],
 77baa2ac-cc0b-4e01-94ff-ec20c98eb29b, 
[checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join,
 
path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join],
 Append, 0
   +- Project [name#2339, age#2340, info#2341, _metadata#2345]
      +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND 
(info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
         :- Project [name#2339, age#2340, info#2341, _metadata#2345]
         :  +- Project [_metadata#2345, name#2339, age#2340, info#2341, 
_metadata#2345]
         :     +- Project [name#2517 AS name#2339, age#2518 AS age#2340, 
info#2519 AS info#2341, _metadata#2529 AS _metadata#2345]
         :        +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json
         +- Project [name#2504, age#2505, info#2506, _metadata#2507]
            +- Project [_metadata#2507, name#2504, age#2505, info#2506, 
_metadata#2507]
               +- Project [name#2523 AS name#2504, age#2524 AS age#2505, 
info#2525 AS info#2506, _metadata#2530 AS _metadata#2507]
                  +- Relation [name#2523,age#2524,info#2525,_metadata#2530] json
   
   == Analyzed Logical Plan ==
   name: string, age: int, info: struct<id:bigint,university:string>, 
_metadata: 
struct<file_path:string,file_name:string,file_size:bigint,file_modification_time:timestamp>
   WriteToMicroBatchDataSourceV1 
FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join],
 77baa2ac-cc0b-4e01-94ff-ec20c98eb29b, 
[checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join,
 
path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/new-streaming-data-join],
 Append, 0
   +- Project [name#2339, age#2340, info#2341, _metadata#2345]
      +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND 
(info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
         :- Project [name#2339, age#2340, info#2341, _metadata#2345]
         :  +- Project [_metadata#2345, name#2339, age#2340, info#2341, 
_metadata#2345]
         :     +- Project [name#2517 AS name#2339, age#2518 AS age#2340, 
info#2519 AS info#2341, _metadata#2529 AS _metadata#2345]
         :        +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json
         +- Project [name#2504, age#2505, info#2506, _metadata#2507]
            +- Project [_metadata#2507, name#2504, age#2505, info#2506, 
_metadata#2507]
               +- Project [name#2523 AS name#2504, age#2524 AS age#2505, 
info#2525 AS info#2506, _metadata#2530 AS _metadata#2507]
                  +- Relation [name#2523,age#2524,info#2525,_metadata#2530] json
   
   == Optimized Logical Plan ==
   Project [name#2517, age#2518, info#2519, _metadata#2529]
   +- Join Inner, ((((name#2517 = name#2523) AND (age#2518 = age#2524)) AND 
(info#2519 = info#2525)) AND (_metadata#2529 = _metadata#2530))
      :- Filter (((isnotnull(name#2517) AND isnotnull(age#2518)) AND 
isnotnull(info#2519)) AND isnotnull(_metadata#2529))
      :  +- Relation [name#2517,age#2518,info#2519,_metadata#2529] json
      +- Filter (((isnotnull(name#2523) AND isnotnull(age#2524)) AND 
isnotnull(info#2525)) AND isnotnull(_metadata#2530))
         +- Relation [name#2523,age#2524,info#2525,_metadata#2530] json
   
   == Physical Plan ==
   *(3) Project [name#2517, age#2518, info#2519, _metadata#2529]
   +- StreamingSymmetricHashJoin [name#2517, age#2518, info#2519, 
_metadata#2529], [name#2523, age#2524, info#2525, _metadata#2530], Inner, 
condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], 
state info [ checkpoint = 
file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b56e426-a39c-4668-a51f-19bce04c2dd8/target/checkpoint_join/state,
 runId = b3233731-bee2-478f-9774-3322b2f88110, opId = 0, ver = 0, numPartitions 
= 5], 0, 0, state cleanup [ left = null, right = null ], 2
      :- Exchange hashpartitioning(name#2517, age#2518, info#2519, 
_metadata#2529, 5), ENSURE_REQUIREMENTS, [plan_id=2637]
      :  +- *(1) Filter (((isnotnull(name#2517) AND isnotnull(age#2518)) AND 
isnotnull(info#2519)) AND isnotnull(_metadata#2529))
      :     +- *(1) Project [name#2517, age#2518, info#2519, 
named_struct(file_path, file_path#2533, file_name, file_name#2534, file_size, 
file_size#2535L, file_modification_time, file_modification_time#2536) AS 
_metadata#2529]
      :        +- FileScan json 
[name#2517,age#2518,info#2519,file_path#2533,file_name#2534,file_size#2535L,file_modification_time#2536]
 Batched: false, DataFilters: [isnotnull(name#2517), isnotnull(age#2518), 
isnotnull(info#2519), isnotnull(_metadata#2529)], Format: JSON, Location: 
InMemoryFileIndex(1 
paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b...,
 PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age), 
IsNotNull(info)], ReadSchema: 
struct<name:string,age:int,info:struct<id:bigint,university:string>>
      +- Exchange hashpartitioning(name#2523, age#2524, info#2525, 
_metadata#2530, 5), ENSURE_REQUIREMENTS, [plan_id=2642]
         +- *(2) Filter (((isnotnull(name#2523) AND isnotnull(age#2524)) AND 
isnotnull(info#2525)) AND isnotnull(_metadata#2530))
            +- *(2) Project [name#2523, age#2524, info#2525, 
named_struct(file_path, file_path#2537, file_name, file_name#2538, file_size, 
file_size#2539L, file_modification_time, file_modification_time#2540) AS 
_metadata#2530]
               +- FileScan json 
[name#2523,age#2524,info#2525,file_path#2537,file_name#2538,file_size#2539L,file_modification_time#2540]
 Batched: false, DataFilters: [isnotnull(name#2523), isnotnull(age#2524), 
isnotnull(info#2525), isnotnull(_metadata#2530)], Format: JSON, Location: 
InMemoryFileIndex(1 
paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-3b...,
 PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age), 
IsNotNull(info)], ReadSchema: 
struct<name:string,age:int,info:struct<id:bigint,university:string>>
   
   ```
   
   > After the fix
   
   ```
   == Parsed Logical Plan ==
   WriteToMicroBatchDataSourceV1 
FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join],
 d8c57232-267e-436b-ad82-4cf8b7f4849b, 
[checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join,
 
path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join],
 Append, 0
   +- Project [name#2339, age#2340, info#2341, _metadata#2345]
      +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND 
(info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
         :- Project [name#2339, age#2340, info#2341, _metadata#2345]
         :  +- Project [_metadata#2345, name#2339, age#2340, info#2341, 
_metadata#2345]
         :     +- Project [name#2523 AS name#2339, age#2524 AS age#2340, 
info#2525 AS info#2341, _metadata#2529 AS _metadata#2345]
         :        +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json
         +- Project [name#2504, age#2505, info#2506, _metadata#2507]
            +- Project [_metadata#2507, name#2504, age#2505, info#2506, 
_metadata#2507]
               +- Project [name#2517 AS name#2504, age#2518 AS age#2505, 
info#2519 AS info#2506, _metadata#2530 AS _metadata#2507]
                  +- Relation [name#2517,age#2518,info#2519,_metadata#2530] json
   
   == Analyzed Logical Plan ==
   name: string, age: int, info: struct<id:bigint,university:string>, 
_metadata: 
struct<file_path:string,file_name:string,file_size:bigint,file_modification_time:timestamp>
   WriteToMicroBatchDataSourceV1 
FileSink[/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join],
 d8c57232-267e-436b-ad82-4cf8b7f4849b, 
[checkpointLocation=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join,
 
path=/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/new-streaming-data-join],
 Append, 0
   +- Project [name#2339, age#2340, info#2341, _metadata#2345]
      +- Join Inner, ((((name#2339 = name#2504) AND (age#2340 = age#2505)) AND 
(info#2341 = info#2506)) AND (_metadata#2345 = _metadata#2507))
         :- Project [name#2339, age#2340, info#2341, _metadata#2345]
         :  +- Project [_metadata#2345, name#2339, age#2340, info#2341, 
_metadata#2345]
         :     +- Project [name#2523 AS name#2339, age#2524 AS age#2340, 
info#2525 AS info#2341, _metadata#2529 AS _metadata#2345]
         :        +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json
         +- Project [name#2504, age#2505, info#2506, _metadata#2507]
            +- Project [_metadata#2507, name#2504, age#2505, info#2506, 
_metadata#2507]
               +- Project [name#2517 AS name#2504, age#2518 AS age#2505, 
info#2519 AS info#2506, _metadata#2530 AS _metadata#2507]
                  +- Relation [name#2517,age#2518,info#2519,_metadata#2530] json
   
   == Optimized Logical Plan ==
   Project [name#2523, age#2524, info#2525, _metadata#2529]
   +- Join Inner, ((((name#2523 = name#2517) AND (age#2524 = age#2518)) AND 
(info#2525 = info#2519)) AND (_metadata#2529 = _metadata#2530))
      :- Filter ((isnotnull(name#2523) AND isnotnull(age#2524)) AND 
isnotnull(info#2525))
      :  +- Relation [name#2523,age#2524,info#2525,_metadata#2529] json
      +- Filter ((isnotnull(name#2517) AND isnotnull(age#2518)) AND 
isnotnull(info#2519))
         +- Relation [name#2517,age#2518,info#2519,_metadata#2530] json
   
   == Physical Plan ==
   *(3) Project [name#2523, age#2524, info#2525, _metadata#2529]
   +- StreamingSymmetricHashJoin [name#2523, age#2524, info#2525, 
_metadata#2529], [name#2517, age#2518, info#2519, _metadata#2530], Inner, 
condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], 
state info [ checkpoint = 
file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a5a5839-1a1c-4f13-9a1c-2bd8f65b16ce/target/checkpoint_join/state,
 runId = 649e748e-fc6d-42c0-9acd-babc7809c621, opId = 0, ver = 0, numPartitions 
= 5], 0, 0, state cleanup [ left = null, right = null ], 2
      :- Exchange hashpartitioning(name#2523, age#2524, info#2525, 
_metadata#2529, 5), ENSURE_REQUIREMENTS, [plan_id=2637]
      :  +- *(1) Filter ((isnotnull(name#2523) AND isnotnull(age#2524)) AND 
isnotnull(info#2525))
      :     +- *(1) Project [name#2523, age#2524, info#2525, 
knownnotnull(named_struct(file_path, file_path#2533, file_name, file_name#2534, 
file_size, file_size#2535L, file_modification_time, 
file_modification_time#2536)) AS _metadata#2529]
      :        +- FileScan json 
[name#2523,age#2524,info#2525,file_path#2533,file_name#2534,file_size#2535L,file_modification_time#2536]
 Batched: false, DataFilters: [isnotnull(name#2523), isnotnull(age#2524), 
isnotnull(info#2525)], Format: JSON, Location: InMemoryFileIndex(1 
paths)[file:/private/var/folders/r0/34w92ww91n3_5htjqqx4_lrh0000gp/T/spark-1a...,
 PartitionFilters: [], PushedFilters: [IsNotNull(name), IsNotNull(age), 
IsNotNull(info)], ReadSchema: 
struct<name:string,age:int,info:struct<id:bigint,university:string>>
      +- ReusedExchange [name#2517, age#2518, info#2519, _metadata#2530], 
Exchange hashpartitioning(name#2523, age#2524, info#2525, _metadata#2529, 5), 
ENSURE_REQUIREMENTS, [plan_id=2637]
   ```
   
   This is definitely an "improvement", but it also shows us the way we collect 
metrics with DSv1 in microbatch can be also affected by physical planning along 
with optimization as well. It has been a sort of fragile.
   
   Anyway, even if this happens with DSv2, the number of input rows would have 
been counted once, so I'd consider this as "correct".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to