[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22232 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22232#discussion_r213543748 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceSuite.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.test.SharedSQLContext + + +class FileSourceSuite extends QueryTest with SharedSQLContext with PredicateHelper { + + test("[SPARK-25237] remove updateBytesReadWithFileSize in FileScanRdd") { +withTempPath { p => + val path = p.getAbsolutePath + spark.range(1000).selectExpr("id AS c0", "rand() AS c1").repartition(10).write.csv(path) --- End diff -- I think a single partition is ok for this test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/22232#discussion_r213483961 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceSuite.scala --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.PredicateHelper +import org.apache.spark.sql.test.SharedSQLContext + + +class FileSourceSuite extends QueryTest with SharedSQLContext with PredicateHelper { + + test("[SPARK-25237] remove updateBytesReadWithFileSize in FileScanRdd") { +withTempPath { p => + val path = p.getAbsolutePath + spark.range(1000).selectExpr("id AS c0", "rand() AS c1").repartition(10).write.csv(path) + val df = spark.read.csv(path).limit(1) + + val bytesReads = new ArrayBuffer[Long]() + val bytesReadListener = new SparkListener() { +override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead +} + } + // Avoid receiving earlier taskEnd events + spark.sparkContext.listenerBus.waitUntilEmpty(500) + + spark.sparkContext.addSparkListener(bytesReadListener) + + df.collect() + + spark.sparkContext.listenerBus.waitUntilEmpty(500) + spark.sparkContext.removeSparkListener(bytesReadListener) + + assert(bytesReads.sum < 3000) --- End diff -- The data above could be made deterministic so that you can assert the bytes read more exactly. I wonder if it's important to make sure the bytes read are exact, rather than just close, given that the change above would change the metric only a little I think. You can just track the sum rather than all values written, but it doesn't matter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22232#discussion_r212796191 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala --- @@ -208,7 +199,6 @@ class FileScanRDD( override def close(): Unit = { updateBytesRead() -updateBytesReadWithFileSize() --- End diff -- aha, I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...
Github user dujunling commented on a diff in the pull request: https://github.com/apache/spark/pull/22232#discussion_r212793909 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala --- @@ -208,7 +199,6 @@ class FileScanRDD( override def close(): Unit = { updateBytesRead() -updateBytesReadWithFileSize() --- End diff -- When there are one more files in the partition, the inputMetrics is wrong when updateBytesReadWithFileSize in the line 142 is exist. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...
Github user dujunling commented on a diff in the pull request: https://github.com/apache/spark/pull/22232#discussion_r212793866 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala --- @@ -208,7 +199,6 @@ class FileScanRDD( override def close(): Unit = { updateBytesRead() -updateBytesReadWithFileSize() --- End diff -- Yes, before SPARK-19464, there will only one works between updateBytesRead and updateBytesReadWithFileSize. If the hadoop version is 2.5 or earlier, updateBytesReadWithFileSize works, If the hadoop version is 2.6 or later, updateBytesRead works. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22232#discussion_r212793049 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala --- @@ -208,7 +199,6 @@ class FileScanRDD( override def close(): Unit = { updateBytesRead() -updateBytesReadWithFileSize() --- End diff -- If we just remove this `updateBytesReadWithFileSize`, the issue in the description can be solved? We need to remove `updateBytesReadWithFileSize` in the line 142, too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...
GitHub user dujunling opened a pull request: https://github.com/apache/spark/pull/22232 [SPARK-25237][SQL]remove updateBytesReadWithFileSize because we use Hadoop FileSystem s⦠â¦tatistics to update the inputMetrics ## What changes were proposed in this pull request? In FileScanRdd, we will update inputMetrics's bytesRead using updateBytesRead every 1000 rows or when close the iterator. but when close the iterator, we will invoke updateBytesReadWithFileSize to increase the inputMetrics's bytesRead with file's length. this will result in the inputMetrics's bytesRead is wrong when run the query with limit such as select * from table limit 1. because we do not support for Hadoop 2.5 and earlier now, we always get the bytesRead from Hadoop FileSystem statistics other than files's length. ## How was this patch tested? manual test You can merge this pull request into a Git repository by running: $ git pull https://github.com/dujunling/spark fileScanRddInput Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22232.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22232 commit 0f75257b50a611e069d406da8d72225bb4e73b51 Author: dujunling Date: 2018-08-25T06:20:35Z remove updateBytesReadWithFileSize because we use Hadoop FileSystem statistics to update the inputMetrics --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org