Re: Kafka directsream receiving rate
Thanks Cody for trying to understand the issue . Sorry if I am not clear . The scenario is to process all messages at once in single dstream block when source system publishes messages .Source system will publish x messages / 10 minutes once. By events I meant that total no of messages processed by each batch interval ( in my case 2000ms) by executor ( web UI shows each block processing as events) DirectStream is processing only 10 messages per batch. It is same if 100 or 1 million messages published. xyz topic having 20 partitions. I am using kafka producer api to publish messages. Below is the code that I am using { val topics = "xyz" val topicSet = topics.split(",").toSet val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092") val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet) k. foreachRDD { rdd => val dstreamToRDD =rdd.cache () println (current time & dtreamToRDD.partitions.length.) val accTran = dstream To RDD. filter { ...} accTran.map {...} } ssc.start () ssc.awaitTermination } } I tried using DirectStream with map which I had issue with offsetRange . After your suggestion offset issue is resolved when I used above DirectStream code with topic only. spark-submit setting that I am using is in the mail chain below . Is there any bottlebeck I am hitting to process maximum messages at one batch interval using directsream rdd? . If this is not clear . I would take this offline and explain scenario briefly. Sent from Samsung Mobile. Original message From: Cody KoeningerDate:06/02/2016 22:32 (GMT+05:30) To: Diwakar Dhanuskodi Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate I am not at all clear on what you are saying. "Yes , I am printing each messages . It is processing all messages under each dstream block." If it is processing all messages, what is the problem you are having? "The issue is with Directsream processing 10 message per event. " What distinction are you making between a message and an event? "I am expecting Directsream to process 1 million messages" Your first email said you were publishing 100 messages but only processing 10. Why are you now trying to process 1 million messages without understanding what is going on? Make sure you can process a limited number of messages correctly first. The first code examples you posted to the list had some pretty serious errors (ie only trying to process 1 partition, trying to process offsets that didn't exist). Make sure that is all fixed first. To be clear, I use direct kakfa rdds to process batches with like 4gb of messages per partition, you shouldn't be hitting some kind of limit with 1 million messages per batch. You may of course hit executor resource issues depending on what you're trying to do with each message, but that doesn't sound like the case here. If you want help, either clarify what you are saying, or post a minimal reproducible code example, with expected output vs actual output. On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi wrote: Cody, Yes , I am printing each messages . It is processing all messages under each dstream block. Source systems are publishing 1 Million messages /4 secs which is less than batch interval. The issue is with Directsream processing 10 message per event. When partitions were increased to 20 in topic, DirectStream picksup only 200 messages ( I guess 10 for each partition ) at a time for processing . I have 16 executors running for streaming ( both yarn client & cluster mode). I am expecting Directsream to process 1 million messages which published in topic < batch interval . Using createStream , It could batch 150K messages and process . createStream is better than Directsream in this case . Again why only 150K. Any clarification is much appreciated on directStream processing millions per batch . Sent from Samsung Mobile. Original message From: Cody Koeninger Date:06/02/2016 01:30 (GMT+05:30) To: Diwakar Dhanuskodi Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate Have you tried just printing each message, to see which ones are being processed? On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi wrote: I am able to see no of messages processed per event in sparkstreaming web UI . Also I am counting the messages inside foreachRDD . Removed the settings for backpressure but still the same . Sent from Samsung Mobile. Original message From: Cody Koeninger Date:06/02/2016 00:33
Fwd: Question on how to access tuple values in spark
> Hi, > > My req is to find max value of revenue per customer so I am using below > query. I got this solution from one of tutorial in google but not able to > understand how it returns max in this scenario. can anyone hep > > revenuePerDayPerCustomerMap.reduceByKey((x, y) => (if(x._2 >= y._2) x else y)) > > ((2013-12-27 00:00:00.0),(62962,199.98)) > ((2013-12-27 00:00:00.0),(62962),299.98)) > > > why doesn't the below statement work to get max? > > x._1>=y._1 ? btw, what is value of x._1,x._2,y._1,y._2 in this scenario. > > Thanks and waiting for your responses. > > Regards, > Asmath - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Question on how to access tuple values in spark
Sent from my iPhone > On Feb 6, 2016, at 4:41 PM, KhajaAsmath Mohammed> wrote: > > Hi, > > My req is to find max value of revenue per customer so I am using below > query. I got this solution from one of tutorial in google but not able to > understand how it returns max in this scenario. can anyone hep > > revenuePerDayPerCustomerMap.reduceByKey((x, y) => (if(x._2 >= y._2) x else y)) > > ((2013-12-27 00:00:00.0),(62962,199.98)) > ((2013-12-27 00:00:00.0),(62962),299.98)) > > > why doesn't the below statement work to get max? > > x._1>=y._1 ? btw, what is value of x._1,x._2,y._1,y._2 in this scenario. > > Thanks and waiting for your responses. > > Regards, > Asmath - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Writing to jdbc database from SparkR (1.5.2)
I'm managing to read data via JDBC using the following but I can't work out how to write something back to the Database. df <- read.df(sqlContext, source="jdbc", url="jdbc:mysql://hostname:3306?user=user=pass", dbtable="database.table") Does this functionality exist in 1.5.2? Thanks, Andrew
Re: Writing to jdbc database from SparkR (1.5.2)
> > df <- read.df(sqlContext, source="jdbc", > url="jdbc:mysql://hostname:3306?user=user=pass", > dbtable="database.table") > I got a bit further but am now getting the following error. This error is being thrown without the database being touched. I tested this by making the database unavailable. > write.df(fooframe, path="NULL", source="jdbc", url="jdbc:mysql:// database.foo.eu-west-1.rds.amazonaws.com:3306?user=user=pass", dbtable="db.table", mode="append") 16/02/06 19:05:43 ERROR RBackendHandler: save on 2 failed Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select. at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:200) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1855) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79) at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38) at io.netty.channel.SimpleChannelIn
Re: Help needed in deleting a message posted in Spark User List
The whole purpose of Apache mailing lists is that the messages get indexed all over the web so that discussions and questions/solutions can be searched easily by google and other engines. For this reason, and the messages being sent via email as Steve pointed out, it's just not possible to retract the messages. On Sat, Feb 6, 2016 at 10:21 AM, Steve Loughranwrote: > > > On 5 Feb 2016, at 17:35, Marcelo Vanzin wrote: > > > > You don't... just send a new one. > > > > On Fri, Feb 5, 2016 at 9:33 AM, swetha kasireddy > > wrote: > >> Hi, > >> > >> I want to edit/delete a message posted in Spark User List. How do I do > that? > >> > >> Thanks! > > > > > > > > it isn't technically possible > > http://apache.org/foundation/public-archives.html > > People do occasionally ask on the infrastructure mailing list to do do > this, but they aren't in a position to do anything about the copies that > end up in the mailboxes of every subscriber. > > Don't worry about it; we've all done things like post internal stack > traces, accidentally mail the wrong list, etc, etc. > > Now, accidentally breaking the nightly build of everything, that's > somewhat embarrassing —but you haven't done that and it's been ~4 months > since I've done that myself. > > > -Steve
Spark Streaming with Druid?
Hi did anybody tried Spark Streaming with Druid as low latency store? Combination seems powerful is it worth trying both together? Please guide and share your experience. I am after creating the best low latency streaming analytics. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Druid-tp26164.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help needed in deleting a message posted in Spark User List
> On 5 Feb 2016, at 17:35, Marcelo Vanzinwrote: > > You don't... just send a new one. > > On Fri, Feb 5, 2016 at 9:33 AM, swetha kasireddy > wrote: >> Hi, >> >> I want to edit/delete a message posted in Spark User List. How do I do that? >> >> Thanks! > > > it isn't technically possible http://apache.org/foundation/public-archives.html People do occasionally ask on the infrastructure mailing list to do do this, but they aren't in a position to do anything about the copies that end up in the mailboxes of every subscriber. Don't worry about it; we've all done things like post internal stack traces, accidentally mail the wrong list, etc, etc. Now, accidentally breaking the nightly build of everything, that's somewhat embarrassing —but you haven't done that and it's been ~4 months since I've done that myself. -Steve
Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage
Sorry I realized that I left a bit of the last email. This is the only BLOCKED thread in the dump. Refence handler is blocked most likely due to the GC running at the moment of the dump. "Reference Handler" daemon prio=10 tid=2 BLOCKED at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157) On Fri, Feb 5, 2016 at 10:44 AM, Udo Fhollwrote: > It does not look like. Here is the output of "grep -A2 -i waiting > spark_tdump.log" > > "RMI TCP Connection(idle)" daemon prio=5 tid=156 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "task-result-getter-1" daemon prio=5 tid=101 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "BLOCK_MANAGER cleanup timer" daemon prio=5 tid=46 WAITING > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > -- > "context-cleaner-periodic-gc" daemon prio=5 tid=69 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "qtp512934838-58" daemon prio=5 tid=58 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "dispatcher-event-loop-3" daemon prio=5 tid=22 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "RMI TCP Connection(idle)" daemon prio=5 tid=150 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "submit-job-thread-pool-0" daemon prio=5 tid=83 WAITING > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > -- > "cw-metrics-publisher" daemon prio=5 tid=90 TIMED_WAITING > at java.lang.Object.wait(Native Method) > at > com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:136) > -- > "qtp512934838-57" daemon prio=5 tid=57 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-19" daemon > prio=5 tid=193 WAITING > at sun.misc.Unsafe.park(Native Method) > at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) > -- > "dispatcher-event-loop-2" daemon prio=5 tid=21 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "qtp512934838-56" daemon prio=5 tid=56 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "BROADCAST_VARS cleanup timer" daemon prio=5 tid=47 WAITING > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > -- > "pool-1-thread-1" prio=5 tid=16 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "dispatcher-event-loop-0" daemon prio=5 tid=19 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "RecurringTimer - Kinesis Checkpointer - Worker > localhost:7b412e3a-f7c8-466d-90f1-deaad8656884" daemon prio=5 tid=89 > TIMED_WAITING > at java.lang.Thread.sleep(Native Method) > at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63) > -- > "qtp512934838-55" daemon prio=5 tid=55 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "Executor task launch worker-0" daemon prio=5 tid=84 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "qtp512934838-54" daemon prio=5 tid=54 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "pool-28-thread-1" prio=5 tid=92 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-18" daemon > prio=5 tid=185 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at > scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135) > -- > "Spark Context Cleaner" daemon prio=5 tid=68 TIMED_WAITING > at java.lang.Object.wait(Native Method) > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) > -- > "qtp512934838-53" daemon prio=5 tid=53 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at
Re: Kafka directsream receiving rate
I am not at all clear on what you are saying. "Yes , I am printing each messages . It is processing all messages under each dstream block." If it is processing all messages, what is the problem you are having? "The issue is with Directsream processing 10 message per event. " What distinction are you making between a message and an event? "I am expecting Directsream to process 1 million messages" Your first email said you were publishing 100 messages but only processing 10. Why are you now trying to process 1 million messages without understanding what is going on? Make sure you can process a limited number of messages correctly first. The first code examples you posted to the list had some pretty serious errors (ie only trying to process 1 partition, trying to process offsets that didn't exist). Make sure that is all fixed first. To be clear, I use direct kakfa rdds to process batches with like 4gb of messages per partition, you shouldn't be hitting some kind of limit with 1 million messages per batch. You may of course hit executor resource issues depending on what you're trying to do with each message, but that doesn't sound like the case here. If you want help, either clarify what you are saying, or post a minimal reproducible code example, with expected output vs actual output. On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi < diwakar.dhanusk...@gmail.com> wrote: > Cody, > Yes , I am printing each messages . It is processing all messages > under each dstream block. > > Source systems are publishing 1 Million messages /4 secs which is less > than batch interval. The issue is with Directsream processing 10 message > per event. When partitions were increased to 20 in topic, DirectStream > picksup only 200 messages ( I guess 10 for each partition ) at a time for > processing . I have 16 executors running for streaming ( both yarn > client & cluster mode). > I am expecting Directsream to process 1 million messages which > published in topic < batch interval . > > Using createStream , It could batch 150K messages and process . > createStream is better than Directsream in this case . Again why only > 150K. > > Any clarification is much appreciated on directStream processing > millions per batch . > > > > > Sent from Samsung Mobile. > > > Original message > From: Cody Koeninger> Date:06/02/2016 01:30 (GMT+05:30) > To: Diwakar Dhanuskodi > Cc: user@spark.apache.org > Subject: Re: Kafka directsream receiving rate > > Have you tried just printing each message, to see which ones are being > processed? > > On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi < > diwakar.dhanusk...@gmail.com> wrote: > >> I am able to see no of messages processed per event in >> sparkstreaming web UI . Also I am counting the messages inside >> foreachRDD . >> Removed the settings for backpressure but still the same . >> >> >> >> >> >> Sent from Samsung Mobile. >> >> >> Original message >> From: Cody Koeninger >> Date:06/02/2016 00:33 (GMT+05:30) >> To: Diwakar Dhanuskodi >> Cc: user@spark.apache.org >> Subject: Re: Kafka directsream receiving rate >> >> How are you counting the number of messages? >> >> I'd go ahead and remove the settings for backpressure and >> maxrateperpartition, just to eliminate that as a variable. >> >> On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi < >> diwakar.dhanusk...@gmail.com> wrote: >> >>> I am using one directsream. Below is the call to directsream:- >>> >>> val topicSet = topics.split(",").toSet >>> val kafkaParams = Map[String,String]("bootstrap.servers" -> " >>> datanode4.isdp.com:9092") >>> val k = >>> KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, >>> kafkaParams, topicSet) >>> >>> When I replace DirectStream call to createStream, all messages >>> were read by one Dstream block.:- >>> val k = KafkaUtils.createStream(ssc, >>> "datanode4.isdp.com:2181","resp",topicMap >>> ,StorageLevel.MEMORY_ONLY) >>> >>> I am using below spark-submit to execute: >>> ./spark-submit --master yarn-client --conf >>> "spark.dynamicAllocation.enabled=true" --conf >>> "spark.shuffle.service.enabled=true" --conf >>> "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf >>> "spark.sql.unsafe.enabled=false" --conf >>> "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" >>> --conf "spark.shuffle.consolidateFiles=true" --conf >>> "spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g >>> --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver --files >>> /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml >>> --jars >>>
Re: Shuffle memory woes
Igor, Thank you for the response but unfortunately, the problem I'm referring to goes beyond this. I have set the shuffle memory fraction to be 90% and set the cache memory to be 0. Repartitioning the RDD helped a tad on the map side but didn't do much for the spilling when there was no longer any memory left for the shuffle. Also the new auto-memory management doesn't seem like it'll have too much of an effect after i've already given most the memory i've allocated to the shuffle. The problem I'm having is most specifically related to the shuffle performing declining by several orders of magnitude when it needs to spill multiple times (it ends up spilling several hundred for me when it can't fit stuff into memory). On Sat, Feb 6, 2016 at 6:40 AM, Igor Bermanwrote: > Hi, > usually you can solve this by 2 steps > make rdd to have more partitions > play with shuffle memory fraction > > in spark 1.6 cache vs shuffle memory fractions are adjusted automatically > > On 5 February 2016 at 23:07, Corey Nolet wrote: > >> I just recently had a discovery that my jobs were taking several hours to >> completely because of excess shuffle spills. What I found was that when I >> hit the high point where I didn't have enough memory for the shuffles to >> store all of their file consolidations at once, it could spill so many >> times that it causes my job's runtime to increase by orders of magnitude >> (and sometimes fail altogether). >> >> I've played with all the tuning parameters I can find. To speed the >> shuffles up, I tuned the akka threads to different values. I also tuned the >> shuffle buffering a tad (both up and down). >> >> I feel like I see a weak point here. The mappers are sharing memory space >> with reducers and the shuffles need enough memory to consolidate and pull >> otherwise they will need to spill and spill and spill. What i've noticed >> about my jobs is that this is a difference between them taking 30 minutes >> and 4 hours or more. Same job- just different memory tuning. >> >> I've found that, as a result of the spilling, I'm better off not caching >> any data in memory and lowering my storage fraction to 0 and still hoping I >> was able to give my shuffles enough memory that my data doesn't >> continuously spill. Is this the way it's supposed to be? It makes it hard >> because it seems like it forces the memory limits on my job- otherwise it >> could take orders of magnitude longer to execute. >> >> >
Re: Slowness in Kmeans calculating fastSquaredDistance
Hi, I did more investigation and found out that BLAS.scala is calling the native reference architecture (f2jblas) for level 1 routines. I even patched it to use nativeBlas.ddot but it has no material impact. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala#L126 private def dot(x: DenseVector, y: DenseVector): Double = { val n = x.size f2jBLAS.ddot(n, x.values, 1, y.values, 1) } Maybe Xiangrui can comment on this? From: Li Ming TsaiSent: Friday, February 5, 2016 10:56 AM To: user@spark.apache.org Subject: Slowness in Kmeans calculating fastSquaredDistance Hi, I'm using INTEL MKL on Spark 1.6.0 which I built myself with the -Pnetlib-lgpl flag. I am using spark local[4] mode and I run it like this: # export LD_LIBRARY_PATH=/opt/intel/lib/intel64:/opt/intel/mkl/lib/intel64 # bin/spark-shell ... I have also added the following to /opt/intel/mkl/lib/intel64: lrwxrwxrwx 1 root root12 Feb 1 09:18 libblas.so -> libmkl_rt.so lrwxrwxrwx 1 root root12 Feb 1 09:18 libblas.so.3 -> libmkl_rt.so lrwxrwxrwx 1 root root12 Feb 1 09:18 liblapack.so -> libmkl_rt.so lrwxrwxrwx 1 root root12 Feb 1 09:18 liblapack.so.3 -> libmkl_rt.so I believe (???) that I'm using Intel MKL because the warnings went away: 16/02/01 07:49:38 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 16/02/01 07:49:38 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS After collectAsMap, there is no progress but I can observe that only 1 CPU is being utilised with the following stack trace: "ForkJoinPool-3-worker-7" #130 daemon prio=5 os_prio=0 tid=0x7fbf30ab6000 nid=0xbdc runnable [0x7fbf12205000] java.lang.Thread.State: RUNNABLE at com.github.fommil.netlib.F2jBLAS.ddot(F2jBLAS.java:71) at org.apache.spark.mllib.linalg.BLAS$.dot(BLAS.scala:128) at org.apache.spark.mllib.linalg.BLAS$.dot(BLAS.scala:111) at org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:349) at org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:587) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:561) at org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:555) This last few steps takes more than half of the total time for a 1Mx100 dataset. The code is just: val clusters = KMeans.train(parsedData, 1000, 1) Shouldn't it utilising all the cores for the dot product? Is this a misconfiguration? Thanks!
Apache Spark data locality when integrating with Kafka
Dears If I will use Kafka as a streaming source to some spark jobs, is it advised to install spark to the same nodes of kafka cluster? What are the benefits and drawbacks of such a decision? regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Apache Spark data locality when integrating with Kafka
Yes . To reduce network latency . Sent from Samsung Mobile. Original message From: fanooosDate:07/02/2016 09:24 (GMT+05:30) To: user@spark.apache.org Cc: Subject: Apache Spark data locality when integrating with Kafka Dears If I will use Kafka as a streaming source to some spark jobs, is it advised to install spark to the same nodes of kafka cluster? What are the benefits and drawbacks of such a decision? regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark data locality when integrating with Kafka
spark can benefit from data locality and will try to launch tasks on the node where the kafka partition resides. however i think in production many organizations run a dedicated kafka cluster. On Sat, Feb 6, 2016 at 11:27 PM, Diwakar Dhanuskodi < diwakar.dhanusk...@gmail.com> wrote: > Yes . To reduce network latency . > > > Sent from Samsung Mobile. > > > Original message > From: fanooos> Date:07/02/2016 09:24 (GMT+05:30) > To: user@spark.apache.org > Cc: > Subject: Apache Spark data locality when integrating with Kafka > > Dears > > If I will use Kafka as a streaming source to some spark jobs, is it advised > to install spark to the same nodes of kafka cluster? > > What are the benefits and drawbacks of such a decision? > > regards > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Imported CSV file content isn't identical to the original file
Hi Spark Users Group, I have a csv file to analysis with Spark, but I’m troubling with importing as DataFrame. Here’s the minimal reproducible example. Suppose I’m having a *10(rows)x2(cols)* *space-delimited csv* file, shown as below: 1446566430 2015-11-0400:00:30 1446566430 2015-11-0400:00:30 1446566430 2015-11-0400:00:30 1446566430 2015-11-0400:00:30 1446566430 2015-11-0400:00:30 1446566431 2015-11-0400:00:31 1446566431 2015-11-0400:00:31 1446566431 2015-11-0400:00:31 1446566431 2015-11-0400:00:31 1446566431 2015-11-0400:00:31 the in column 2 represents sub-delimiter within that column, and this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv I’m using *spark-csv* to import this file as Spark *DataFrame*: sqlContext.read.format("com.databricks.spark.csv") .option("header", "false") // Use first line of all files as header .option("inferSchema", "false") // Automatically infer data types .option("delimiter", " ") .load("hdfs:///tmp/1.csv") .show Oddly, the output shows only a part of each column: [image: Screenshot from 2016-02-07 15-27-51.png] and even the boundary of the table wasn’t shown correctly. I also used the other way to read csv file, by sc.textFile(...).map(_.split(" ")) and sqlContext.createDataFrame, and the result is the same. Can someone point me out where I did it wrong? — BR, Todd Leo
Re: Bad Digest error while doing aws s3 put
Hi , I am getting the following error while reading the huge data from S3 and after processing ,writing data to S3 again. Did you find any solution for this ? 16/02/07 07:41:59 WARN scheduler.TaskSetManager: Lost task 144.2 in stage 3.0 (TID 169, ip-172-31-7-26.us-west-2.compute.internal): java.io.IOException: exception in uploadSinglePart at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadSinglePart(MultipartUploadOutputStream.java:248) at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.close(MultipartUploadOutputStream.java:469) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:105) at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:106) at java.io.FilterOutputStream.close(FilterOutputStream.java:160) at org.apache.hadoop.mapred.TextOutputFormat$LineRecordWriter.close(TextOutputFormat.java:109) at org.apache.spark.SparkHadoopWriter.close(SparkHadoopWriter.scala:102) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1080) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: exception in putObject at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.storeFile(Jets3tNativeFileSystemStore.java:149) at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103) at com.sun.proxy.$Proxy26.storeFile(Unknown Source) at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadSinglePart(MultipartUploadOutputStream.java:245) ... 15 more Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The Content-MD5 you specified did not match what we received. (Service: Amazon S3; Status Code: 400; Error Code: BadDigest; Request ID: 5918216A5901FCC8), S3 Extended Request ID: QSxtYln/yXqHYpdr4BWosin/TAFsGlK1FlKfE5PcuJkNrgoblGzTNt74kEhuNcrJCRZ3mXq0oUo= at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3796) at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1482) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.storeFile(Jets3tNativeFileSystemStore.java:140) ... 22 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Bad-Digest-error-while-doing-aws-s3-put-tp10036p26167.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: different behavior while using createDataFrame and read.df in SparkR
Thank you ! Rui Sun for the observation! It helped. I have a new problem arising. When I create a small function for dummy variable creation for categorical column BDADummies<-function(dataframe,column){ cat.column<-vector(mode="character",length=nrow(dataframe)) cat.column<-collect(column) lev<-length(levels(as.factor(unlist(cat.column for (j in 1:lev){ dummy.df<-withColumn(dataframe,paste0(colnames(cat.column),j),ifelse(column[[1]]==levels(as.factor(unlist(cat.column)))[j],1,0) ) dataframe<-dummy.df } return(dataframe) } *and when I call the function using* newdummy.df<-BDADummies(df1,column=select(df1,df1$Species)) I get the below error Error in withColumn(dataframe, paste0(colnames(cat.column), j), ifelse(column[[1]] == : error in evaluating the argument 'col' in selecting a method for function 'withColumn': Error in if (le > 0) paste0("[1:", paste(le), "]") else "(0)" : argument is not interpretable as logical *but when i use it without calling or creating a function , the statement * dummy.df<-withColumn(dataframe,paste0(colnames(cat.column),j),ifelse(column[[1]]==levels(as.factor(unlist(cat.column)))[j],1,0) ) gives me the new columns generating column names as desired. Warm regards, Devesh. On Sat, Feb 6, 2016 at 7:09 AM, Sun, Ruiwrote: > I guess this is related to > https://issues.apache.org/jira/browse/SPARK-11976 > > > > When calling createDataFrame on iris, the “.” Character in column names > will be replaced with “_”. > > It seems that when you create a DataFrame from the CSV file, the “.” > Character in column names are still there. > > > > *From:* Devesh Raj Singh [mailto:raj.deves...@gmail.com] > *Sent:* Friday, February 5, 2016 2:44 PM > *To:* user@spark.apache.org > *Cc:* Sun, Rui > *Subject:* different behavior while using createDataFrame and read.df in > SparkR > > > > > Hi, > > > > I am using Spark 1.5.1 > > > > When I do this > > > > df <- createDataFrame(sqlContext, iris) > > > > #creating a new column for category "Setosa" > > > > df$Species1<-ifelse((df)[[5]]=="setosa",1,0) > > > > head(df) > > > > output: new column created > > > > Sepal.Length Sepal.Width Petal.Length Petal.Width Species > > 1 5.1 3.5 1.4 0.2 setosa > > 2 4.9 3.0 1.4 0.2 setosa > > 3 4.7 3.2 1.3 0.2 setosa > > 4 4.6 3.1 1.5 0.2 setosa > > 5 5.0 3.6 1.4 0.2 setosa > > 6 5.4 3.9 1.7 0.4 setosa > > > > *but when I saved the iris dataset as a CSV file and try to read it and > convert it to sparkR dataframe* > > > > df <- > read.df(sqlContext,"/Users/devesh/Github/deveshgit2/bdaml/data/iris/", > > source = "com.databricks.spark.csv",header = > "true",inferSchema = "true") > > > > now when I try to create new column > > > > df$Species1<-ifelse((df)[[5]]=="setosa",1,0) > > I get the below error: > > > > 16/02/05 12:11:01 ERROR RBackendHandler: col on 922 failed > > Error in select(x, x$"*", alias(col, colName)) : > > error in evaluating the argument 'col' in selecting a method for > function 'select': Error in invokeJava(isStatic = FALSE, objId$id, > methodName, ...) : > > org.apache.spark.sql.AnalysisException: Cannot resolve column name > "Sepal.Length" among (Sepal.Length, Sepal.Width, Petal.Length, Petal.Width, > Species); > > at org.apache.spark.s > > -- > > Warm regards, > > Devesh. > -- Warm regards, Devesh.
Re: Kafka directsream receiving rate
Cody, Yes , I am printing each messages . It is processing all messages under each dstream block. Source systems are publishing 1 Million messages /4 secs which is less than batch interval. The issue is with Directsream processing 10 message per event. When partitions were increased to 20 in topic, DirectStream picksup only 200 messages ( I guess 10 for each partition ) at a time for processing . I have 16 executors running for streaming ( both yarn client & cluster mode). I am expecting Directsream to process 1 million messages which published in topic < batch interval . Using createStream , It could batch 150K messages and process . createStream is better than Directsream in this case . Again why only 150K. Any clarification is much appreciated on directStream processing millions per batch . Sent from Samsung Mobile. Original message From: Cody KoeningerDate:06/02/2016 01:30 (GMT+05:30) To: Diwakar Dhanuskodi Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate Have you tried just printing each message, to see which ones are being processed? On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi wrote: I am able to see no of messages processed per event in sparkstreaming web UI . Also I am counting the messages inside foreachRDD . Removed the settings for backpressure but still the same . Sent from Samsung Mobile. Original message From: Cody Koeninger Date:06/02/2016 00:33 (GMT+05:30) To: Diwakar Dhanuskodi Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate How are you counting the number of messages? I'd go ahead and remove the settings for backpressure and maxrateperpartition, just to eliminate that as a variable. On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi wrote: I am using one directsream. Below is the call to directsream:- val topicSet = topics.split(",").toSet val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092") val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet) When I replace DirectStream call to createStream, all messages were read by one Dstream block.:- val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap ,StorageLevel.MEMORY_ONLY) I am using below spark-submit to execute: ./spark-submit --master yarn-client --conf "spark.dynamicAllocation.enabled=true" --conf "spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" --conf "spark.shuffle.consolidateFiles=true" --conf "spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver --files /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml --jars /root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar /root/Jars/sparkreceiver.jar Sent from Samsung Mobile. Original message From: Cody Koeninger Date:05/02/2016 22:07 (GMT+05:30) To: Diwakar Dhanuskodi Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate If you're using the direct stream, you have 0 receivers. Do you mean you have 1 executor? Can you post the relevant call to createDirectStream from your code, as well as any relevant spark configuration? On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi wrote: Adding more info Batch interval is 2000ms. I expect all 100 messages go thru one dstream from directsream but it receives at rate of 10 messages at time. Am I missing some configurations here. Any help appreciated. Regards Diwakar. Sent from Samsung Mobile. Original message From: Diwakar Dhanuskodi Date:05/02/2016 07:33 (GMT+05:30) To: user@spark.apache.org Cc: Subject: Kafka directsream receiving rate Hi, Using spark 1.5.1. I have a topic with 20 partitions. When I publish 100 messages. Spark direct stream is receiving 10 messages per dstream. I have only one receiver . When I used createStream the receiver received entire 100 messages at once. Appreciate any help . Regards Diwakar Sent from Samsung Mobile.
Re: Shuffle memory woes
Hi, usually you can solve this by 2 steps make rdd to have more partitions play with shuffle memory fraction in spark 1.6 cache vs shuffle memory fractions are adjusted automatically On 5 February 2016 at 23:07, Corey Noletwrote: > I just recently had a discovery that my jobs were taking several hours to > completely because of excess shuffle spills. What I found was that when I > hit the high point where I didn't have enough memory for the shuffles to > store all of their file consolidations at once, it could spill so many > times that it causes my job's runtime to increase by orders of magnitude > (and sometimes fail altogether). > > I've played with all the tuning parameters I can find. To speed the > shuffles up, I tuned the akka threads to different values. I also tuned the > shuffle buffering a tad (both up and down). > > I feel like I see a weak point here. The mappers are sharing memory space > with reducers and the shuffles need enough memory to consolidate and pull > otherwise they will need to spill and spill and spill. What i've noticed > about my jobs is that this is a difference between them taking 30 minutes > and 4 hours or more. Same job- just different memory tuning. > > I've found that, as a result of the spilling, I'm better off not caching > any data in memory and lowering my storage fraction to 0 and still hoping I > was able to give my shuffles enough memory that my data doesn't > continuously spill. Is this the way it's supposed to be? It makes it hard > because it seems like it forces the memory limits on my job- otherwise it > could take orders of magnitude longer to execute. > >