Re: Kafka directsream receiving rate

2016-02-06 Thread Diwakar Dhanuskodi
Thanks  Cody for  trying to  understand the  issue .
Sorry if  I am  not  clear .
The scenario  is  to  process all messages at once  in  single  dstream block  
when  source  system  publishes messages .Source  system  will  publish x 
messages  / 10 minutes  once.

By events I meant that  total no of messages processed by each batch interval  
( in my case 2000ms)   by executor ( web UI shows each block  processing  as  
events)  

DirectStream is processing only  10 messages per batch. It  is  same if  100 or 
 1 million  messages published. 

xyz topic having 20 partitions. 
I am  using  kafka producer api to publish messages. 
Below  is  the  code  that  I am  using 
{
val topics = "xyz"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)
k. foreachRDD { rdd =>
val dstreamToRDD =rdd.cache ()
println (current time & dtreamToRDD.partitions.length.)
val accTran  = dstream To RDD. filter { ...}
accTran.map {...}
}
ssc.start ()
ssc.awaitTermination
}


 }

I tried using DirectStream with map which  I had  issue with  
offsetRange . After  your  suggestion  offset  issue  is  resolved  when  I 
used  above  DirectStream code with topic only.

spark-submit setting that  I am  using  is  in  the  mail  chain  below .
Is  there  any bottlebeck I am  hitting to  process maximum messages at one 
batch interval using  directsream rdd?  .
If  this  is  not  clear . I would  take  this  offline  and  explain   
scenario briefly. 

Sent from Samsung Mobile.







 Original message From: Cody Koeninger 
 Date:06/02/2016  22:32  (GMT+05:30) 
To: Diwakar Dhanuskodi  Cc: 
user@spark.apache.org Subject: Re: Kafka directsream receiving rate 

I am not at all clear on what you are saying.

"Yes , I am  printing  each  messages .  It is  processing all  messages under 
each  dstream block."  If it is processing all messages, what is the problem 
you are having?

"The issue is  with  Directsream processing 10 message per event. "  What 
distinction are you making between a message and an event?

"I am  expecting  Directsream to  process  1 million messages"   Your first 
email said you were publishing 100 messages but only processing 10.  Why are 
you now trying to process 1 million messages without understanding what is 
going on?  Make sure you can process a limited number of messages correctly 
first.  The first code examples you posted to the list had some pretty serious 
errors (ie only trying to process 1 partition, trying to process offsets that 
didn't exist).  Make sure that is all fixed first.

To be clear, I use direct kakfa rdds to process batches with like 4gb of 
messages per partition, you shouldn't be hitting some kind of limit with 1 
million messages per batch.  You may of course hit executor resource issues 
depending on what you're trying to do with each message, but that doesn't sound 
like the case here.

If you want help, either clarify what you are saying, or post a minimal 
reproducible code example, with expected output vs actual output.






On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi 
 wrote:
Cody, 
Yes , I am  printing  each  messages . It is  processing all  messages under 
each  dstream block.

Source systems are   publishing  1 Million messages /4 secs which is less than 
batch interval. The issue is  with  Directsream processing 10 message per 
event. When partitions were  increased to  20 in topic, DirectStream picksup 
only 200 messages ( I guess 10 for  each partition ) at a time for  processing 
. I have  16 executors running for  streaming ( both  yarn client & cluster 
mode). 
I am  expecting  Directsream to  process  1 million messages which  published 
in topic < batch interval . 

Using  createStream , It could  batch 150K messages and process . createStream 
is  better than  Directsream in  this  case . Again why only  150K.

Any  clarification is  much  appreciated  on directStream processing millions 
per batch .




Sent from Samsung Mobile.


 Original message 
From: Cody Koeninger 
Date:06/02/2016 01:30 (GMT+05:30)
To: Diwakar Dhanuskodi 
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

Have you tried just printing each message, to see which ones are being 
processed?

On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi 
 wrote:
I am  able  to  see  no of  messages processed  per  event  in  sparkstreaming 
web UI . Also  I am  counting  the  messages inside  foreachRDD .
Removed  the  settings for  backpressure but still  the  same .





Sent from Samsung Mobile.


 Original message 
From: Cody Koeninger 
Date:06/02/2016 00:33 

Fwd: Question on how to access tuple values in spark

2016-02-06 Thread mdkhajaasmath

> Hi,
> 
> My req is to find max value of revenue per customer so I am using below 
> query. I got this solution from one of tutorial in google but not able to 
> understand how it returns max in this scenario. can anyone hep
> 
> revenuePerDayPerCustomerMap.reduceByKey((x, y) => (if(x._2 >= y._2) x else y))
> 
>  ((2013-12-27 00:00:00.0),(62962,199.98))
>  ((2013-12-27 00:00:00.0),(62962),299.98))
> 
> 
> why doesn't the below statement work to get max?
> 
> x._1>=y._1 ? btw, what is value of x._1,x._2,y._1,y._2 in this scenario. 
> 
> Thanks and waiting for your responses.
> 
> Regards,
> Asmath

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Question on how to access tuple values in spark

2016-02-06 Thread mdkhajaasmath


Sent from my iPhone

> On Feb 6, 2016, at 4:41 PM, KhajaAsmath Mohammed  
> wrote:
> 
> Hi,
> 
> My req is to find max value of revenue per customer so I am using below 
> query. I got this solution from one of tutorial in google but not able to 
> understand how it returns max in this scenario. can anyone hep
> 
> revenuePerDayPerCustomerMap.reduceByKey((x, y) => (if(x._2 >= y._2) x else y))
> 
>  ((2013-12-27 00:00:00.0),(62962,199.98))
>  ((2013-12-27 00:00:00.0),(62962),299.98))
> 
> 
> why doesn't the below statement work to get max?
> 
> x._1>=y._1 ? btw, what is value of x._1,x._2,y._1,y._2 in this scenario. 
> 
> Thanks and waiting for your responses.
> 
> Regards,
> Asmath

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Writing to jdbc database from SparkR (1.5.2)

2016-02-06 Thread Andrew Holway
I'm managing to read data via JDBC using the following but I can't work out
how to write something back to the Database.


df <- read.df(sqlContext, source="jdbc",
url="jdbc:mysql://hostname:3306?user=user=pass",
dbtable="database.table")


Does this functionality exist in 1.5.2?


Thanks,


Andrew


Re: Writing to jdbc database from SparkR (1.5.2)

2016-02-06 Thread Andrew Holway
>
> df <- read.df(sqlContext, source="jdbc",
> url="jdbc:mysql://hostname:3306?user=user=pass",
> dbtable="database.table")
>

I got a bit further but am now getting the following error. This error is
being thrown without the database being touched. I tested this by making
the database unavailable.

> write.df(fooframe, path="NULL", source="jdbc", url="jdbc:mysql://
database.foo.eu-west-1.rds.amazonaws.com:3306?user=user=pass",
dbtable="db.table", mode="append")

16/02/06 19:05:43 ERROR RBackendHandler: save on 2 failed

Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :

  java.lang.RuntimeException:
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not
allow create table as select.

at scala.sys.package$.error(package.scala:27)

at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:200)

at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)

at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1855)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)

at
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)

at
org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)

at io.netty.channel.SimpleChannelIn


Re: Help needed in deleting a message posted in Spark User List

2016-02-06 Thread Corey Nolet
The whole purpose of Apache mailing lists is that the messages get indexed
all over the web so that discussions and questions/solutions can be
searched easily by google and other engines.

For this reason, and the messages being sent via email as Steve pointed
out, it's just not possible to retract the messages.

On Sat, Feb 6, 2016 at 10:21 AM, Steve Loughran 
wrote:

>
> > On 5 Feb 2016, at 17:35, Marcelo Vanzin  wrote:
> >
> > You don't... just send a new one.
> >
> > On Fri, Feb 5, 2016 at 9:33 AM, swetha kasireddy
> >  wrote:
> >> Hi,
> >>
> >> I want to edit/delete a message posted in Spark User List. How do I do
> that?
> >>
> >> Thanks!
> >
> >
> >
>
> it isn't technically possible
>
> http://apache.org/foundation/public-archives.html
>
> People do occasionally ask on the infrastructure mailing list to do do
> this, but they aren't in a position to do anything about the copies that
> end up in the mailboxes of every subscriber.
>
> Don't worry about it; we've all done things like post internal stack
> traces, accidentally mail the wrong list, etc, etc.
>
> Now, accidentally breaking the nightly build of everything, that's
> somewhat embarrassing —but you haven't done that and it's been ~4 months
> since I've done that myself.
>
>
> -Steve


Spark Streaming with Druid?

2016-02-06 Thread unk1102
Hi did anybody tried Spark Streaming with Druid as low latency store?
Combination seems powerful is it worth trying both together? Please guide
and share your experience. I am after creating the best low latency
streaming analytics.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Druid-tp26164.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help needed in deleting a message posted in Spark User List

2016-02-06 Thread Steve Loughran

> On 5 Feb 2016, at 17:35, Marcelo Vanzin  wrote:
> 
> You don't... just send a new one.
> 
> On Fri, Feb 5, 2016 at 9:33 AM, swetha kasireddy
>  wrote:
>> Hi,
>> 
>> I want to edit/delete a message posted in Spark User List. How do I do that?
>> 
>> Thanks!
> 
> 
> 

it isn't technically possible

http://apache.org/foundation/public-archives.html

People do occasionally ask on the infrastructure mailing list to do do this, 
but they aren't in a position to do anything about the copies that end up in 
the mailboxes of every subscriber.

Don't worry about it; we've all done things like post internal stack traces, 
accidentally mail the wrong list, etc, etc. 

Now, accidentally breaking the nightly build of everything, that's somewhat 
embarrassing —but you haven't done that and it's been ~4 months since I've done 
that myself.


-Steve

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-06 Thread Udo Fholl
Sorry I realized that I left a bit of the last email.

This is the only BLOCKED thread in the dump. Refence handler is blocked
most likely due to the GC running at the moment of the dump.

"Reference Handler" daemon prio=10 tid=2 BLOCKED
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
  at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)


On Fri, Feb 5, 2016 at 10:44 AM, Udo Fholl  wrote:

> It does not look like. Here is the output of "grep -A2 -i waiting
> spark_tdump.log"
>
> "RMI TCP Connection(idle)" daemon prio=5 tid=156 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "task-result-getter-1" daemon prio=5 tid=101 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "BLOCK_MANAGER cleanup timer" daemon prio=5 tid=46 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "context-cleaner-periodic-gc" daemon prio=5 tid=69 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "qtp512934838-58" daemon prio=5 tid=58 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "dispatcher-event-loop-3" daemon prio=5 tid=22 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "RMI TCP Connection(idle)" daemon prio=5 tid=150 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "submit-job-thread-pool-0" daemon prio=5 tid=83 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "cw-metrics-publisher" daemon prio=5 tid=90 TIMED_WAITING
>   at java.lang.Object.wait(Native Method)
>   at
> com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:136)
> --
> "qtp512934838-57" daemon prio=5 tid=57 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-19" daemon
> prio=5 tid=193 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> --
> "dispatcher-event-loop-2" daemon prio=5 tid=21 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "qtp512934838-56" daemon prio=5 tid=56 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "BROADCAST_VARS cleanup timer" daemon prio=5 tid=47 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "pool-1-thread-1" prio=5 tid=16 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "dispatcher-event-loop-0" daemon prio=5 tid=19 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "RecurringTimer - Kinesis Checkpointer - Worker
> localhost:7b412e3a-f7c8-466d-90f1-deaad8656884" daemon prio=5 tid=89
> TIMED_WAITING
>   at java.lang.Thread.sleep(Native Method)
>   at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
> --
> "qtp512934838-55" daemon prio=5 tid=55 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "Executor task launch worker-0" daemon prio=5 tid=84 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "qtp512934838-54" daemon prio=5 tid=54 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "pool-28-thread-1" prio=5 tid=92 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-18" daemon
> prio=5 tid=185 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at
> scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
> --
> "Spark Context Cleaner" daemon prio=5 tid=68 TIMED_WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> --
> "qtp512934838-53" daemon prio=5 tid=53 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at 

Re: Kafka directsream receiving rate

2016-02-06 Thread Cody Koeninger
I am not at all clear on what you are saying.

"Yes , I am  printing  each  messages .  It is  processing all  messages
under each  dstream block."  If it is processing all messages, what is the
problem you are having?

"The issue is  with  Directsream processing 10 message per event. "  What
distinction are you making between a message and an event?

"I am  expecting  Directsream to  process  1 million messages"   Your first
email said you were publishing 100 messages but only processing 10.  Why
are you now trying to process 1 million messages without understanding what
is going on?  Make sure you can process a limited number of messages
correctly first.  The first code examples you posted to the list had some
pretty serious errors (ie only trying to process 1 partition, trying to
process offsets that didn't exist).  Make sure that is all fixed first.

To be clear, I use direct kakfa rdds to process batches with like 4gb of
messages per partition, you shouldn't be hitting some kind of limit with 1
million messages per batch.  You may of course hit executor resource issues
depending on what you're trying to do with each message, but that doesn't
sound like the case here.

If you want help, either clarify what you are saying, or post a minimal
reproducible code example, with expected output vs actual output.






On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> Cody,
> Yes , I am  printing  each  messages . It is  processing all  messages
> under each  dstream block.
>
> Source systems are   publishing  1 Million messages /4 secs which is less
> than batch interval. The issue is  with  Directsream processing 10 message
> per event. When partitions were  increased to  20 in topic, DirectStream
> picksup only 200 messages ( I guess 10 for  each partition ) at a time for
>  processing . I have  16 executors running for  streaming ( both  yarn
> client & cluster mode).
> I am  expecting  Directsream to  process  1 million messages which
>  published in topic < batch interval .
>
> Using  createStream , It could  batch 150K messages and process .
> createStream is  better than  Directsream in  this  case . Again why only
>  150K.
>
> Any  clarification is  much  appreciated  on directStream processing
> millions per batch .
>
>
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: Cody Koeninger 
> Date:06/02/2016 01:30 (GMT+05:30)
> To: Diwakar Dhanuskodi 
> Cc: user@spark.apache.org
> Subject: Re: Kafka directsream receiving rate
>
> Have you tried just printing each message, to see which ones are being
> processed?
>
> On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
>
>> I am  able  to  see  no of  messages processed  per  event  in
>>  sparkstreaming web UI . Also  I am  counting  the  messages inside
>>  foreachRDD .
>> Removed  the  settings for  backpressure but still  the  same .
>>
>>
>>
>>
>>
>> Sent from Samsung Mobile.
>>
>>
>>  Original message 
>> From: Cody Koeninger 
>> Date:06/02/2016 00:33 (GMT+05:30)
>> To: Diwakar Dhanuskodi 
>> Cc: user@spark.apache.org
>> Subject: Re: Kafka directsream receiving rate
>>
>> How are you counting the number of messages?
>>
>> I'd go ahead and remove the settings for backpressure and
>> maxrateperpartition, just to eliminate that as a variable.
>>
>> On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi <
>> diwakar.dhanusk...@gmail.com> wrote:
>>
>>> I am  using  one  directsream. Below  is  the  call  to directsream:-
>>>
>>> val topicSet = topics.split(",").toSet
>>> val kafkaParams = Map[String,String]("bootstrap.servers" -> "
>>> datanode4.isdp.com:9092")
>>> val k =
>>> KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,
>>> kafkaParams, topicSet)
>>>
>>> When  I replace   DirectStream call  to  createStream,  all  messages
>>> were  read  by  one  Dstream block.:-
>>> val k = KafkaUtils.createStream(ssc, 
>>> "datanode4.isdp.com:2181","resp",topicMap
>>> ,StorageLevel.MEMORY_ONLY)
>>>
>>> I am  using   below  spark-submit to execute:
>>> ./spark-submit --master yarn-client --conf
>>> "spark.dynamicAllocation.enabled=true" --conf
>>> "spark.shuffle.service.enabled=true" --conf
>>> "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf
>>> "spark.sql.unsafe.enabled=false" --conf
>>> "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s"
>>> --conf "spark.shuffle.consolidateFiles=true"   --conf
>>> "spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g
>>> --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files
>>> /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
>>> --jars
>>> 

Re: Shuffle memory woes

2016-02-06 Thread Corey Nolet
Igor,

Thank you for the response but unfortunately, the problem I'm referring to
goes beyond this. I have set the shuffle memory fraction to be 90% and set
the cache memory to be 0. Repartitioning the RDD helped a tad on the map
side but didn't do much for the spilling when there was no longer any
memory left for the shuffle. Also the new auto-memory management doesn't
seem like it'll have too much of an effect after i've already given most
the memory i've allocated to the shuffle. The problem I'm having is most
specifically related to the shuffle performing declining by several orders
of magnitude when it needs to spill multiple times (it ends up spilling
several hundred for me when it can't fit stuff into memory).



On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman  wrote:

> Hi,
> usually you can solve this by 2 steps
> make rdd to have more partitions
> play with shuffle memory fraction
>
> in spark 1.6 cache vs shuffle memory fractions are adjusted automatically
>
> On 5 February 2016 at 23:07, Corey Nolet  wrote:
>
>> I just recently had a discovery that my jobs were taking several hours to
>> completely because of excess shuffle spills. What I found was that when I
>> hit the high point where I didn't have enough memory for the shuffles to
>> store all of their file consolidations at once, it could spill so many
>> times that it causes my job's runtime to increase by orders of magnitude
>> (and sometimes fail altogether).
>>
>> I've played with all the tuning parameters I can find. To speed the
>> shuffles up, I tuned the akka threads to different values. I also tuned the
>> shuffle buffering a tad (both up and down).
>>
>> I feel like I see a weak point here. The mappers are sharing memory space
>> with reducers and the shuffles need enough memory to consolidate and pull
>> otherwise they will need to spill and spill and spill. What i've noticed
>> about my jobs is that this is a difference between them taking 30 minutes
>> and 4 hours or more. Same job- just different memory tuning.
>>
>> I've found that, as a result of the spilling, I'm better off not caching
>> any data in memory and lowering my storage fraction to 0 and still hoping I
>> was able to give my shuffles enough memory that my data doesn't
>> continuously spill. Is this the way it's supposed to be? It makes it hard
>> because it seems like it forces the memory limits on my job- otherwise it
>> could take orders of magnitude longer to execute.
>>
>>
>


Re: Slowness in Kmeans calculating fastSquaredDistance

2016-02-06 Thread Li Ming Tsai
Hi,


I did more investigation and found out that BLAS.scala is calling the native 
reference architecture (f2jblas) for level 1 routines.


I even patched it to use nativeBlas.ddot but it has no material impact.


https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala#L126


private def dot(x: DenseVector, y: DenseVector): Double = {

val n = x.size

f2jBLAS.ddot(n, x.values, 1, y.values, 1)

  }


Maybe Xiangrui can comment on this?




From: Li Ming Tsai 
Sent: Friday, February 5, 2016 10:56 AM
To: user@spark.apache.org
Subject: Slowness in Kmeans calculating fastSquaredDistance


Hi,


I'm using INTEL MKL on Spark 1.6.0 which I built myself with the -Pnetlib-lgpl 
flag.


I am using spark local[4] mode and I run it like this:
# export LD_LIBRARY_PATH=/opt/intel/lib/intel64:/opt/intel/mkl/lib/intel64
# bin/spark-shell ...

I have also added the following to /opt/intel/mkl/lib/intel64:
lrwxrwxrwx 1 root root12 Feb  1 09:18 libblas.so -> libmkl_rt.so
lrwxrwxrwx 1 root root12 Feb  1 09:18 libblas.so.3 -> libmkl_rt.so
lrwxrwxrwx 1 root root12 Feb  1 09:18 liblapack.so -> libmkl_rt.so
lrwxrwxrwx 1 root root12 Feb  1 09:18 liblapack.so.3 -> libmkl_rt.so


I believe (???) that I'm using Intel MKL because the warnings went away:

16/02/01 07:49:38 WARN BLAS: Failed to load implementation from: 
com.github.fommil.netlib.NativeSystemBLAS

16/02/01 07:49:38 WARN BLAS: Failed to load implementation from: 
com.github.fommil.netlib.NativeRefBLAS

After collectAsMap, there is no progress but I can observe that only 1 CPU is 
being utilised with the following stack trace:

"ForkJoinPool-3-worker-7" #130 daemon prio=5 os_prio=0 tid=0x7fbf30ab6000 
nid=0xbdc runnable [0x7fbf12205000]

   java.lang.Thread.State: RUNNABLE

at com.github.fommil.netlib.F2jBLAS.ddot(F2jBLAS.java:71)

at org.apache.spark.mllib.linalg.BLAS$.dot(BLAS.scala:128)

at org.apache.spark.mllib.linalg.BLAS$.dot(BLAS.scala:111)

at 
org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:349)

at 
org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:587)

at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:561)

at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:555)


This last few steps takes more than half of the total time for a 1Mx100 dataset.


The code is just:

val clusters = KMeans.train(parsedData, 1000, 1)


Shouldn't it utilising all the cores for the dot product? Is this a 
misconfiguration?


Thanks!




Apache Spark data locality when integrating with Kafka

2016-02-06 Thread fanooos
Dears

If I will use Kafka as a streaming source to some spark jobs, is it advised
to install spark to the same nodes of kafka cluster? 

What are the benefits and drawbacks of such a decision? 

regards 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Apache Spark data locality when integrating with Kafka

2016-02-06 Thread Diwakar Dhanuskodi
Yes . To  reduce  network  latency .


Sent from Samsung Mobile.

 Original message From: fanooos 
 Date:07/02/2016  09:24  (GMT+05:30) 
To: user@spark.apache.org Cc:  Subject: Apache 
Spark data locality when integrating with Kafka 
Dears

If I will use Kafka as a streaming source to some spark jobs, is it advised
to install spark to the same nodes of kafka cluster? 

What are the benefits and drawbacks of such a decision? 

regards 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Apache Spark data locality when integrating with Kafka

2016-02-06 Thread Koert Kuipers
spark can benefit from data locality and will try to launch tasks on the
node where the kafka partition resides.

however i think in production many organizations run a dedicated kafka
cluster.

On Sat, Feb 6, 2016 at 11:27 PM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> Yes . To  reduce  network  latency .
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: fanooos 
> Date:07/02/2016 09:24 (GMT+05:30)
> To: user@spark.apache.org
> Cc:
> Subject: Apache Spark data locality when integrating with Kafka
>
> Dears
>
> If I will use Kafka as a streaming source to some spark jobs, is it advised
> to install spark to the same nodes of kafka cluster?
>
> What are the benefits and drawbacks of such a decision?
>
> regards
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Imported CSV file content isn't identical to the original file

2016-02-06 Thread SLiZn Liu
Hi Spark Users Group,

I have a csv file to analysis with Spark, but I’m troubling with importing
as DataFrame.

Here’s the minimal reproducible example. Suppose I’m having a
*10(rows)x2(cols)* *space-delimited csv* file, shown as below:

1446566430 2015-11-0400:00:30
1446566430 2015-11-0400:00:30
1446566430 2015-11-0400:00:30
1446566430 2015-11-0400:00:30
1446566430 2015-11-0400:00:30
1446566431 2015-11-0400:00:31
1446566431 2015-11-0400:00:31
1446566431 2015-11-0400:00:31
1446566431 2015-11-0400:00:31
1446566431 2015-11-0400:00:31

the  in column 2 represents sub-delimiter within that column, and this
file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv

I’m using *spark-csv* to import this file as Spark *DataFrame*:

sqlContext.read.format("com.databricks.spark.csv")
.option("header", "false") // Use first line of all files as header
.option("inferSchema", "false") // Automatically infer data types
.option("delimiter", " ")
.load("hdfs:///tmp/1.csv")
.show

Oddly, the output shows only a part of each column:

[image: Screenshot from 2016-02-07 15-27-51.png]

and even the boundary of the table wasn’t shown correctly. I also used the
other way to read csv file, by sc.textFile(...).map(_.split(" ")) and
sqlContext.createDataFrame, and the result is the same. Can someone point
me out where I did it wrong?

—
BR,
Todd Leo
​


Re: Bad Digest error while doing aws s3 put

2016-02-06 Thread Dhimant
Hi , I am getting the following error while reading the huge data from S3 and
after processing ,writing data to S3 again.

Did you find any solution for this ?

16/02/07 07:41:59 WARN scheduler.TaskSetManager: Lost task 144.2 in stage
3.0 (TID 169, ip-172-31-7-26.us-west-2.compute.internal):
java.io.IOException: exception in uploadSinglePart
at
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadSinglePart(MultipartUploadOutputStream.java:248)
at
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.close(MultipartUploadOutputStream.java:469)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:105)
at
org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:106)
at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
at
org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:109)
at
org.apache.spark.SparkHadoopWriter.close(SparkHadoopWriter.scala:102)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1080)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: exception in putObject
at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.storeFile(Jets3tNativeFileSystemStore.java:149)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy26.storeFile(Unknown Source)
at
com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadSinglePart(MultipartUploadOutputStream.java:245)
... 15 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The
Content-MD5 you specified did not match what we received. (Service: Amazon
S3; Status Code: 400; Error Code: BadDigest; Request ID: 5918216A5901FCC8),
S3 Extended Request ID:
QSxtYln/yXqHYpdr4BWosin/TAFsGlK1FlKfE5PcuJkNrgoblGzTNt74kEhuNcrJCRZ3mXq0oUo=
at
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
at
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
at
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
at
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3796)
at
com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1482)
at
com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.storeFile(Jets3tNativeFileSystemStore.java:140)
... 22 more





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bad-Digest-error-while-doing-aws-s3-put-tp10036p26167.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: different behavior while using createDataFrame and read.df in SparkR

2016-02-06 Thread Devesh Raj Singh
Thank you ! Rui Sun for the observation! It helped.

I have a new problem arising. When I create a small function for dummy
variable creation for categorical column

BDADummies<-function(dataframe,column){
  cat.column<-vector(mode="character",length=nrow(dataframe))
  cat.column<-collect(column)
  lev<-length(levels(as.factor(unlist(cat.column
  for (j in 1:lev){



dummy.df<-withColumn(dataframe,paste0(colnames(cat.column),j),ifelse(column[[1]]==levels(as.factor(unlist(cat.column)))[j],1,0)
)


dataframe<-dummy.df
}
  return(dataframe)
}

*and when I call the function using*

newdummy.df<-BDADummies(df1,column=select(df1,df1$Species))


I get the below error

Error in withColumn(dataframe, paste0(colnames(cat.column), j),
ifelse(column[[1]] ==  :
  error in evaluating the argument 'col' in selecting a method for function
'withColumn': Error in if (le > 0) paste0("[1:", paste(le), "]") else "(0)"
:
  argument is not interpretable as logical


*but when i use it without calling or creating a function , the statement *

dummy.df<-withColumn(dataframe,paste0(colnames(cat.column),j),ifelse(column[[1]]==levels(as.factor(unlist(cat.column)))[j],1,0)
)

gives me the new columns generating column names as desired.

Warm regards,
Devesh.

On Sat, Feb 6, 2016 at 7:09 AM, Sun, Rui  wrote:

> I guess this is related to
> https://issues.apache.org/jira/browse/SPARK-11976
>
>
>
> When calling createDataFrame on iris, the “.” Character in column names
> will be replaced with “_”.
>
> It seems that when you create a DataFrame from the CSV file, the “.”
> Character in column names are still there.
>
>
>
> *From:* Devesh Raj Singh [mailto:raj.deves...@gmail.com]
> *Sent:* Friday, February 5, 2016 2:44 PM
> *To:* user@spark.apache.org
> *Cc:* Sun, Rui
> *Subject:* different behavior while using createDataFrame and read.df in
> SparkR
>
>
>
>
> Hi,
>
>
>
> I am using Spark 1.5.1
>
>
>
> When I do this
>
>
>
> df <- createDataFrame(sqlContext, iris)
>
>
>
> #creating a new column for category "Setosa"
>
>
>
> df$Species1<-ifelse((df)[[5]]=="setosa",1,0)
>
>
>
> head(df)
>
>
>
> output: new column created
>
>
>
>   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
>
> 1  5.1 3.5  1.4 0.2  setosa
>
> 2  4.9 3.0  1.4 0.2  setosa
>
> 3  4.7 3.2  1.3 0.2  setosa
>
> 4  4.6 3.1  1.5 0.2  setosa
>
> 5  5.0 3.6  1.4 0.2  setosa
>
> 6  5.4 3.9  1.7 0.4  setosa
>
>
>
> *but when I saved the iris dataset as a CSV file and try to read it and
> convert it to sparkR dataframe*
>
>
>
> df <-
> read.df(sqlContext,"/Users/devesh/Github/deveshgit2/bdaml/data/iris/",
>
>   source = "com.databricks.spark.csv",header =
> "true",inferSchema = "true")
>
>
>
> now when I try to create new column
>
>
>
> df$Species1<-ifelse((df)[[5]]=="setosa",1,0)
>
> I get the below error:
>
>
>
> 16/02/05 12:11:01 ERROR RBackendHandler: col on 922 failed
>
> Error in select(x, x$"*", alias(col, colName)) :
>
>   error in evaluating the argument 'col' in selecting a method for
> function 'select': Error in invokeJava(isStatic = FALSE, objId$id,
> methodName, ...) :
>
>   org.apache.spark.sql.AnalysisException: Cannot resolve column name
> "Sepal.Length" among (Sepal.Length, Sepal.Width, Petal.Length, Petal.Width,
> Species);
>
> at org.apache.spark.s
>
> --
>
> Warm regards,
>
> Devesh.
>



-- 
Warm regards,
Devesh.


Re: Kafka directsream receiving rate

2016-02-06 Thread Diwakar Dhanuskodi
Cody, 
Yes , I am  printing  each  messages . It is  processing all  messages under 
each  dstream block.

Source systems are   publishing  1 Million messages /4 secs which is less than 
batch interval. The issue is  with  Directsream processing 10 message per 
event. When partitions were  increased to  20 in topic, DirectStream picksup 
only 200 messages ( I guess 10 for  each partition ) at a time for  processing 
. I have  16 executors running for  streaming ( both  yarn client & cluster 
mode). 
I am  expecting  Directsream to  process  1 million messages which  published 
in topic < batch interval . 

Using  createStream , It could  batch 150K messages and process . createStream 
is  better than  Directsream in  this  case . Again why only  150K.

Any  clarification is  much  appreciated  on directStream processing millions 
per batch .




Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
 Date:06/02/2016  01:30  (GMT+05:30) 
To: Diwakar Dhanuskodi  Cc: 
user@spark.apache.org Subject: Re: Kafka directsream receiving rate 

Have you tried just printing each message, to see which ones are being 
processed?

On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi 
 wrote:
I am  able  to  see  no of  messages processed  per  event  in  sparkstreaming 
web UI . Also  I am  counting  the  messages inside  foreachRDD .
Removed  the  settings for  backpressure but still  the  same .





Sent from Samsung Mobile.


 Original message 
From: Cody Koeninger 
Date:06/02/2016 00:33 (GMT+05:30)
To: Diwakar Dhanuskodi 
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

How are you counting the number of messages?

I'd go ahead and remove the settings for backpressure and maxrateperpartition, 
just to eliminate that as a variable.

On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi 
 wrote:
I am  using  one  directsream. Below  is  the  call  to directsream:-

val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)

When  I replace   DirectStream call  to  createStream,  all  messages were  
read  by  one  Dstream block.:-
val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap 
,StorageLevel.MEMORY_ONLY)

I am  using   below  spark-submit to execute:
./spark-submit --master yarn-client --conf 
"spark.dynamicAllocation.enabled=true" --conf 
"spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" 
--conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf 
"spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" 
--conf "spark.shuffle.consolidateFiles=true"   --conf 
"spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g 
--executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files 
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
 --jars 
/root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
 /root/Jars/sparkreceiver.jar




Sent from Samsung Mobile.


 Original message 
From: Cody Koeninger 
Date:05/02/2016 22:07 (GMT+05:30)
To: Diwakar Dhanuskodi 
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

If you're using the direct stream, you have 0 receivers.  Do you mean you have 
1 executor?

Can you post the relevant call to createDirectStream from your code, as well as 
any relevant spark configuration?

On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi 
 wrote:
Adding more info

Batch  interval  is  2000ms.
I expect all 100 messages  go thru one  dstream from  directsream but it 
receives at rate of 10 messages at time. Am  I missing  some  configurations 
here. Any help appreciated. 

Regards 
Diwakar.


Sent from Samsung Mobile.


 Original message 
From: Diwakar Dhanuskodi 
Date:05/02/2016 07:33 (GMT+05:30)
To: user@spark.apache.org
Cc:
Subject: Kafka directsream receiving rate

Hi,
Using spark 1.5.1.
I have a topic with 20 partitions.  When I publish 100 messages. Spark direct 
stream is receiving 10 messages per  dstream. I have  only  one  receiver . 
When I used createStream the  receiver  received  entire 100 messages  at once. 
 

Appreciate  any  help .

Regards 
Diwakar


Sent from Samsung Mobile.





Re: Shuffle memory woes

2016-02-06 Thread Igor Berman
Hi,
usually you can solve this by 2 steps
make rdd to have more partitions
play with shuffle memory fraction

in spark 1.6 cache vs shuffle memory fractions are adjusted automatically

On 5 February 2016 at 23:07, Corey Nolet  wrote:

> I just recently had a discovery that my jobs were taking several hours to
> completely because of excess shuffle spills. What I found was that when I
> hit the high point where I didn't have enough memory for the shuffles to
> store all of their file consolidations at once, it could spill so many
> times that it causes my job's runtime to increase by orders of magnitude
> (and sometimes fail altogether).
>
> I've played with all the tuning parameters I can find. To speed the
> shuffles up, I tuned the akka threads to different values. I also tuned the
> shuffle buffering a tad (both up and down).
>
> I feel like I see a weak point here. The mappers are sharing memory space
> with reducers and the shuffles need enough memory to consolidate and pull
> otherwise they will need to spill and spill and spill. What i've noticed
> about my jobs is that this is a difference between them taking 30 minutes
> and 4 hours or more. Same job- just different memory tuning.
>
> I've found that, as a result of the spilling, I'm better off not caching
> any data in memory and lowering my storage fraction to 0 and still hoping I
> was able to give my shuffles enough memory that my data doesn't
> continuously spill. Is this the way it's supposed to be? It makes it hard
> because it seems like it forces the memory limits on my job- otherwise it
> could take orders of magnitude longer to execute.
>
>