[jira] [Updated] (SPARK-40608) `V2ScanRelationPushDown` supports extracting predicates in the output set
[ https://issues.apache.org/jira/browse/SPARK-40608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wang-zhun updated SPARK-40608: -- Description: Currently does not push down `(ID = 20220921) OR (ID = 20220910)`, if `ID` is a partition field, it makes sense to implement push down {code:java} SELECT * FROM h2.test.people WHERE ID = 20220921 OR (ID =20220910 and split(NAME, ',')[0] == 'fred') {code} h3. before {code:java} == Analyzed Logical Plan == NAME: string, ID: int Project [NAME#5, ID#6] +- Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- SubqueryAlias h2.test.people +- RelationV2[NAME#5, ID#6] h2.test.people test.people == Optimized Logical Plan == Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- RelationV2[NAME#5, ID#6] test.people == Physical Plan == *(1) Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- *(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@56c42964 [NAME#5,ID#6] PushedFilters: [], ReadSchema: struct {code} h3. after {code:java} == Analyzed Logical Plan == NAME: string, ID: int Project [NAME#5, ID#6] +- Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- SubqueryAlias h2.test.people +- RelationV2[NAME#5, ID#6] h2.test.people test.people == Optimized Logical Plan == Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- RelationV2[NAME#5, ID#6] test.people == Physical Plan == *(1) Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- *(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@56c42964 [NAME#5,ID#6] PushedFilters: [(ID = 20220921) OR (ID = 20220910)], ReadSchema: struct{code} was: Currently it is not possible to push down `(ID = 20220921) OR (ID = 20220910)`, if `ID` is a partition field, it makes sense to implement push down {code:java} SELECT * FROM h2.test.people WHERE ID = 20220921 OR (ID =20220910 and split(NAME, ',')[0] == 'fred') {code} h3. before {code:java} == Analyzed Logical Plan == NAME: string, ID: int Project [NAME#5, ID#6] +- Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- SubqueryAlias h2.test.people +- RelationV2[NAME#5, ID#6] h2.test.people test.people == Optimized Logical Plan == Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- RelationV2[NAME#5, ID#6] test.people == Physical Plan == *(1) Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- *(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@56c42964 [NAME#5,ID#6] PushedFilters: [], ReadSchema: struct {code} h3. after {code:java} == Analyzed Logical Plan == NAME: string, ID: int Project [NAME#5, ID#6] +- Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- SubqueryAlias h2.test.people +- RelationV2[NAME#5, ID#6] h2.test.people test.people == Optimized Logical Plan == Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- RelationV2[NAME#5, ID#6] test.people == Physical Plan == *(1) Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- *(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@56c42964 [NAME#5,ID#6] PushedFilters: [(ID = 20220921) OR (ID = 20220910)], ReadSchema: struct{code} > `V2ScanRelationPushDown` supports extracting predicates in the output set > - > > Key: SPARK-40608 > URL: https://issues.apache.org/jira/browse/SPARK-40608 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.0 >Reporter: wang-zhun >Priority: Major > > Currently does not push down `(ID = 20220921) OR (ID = 20220910)`, if `ID` is > a partition field, it makes sense to implement push down > {code:java} > SELECT * FROM h2.test.people WHERE ID = 20220921 OR (ID =20220910 and > split(NAME, ',')[0] == 'fred') {code} > h3. before > {code:java} > == Analyzed Logical Plan == > NAME: string, ID: int > Project [NAME#5, ID#6] > +- Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, > -1)[0] = fred))) > +- SubqueryAlias h2.test.people > +- RelationV2[NAME#5, ID#6] h2.test.people test.people > == Optimized Logical Plan == > Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] > = fred))) > +- RelationV2[NAME#5, ID#6] test.people > == Physical Plan == > *(1) Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, > -1)[0] = fred))) > +- *(1) Scan > org.apach
[jira] [Created] (SPARK-40608) `V2ScanRelationPushDown` supports extracting predicates in the output set
wang-zhun created SPARK-40608: - Summary: `V2ScanRelationPushDown` supports extracting predicates in the output set Key: SPARK-40608 URL: https://issues.apache.org/jira/browse/SPARK-40608 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.0 Reporter: wang-zhun Currently it is not possible to push down `(ID = 20220921) OR (ID = 20220910)`, if `ID` is a partition field, it makes sense to implement push down {code:java} SELECT * FROM h2.test.people WHERE ID = 20220921 OR (ID =20220910 and split(NAME, ',')[0] == 'fred') {code} h3. before {code:java} == Analyzed Logical Plan == NAME: string, ID: int Project [NAME#5, ID#6] +- Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- SubqueryAlias h2.test.people +- RelationV2[NAME#5, ID#6] h2.test.people test.people == Optimized Logical Plan == Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- RelationV2[NAME#5, ID#6] test.people == Physical Plan == *(1) Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- *(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@56c42964 [NAME#5,ID#6] PushedFilters: [], ReadSchema: struct {code} h3. after {code:java} == Analyzed Logical Plan == NAME: string, ID: int Project [NAME#5, ID#6] +- Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- SubqueryAlias h2.test.people +- RelationV2[NAME#5, ID#6] h2.test.people test.people == Optimized Logical Plan == Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- RelationV2[NAME#5, ID#6] test.people == Physical Plan == *(1) Filter ((ID#6 = 20220921) OR ((ID#6 = 20220910) AND (split(NAME#5, ,, -1)[0] = fred))) +- *(1) Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$1@56c42964 [NAME#5,ID#6] PushedFilters: [(ID = 20220921) OR (ID = 20220910)], ReadSchema: struct{code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-39921) SkewJoin--Stream side skew in BroadcastJoin
[ https://issues.apache.org/jira/browse/SPARK-39921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17575175#comment-17575175 ] wang-zhun commented on SPARK-39921: --- Yes, PR is in preparation > SkewJoin--Stream side skew in BroadcastJoin > --- > > Key: SPARK-39921 > URL: https://issues.apache.org/jira/browse/SPARK-39921 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.2 >Reporter: wang-zhun >Priority: Major > Attachments: 1.png, 2.png, 3.png, 4.png > > > There is data skew in BroadcastJoin. Since LocalShuffle cannot cover all > cases, we can refer to the solution of SortMergeJoin skew to consider solving > the data skew problem in BroadcastJoin. > h3. senairo > !1.png! > !2.png! > h3. Effect > !4.png! > !3.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39921) SkewJoin--Stream side skew in BroadcastJoin
[ https://issues.apache.org/jira/browse/SPARK-39921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wang-zhun updated SPARK-39921: -- Description: There is data skew in BroadcastJoin. Since LocalShuffle cannot cover all cases, we can refer to the solution of SortMergeJoin skew to consider solving the data skew problem in BroadcastJoin. h3. senairo !1.png! !2.png! h3. Effect !4.png! !3.png! was: There is data skew in BroadcastJoin. Since LocalShuffle cannot cover all cases, we can refer to the solution of SortMergeJoin skew to consider solving the data skew problem in BroadcastJoin. h3. senairo !https://git.n.xiaomi.com/computing/spark/uploads/ee0bf49e1c2a378592c6885c1a37a14c/image.png! h3. !image-2022-07-29-14-52-10-659.png! h3. Effect !image-2022-07-29-14-53-27-021.png! !image-2022-07-29-14-52-58-810.png! > SkewJoin--Stream side skew in BroadcastJoin > --- > > Key: SPARK-39921 > URL: https://issues.apache.org/jira/browse/SPARK-39921 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.2 >Reporter: wang-zhun >Priority: Major > Fix For: 3.1.2, 3.3.0 > > Attachments: 1.png, 2.png, 3.png, 4.png > > > There is data skew in BroadcastJoin. Since LocalShuffle cannot cover all > cases, we can refer to the solution of SortMergeJoin skew to consider solving > the data skew problem in BroadcastJoin. > h3. senairo > !1.png! > !2.png! > h3. Effect > !4.png! > !3.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-39921) SkewJoin--Stream side skew in BroadcastJoin
[ https://issues.apache.org/jira/browse/SPARK-39921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wang-zhun updated SPARK-39921: -- Attachment: 1.png 2.png 3.png 4.png > SkewJoin--Stream side skew in BroadcastJoin > --- > > Key: SPARK-39921 > URL: https://issues.apache.org/jira/browse/SPARK-39921 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.2 >Reporter: wang-zhun >Priority: Major > Fix For: 3.1.2, 3.3.0 > > Attachments: 1.png, 2.png, 3.png, 4.png > > > There is data skew in BroadcastJoin. Since LocalShuffle cannot cover all > cases, we can refer to the solution of SortMergeJoin skew to consider solving > the data skew problem in BroadcastJoin. > h3. senairo > !https://git.n.xiaomi.com/computing/spark/uploads/ee0bf49e1c2a378592c6885c1a37a14c/image.png! > h3. !image-2022-07-29-14-52-10-659.png! > h3. Effect > !image-2022-07-29-14-53-27-021.png! > !image-2022-07-29-14-52-58-810.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-39921) SkewJoin--Stream side skew in BroadcastJoin
wang-zhun created SPARK-39921: - Summary: SkewJoin--Stream side skew in BroadcastJoin Key: SPARK-39921 URL: https://issues.apache.org/jira/browse/SPARK-39921 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.2 Reporter: wang-zhun Fix For: 3.3.0, 3.1.2 There is data skew in BroadcastJoin. Since LocalShuffle cannot cover all cases, we can refer to the solution of SortMergeJoin skew to consider solving the data skew problem in BroadcastJoin. h3. senairo !https://git.n.xiaomi.com/computing/spark/uploads/ee0bf49e1c2a378592c6885c1a37a14c/image.png! h3. !image-2022-07-29-14-52-10-659.png! h3. Effect !image-2022-07-29-14-53-27-021.png! !image-2022-07-29-14-52-58-810.png! -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37595) DatasourceV2 `exists ... select *` column push down
[ https://issues.apache.org/jira/browse/SPARK-37595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wang-zhun updated SPARK-37595: -- Issue Type: Improvement (was: Wish) > DatasourceV2 `exists ... select *` column push down > --- > > Key: SPARK-37595 > URL: https://issues.apache.org/jira/browse/SPARK-37595 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.1.2, 3.2.0 >Reporter: wang-zhun >Priority: Major > > The datasourcev2 table is very slow when executing TPCDS, because `exists ... > select *` will not push down the cropped columns to the data source > > Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite` > {code:java} > test("datasourcev2 exists") { > val t1 = s"${catalogAndNamespace}t1" > withTable(t1) { > sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") > val t2 = s"${catalogAndNamespace}t2" > withTable(t2) { > sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") > val query = sql(s"select * from $t1 where not exists" + > s"(select * from $t2 where t1.col1=t2.col1)").queryExecution > // scalastyle:off println > println(query.executedPlan) > } > } > } > AdaptiveSparkPlan isFinalPlan=false > +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false > :- Project [col1#17, col2#18] > : +- BatchScan[col1#17, col2#18] class > org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan > RuntimeFilters: [] > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true]),false), [id=#28] > +- Project [col1#19] > +- BatchScan[col1#19, col2#20] class > org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan > RuntimeFilters: [] > Expectation is `BatchScan[col1#19] class > org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan > RuntimeFilters: []` {code} > Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown` > is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not > support `FileSourceStrategy` -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-37595) DatasourceV2 `exists ... select *` column push down
[ https://issues.apache.org/jira/browse/SPARK-37595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wang-zhun updated SPARK-37595: -- Description: The datasourcev2 table is very slow when executing TPCDS, because `exists ... select *` will not push down the cropped columns to the data source Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite` {code:java} test("datasourcev2 exists") { val t1 = s"${catalogAndNamespace}t1" withTable(t1) { sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") val t2 = s"${catalogAndNamespace}t2" withTable(t2) { sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") val query = sql(s"select * from $t1 where not exists" + s"(select * from $t2 where t1.col1=t2.col1)").queryExecution // scalastyle:off println println(query.executedPlan) } } } AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false :- Project [col1#17, col2#18] : +- BatchScan[col1#17, col2#18] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28] +- Project [col1#19] +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] Expectation is `BatchScan[col1#19] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []` {code} Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown` is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not support `FileSourceStrategy` was: The datasourcev2 table is very slow when executing TPCDS, because `exists ... select *` will not push down the cropped columns to the data source Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite` {code:java} test("datasourcev2 exists") { val t1 = s"${catalogAndNamespace}t1" withTable(t1) { sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") val t2 = s"${catalogAndNamespace}t2" withTable(t2) { sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") val query = sql(s"select * from $t1 where not exists" + s"(select * from $t2 where t1.col1=t2.col1)").queryExecution // scalastyle:off println println(query.executedPlan) } } }AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false :- Project [col1#17, col2#18] : +- BatchScan[col1#17, col2#18] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28] +- Project [col1#19] +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] Expectation is `BatchScan[col1#19] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []` {code} Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown` is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not support `FileSourceStrategy` > DatasourceV2 `exists ... select *` column push down > --- > > Key: SPARK-37595 > URL: https://issues.apache.org/jira/browse/SPARK-37595 > Project: Spark > Issue Type: Wish > Components: SQL >Affects Versions: 3.1.2, 3.2.0 >Reporter: wang-zhun >Priority: Major > > The datasourcev2 table is very slow when executing TPCDS, because `exists ... > select *` will not push down the cropped columns to the data source > > Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite` > {code:java} > test("datasourcev2 exists") { > val t1 = s"${catalogAndNamespace}t1" > withTable(t1) { > sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") > val t2 = s"${catalogAndNamespace}t2" > withTable(t2) { > sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") > val query = sql(s"select * from $t1 where not exists" + > s"(select * from $t2 where t1.col1=t2.col1)").queryExecution > // scalastyle:off println > println(query.executedPlan) > } > } > } > AdaptiveSparkPlan isFinalPlan=false > +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false > :- Project [col1#17, col2#18] > : +- BatchScan[col1#17, col2#18] class > org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan > RuntimeFilters: [] > +- BroadcastExchange HashedRelat
[jira] [Updated] (SPARK-37595) DatasourceV2 `exists ... select *` column push down
[ https://issues.apache.org/jira/browse/SPARK-37595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wang-zhun updated SPARK-37595: -- Description: The datasourcev2 table is very slow when executing TPCDS, because `exists ... select *` will not push down the cropped columns to the data source Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite` {code:java} test("datasourcev2 exists") { val t1 = s"${catalogAndNamespace}t1" withTable(t1) { sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") val t2 = s"${catalogAndNamespace}t2" withTable(t2) { sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") val query = sql(s"select * from $t1 where not exists" + s"(select * from $t2 where t1.col1=t2.col1)").queryExecution // scalastyle:off println println(query.executedPlan) } } }AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false :- Project [col1#17, col2#18] : +- BatchScan[col1#17, col2#18] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28] +- Project [col1#19] +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] Expectation is `BatchScan[col1#19] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []` {code} Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown` is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not support `FileSourceStrategy` was: datasourcev2表在执行TPCDS时很慢,因为`exists ... select *`不会将裁剪后的列下推到数据源 在 `org.apache.spark.sql.connector.DataSourceV2SQLSuite` 中添加测试 {code:java} {code} 测试(“datasourcev2 存在”){ {code:java} {code} val t1 = s"${catalogAndNamespace}t1" {code:java} {code} withTable(t1) { {code:java} {code} sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") {code:java} {code} val t2 = s"${catalogAndNamespace}t2" {code:java} {code} withTable(t2) { {code:java} {code} sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") {code:java} {code} val query = sql(s"select * from $t1 where not exist" + {code:java} {code} s"(select * from $t2 where t1.col1=t2.col1)").queryExecution {code:java} {code} // scalastyle:off println {code:java} {code} println(查询。执行计划) {code:java} {code} } {code:java} {code} } {code:java} {code} }AdaptiveSparkPlan isFinalPlan=false {code:java} {code} +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false {code:java} {code} :- 项目 [col1#17, col2#18] {code:java} {code} : +- BatchScan[col1#17, col2#18] 类 org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] {code:java} {code} +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28] {code:java} {code} +- 项目 [col1#19] {code:java} {code} +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] {code:java} {code} 期望是 `BatchScan[col1#19] class org.apache.spark.sql。 connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []` 原因`Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown`在`Batch("RewriteSubquery"`之前执行,并行datasourceV2不支持`FileSourceStrategy` > DatasourceV2 `exists ... select *` column push down > --- > > Key: SPARK-37595 > URL: https://issues.apache.org/jira/browse/SPARK-37595 > Project: Spark > Issue Type: Wish > Components: SQL >Affects Versions: 3.1.2, 3.2.0 >Reporter: wang-zhun >Priority: Major > > The datasourcev2 table is very slow when executing TPCDS, because `exists ... > select *` will not push down the cropped columns to the data source > > Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite` > {code:java} > test("datasourcev2 exists") { > val t1 = s"${catalogAndNamespace}t1" > withTable(t1) { > sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") > val t2 = s"${catalogAndNamespace}t2" > withTable(t2) { > sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") > val query = sql(s"select * from $t1 where not exists" + > s"(select * from $t2 where t1.col1=t2.col1)").queryExecution > // scalastyle:off println > println(query.executedPlan) > } > } > }AdaptiveSparkPlan isFinalPlan=false > +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, fal
[jira] [Updated] (SPARK-37595) DatasourceV2 `exists ... select *` column push down
[ https://issues.apache.org/jira/browse/SPARK-37595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] wang-zhun updated SPARK-37595: -- Description: datasourcev2表在执行TPCDS时很慢,因为`exists ... select *`不会将裁剪后的列下推到数据源 在 `org.apache.spark.sql.connector.DataSourceV2SQLSuite` 中添加测试 {code:java} {code} 测试(“datasourcev2 存在”){ {code:java} {code} val t1 = s"${catalogAndNamespace}t1" {code:java} {code} withTable(t1) { {code:java} {code} sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") {code:java} {code} val t2 = s"${catalogAndNamespace}t2" {code:java} {code} withTable(t2) { {code:java} {code} sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") {code:java} {code} val query = sql(s"select * from $t1 where not exist" + {code:java} {code} s"(select * from $t2 where t1.col1=t2.col1)").queryExecution {code:java} {code} // scalastyle:off println {code:java} {code} println(查询。执行计划) {code:java} {code} } {code:java} {code} } {code:java} {code} }AdaptiveSparkPlan isFinalPlan=false {code:java} {code} +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false {code:java} {code} :- 项目 [col1#17, col2#18] {code:java} {code} : +- BatchScan[col1#17, col2#18] 类 org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] {code:java} {code} +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28] {code:java} {code} +- 项目 [col1#19] {code:java} {code} +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] {code:java} {code} 期望是 `BatchScan[col1#19] class org.apache.spark.sql。 connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []` 原因`Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown`在`Batch("RewriteSubquery"`之前执行,并行datasourceV2不支持`FileSourceStrategy` was: The datasourcev2 table is very slow when executing TPCDS, because `exists ... select *` will not push down the cropped columns to the data source Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite` ``` test("datasourcev2 exists") { val t1 = s"${catalogAndNamespace}t1" withTable(t1) { sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") val t2 = s"${catalogAndNamespace}t2" withTable(t2) { sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") val query = sql(s"select * from $t1 where not exists" + s"(select * from $t2 where t1.col1=t2.col1)").queryExecution // scalastyle:off println println(query.executedPlan) } } } AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false :- Project [col1#17, col2#18] : +- BatchScan[col1#17, col2#18] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28] +- Project [col1#19] +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] Expectation is `BatchScan[col1#19] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []` ``` Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown` is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not support `FileSourceStrategy` > DatasourceV2 `exists ... select *` column push down > --- > > Key: SPARK-37595 > URL: https://issues.apache.org/jira/browse/SPARK-37595 > Project: Spark > Issue Type: Wish > Components: SQL >Affects Versions: 3.1.2, 3.2.0 >Reporter: wang-zhun >Priority: Major > > datasourcev2表在执行TPCDS时很慢,因为`exists ... select *`不会将裁剪后的列下推到数据源 > 在 `org.apache.spark.sql.connector.DataSourceV2SQLSuite` 中添加测试 > {code:java} > {code} > 测试(“datasourcev2 存在”){ > {code:java} > {code} > val t1 = s"${catalogAndNamespace}t1" > {code:java} > {code} > withTable(t1) { > {code:java} > {code} > sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") > {code:java} > {code} > val t2 = s"${catalogAndNamespace}t2" > {code:java} > {code} > withTable(t2) { > {code:java} > {code} > sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") > {code:java} > {code} > val query = sql(s"select * from $t1 where not exist" + > {code:java} > {code} > s"(select * from $t2 where t1.col1=t2.col1)").queryExecution > {code:java} > {code} > // scalastyle:off println > {code:java} > {code} > println(查询。执行计划) > {code:java} >
[jira] [Created] (SPARK-37595) DatasourceV2 `exists ... select *` column push down
wang-zhun created SPARK-37595: - Summary: DatasourceV2 `exists ... select *` column push down Key: SPARK-37595 URL: https://issues.apache.org/jira/browse/SPARK-37595 Project: Spark Issue Type: Wish Components: SQL Affects Versions: 3.2.0, 3.1.2 Reporter: wang-zhun The datasourcev2 table is very slow when executing TPCDS, because `exists ... select *` will not push down the cropped columns to the data source Add test in `org.apache.spark.sql.connector.DataSourceV2SQLSuite` ``` test("datasourcev2 exists") { val t1 = s"${catalogAndNamespace}t1" withTable(t1) { sql(s"CREATE TABLE $t1 (col1 string, col2 string) USING $v2Format") val t2 = s"${catalogAndNamespace}t2" withTable(t2) { sql(s"CREATE TABLE $t2 (col1 string, col2 string) USING $v2Format") val query = sql(s"select * from $t1 where not exists" + s"(select * from $t2 where t1.col1=t2.col1)").queryExecution // scalastyle:off println println(query.executedPlan) } } } AdaptiveSparkPlan isFinalPlan=false +- BroadcastHashJoin [col1#17], [col1#19], LeftSemi, BuildRight, false :- Project [col1#17, col2#18] : +- BatchScan[col1#17, col2#18] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]),false), [id=#28] +- Project [col1#19] +- BatchScan[col1#19, col2#20] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: [] Expectation is `BatchScan[col1#19] class org.apache.spark.sql.connector.catalog.InMemoryTable$InMemoryBatchScan RuntimeFilters: []` ``` Reason `Batch("Early Filter and Projection Push-Down" V2ScanRelationPushDown` is executed before `Batch("RewriteSubquery"`, parallel datasourceV2 does not support `FileSourceStrategy` -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-37239) Avoid unnecessary `setReplication` in Yarn mode
[ https://issues.apache.org/jira/browse/SPARK-37239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440329#comment-17440329 ] wang-zhun edited comment on SPARK-37239 at 11/8/21, 9:50 AM: - [~LuciferYang] Yes, use the default value {code:java} org.apache.spark.deploy.yarn.Client ``` val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort) .getOrElse(fs.getDefaultReplication(destDir)) FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) destFs.setReplication(destPath, replication) ``` replication: Short -> replication: Option[Short] ``` val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort) FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) replication.foreach(destFs.setReplication(destPath, _)) ```{code} was (Author: wangzhun): [~LuciferYang] Yes, use the default value {code:java} org.apache.spark.deploy.yarn.Client ``` val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort) .getOrElse(fs.getDefaultReplication(destDir)) FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) destFs.setReplication(destPath, replication) ``` replication: Short -> replication: Option[Short] ``` val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort) FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) replication.map(destFs.setReplication(destPath, _)) ```{code} > Avoid unnecessary `setReplication` in Yarn mode > --- > > Key: SPARK-37239 > URL: https://issues.apache.org/jira/browse/SPARK-37239 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 3.1.2 >Reporter: wang-zhun >Priority: Major > > We found a large number of replication logs in hdfs server > ``` > 2021-11-04,17:22:13,065 INFO > org.apache.hadoop.hdfs.server.namenode.FSDirectory: Replication remains > unchanged at 3 for > xxx/.sparkStaging/application_1635470728320_1144379/__spark_libs__303253482044663796.zip > 2021-11-04,17:22:13,069 INFO > org.apache.hadoop.hdfs.server.namenode.FSDirectory: Replication remains > unchanged at 3 for > xxx/.sparkStaging/application_1635470728320_1144383/__spark_libs__4747402134564993861.zip > 2021-11-04,17:22:13,070 INFO > org.apache.hadoop.hdfs.server.namenode.FSDirectory: Replication remains > unchanged at 3 for > xxx/.sparkStaging/application_1635470728320_1144373/__spark_libs__4377509773730188331.zip > ``` > https://github.com/apache/hadoop/blob/6f7b965808f71f44e2617c50d366a6375fdfbbfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java#L2439 > > `setReplication` needs to acquire write lock, we should reduce this > unnecessary operation -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37239) Avoid unnecessary `setReplication` in Yarn mode
[ https://issues.apache.org/jira/browse/SPARK-37239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17440329#comment-17440329 ] wang-zhun commented on SPARK-37239: --- [~LuciferYang] Yes, use the default value {code:java} org.apache.spark.deploy.yarn.Client ``` val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort) .getOrElse(fs.getDefaultReplication(destDir)) FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) destFs.setReplication(destPath, replication) ``` replication: Short -> replication: Option[Short] ``` val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort) FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) replication.map(destFs.setReplication(destPath, _)) ```{code} > Avoid unnecessary `setReplication` in Yarn mode > --- > > Key: SPARK-37239 > URL: https://issues.apache.org/jira/browse/SPARK-37239 > Project: Spark > Issue Type: Improvement > Components: YARN >Affects Versions: 3.1.2 >Reporter: wang-zhun >Priority: Major > > We found a large number of replication logs in hdfs server > ``` > 2021-11-04,17:22:13,065 INFO > org.apache.hadoop.hdfs.server.namenode.FSDirectory: Replication remains > unchanged at 3 for > xxx/.sparkStaging/application_1635470728320_1144379/__spark_libs__303253482044663796.zip > 2021-11-04,17:22:13,069 INFO > org.apache.hadoop.hdfs.server.namenode.FSDirectory: Replication remains > unchanged at 3 for > xxx/.sparkStaging/application_1635470728320_1144383/__spark_libs__4747402134564993861.zip > 2021-11-04,17:22:13,070 INFO > org.apache.hadoop.hdfs.server.namenode.FSDirectory: Replication remains > unchanged at 3 for > xxx/.sparkStaging/application_1635470728320_1144373/__spark_libs__4377509773730188331.zip > ``` > https://github.com/apache/hadoop/blob/6f7b965808f71f44e2617c50d366a6375fdfbbfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java#L2439 > > `setReplication` needs to acquire write lock, we should reduce this > unnecessary operation -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37239) Avoid unnecessary `setReplication` in Yarn mode
wang-zhun created SPARK-37239: - Summary: Avoid unnecessary `setReplication` in Yarn mode Key: SPARK-37239 URL: https://issues.apache.org/jira/browse/SPARK-37239 Project: Spark Issue Type: Improvement Components: YARN Affects Versions: 3.1.2 Reporter: wang-zhun We found a large number of replication logs in hdfs server ``` 2021-11-04,17:22:13,065 INFO org.apache.hadoop.hdfs.server.namenode.FSDirectory: Replication remains unchanged at 3 for xxx/.sparkStaging/application_1635470728320_1144379/__spark_libs__303253482044663796.zip 2021-11-04,17:22:13,069 INFO org.apache.hadoop.hdfs.server.namenode.FSDirectory: Replication remains unchanged at 3 for xxx/.sparkStaging/application_1635470728320_1144383/__spark_libs__4747402134564993861.zip 2021-11-04,17:22:13,070 INFO org.apache.hadoop.hdfs.server.namenode.FSDirectory: Replication remains unchanged at 3 for xxx/.sparkStaging/application_1635470728320_1144373/__spark_libs__4377509773730188331.zip ``` https://github.com/apache/hadoop/blob/6f7b965808f71f44e2617c50d366a6375fdfbbfa/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java#L2439 `setReplication` needs to acquire write lock, we should reduce this unnecessary operation -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-37051) The filter operator gets wrong results in ORC's char type
[ https://issues.apache.org/jira/browse/SPARK-37051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17434086#comment-17434086 ] wang-zhun commented on SPARK-37051: --- We have the same problem, tcpds-q7 does not work. The cd_education_status field of the customer_demographics table in parquet format is char(20). {code:java} select count(*) from tpcds.customer_demographics where cd_education_status ='College'{code} spark3.1.2 returns 0 rows spark2.3.4/hive/presto returns 27w rows > The filter operator gets wrong results in ORC's char type > - > > Key: SPARK-37051 > URL: https://issues.apache.org/jira/browse/SPARK-37051 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2, 3.2.1, 3.3.0 > Environment: Spark 3.1.2 > Scala 2.12 / Java 1.8 >Reporter: frankli >Priority: Critical > > When I try the following sample SQL on the TPCDS data, the filter operator > returns an empty row set (shown in web ui). > _select * from item where i_category = 'Music' limit 100;_ > The table is in ORC format, and i_category is char(50) type. > Data is inserted by hive, and queried by Spark. > I guest that the char(50) type will remains redundant blanks after the actual > word. > It will affect the boolean value of "x.equals(Y)", and results in wrong > results. > Luckily, the varchar type is OK. > > This bug can be reproduced by a few steps. > >>> desc t2_orc; > ++---+++ > |col_name|data_type|comment| > ++---+++ > |a|string |NULL| > |b|char(50) |NULL| > |c|int |NULL| > ++---++--–+ > >>> select * from t2_orc where a='a'; > +-+---++--+ > |a|b|c| > +-+---++--+ > |a|b|1| > |a|b|2| > |a|b|3| > |a|b|4| > |a|b|5| > +-+---++–+ > >>> select * from t2_orc where b='b'; > +-+---++--+ > |a|b|c| > +-+---++--+ > +-+---++--+ > > By the way, Spark's tests should add more cases on the char type. > > == Physical Plan == > CollectLimit (3) > +- Filter (2) > +- Scan orc tpcds_bin_partitioned_orc_2.item (1) > (1) Scan orc tpcds_bin_partitioned_orc_2.item > Output [22]: [i_item_sk#0L, i_item_id#1, i_rec_start_date#2, > i_rec_end_date#3, i_item_desc#4, i_current_price#5, i_wholesale_cost#6, > i_brand_id#7, i_brand#8, i_class_id#9, i_class#10, i_category_id#11, > i_category#12, i_manufact_id#13, i_manufact#14, i_size#15, i_formulation#16, > i_color#17, i_units#18, i_container#19, i_manager_id#20, > i_product_name#21|#0L, i_item_id#1, i_rec_start_date#2, i_rec_end_date#3, > i_item_desc#4, i_current_price#5, i_wholesale_cost#6, i_brand_id#7, > i_brand#8, i_class_id#9, i_class#10, i_category_id#11, i_category#12, > i_manufact_id#13, i_manufact#14, i_size#15, i_formulation#16, i_color#17, > i_units#18, i_container#19, i_manager_id#20, i_product_name#21] > Batched: false > Location: InMemoryFileIndex [hdfs://tpcds_bin_partitioned_orc_2.db/item] > PushedFilters: [IsNotNull(i_category), +EqualTo(i_category,+Music > )] > ReadSchema: > struct > (2) Filter > Input [22]: [i_item_sk#0L, i_item_id#1, i_rec_start_date#2, > i_rec_end_date#3, i_item_desc#4, i_current_price#5, i_wholesale_cost#6, > i_brand_id#7, i_brand#8, i_class_id#9, i_class#10, i_category_id#11, > i_category#12, i_manufact_id#13, i_manufact#14, i_size#15, i_formulation#16, > i_color#17, i_units#18, i_container#19, i_manager_id#20, > i_product_name#21|#0L, i_item_id#1, i_rec_start_date#2, i_rec_end_date#3, > i_item_desc#4, i_current_price#5, i_wholesale_cost#6, i_brand_id#7, > i_brand#8, i_class_id#9, i_class#10, i_category_id#11, i_category#12, > i_manufact_id#13, i_manufact#14, i_size#15, i_formulation#16, i_color#17, > i_units#18, i_container#19, i_manager_id#20, i_product_name#21] > Condition : (isnotnull(i_category#12) AND +(i_category#12 = Music ))+ > (3) CollectLimit > Input [22]: [i_item_sk#0L, i_item_id#1, i_rec_start_date#2, > i_rec_end_date#3, i_item_desc#4, i_current_price#5, i_wholesale_cost#6, > i_brand_id#7, i_brand#8, i_class_id#9, i_class#10, i_category_id#11, > i_category#12, i_manufact_id#13, i_manufact#14, i_size#15, i_formulation#16, > i_color#17, i_units#18, i_container#19, i_manager_id#20, > i_product_name#21|#0L, i_item_id#1, i_rec_start_date#2, i_rec_end_date#3, > i_item_desc#4, i_current_price#5, i_wholesale_cost#6, i_brand_id#7, > i_brand#8, i_class_id#9, i_class#10, i_category_id#11, i_category#12, > i_manufact_id#13, i_manufact#14, i_size#15, i_formulation#16, i_color#17, > i_units#18, i_container#19, i_manager_id#20, i_product_name#21] > Arguments: 100 > -- This message was sent by Atlassian Jira (v8.3.4#803005) ---