OK Thanks On Sat, 27 Jun 2020, 17:36 Sean Owen, <sro...@gmail.com> wrote:
> It does not return a DataFrame. It returns Dataset[Long]. > You do not need to collect(). See my email. > > On Sat, Jun 27, 2020, 11:33 AM Anwar AliKhan <anwaralikhan...@gmail.com> > wrote: > >> So the range function actually returns BigInt (Spark SQL type) >> and the fact Dataset[Long] and printSchema are displaying (toString()) >> Long instead of BigInt needs looking into. >> >> Putting that to one side >> >> My issue with using collect() to get around the casting of elements >> returned >> by range is, I read some literature which says the collect() returns all >> the data to the driver >> and so can likely cause Out Of memory error. >> >> Question: >> Is it correct that collect() behaves that way and can cause Out of memory >> error ? >> >> Obviously it will be better to use .map for casting because then the >> work is being done by workers. >> spark.range(10).map(_.toLong),reduce(_+_) >> <http://www.backbutton.co.uk/> >> >> >> On Sat, 27 Jun 2020, 15:42 Sean Owen, <sro...@gmail.com> wrote: >> >>> There are several confusing things going on here. I think this is part >>> of the explanation, not 100% sure: >>> >>> 'bigint' is the Spark SQL type of an 8-byte long. 'long' is the type >>> of a JVM primitive. Both are the same, conceptually, but represented >>> differently internally as they are logically somewhat different ideas. >>> >>> The first thing I'm not sure about is why the toString of >>> Dataset[Long] reports a 'bigint' and printSchema() reports 'long'. >>> That might be a (cosmetic) bug. >>> >>> Second, in Scala 2.12, its SAM support causes calls to reduce() and >>> other methods, using an Object type, to be ambiguous, because Spark >>> has long since had Java-friendly overloads that support a SAM >>> interface for Java callers. Those weren't removed to avoid breakage, >>> at the cost of having to explicitly tell it what overload you want. >>> (They are equivalent) >>> >>> This is triggered because range() returns java.lang.Longs, not long >>> primitives (i.e. scala.Long). I assume that is to make it versatile >>> enough to use in Java too, and because it's hard to write an overload >>> (would have to rename it) >>> >>> But that means you trigger the SAM overload issue. >>> >>> Anything you do that makes this a Dataset[scala.Long] resolves it, as >>> it is no longer ambiguous (Java-friendly Object-friendly overload does >>> not apply). For example: >>> >>> spark.range(10).map(_.toLong).reduce(_+_) >>> >>> If you collect(), you still have an Array[java.lang.Long]. But Scala >>> implicits and conversions make .reduce(_+_) work fine on that; there >>> is no "Java-friendly" overload in the way. >>> >>> Normally all of this just works and you can ignore these differences. >>> This is a good example of a corner case in which it's inconvenient, >>> because of the old Java-friendly overloads. This is by design though. >>> >>> On Sat, Jun 27, 2020 at 8:29 AM Anwar AliKhan <anwaralikhan...@gmail.com> >>> wrote: >>> > >>> > As you know I have been puzzling over this issue : >>> > How come spark.range(100).reduce(_+_) >>> > worked in earlier spark version but not with the most recent versions. >>> > >>> > well, >>> > >>> > When you first create a dataset, by default the column "id" datatype >>> is [BigInt], >>> > It is a bit like a coin Long on one side and bigint on the other side. >>> > >>> > scala> val myrange = spark.range(1,100) >>> > myrange: org.apache.spark.sql.Dataset[Long] = [id: bigint] >>> > >>> > The Spark framework error message after parsing the reduce(_+_) method >>> confirms this >>> > and moreover stresses its constraints of expecting data type long as >>> parameter argument(s). >>> > >>> > scala> myrange.reduce(_+_) >>> > <console>:26: error: overloaded method value reduce with alternatives: >>> > (func: >>> org.apache.spark.api.java.function.ReduceFunction[java.lang.Long])java.lang.Long >>> <and> >>> > (func: (java.lang.Long, java.lang.Long) => >>> java.lang.Long)java.lang.Long >>> > cannot be applied to ((java.lang.Long, java.lang.Long) => scala.Long) >>> > myrange.reduce(_+_) >>> > ^ >>> > >>> > But if you ask the printSchema method it disagrees with both of the >>> above and says the column "id" data is Long. >>> > scala> range100.printSchema() >>> > root >>> > |-- id: long (nullable = false) >>> > >>> > If I ask the collect() method, the collect() method agrees with >>> printSchema() that the datatype of column "id" is Long and not BigInt. >>> > >>> > scala> range100.collect() >>> > res10: Array[Long] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, >>> 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, >>> 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, >>> 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, >>> 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, >>> 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99) >>> > >>> > To settle the dispute between the methods and get the collect() to >>> "show me the money" I called the collect() to pass its return type to >>> reduce(_+_). >>> > >>> > "Here is the money" >>> > scala> range100.collect().reduce(_+_) >>> > res11: Long = 4950 >>> > >>> > The collect() and printSchema methods could be implying there is no >>> difference between a Long or a BingInt. >>> > >>> > Questions : These return type differentials, are they by design or >>> an oversight bug ? >>> > Questions : Why the change from earlier version to later version ? >>> > Question : Will you be updating the reduce(_+_) method ? >>> > >>> > When it comes to creating a dataset using toDs there is no dispute, >>> > all the methods agree that it is neither a BigInt or a Long but an int >>> even integer. >>> > >>> > scala> val dataset = Seq(1, 2, 3).toDS() >>> > dataset: org.apache.spark.sql.Dataset[Int] = [value: int] >>> > >>> > scala> dataset.collect() >>> > res29: Array[Int] = Array(1, 2, 3) >>> > >>> > scala> dataset.printSchema() >>> > root >>> > |-- value: integer (nullable = false) >>> > >>> > scala> dataset.show() >>> > +-----+ >>> > |value| >>> > +-----+ >>> > | 1| >>> > | 2| >>> > | 3| >>> > +-----+ >>> > >>> > scala> dataset.reduce(_+_) >>> > res7: Int = 6 >>> > >>> >>