[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21805 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r204379093 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -207,4 +207,7 @@ case class InMemoryRelation( } override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache) + + override def simpleString: String = s"InMemoryRelation(${output}, ${cacheBuilder.storageLevel})" + --- End diff -- nit: remove the blank line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r204378903 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala --- @@ -206,4 +206,20 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits // first time use, load cache checkDataset(df5, Row(10)) } + + test("SPARK-24850 InMemoryRelation string representation does not include cached plan") { +val dummyQueryExecution = spark.range(0, 1).toDF().queryExecution +val inMemoryRelation = InMemoryRelation( + true, + 1000, + StorageLevel.MEMORY_ONLY, + dummyQueryExecution.sparkPlan, + Some("test-relation"), + dummyQueryExecution.logical) + + assert(!inMemoryRelation.simpleString.contains(dummyQueryExecution.sparkPlan.toString)) +assert(inMemoryRelation.simpleString == + s"InMemoryRelation(${inMemoryRelation.output}," + + " StorageLevel(memory, deserialized, 1 replicas))") + } --- End diff -- How about just comparing explain results? ``` val df = Seq((1, 2)).toDF("a", "b").cache val outputStream = new java.io.ByteArrayOutputStream() Console.withOut(outputStream) { df.explain(false) } assert(outputStream.toString.replaceAll("#\\d+", "#x").contains( "InMemoryRelation [a#x, b#x], StorageLevel(disk, memory, deserialized, 1 replicas)")) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r204378696 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -207,4 +207,7 @@ case class InMemoryRelation( } override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache) + + override def simpleString: String = s"InMemoryRelation(${output}, ${cacheBuilder.storageLevel})" --- End diff -- How about `s"InMemoryRelation [${Utils.truncatedString(output, ", ")}], ${cacheBuilder.storageLevel}"`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r204212287 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -50,6 +50,8 @@ case class CachedRDDBuilder( tableName: Option[String])( @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null) { + override def toString: String = s"CachedRDDBuilder($useCompression, $batchSize, $storageLevel)" --- End diff -- yea, I think the output should be the same with one in v2.3; ``` scala> val df = Seq((1, 2), (3, 4)).toDF("a", "b") scala> val testDf = df.join(df, "a").join(df, "a").cache scala> testDf.groupBy("a").count().explain == Physical Plan == *(2) HashAggregate(keys=[a#309], functions=[count(1)]) +- Exchange hashpartitioning(a#309, 200) +- *(1) HashAggregate(keys=[a#309], functions=[partial_count(1)]) +- *(1) InMemoryTableScan [a#309] +- InMemoryRelation [a#309, b#310, b#314, b#319], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- *(3) Project [a#60, b#61, b#212, b#217] +- *(3) BroadcastHashJoin [a#60], [a#216], Inner, BuildRight :- *(3) Project [a#60, b#61, b#212] : +- *(3) BroadcastHashJoin [a#60], [a#211], Inner, BuildRight : :- *(3) InMemoryTableScan [a#60, b#61] : : +- InMemoryRelation [a#60, b#61], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) : : +- LocalTableScan [a#15, b#16] : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) :+- *(1) InMemoryTableScan [a#211, b#212] : +- InMemoryRelation [a#211, b#212], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) :+- LocalTableScan [a#15, b#16] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(2) InMemoryTableScan [a#216, b#217] +- InMemoryRelation [a#216, b#217], true, 1, StorageLevel(disk, memory, deserialized, 1 replicas) +- LocalTableScan [a#15, b#16] ``` The output of this current pr is still different, so can you fix that way? @onursatici --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r204123221 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala --- @@ -50,6 +50,8 @@ case class CachedRDDBuilder( tableName: Option[String])( @transient private var _cachedColumnBuffers: RDD[CachedBatch] = null) { + override def toString: String = s"CachedRDDBuilder($useCompression, $batchSize, $storageLevel)" --- End diff -- My major point is whether we need to output `$useCompression, $batchSize`. How useful are they? Our explain output is already pretty long. Maybe we can skip them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r204032968 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala --- @@ -206,4 +206,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits // first time use, load cache checkDataset(df5, Row(10)) } + + test("SPARK-24850 InMemoryRelation string representation does not include cached plan") { +val dummyQueryExecution = spark.range(0, 1).toDF().queryExecution +val inMemoryRelation = InMemoryRelation( + true, + 1000, + StorageLevel.MEMORY_ONLY, + dummyQueryExecution.sparkPlan, + Some("test-relation"), + dummyQueryExecution.logical) + + assert(!inMemoryRelation.simpleString.contains(dummyQueryExecution.sparkPlan.toString)) +assert(inMemoryRelation.simpleString.contains( + "CachedRDDBuilder(true, 1000, StorageLevel(memory, deserialized, 1 replicas))")) --- End diff -- yea, but you don't fill `useCompression` and `batchSize` in the test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user onursatici commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r204001527 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala --- @@ -206,4 +206,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits // first time use, load cache checkDataset(df5, Row(10)) } + + test("SPARK-24850 InMemoryRelation string representation does not include cached plan") { +val dummyQueryExecution = spark.range(0, 1).toDF().queryExecution +val inMemoryRelation = InMemoryRelation( + true, + 1000, + StorageLevel.MEMORY_ONLY, + dummyQueryExecution.sparkPlan, + Some("test-relation"), + dummyQueryExecution.logical) + + assert(!inMemoryRelation.simpleString.contains(dummyQueryExecution.sparkPlan.toString)) +assert(inMemoryRelation.simpleString.contains( + "CachedRDDBuilder(true, 1000, StorageLevel(memory, deserialized, 1 replicas))")) --- End diff -- @gatorsmile tried to keep this close to its default value, maybe we can do something like `CachedRDDBuilder(useCompression = true, batchSize = 1000, ...)`? But that will break the consistency across logging case classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user onursatici commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r204001546 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala --- @@ -206,4 +206,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits // first time use, load cache checkDataset(df5, Row(10)) } + + test("SPARK-24850 InMemoryRelation string representation does not include cached plan") { +val dummyQueryExecution = spark.range(0, 1).toDF().queryExecution +val inMemoryRelation = InMemoryRelation( + true, + 1000, + StorageLevel.MEMORY_ONLY, + dummyQueryExecution.sparkPlan, + Some("test-relation"), + dummyQueryExecution.logical) + + assert(!inMemoryRelation.simpleString.contains(dummyQueryExecution.sparkPlan.toString)) +assert(inMemoryRelation.simpleString.contains( + "CachedRDDBuilder(true, 1000, StorageLevel(memory, deserialized, 1 replicas))")) --- End diff -- @maropu wouldn't that be testing the same thing, as explain calls `plan.treeString` which calls `elem.simpleString` for every child? I think testing for `InMemoryRelation.simpleString` covers other possible places where a `plan.treeString` is logged. Happy to change if you have concerns --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r203949905 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala --- @@ -206,4 +206,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits // first time use, load cache checkDataset(df5, Row(10)) } + + test("SPARK-24850 InMemoryRelation string representation does not include cached plan") { +val dummyQueryExecution = spark.range(0, 1).toDF().queryExecution +val inMemoryRelation = InMemoryRelation( + true, + 1000, + StorageLevel.MEMORY_ONLY, + dummyQueryExecution.sparkPlan, + Some("test-relation"), + dummyQueryExecution.logical) + + assert(!inMemoryRelation.simpleString.contains(dummyQueryExecution.sparkPlan.toString)) +assert(inMemoryRelation.simpleString.contains( + "CachedRDDBuilder(true, 1000, StorageLevel(memory, deserialized, 1 replicas))")) --- End diff -- How about just comparing explain output results like the query in this pr description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r203945646 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala --- @@ -206,4 +206,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits // first time use, load cache checkDataset(df5, Row(10)) } + + test("SPARK-24850 InMemoryRelation string representation does not include cached plan") { +val dummyQueryExecution = spark.range(0, 1).toDF().queryExecution +val inMemoryRelation = InMemoryRelation( + true, + 1000, + StorageLevel.MEMORY_ONLY, + dummyQueryExecution.sparkPlan, + Some("test-relation"), + dummyQueryExecution.logical) + + assert(!inMemoryRelation.simpleString.contains(dummyQueryExecution.sparkPlan.toString)) +assert(inMemoryRelation.simpleString.contains( + "CachedRDDBuilder(true, 1000, StorageLevel(memory, deserialized, 1 replicas))")) --- End diff -- Or we might not need the batch size in the plan. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21805#discussion_r203945605 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala --- @@ -206,4 +206,19 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits // first time use, load cache checkDataset(df5, Row(10)) } + + test("SPARK-24850 InMemoryRelation string representation does not include cached plan") { +val dummyQueryExecution = spark.range(0, 1).toDF().queryExecution +val inMemoryRelation = InMemoryRelation( + true, + 1000, + StorageLevel.MEMORY_ONLY, + dummyQueryExecution.sparkPlan, + Some("test-relation"), + dummyQueryExecution.logical) + + assert(!inMemoryRelation.simpleString.contains(dummyQueryExecution.sparkPlan.toString)) +assert(inMemoryRelation.simpleString.contains( + "CachedRDDBuilder(true, 1000, StorageLevel(memory, deserialized, 1 replicas))")) --- End diff -- `true` and `1000` look confusing to end users. Can we improve it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21805: [SPARK-24850][SQL] fix str representation of Cach...
GitHub user onursatici opened a pull request: https://github.com/apache/spark/pull/21805 [SPARK-24850][SQL] fix str representation of CachedRDDBuilder ## What changes were proposed in this pull request? As of https://github.com/apache/spark/pull/21018, InMemoryRelation includes its cacheBuilder when logging query plans. This PR changes the string representation of the CachedRDDBuilder to not include the cached spark plan. ## How was this patch tested? spark-shell, query: ``` var df_cached = spark.read.format("csv").option("header", "true").load("test.csv").cache() 0 to 1 foreach { _ => df_cached = df_cached.join(spark.read.format("csv").option("header", "true").load("test.csv"), "A").cache() } df_cached.explain ``` as of master results in: ``` == Physical Plan == InMemoryTableScan [A#10, B#11, B#35, B#87] +- InMemoryRelation [A#10, B#11, B#35, B#87], CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35, B#87] +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 replicas),*(2) Project [A#10, B#11, B#35] +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#34) +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) : +- *(2) Project [A#10, B#11, B#35] : +- *(2) BroadcastHashJoin [A#10], [A#34], Inner, BuildRight : :- *(2) Filter isnotnull(A#10) : : +- InMemoryTableScan [A#10, B#11], [isnotnull(A#10)] : : +- InMemoryRelation [A#10, B#11], CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) : : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) : +- *(1) Filter isnotnull(A#34) : +- InMemoryTableScan [A#34, B#35], [isnotnull(A#34)] : +- InMemoryRelation [A#34, B#35], CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) : +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false])) +- *(1) Filter isnotnull(A#86) +- InMemoryTableScan [A#86, B#87], [isnotnull(A#86)] +- InMemoryRelation [A#86, B#87], CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) +- *(1) FileScan csv [A#10,B#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:test.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct ,None) +- *(2) Project [A#10, B#11, B#35, B#87] +- *(2) BroadcastHashJoin [A#10], [A#86], Inner, BuildRight :- *(2) Filter isnotnull(A#10) : +- InMemoryTableScan [A#10, B#11, B#35], [isnotnull(A#10)] : +- InMemoryRelation [A#10, B#11, B#35], CachedRDDBuilder(true,1,StorageLevel(disk, memory,