[Spark SQL]: DataFrame schema resulting in NullPointerException

2017-11-19 Thread Chitral Verma
Hey,

I'm working on this use case that involves converting DStreams to
Dataframes after some transformations. I've simplified my code into the
following snippet so as to reproduce the error. Also, I've mentioned below
my environment settings.

*Environment:*

Spark Version: 2.2.0
Java: 1.8
Execution mode: local/ IntelliJ


*Code:*

object Tests {

def main(args: Array[String]): Unit = {
val spark: SparkSession =  ...
  import spark.implicits._

val df = List(
("jim", "usa"),
("raj", "india"))
.toDF("name", "country")

df.rdd
  .map(x => x.toSeq)
  .map(x => new GenericRowWithSchema(x.toArray, df.schema))
  .foreach(println)
  }
}


This results in NullPointerException as I'm directly using df.schema in
map().

What I don't understand is that if I use the following code (basically
storing the schema as a value before transforming), it works just fine.

object Tests {

def main(args: Array[String]): Unit = {
val spark: SparkSession =  ...
  import spark.implicits._

val df = List(
("jim", "usa"),
("raj", "india"))
.toDF("name", "country")

val sc = df.schema

df.rdd
  .map(x => x.toSeq)
  .map(x => new GenericRowWithSchema(x.toArray, sc))
  .foreach(println)
  }
}


I wonder why this is happening as *df.rdd* is not an action and there is
visible change in state of dataframe just yet. What are your thoughts on
this?

Regards,
Chitral Verma


Kryo not registered class

2017-11-19 Thread Angel Francisco Orta
Hello, I'm with spark 2.1.0 with scala and I'm registering all classes with
kryo, and I have a  problem registering this class,

org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation[]

I can't register with
classOf[Array[Class.forName("org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation").type]]


I have tried as well creating a java class like register and registering
the class as
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation[].class;

Any clue is appreciatted,

Thanks.


Re: Multiple transformations without recalculating or caching

2017-11-19 Thread Phillip Henry
A back-of-a-beermat calculation says if you have, say, 20 boxes, saving 1TB
should take approximately 15 minutes (with a replication factor of 1 since
you don't need it higher for ephemeral data that is relatively easy to
generate).

This isn't much if the whole job takes hours. You get the added bonus that
you can inspect interim data to help understand how the results came to be.

This worked for us. As ever, YMMV.

Phillip


On 17 Nov 2017 11:12, "Fernando Pereira"  wrote:

> Notice the fact that I have 1+ TB. If I didn't mind things to be slow I
> wouldn't be using spark.
>
> On 17 November 2017 at 11:06, Sebastian Piu 
> wrote:
>
>> If you don't want to recalculate you need to hold the results somewhere,
>> of you need to save it why don't you so that and then read it again and get
>> your stats?
>>
>> On Fri, 17 Nov 2017, 10:03 Fernando Pereira, 
>> wrote:
>>
>>> Dear Spark users
>>>
>>> Is it possible to take the output of a transformation (RDD/Dataframe)
>>> and feed it to two independent transformations without recalculating the
>>> first transformation and without caching the whole dataset?
>>>
>>> Consider the case of a very large dataset (1+TB) which suffered several
>>> transformations and now we want to save it but also calculate some
>>> statistics per group.
>>> So the best processing way would for: for each partition: do task A, do
>>> task B.
>>>
>>> I don't see a way of instructing spark how to proceed that way without
>>> caching to disk, which seems unnecessarily heavy. And if we don't cache
>>> spark recalculates every partition all the way from the beginning. In
>>> either case huge file reads happen.
>>>
>>> Any ideas on how to avoid it?
>>>
>>> Thanks
>>>
>>> Fernando
>>>
>>
>