Re: Joining a RDD to a Dataframe

2016-05-08 Thread Ashish Dubey
Is there any reason you dont want to convert this - i dont think join b/w
RDD and DF is supported.

On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon 
wrote:

> Hi,
>
> I have a RDD built during a spark streaming job and I'd like to join it to
> a DataFrame (E/S input) to enrich it.
> It seems that I can't join the RDD and the DF without converting first the
> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>
> scala> df
> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
> array>, id: string]
>
> scala> df_input
> res33: org.apache.spark.sql.DataFrame = [id: string]
>
> scala> df_input.collect
> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
>
> I can get ids I want if I know the value to look for in addresses.id
> using :
>
> scala> df.filter(array_contains(df("addresses.id"),
> "idaddress2")).select("id").collect
> res35: Array[org.apache.spark.sql.Row] = Array([], [YY])
>
> However when I try to join df_input and df and to use the previous filter
> as the join condition I get an exception :
>
> scala> df.join(df_input, array_contains(df("adresses.id"),
> df_input("id")))
> java.lang.RuntimeException: Unsupported literal type class
> org.apache.spark.sql.Column id
> at
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
> at
> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
> ...
>
> It seems that array_contains only supports static arguments and does not
> replace a sql.Column by its value.
>
> What's the best way to achieve what I want to do ? (Also speaking in term
> of performance)
>
> Thanks
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: sqlCtx.read.parquet yields lots of small tasks

2016-05-08 Thread Ashish Dubey
I see the behavior - so it always goes with min total tasks possible on
your settings ( num-executors * num-cores ) - however if you use a huge
amount of data then you will see more tasks - that means it has some kind
of lower bound on num-tasks.. It may require some digging. other formats
did not seem to have this issue.

On Sun, May 8, 2016 at 12:10 AM, Johnny W. <jzw.ser...@gmail.com> wrote:

> The file size is very small (< 1M). The stage launches every time i call:
> --
> sqlContext.read.parquet(path_to_file)
>
> These are the parquet specific configurations I set:
> --
> spark.sql.parquet.filterPushdown: true
> spark.sql.parquet.mergeSchema: true
>
> Thanks,
> J.
>
> On Sat, May 7, 2016 at 4:20 PM, Ashish Dubey <ashish@gmail.com> wrote:
>
>> How big is your file and can you also share the code snippet
>>
>>
>> On Saturday, May 7, 2016, Johnny W. <jzw.ser...@gmail.com> wrote:
>>
>>> hi spark-user,
>>>
>>> I am using Spark 1.6.0. When I call sqlCtx.read.parquet to create a
>>> dataframe from a parquet data source with a single parquet file, it yields
>>> a stage with lots of small tasks. It seems the number of tasks depends on
>>> how many executors I have instead of how many parquet files/partitions I
>>> have. Actually, it launches 5 tasks on each executor.
>>>
>>> This behavior is quite strange, and may have potential issue if there is
>>> a slow executor. What is this "parquet" stage for? and why it launches 5
>>> tasks on each executor?
>>>
>>> Thanks,
>>> J.
>>>
>>
>


Re: BlockManager crashing applications

2016-05-08 Thread Ashish Dubey
   1. Caused by: java.io.IOException: Failed to connect to
   ip-10-12-46-235.us-west-2.compute.internal/10.12.46.235:55681
   2. at
   
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
   3. at
   
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
   4. at
   
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
   5. at
   
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
   6. at
   
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
   7. at
   
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)


Above message indicates that there used to be a executor on that address
and by the time other executor was about to read - it did not exist. You
may also be able to confirm ( if this is the case )by looking at spark App
ui - you may find dead executors..

On Sun, May 8, 2016 at 6:02 PM, Brandon White <bwwintheho...@gmail.com>
wrote:

> I'm not quite sure how this is a memory problem. There are no OOM
> exceptions and the job only breaks when actions are ran in parallel,
> submitted to the scheduler by different threads.
>
> The issue is that the doGetRemote function does not retry when it is
> denied access to a cache block.
> On May 8, 2016 5:55 PM, "Ashish Dubey" <ashish@gmail.com> wrote:
>
> Brandon,
>
> how much memory are you giving to your executors - did you check if there
> were dead executors in your application logs.. Most likely you require
> higher memory for executors..
>
> Ashish
>
> On Sun, May 8, 2016 at 1:01 PM, Brandon White <bwwintheho...@gmail.com>
> wrote:
>
>> Hello all,
>>
>> I am running a Spark application which schedules multiple Spark jobs.
>> Something like:
>>
>> val df  = sqlContext.read.parquet("/path/to/file")
>>
>> filterExpressions.par.foreach { expression =>
>>   df.filter(expression).count()
>> }
>>
>> When the block manager fails to fetch a block, it throws an exception
>> which eventually kills the exception: http://pastebin.com/2ggwv68P
>>
>> This code works when I run it on one thread with:
>>
>> filterExpressions.foreach { expression =>
>>   df.filter(expression).count()
>> }
>>
>> But I really need the parallel execution of the jobs. Is there anyway
>> around this? It seems like a bug in the BlockManagers doGetRemote function.
>> I have tried the HTTP Block Manager as well.
>>
>
>


Re: BlockManager crashing applications

2016-05-08 Thread Ashish Dubey
Brandon,

how much memory are you giving to your executors - did you check if there
were dead executors in your application logs.. Most likely you require
higher memory for executors..

Ashish

On Sun, May 8, 2016 at 1:01 PM, Brandon White 
wrote:

> Hello all,
>
> I am running a Spark application which schedules multiple Spark jobs.
> Something like:
>
> val df  = sqlContext.read.parquet("/path/to/file")
>
> filterExpressions.par.foreach { expression =>
>   df.filter(expression).count()
> }
>
> When the block manager fails to fetch a block, it throws an exception
> which eventually kills the exception: http://pastebin.com/2ggwv68P
>
> This code works when I run it on one thread with:
>
> filterExpressions.foreach { expression =>
>   df.filter(expression).count()
> }
>
> But I really need the parallel execution of the jobs. Is there anyway
> around this? It seems like a bug in the BlockManagers doGetRemote function.
> I have tried the HTTP Block Manager as well.
>


Re: Parse Json in Spark

2016-05-08 Thread Ashish Dubey
This limit is due to underlying inputFormat implementation.  you can always
write your own inputFormat and then use spark newAPIHadoopFile api to pass
your inputFormat class path. You will have to place the jar file in /lib
location on all the nodes..

Ashish

On Sun, May 8, 2016 at 4:02 PM, Hyukjin Kwon  wrote:

>
> I remember this Jira, https://issues.apache.org/jira/browse/SPARK-7366.
> Parsing multiple lines are not supported in Json fsta source.
>
> Instead this can be done by sc.wholeTextFiles(). I found some examples
> here,
> http://searchdatascience.com/spark-adventures-1-processing-multi-line-json-files
>
> Although this reads a file as a whole record, this should work.
>
> Thanks!
> On 9 May 2016 7:20 a.m., "KhajaAsmath Mohammed" 
> wrote:
>
>> Hi,
>>
>> I am working on parsing the json in spark but most of the information
>> available online states that  I need to have entire JSON in single line.
>>
>> In my case, Json file is delivered in complex structure and not in a
>> single line. could anyone know how to process this in SPARK.
>>
>> I used Jackson jar to process json and was able to do it when it is
>> present in single line. Any ideas?
>>
>> Thanks,
>> Asmath
>>
>


Re: How to verify if spark is using kryo serializer for shuffle

2016-05-07 Thread Ashish Dubey
Driver maintains the complete metadata of application ( scheduling of
executor and maintaining the messaging to control the execution )
This code seems to be failing in that code path only. With that said there
is Jvm overhead based on num of executors , stages and tasks in your app.
Do you know your driver heap size and application structure ( num of stages
and tasks )

Ashish
On Saturday, May 7, 2016, Nirav Patel  wrote:

> Right but this logs from spark driver and spark driver seems to use Akka.
>
> ERROR [sparkDriver-akka.actor.default-dispatcher-17]
> akka.actor.ActorSystemImpl: Uncaught fatal error from thread
> [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down
> ActorSystem [sparkDriver]
>
> I saw following logs before above happened.
>
> 2016-05-06 09:49:17,813 INFO
> [sparkDriver-akka.actor.default-dispatcher-17]
> org.apache.spark.MapOutputTrackerMasterEndpoint: Asked to send map output
> locations for shuffle 1 to hdn6.xactlycorporation.local:44503
>
>
> As far as I know driver is just driving shuffle operation but not actually
> doing anything within its own system that will cause memory issue. Can you
> explain in what circumstances I could see this error in driver logs? I
> don't do any collect or any other driver operation that would cause this.
> It fails when doing aggregateByKey operation but that should happen in
> executor JVM NOT in driver JVM.
>
>
> Thanks
>
> On Sat, May 7, 2016 at 11:58 AM, Ted Yu  > wrote:
>
>> bq.   at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>
>> It was Akka which uses JavaSerializer
>>
>> Cheers
>>
>> On Sat, May 7, 2016 at 11:13 AM, Nirav Patel > > wrote:
>>
>>> Hi,
>>>
>>> I thought I was using kryo serializer for shuffle.  I could verify it
>>> from spark UI - Environment tab that
>>> spark.serializer org.apache.spark.serializer.KryoSerializer
>>> spark.kryo.registrator
>>> com.myapp.spark.jobs.conf.SparkSerializerRegistrator
>>>
>>>
>>> But when I see following error in Driver logs it looks like spark is
>>> using JavaSerializer
>>>
>>> 2016-05-06 09:49:26,490 ERROR
>>> [sparkDriver-akka.actor.default-dispatcher-17] akka.actor.ActorSystemImpl:
>>> Uncaught fatal error from thread
>>> [sparkDriver-akka.remote.default-remote-dispatcher-6] shutting down
>>> ActorSystem [sparkDriver]
>>>
>>> java.lang.OutOfMemoryError: Java heap space
>>>
>>> at java.util.Arrays.copyOf(Arrays.java:2271)
>>>
>>> at
>>> java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>>>
>>> at
>>> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>>>
>>> at
>>> java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>>>
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
>>>
>>> at
>>> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
>>>
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
>>>
>>> at
>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>
>>> at
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129)
>>>
>>> at
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>>
>>> at
>>> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129)
>>>
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>
>>> at
>>> akka.serialization.JavaSerializer.toBinary(Serializer.scala:129)
>>>
>>> at
>>> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>>>
>>> at
>>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>>>
>>> at
>>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>>>
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>
>>> at
>>> akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
>>>
>>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
>>>
>>> at
>>> akka.remote.EndpointWriter$$anonfun$2.applyOrElse(Endpoint.scala:718)
>>>
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>>
>>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
>>>
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>>
>>> at
>>> 

Re: sqlCtx.read.parquet yields lots of small tasks

2016-05-07 Thread Ashish Dubey
How big is your file and can you also share the code snippet

On Saturday, May 7, 2016, Johnny W.  wrote:

> hi spark-user,
>
> I am using Spark 1.6.0. When I call sqlCtx.read.parquet to create a
> dataframe from a parquet data source with a single parquet file, it yields
> a stage with lots of small tasks. It seems the number of tasks depends on
> how many executors I have instead of how many parquet files/partitions I
> have. Actually, it launches 5 tasks on each executor.
>
> This behavior is quite strange, and may have potential issue if there is a
> slow executor. What is this "parquet" stage for? and why it launches 5
> tasks on each executor?
>
> Thanks,
> J.
>