[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...

2018-09-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22232


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...

2018-08-28 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22232#discussion_r213543748
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceSuite.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.test.SharedSQLContext
+
+
+class FileSourceSuite extends QueryTest with SharedSQLContext with 
PredicateHelper {
+
+  test("[SPARK-25237] remove updateBytesReadWithFileSize in FileScanRdd") {
+withTempPath { p =>
+  val path = p.getAbsolutePath
+  spark.range(1000).selectExpr("id AS c0", "rand() AS 
c1").repartition(10).write.csv(path)
--- End diff --

I think a single partition is ok for this test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...

2018-08-28 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/22232#discussion_r213483961
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceSuite.scala
 ---
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.test.SharedSQLContext
+
+
+class FileSourceSuite extends QueryTest with SharedSQLContext with 
PredicateHelper {
+
+  test("[SPARK-25237] remove updateBytesReadWithFileSize in FileScanRdd") {
+withTempPath { p =>
+  val path = p.getAbsolutePath
+  spark.range(1000).selectExpr("id AS c0", "rand() AS 
c1").repartition(10).write.csv(path)
+  val df = spark.read.csv(path).limit(1)
+
+  val bytesReads = new ArrayBuffer[Long]()
+  val bytesReadListener = new SparkListener() {
+override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+  bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead
+}
+  }
+  // Avoid receiving earlier taskEnd events
+  spark.sparkContext.listenerBus.waitUntilEmpty(500)
+
+  spark.sparkContext.addSparkListener(bytesReadListener)
+
+  df.collect()
+
+  spark.sparkContext.listenerBus.waitUntilEmpty(500)
+  spark.sparkContext.removeSparkListener(bytesReadListener)
+
+  assert(bytesReads.sum < 3000)
--- End diff --

The data above could be made deterministic so that you can assert the bytes 
read more exactly. I wonder if it's important to make sure the bytes read are 
exact, rather than just close, given that the change above would change the 
metric only a little I think.

You can just track the sum rather than all values written, but it doesn't 
matter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...

2018-08-25 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22232#discussion_r212796191
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 ---
@@ -208,7 +199,6 @@ class FileScanRDD(
 
   override def close(): Unit = {
 updateBytesRead()
-updateBytesReadWithFileSize()
--- End diff --

aha, I see.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...

2018-08-25 Thread dujunling
Github user dujunling commented on a diff in the pull request:

https://github.com/apache/spark/pull/22232#discussion_r212793909
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 ---
@@ -208,7 +199,6 @@ class FileScanRDD(
 
   override def close(): Unit = {
 updateBytesRead()
-updateBytesReadWithFileSize()
--- End diff --

When there are one more files in the partition, the inputMetrics is wrong 
when  updateBytesReadWithFileSize in the line 142 is exist.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...

2018-08-25 Thread dujunling
Github user dujunling commented on a diff in the pull request:

https://github.com/apache/spark/pull/22232#discussion_r212793866
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 ---
@@ -208,7 +199,6 @@ class FileScanRDD(
 
   override def close(): Unit = {
 updateBytesRead()
-updateBytesReadWithFileSize()
--- End diff --

Yes, before SPARK-19464,  there  will only one works between 
updateBytesRead and updateBytesReadWithFileSize.  If the hadoop version is 2.5 
or  earlier, updateBytesReadWithFileSize works, If the hadoop version is 2.6 or 
later, updateBytesRead works.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...

2018-08-25 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22232#discussion_r212793049
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 ---
@@ -208,7 +199,6 @@ class FileScanRDD(
 
   override def close(): Unit = {
 updateBytesRead()
-updateBytesReadWithFileSize()
--- End diff --

If we just remove this `updateBytesReadWithFileSize`, the issue in the 
description can be solved? We need to remove `updateBytesReadWithFileSize` in 
the line 142, too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22232: [SPARK-25237][SQL]remove updateBytesReadWithFileS...

2018-08-24 Thread dujunling
GitHub user dujunling opened a pull request:

https://github.com/apache/spark/pull/22232

[SPARK-25237][SQL]remove updateBytesReadWithFileSize because we use Hadoop 
FileSystem s…

…tatistics to update the inputMetrics

## What changes were proposed in this pull request?

In FileScanRdd, we will update inputMetrics's bytesRead using 
updateBytesRead  every 1000 rows or when close the iterator.

but when close the iterator,  we will invoke updateBytesReadWithFileSize to 
increase the inputMetrics's bytesRead with file's length.

this will result in the inputMetrics's bytesRead is wrong when run the 
query with limit such as select * from table limit 1.

because we do not support for Hadoop 2.5 and earlier now, we always get the 
bytesRead from  Hadoop FileSystem statistics other than files's length.

## How was this patch tested?

manual test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dujunling/spark fileScanRddInput

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22232.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22232


commit 0f75257b50a611e069d406da8d72225bb4e73b51
Author: dujunling 
Date:   2018-08-25T06:20:35Z

remove updateBytesReadWithFileSize because we use Hadoop FileSystem 
statistics to update the inputMetrics




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org