[jira] [Commented] (SPARK-7549) Support aggregating over nested fields
[ https://issues.apache.org/jira/browse/SPARK-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14625704#comment-14625704 ] Chen Song commented on SPARK-7549: -- I prefer the former. I thought about using explode, it's a good way to implement the nested aggregations. But I wanna take advantage of codegen by implement these directly. Support aggregating over nested fields -- Key: SPARK-7549 URL: https://issues.apache.org/jira/browse/SPARK-7549 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Would be nice to be able to run sum, avg, min, max (and other numeric aggregate expressions) on arrays. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7394) Add Pandas style cast (astype)
[ https://issues.apache.org/jira/browse/SPARK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14530065#comment-14530065 ] Chen Song commented on SPARK-7394: -- Ok, I'll work on this after finishing my daily job. Add Pandas style cast (astype) -- Key: SPARK-7394 URL: https://issues.apache.org/jira/browse/SPARK-7394 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Labels: starter Fix For: 1.4.0 Basically alias astype == cast in Column for Python (and Python only). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7394) Add Pandas style cast (astype)
[ https://issues.apache.org/jira/browse/SPARK-7394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14530066#comment-14530066 ] Chen Song commented on SPARK-7394: -- Ok, I'll work on this after finishing my daily job. Add Pandas style cast (astype) -- Key: SPARK-7394 URL: https://issues.apache.org/jira/browse/SPARK-7394 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Labels: starter Fix For: 1.4.0 Basically alias astype == cast in Column for Python (and Python only). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7327) DataFrame show() method doesn't like empty dataframes
[ https://issues.apache.org/jira/browse/SPARK-7327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14526191#comment-14526191 ] Chen Song commented on SPARK-7327: -- I am working on upgrade the show function. I'll run the empty df test and fix it. DataFrame show() method doesn't like empty dataframes - Key: SPARK-7327 URL: https://issues.apache.org/jira/browse/SPARK-7327 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.3.1 Reporter: Olivier Girardot Priority: Minor For an empty DataFrame (for exemple after a filter) any call to show() ends up with : {code} java.util.MissingFormatWidthException: -0s at java.util.Formatter$FormatSpecifier.checkGeneral(Formatter.java:2906) at java.util.Formatter$FormatSpecifier.init(Formatter.java:2680) at java.util.Formatter.parse(Formatter.java:2528) at java.util.Formatter.format(Formatter.java:2469) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at org.apache.spark.sql.DataFrame$$anonfun$showString$2$$anonfun$apply$4.apply(DataFrame.scala:200) at org.apache.spark.sql.DataFrame$$anonfun$showString$2$$anonfun$apply$4.apply(DataFrame.scala:199) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.DataFrame$$anonfun$showString$2.apply(DataFrame.scala:199) at org.apache.spark.sql.DataFrame$$anonfun$showString$2.apply(DataFrame.scala:198) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:198) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:314) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:320) {code} If no-one takes it by next friday, I'll fix it, the problem seems to come from the colWidths method : {code} // Compute the width of each column val colWidths = Array.fill(numCols)(0) for (row - rows) { for ((cell, i) - row.zipWithIndex) { colWidths(i) = math.max(colWidths(i), cell.length) } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7251) Perform sequential scan when iterating over entries in BytesToBytesMap
[ https://issues.apache.org/jira/browse/SPARK-7251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14522977#comment-14522977 ] Chen Song commented on SPARK-7251: -- Can the key could be empty? If we allow the key could be zero, it's impossible to deal the useless memory of the end page. Because I don't know if it's at the end of the page or just the key is empty. Of couse I have to setMemory to zero. Perform sequential scan when iterating over entries in BytesToBytesMap -- Key: SPARK-7251 URL: https://issues.apache.org/jira/browse/SPARK-7251 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: Josh Rosen In BytesToBytesMap, {{iterator()}} iterates in order of set bits in the bitmap, but it would be faster to iterate directly through the data pages, since this will result in less random memory access. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-7234) When codegen on DateType defaultPrimitive will throw type mismatch exception
Chen Song created SPARK-7234: Summary: When codegen on DateType defaultPrimitive will throw type mismatch exception Key: SPARK-7234 URL: https://issues.apache.org/jira/browse/SPARK-7234 Project: Spark Issue Type: Bug Components: SQL Reporter: Chen Song When codegen on, the defaultPrimitive of DateType is null. This will rise below error. select COUNT(a) from table a - DateType type mismatch; found : Null(null) required: DateType.this.InternalType -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-7158) collect and take return different results
[ https://issues.apache.org/jira/browse/SPARK-7158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14513623#comment-14513623 ] Chen Song commented on SPARK-7158: -- Should we execute the query in cache method? collect and take return different results - Key: SPARK-7158 URL: https://issues.apache.org/jira/browse/SPARK-7158 Project: Spark Issue Type: Sub-task Components: SQL Reporter: Reynold Xin Priority: Blocker Reported by [~rams] {code} import java.util.UUID import org.apache.spark.sql._ import org.apache.spark.sql.types._ val rdd = sc.parallelize(List(1,2,3), 2) val schema = StructType(List(StructField(index,IntegerType,true))) val df = sqlContext.createDataFrame(rdd.map(p = Row(p)), schema) def id:() = String = () = {UUID.randomUUID().toString()} def square:Int = Int = (x: Int) = {x * x} val dfWithId = df.withColumn(id,callUDF(id, StringType)).cache() //expect the ID to have materialized at this point dfWithId.collect() //res0: Array[org.apache.spark.sql.Row] = Array([1,43c7b8e2-b4a3-43ee-beff-0bb4b7d6c1b1], [2,efd061be-e8cc-43fa-956e-cfd6e7355982], [3,79b0baab-627c-4761-af0d-8995b8c5a125]) val dfWithIdAndSquare = dfWithId.withColumn(square,callUDF(square, IntegerType, col(index))) dfWithIdAndSquare.collect() //res1: Array[org.apache.spark.sql.Row] = Array([1,a3b2e744-a0a1-40fe-8133-87a67660b4ab,1], [2,0a7052a0-6071-4ef5-a25a-2670248ea5cd,4], [3,209f269e-207a-4dfd-a186-738be5db2eff,9]) //why are the IDs in lines 11 and 15 different? {code} The randomly generated IDs are the same if show (which uses take under the hood) is used instead of collect. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6927) Sorting Error when codegen on
Chen Song created SPARK-6927: Summary: Sorting Error when codegen on Key: SPARK-6927 URL: https://issues.apache.org/jira/browse/SPARK-6927 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.3.0 Reporter: Chen Song Fix For: 1.4.0 When code gen is on, some unit test in SqlQuerySuit failed. test(sorting) { val before = conf.externalSortEnabled setConf(SQLConf.EXTERNAL_SORT, false) sortTest() setConf(SQLConf.EXTERNAL_SORT, before.toString) } test(external sorting) { val before = conf.externalSortEnabled setConf(SQLConf.EXTERNAL_SORT, true) sortTest() setConf(SQLConf.EXTERNAL_SORT, before.toString) } GenerateOrding can't deal BinaryType. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6899) Type mismatch when user codegen
Chen Song created SPARK-6899: Summary: Type mismatch when user codegen Key: SPARK-6899 URL: https://issues.apache.org/jira/browse/SPARK-6899 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.3.0 Reporter: Chen Song When I run tests in DataFrameSuite with codegen on, some type mismatched error occured. test(average) { checkAnswer( decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2.0))) } type mismatch; found : Int(0) required: org.apache.spark.sql.types.DecimalType#JvmType The original problem I think comes from GenerateMutableProjection. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6899) Type mismatch when using codegen
[ https://issues.apache.org/jira/browse/SPARK-6899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Song updated SPARK-6899: - Summary: Type mismatch when using codegen (was: Type mismatch when user codegen) Type mismatch when using codegen Key: SPARK-6899 URL: https://issues.apache.org/jira/browse/SPARK-6899 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.3.0 Reporter: Chen Song When I run tests in DataFrameSuite with codegen on, some type mismatched error occured. test(average) { checkAnswer( decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2.0))) } type mismatch; found : Int(0) required: org.apache.spark.sql.types.DecimalType#JvmType The original problem I think comes from GenerateMutableProjection. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6899) Type mismatch when using codegen
[ https://issues.apache.org/jira/browse/SPARK-6899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Song updated SPARK-6899: - Description: When I run tests in DataFrameSuite with codegen on, some type mismatched error occured. {code} test(average) { checkAnswer( decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2.0))) } type mismatch; found : Int(0) required: org.apache.spark.sql.types.DecimalType#JvmType {code} was: When I run tests in DataFrameSuite with codegen on, some type mismatched error occured. {code} test(average) { checkAnswer( decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2.0))) } type mismatch; found : Int(0) required: org.apache.spark.sql.types.DecimalType#JvmType {code} The original problem I think comes from GenerateMutableProjection. Type mismatch when using codegen Key: SPARK-6899 URL: https://issues.apache.org/jira/browse/SPARK-6899 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.3.0 Reporter: Chen Song When I run tests in DataFrameSuite with codegen on, some type mismatched error occured. {code} test(average) { checkAnswer( decimalData.agg(avg('a)), Row(new java.math.BigDecimal(2.0))) } type mismatch; found : Int(0) required: org.apache.spark.sql.types.DecimalType#JvmType {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-6395) Rebuild the schema from a GenericRow
[ https://issues.apache.org/jira/browse/SPARK-6395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen Song closed SPARK-6395. Resolution: Not a Problem Rebuild the schema from a GenericRow Key: SPARK-6395 URL: https://issues.apache.org/jira/browse/SPARK-6395 Project: Spark Issue Type: Task Components: SQL Reporter: Chen Song Sometimes we need the schema of the row, but GenericRow doesn't contain schema information. So we need a method such as val schema = ScalaReflection.rebuildSchema(row) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6412) Add Char support in dataTypes.
Chen Song created SPARK-6412: Summary: Add Char support in dataTypes. Key: SPARK-6412 URL: https://issues.apache.org/jira/browse/SPARK-6412 Project: Spark Issue Type: Bug Components: SQL Reporter: Chen Song We can't get the schema of case class PrimitiveData, because of ScalaReflection.schemaFor and dataTYpes doesn't support CharType. case class PrimitiveData( charField: Char,// Can't get the char schema info intField: Int, longField: Long, doubleField: Double, floatField: Float, shortField: Short, byteField: Byte, booleanField: Boolean) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6395) Rebuild the schema from a GenericRow
Chen Song created SPARK-6395: Summary: Rebuild the schema from a GenericRow Key: SPARK-6395 URL: https://issues.apache.org/jira/browse/SPARK-6395 Project: Spark Issue Type: Task Components: SQL Reporter: Chen Song Sometimes we need the schema of the row, but GenericRow doesn't contain schema information. So we need a method such as val schema = ScalaReflection.rebuildSchema(row) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6382) withUDF(...) {...} for supporting temporary UDF definitions in the scope
[ https://issues.apache.org/jira/browse/SPARK-6382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14365433#comment-14365433 ] Chen Song commented on SPARK-6382: -- If we don't change the global scope of UDF registration, we can change the way we get analyzer to provide a temp analyzer to analysis the sql and provide temporary UDF support. withUDF(...) {...} for supporting temporary UDF definitions in the scope Key: SPARK-6382 URL: https://issues.apache.org/jira/browse/SPARK-6382 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.3.0, 1.3.1 Reporter: Jianshi Huang Currently the scope of UDF registration is global. It's unsuitable for libraries that are built on top of DataFrame, as many operations has to be done by registering a UDF first. Please provide a way for binding temporary UDFs. e.g. {code} withUDF((merge_map, (m1: Map[String, Double], m2: Map[String, Double]) = m2 ++ m2), ...) { sql(select merge_map(d1.map, d2.map) from d1, d2 where d1.id = d2.id) } {code} Also UDF registry is a mutable Hashmap, refactoring it to a immutable one makes more sense. Jianshi -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6292) Add RDD methods to DataFrame to preserve schema
[ https://issues.apache.org/jira/browse/SPARK-6292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14365404#comment-14365404 ] Chen Song commented on SPARK-6292: -- I got it. I think we can expand the RDDApi and create a new interface such as DataFrameApi to give the convenient version methonds. Add RDD methods to DataFrame to preserve schema --- Key: SPARK-6292 URL: https://issues.apache.org/jira/browse/SPARK-6292 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 1.3.0 Reporter: Joseph K. Bradley Users can use RDD methods on DataFrames, but they lose the schema and need to reapply it. For RDD methods which preserve the schema (such as randomSplit), DataFrame should provide versions of those methods which automatically preserve the schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6292) Add RDD methods to DataFrame to preserve schema
[ https://issues.apache.org/jira/browse/SPARK-6292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14365370#comment-14365370 ] Chen Song commented on SPARK-6292: -- Can you explain this JIRA more exactly? Or you can give an example to describe the task. Add RDD methods to DataFrame to preserve schema --- Key: SPARK-6292 URL: https://issues.apache.org/jira/browse/SPARK-6292 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 1.3.0 Reporter: Joseph K. Bradley Users can use RDD methods on DataFrames, but they lose the schema and need to reapply it. For RDD methods which preserve the schema (such as randomSplit), DataFrame should provide versions of those methods which automatically preserve the schema. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6293) SQLContext.implicits should provide automatic conversion for RDD[Row]
[ https://issues.apache.org/jira/browse/SPARK-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14365500#comment-14365500 ] Chen Song commented on SPARK-6293: -- Yeah, this is a problem. Maybe we can rebuild the schema using the exist row when the it doesn't contain schema data. SQLContext.implicits should provide automatic conversion for RDD[Row] - Key: SPARK-6293 URL: https://issues.apache.org/jira/browse/SPARK-6293 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 1.3.0 Reporter: Joseph K. Bradley When a DataFrame is converted to an RDD[Row], it should be easier to convert it back to a DataFrame via toDF. E.g.: {code} val df: DataFrame = myRDD.toDF(col1, col2) // This works for types like RDD[scala.Tuple2[...]] val splits = df.rdd.randomSplit(...) val split0: RDD[Row] = splits(0) val df0 = split0.toDF(col1, col2) // This fails {code} The failure happens because SQLContext.implicits does not provide an automatic conversion for Rows. (It does handle Products, but Row does not implement Product.) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6293) SQLContext.implicits should provide automatic conversion for RDD[Row]
[ https://issues.apache.org/jira/browse/SPARK-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14364340#comment-14364340 ] Chen Song commented on SPARK-6293: -- OK, I have created a pull request https://github.com/apache/spark/pull/5040. I'm the user KAKA1992. SQLContext.implicits should provide automatic conversion for RDD[Row] - Key: SPARK-6293 URL: https://issues.apache.org/jira/browse/SPARK-6293 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 1.3.0 Reporter: Joseph K. Bradley When a DataFrame is converted to an RDD[Row], it should be easier to convert it back to a DataFrame via toDF. E.g.: {code} val df: DataFrame = myRDD.toDF(col1, col2) // This works for types like RDD[scala.Tuple2[...]] val splits = df.rdd.randomSplit(...) val split0: RDD[Row] = splits(0) val df0 = split0.toDF(col1, col2) // This fails {code} The failure happens because SQLContext.implicits does not provide an automatic conversion for Rows. (It does handle Products, but Row does not implement Product.) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6293) SQLContext.implicits should provide automatic conversion for RDD[Row]
[ https://issues.apache.org/jira/browse/SPARK-6293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14362824#comment-14362824 ] Chen Song commented on SPARK-6293: -- The schema is preserved in the row and rdd doesn't contain this. If the rdd is empty, we can't get the schema. And val df0 = split0.toDF(col1, col2) will fail. SQLContext.implicits should provide automatic conversion for RDD[Row] - Key: SPARK-6293 URL: https://issues.apache.org/jira/browse/SPARK-6293 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 1.3.0 Reporter: Joseph K. Bradley When a DataFrame is converted to an RDD[Row], it should be easier to convert it back to a DataFrame via toDF. E.g.: {code} val df: DataFrame = myRDD.toDF(col1, col2) // This works for types like RDD[scala.Tuple2[...]] val splits = df.rdd.randomSplit(...) val split0: RDD[Row] = splits(0) val df0 = split0.toDF(col1, col2) // This fails {code} The failure happens because SQLContext.implicits does not provide an automatic conversion for Rows. (It does handle Products, but Row does not implement Product.) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-3633) Fetches failure observed after SPARK-2711
[ https://issues.apache.org/jira/browse/SPARK-3633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14163859#comment-14163859 ] Chen Song commented on SPARK-3633: -- Looks like we have addressed fetch failure caused by Too many files open. Anyone has more insight on the timeout thing? The timeout happened during the transfer of BufferAckMessage between the sender and receiver. To shed more light on this issue, I turned on DEBUG level logging and it kind of give the trace of life cycle of this event. * On sender host, sending of the message seems healthy. {noformat} 4/09/25 19:59:48 DEBUG ConnectionManager: Before Sending [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] connectionid: sender_host_60072_260 14/09/25 19:59:48 DEBUG ConnectionManager: Sending [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 DEBUG SendingConnection: Added [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to outbox for sending to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 DEBUG SendingConnection: Starting to send [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending chunk from [BufferAckMessage(aid = 582, id = 1503, size = 9601)] to [ConnectionManagerId(receiver_host,52315)] 14/09/25 19:59:48 TRACE SendingConnection: Sending
[jira] [Commented] (SPARK-3633) Fetches failure observed after SPARK-2711
[ https://issues.apache.org/jira/browse/SPARK-3633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14155415#comment-14155415 ] Chen Song commented on SPARK-3633: -- I tried increasing timeout for this property spark.core.connection.ack.wait.timeout. I saw less fetch failures due to ack timeout but they still exist. I also tried relaxing the following properties but none of them seems to help. spark.core.connection.handler.threads.* spark.core.connection.io.threads.* spark.core.connection.connect.threads.* Fetches failure observed after SPARK-2711 - Key: SPARK-3633 URL: https://issues.apache.org/jira/browse/SPARK-3633 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 1.1.0 Reporter: Nishkam Ravi Running a variant of PageRank on a 6-node cluster with a 30Gb input dataset. Recently upgraded to Spark 1.1. The workload fails with the following error message(s): {code} 14/09/19 12:10:38 WARN TaskSetManager: Lost task 51.0 in stage 2.1 (TID 552, c1705.halxg.cloudera.com): FetchFailed(BlockManagerId(1, c1706.halxg.cloudera.com, 49612, 0), shuffleId=3, mapId=75, reduceId=120) 14/09/19 12:10:38 INFO DAGScheduler: Resubmitting failed stages {code} In order to identify the problem, I carried out change set analysis. As I go back in time, the error message changes to: {code} 14/09/21 12:56:54 WARN TaskSetManager: Lost task 35.0 in stage 3.0 (TID 519, c1706.halxg.cloudera.com): java.io.FileNotFoundException: /var/lib/jenkins/workspace/tmp/spark-local-20140921123257-68ee/1c/temp_3a1ade13-b48a-437a-a466-673995304034 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:185) org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:197) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:145) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:51) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) {code} All the way until Aug 4th. Turns out the problem changeset is 4fde28c. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-3633) Fetches failure observed after SPARK-2711
[ https://issues.apache.org/jira/browse/SPARK-3633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14155415#comment-14155415 ] Chen Song edited comment on SPARK-3633 at 10/1/14 8:15 PM: --- I tried increasing timeout for this property spark.core.connection.ack.wait.timeout. I saw less fetch failures due to ack timeout but they still exist. I also tried relaxing the following properties but none of them seems to help. spark.core.connection.handler.threads.* spark.core.connection.io.threads.* spark.core.connection.connect.threads.* My job only runs 500 parallel reduce tasks and I don't see much GC activity on sender/receiver executor JVM when the timeout happens. was (Author: capricornius): I tried increasing timeout for this property spark.core.connection.ack.wait.timeout. I saw less fetch failures due to ack timeout but they still exist. I also tried relaxing the following properties but none of them seems to help. spark.core.connection.handler.threads.* spark.core.connection.io.threads.* spark.core.connection.connect.threads.* Fetches failure observed after SPARK-2711 - Key: SPARK-3633 URL: https://issues.apache.org/jira/browse/SPARK-3633 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 1.1.0 Reporter: Nishkam Ravi Running a variant of PageRank on a 6-node cluster with a 30Gb input dataset. Recently upgraded to Spark 1.1. The workload fails with the following error message(s): {code} 14/09/19 12:10:38 WARN TaskSetManager: Lost task 51.0 in stage 2.1 (TID 552, c1705.halxg.cloudera.com): FetchFailed(BlockManagerId(1, c1706.halxg.cloudera.com, 49612, 0), shuffleId=3, mapId=75, reduceId=120) 14/09/19 12:10:38 INFO DAGScheduler: Resubmitting failed stages {code} In order to identify the problem, I carried out change set analysis. As I go back in time, the error message changes to: {code} 14/09/21 12:56:54 WARN TaskSetManager: Lost task 35.0 in stage 3.0 (TID 519, c1706.halxg.cloudera.com): java.io.FileNotFoundException: /var/lib/jenkins/workspace/tmp/spark-local-20140921123257-68ee/1c/temp_3a1ade13-b48a-437a-a466-673995304034 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:117) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:185) org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:197) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:145) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:51) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) {code} All the way until Aug 4th. Turns out the problem changeset is 4fde28c. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org