[Spark SQL]: DataFrame schema resulting in NullPointerException
Hey, I'm working on this use case that involves converting DStreams to Dataframes after some transformations. I've simplified my code into the following snippet so as to reproduce the error. Also, I've mentioned below my environment settings. *Environment:* Spark Version: 2.2.0 Java: 1.8 Execution mode: local/ IntelliJ *Code:* object Tests { def main(args: Array[String]): Unit = { val spark: SparkSession = ... import spark.implicits._ val df = List( ("jim", "usa"), ("raj", "india")) .toDF("name", "country") df.rdd .map(x => x.toSeq) .map(x => new GenericRowWithSchema(x.toArray, df.schema)) .foreach(println) } } This results in NullPointerException as I'm directly using df.schema in map(). What I don't understand is that if I use the following code (basically storing the schema as a value before transforming), it works just fine. object Tests { def main(args: Array[String]): Unit = { val spark: SparkSession = ... import spark.implicits._ val df = List( ("jim", "usa"), ("raj", "india")) .toDF("name", "country") val sc = df.schema df.rdd .map(x => x.toSeq) .map(x => new GenericRowWithSchema(x.toArray, sc)) .foreach(println) } } I wonder why this is happening as *df.rdd* is not an action and there is visible change in state of dataframe just yet. What are your thoughts on this? Regards, Chitral Verma
Kryo not registered class
Hello, I'm with spark 2.1.0 with scala and I'm registering all classes with kryo, and I have a problem registering this class, org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation[] I can't register with classOf[Array[Class.forName("org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation").type]] I have tried as well creating a java class like register and registering the class as org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation[].class; Any clue is appreciatted, Thanks.
Re: Multiple transformations without recalculating or caching
A back-of-a-beermat calculation says if you have, say, 20 boxes, saving 1TB should take approximately 15 minutes (with a replication factor of 1 since you don't need it higher for ephemeral data that is relatively easy to generate). This isn't much if the whole job takes hours. You get the added bonus that you can inspect interim data to help understand how the results came to be. This worked for us. As ever, YMMV. Phillip On 17 Nov 2017 11:12, "Fernando Pereira"wrote: > Notice the fact that I have 1+ TB. If I didn't mind things to be slow I > wouldn't be using spark. > > On 17 November 2017 at 11:06, Sebastian Piu > wrote: > >> If you don't want to recalculate you need to hold the results somewhere, >> of you need to save it why don't you so that and then read it again and get >> your stats? >> >> On Fri, 17 Nov 2017, 10:03 Fernando Pereira, >> wrote: >> >>> Dear Spark users >>> >>> Is it possible to take the output of a transformation (RDD/Dataframe) >>> and feed it to two independent transformations without recalculating the >>> first transformation and without caching the whole dataset? >>> >>> Consider the case of a very large dataset (1+TB) which suffered several >>> transformations and now we want to save it but also calculate some >>> statistics per group. >>> So the best processing way would for: for each partition: do task A, do >>> task B. >>> >>> I don't see a way of instructing spark how to proceed that way without >>> caching to disk, which seems unnecessarily heavy. And if we don't cache >>> spark recalculates every partition all the way from the beginning. In >>> either case huge file reads happen. >>> >>> Any ideas on how to avoid it? >>> >>> Thanks >>> >>> Fernando >>> >> >