Thanks everybody for the advice on this.
I attached YourKit and found that the CPU time split was about 70% in
Parquet/LZO reading and 30% applying the filter predicate. I guess those
are reasonable things for it to be spending time on, and so it really could
just be a case of needing more
Hi,
Recently I gave a talk on how to create spark data sources from scratch.
Screencast of the same is available on Youtube with slides and code. Please
have a look if you are interested.
http://blog.madhukaraphatak.com/anatomy-of-spark-datasource-api/
--
Regards,
Madhukara Phatak
Have a look at the StageInfo
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.scheduler.StageInfo
class,
it has method stageFailed. You could make use of it. I don't understand the
point of restarting the entire application.
Thanks
Best Regards
On Tue, Jun 30, 2015 at
Is this 3 is no of parallel consumer threads per receiver , means in total
we have 2*3=6 consumer in same consumer group consuming from all 300
partitions.
3 is just parallelism on same receiver and recommendation is to use 1 per
receiver since consuming from kafka is not cpu bound rather
How much memory you have on that machine? You can increase the heap-space
by *export _JAVA_OPTIONS=-Xmx2g*
Thanks
Best Regards
On Tue, Jun 30, 2015 at 11:00 AM, Chintan Bhatt
chintanbhatt...@charusat.ac.in wrote:
Facing following error message while performing sbt/sbt assembly
Error
I 'm using 50 servers , 35 executors per server, 140GB memory per server
35 executors *per server* sounds kind of odd to me.
With 35 executors per server and server having 140gb, meaning each executor
is going to get only 4gb, 4gb will be divided in to shuffle/storage memory
fractions...
I am running a spark application in standalone cluster on windows 7
environment.
Following are the details.
spark version = 1.4.0
Windows/Standalone mode
built the Hadoop 2.6.0 on windows and set the env params like so
HADOOP_HOME = E:\hadooptar260\hadoop-2.6.0
HADOOP_CONF_DIR
I wonder if this could be a side effect of Spark-3928. Does ending the path
with *.parquet work?
div Original message /divdivFrom: Exie
tfind...@prodevelop.com.au /divdivDate:06/30/2015 9:20 PM (GMT-05:00)
/divdivTo: user@spark.apache.org /divdivSubject: 1.4.0 /divdiv
/divSo
Thanks Ak,
This problem has been solved, I use nmon to monitor the system I/O
and CPU pressure and found there is a very sharp peak.And after that peak many
process stops running, so I correct my code and this issue gone.
previous code looks like this:
Hi Debasish:
We have the same dataset running on SybaseIQ and after the caches are warm
the queries come back in about 300ms. We're looking at options to relieve
overutilization and to bring down licensing costs. I realize that Spark
may not be the best fit for this use case but I'm interested
Not sure if this helps, but the options I set are slightly different:
val hadoopConf=sc.hadoopConfiguration
hadoopConf.set(fs.s3n.awsAccessKeyId,key)
hadoopConf.set(fs.s3n.awsSecretAccessKey,secret)
Try setting them to s3n as opposed to just s3
Good luck!
--
View this message in context:
Minor correction:
It should be sc._jsc
Cheers
On Tue, Jun 30, 2015 at 4:23 PM, ayan guha guha.a...@gmail.com wrote:
There is a sc._jsc_ which you can access to get/set hadoop conf.
On Wed, Jul 1, 2015 at 7:41 AM, Richard Ding pigu...@gmail.com wrote:
Hi,
I noticed that, in Scala API,
Just to add to this, here's some more info:
val myDF = hiveContext.read.parquet(s3n://myBucket/myPath/)
Produces these...
2015-07-01 03:25:50,450 INFO [pool-14-thread-4]
(org.apache.hadoop.fs.s3native.NativeS3FileSystem) - Opening
's3n://myBucket/myPath/part-r-00339.parquet' for reading
That
FWIW, I had some trouble getting Spark running on a Pi.
My core problem was using snappy for compression as it comes as a pre-made
binary for i386 and I couldnt find one for ARM.
So to work around it there was an option to use LZO instead, then everything
worked.
Off the top of my head, it was
So I was delighted with Spark 1.3.1 using Parquet 1.6.0 which would
partition data into folders. So I set up some parquet data paritioned by
date. This enabled is to reference a single day/month/year minimizing how
much data was scanned.
eg:
val myDataFrame =
Thanks -- you do an excellent job of getting into the nitty-gritty of Spark's
behind-the-scenes functioning.
- Jordan
From: madhu phatak [mailto:phatak@gmail.com]
Sent: Tuesday, June 30, 2015 3:17 AM
To: user@spark.apache.org
Subject: Talk on Deep dive into Spark Data source API
Hi,
Hi Klaus, you can use new ml api with dataframe:
val model = (new
LogisticRegresion).setInputCol(fetures).setProbabilityCol(probability).setOutputCol(prediction).fit(data)
Thanks,
Peter Rudenko
On 2015-06-30 14:00, Klaus Schaefers wrote:
Hello,
is there a way to get the during the
We finally managed to find the problem, the s3 files were located in
Frankfurt which only supports the *v4* signature
*Surprising* is the fact that the spark core library method textfile does
not support that!!
--
View this message in context:
You can't use different versions of spark in your application vs your
cluster.
For the direct stream, it's not 60 partitions per executor, it's 300
partitions, and executors work on them as they are scheduled. Yes, if you
have no messages you will get an empty partition. It's up to you whether
Hi, Spark Users
I'm trying to update registered DataFrame temp table by invoke
DataFrame.registerTempTable again and again.
Assume I have DataFrame temp table table1, below is the concurrent logic
sqlContext.table(table1).filter(***).unionAll(dummy1DF).registerTempTable(table1)
On 24 Jun 2015, at 18:56, Kevin Liu kevin...@fb.commailto:kevin...@fb.com
wrote:
Continuing this thread beyond standalone - onto clusters, does anyone have
experience successfully running any Spark cluster on IPv6 only (not dual stack)
machines? More companies are moving to IPv6 and some such
Hi, Spark users,
Following images are copied from spark streaming UI. I observed for about 30
minutes, and see that the Processed records(438768, at the moment I copied the
image) are always lagging behind Received records(480783) by about 40k records,
Since the waiting batches is 1 and the
Thanks for that ,
One more doubt is that How to perform different logic/operations over a
Dstreams with two types of Streamid in it .
*Its like using stream-id in storm and Fork type of think (diff logic for
both edges at same type without using 2 separate filters .).*
On Tue, Jun 30, 2015
Hi,
We are using Spark 1.4.0 on hadoop using yarn-cluster mode via
spark-submit. We are facing parquet write issue after doing dataframe joins
We have a full data set and then an incremental data. We are reading them
as dataframes, joining them, and then writing the data to the hdfs system
in
Thanks Salih. :)
The output of the groupby is as below.
2015-01-14 SEC Inquiry
2015-01-16 Re: SEC Inquiry
2015-01-18 Fwd: Re: SEC Inquiry
And subsequently, we would like to aggregate all messages with a particular
reference subject.
For instance the question we are trying to
The API exported in the 1.4 release is different from the one used in the
2014 demo. Please see the latest documentation at
http://people.apache.org/~pwendell/spark-releases/latest/sparkr.html or
Chris's demo from Spark Summit at
I have a dataset (trimmed and simplified) with 2 columns as below.
DateSubject
2015-01-14 SEC Inquiry
2014-02-12 Happy birthday
2014-02-13 Re: Happy birthday
2015-01-16 Re: SEC Inquiry
2015-01-18 Fwd: Re: SEC Inquiry
I have imported the same in a
Hi SurajWhat will be your output after group by? Since GroupBy is for
aggregations like sum, count etc.
If you want to count the 2015 records than it is possible. Kind Regards
Salih Oztop
From: Suraj Shetiya surajshet...@gmail.com
To: user@spark.apache.org
Sent: Tuesday, June 30, 2015
Hi all,
I'm running a spark standalone cluster with one master and one slave
(different machines and both in version 1.4.0), the thing is I have a spark
streaming job that gets data from Kafka, and the just prints it.
To configure the cluster I just started the master and then the slaves
Hi,
In your build.sbt file, all the dependencies you have (hopefully they're
not too many, they only have a lot of transitive dependencies), for example:
```
libraryDependencies += org.apache.hbase % hbase % 1.1.1
libraryDependencies += junit % junit % x
resolvers += Some other repo at
Hi folks, running into a pretty strange issue:
I'm setting
spark.executor.extraClassPath
spark.driver.extraClassPath
to point to some external JARs. If I set them in spark-defaults.conf
everything works perfectly.
However, if I remove spark-defaults.conf and just create a SparkConf and
call
Hi all,
I have a problem where I have a RDD of elements:
Item1 Item2 Item3 Item4 Item5 Item6 ...
and I want to run a function over them to decide which runs of elements to
group together:
[Item1 Item2] [Item3] [Item4 Item5 Item6] ...
Technically, I could use aggregate to do this, but I would
I have a RDD of type (String,
Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]
Here String is Key and a list of tuples for that key. I got above RDD after
doing a groupByKey. I later want to compute
Thanks Shivaram. I watched your talk and the plan to use ML APIs with R
flavor looks exciting.
Is there a different venue where I would be able to follow the SparkR API
progress?
Thanks
Pradeep
On Mon, Jun 29, 2015 at 1:12 PM, Shivaram Venkataraman
shiva...@eecs.berkeley.edu wrote:
The RDD
The Spark JIRA and the user, dev mailing lists are the best place to follow
the progress.
Shivaram
On Tue, Jun 30, 2015 at 9:52 AM, Pradeep Bashyal prad...@bashyal.com
wrote:
Thanks Shivaram. I watched your talk and the plan to use ML APIs with R
flavor looks exciting.
Is there a different
I have a Spark program which exhibits increasing resource usage. Spark
Streaming (https://spark.apache.org/streaming/) is used to provide the data
source. The Spark Driver class receives events by querying a MongoDB in a
custom JavaReceiverInputDStream. These events are then transformed via
Could you give more information on the operations that you are using? The
code outline?
And what do you mean by Spark Driver receiver events? If the driver is
receiving events, how is it being sent to the executors.
BTW, for memory usages, I strongly recommend using jmap --histo:live to see
what
I'm running reduceByKey in spark. My program is the simplest example of
spark:
val counts = textFile.flatMap(line = line.split( )).repartition(2).
.map(word = (word, 1))
.reduceByKey(_ + _, 1)
counts.saveAsTextFile(hdfs://...)
but it always run out of
This brings up another question/issue - there doesn't seem to be a way to
partition cached tables in the same way you can partition, say a Hive
table. For example, we would like to partition the overall dataset (233m
rows, 9.2Gb) by (product, coupon) so when we run one of these queries
Try mapPartitions, which gives you an iterator, and you can produce an
iterator back.
On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote:
Hi all,
I have a problem where I have a RDD of elements:
Item1 Item2 Item3 Item4 Item5 Item6 ...
and I want to run a function over
hello, I ‘m using spark 1.4.2-SNAPSHOT
I ‘m running in yarn mode:-)
I wonder if the spark.shuffle.memoryFraction or spark.shuffle.manager work?
how to set these parameters...
在 2015年7月1日,上午1:32,Ted Yu yuzhih...@gmail.com 写道:
Which Spark release are you using ?
Are you running in standalone
0
down vote
favorite
Is there a nice way of going from a Spark DataFrame to an EdgeRDD without
hardcoding types in the Scala code? The examples I've seen use case classes
to define the type of the EdgeRDD.
Let's assume that our Spark DataFrame has StructField (dstID, LongType,
false)
Which Spark release are you using ?
Are you running in standalone mode ?
Cheers
On Tue, Jun 30, 2015 at 10:03 AM, hotdog lisend...@163.com wrote:
I'm running reduceByKey in spark. My program is the simplest example of
spark:
val counts = textFile.flatMap(line = line.split(
Hi,
I'm creating scalatest tests for my Spark programs. I typically read data
from Amazon S3. When I run them using master=local everything works.
However, if I start an Amazon EC2 cluster and use that as the master, I get
EOFExceptions.
e.g.
mvn test -Dsuites=package.MyTest
Are you using the SparkR from the latest Spark 1.4 release ? The function
was not available in the older AMPLab version
Shivaram
On Tue, Jun 30, 2015 at 1:43 PM, Nicholas Sharkey nicholasshar...@gmail.com
wrote:
Any idea why I can't get the sparkRSQL.init function to work? The other
parts of
That's an interesting idea! I hadn't considered that. However, looking at
the Partitioner interface, I would need to know from looking at a single
key which doesn't fit my case, unfortunately. For my case, I need to
compare successive pairs of keys. (I'm trying to re-join lines that were
split
Thanks, Reynold. I still need to handle incomplete groups that fall
between partition boundaries. So, I need a two-pass approach. I came up
with a somewhat hacky way to handle those using the partition indices and
key-value pairs as a second pass after the first.
OCaml's std library provides a
could you use a custom partitioner to preserve boundaries such that all related
tuples end up on the same partition?
On Jun 30, 2015, at 12:00 PM, RJ Nowling rnowl...@gmail.com wrote:
Thanks, Reynold. I still need to handle incomplete groups that fall between
partition boundaries. So, I
Hi,
I'm running Spark 1.4.0 without Hadoop. I'm using the binary
spark-1.4.0-bin-hadoop2.6.
I start the spark-shell as :
spark-shell --master local[2] --packages
com.databricks:spark-csv_2.11:1.1.0 --executor-memory 2G --conf
spark.local.dir=C:/Users/Sourav.
Then I run :
val df =
Hi,
I'm looking for a way to estimate the amount of memory that will be needed
for a task looking at the size of its input data. It clearly depends on
what the task is doing, but is there a place to look in the logs exported
by Spark to see this information?
Thanks
Well, the scheduling delay is the time a batch has to wait for getting
resources. So even if there is no backlog in processing and scheduling
delay is 0, there is one batch that is being processed at any point of
time, which explains the difference.
On Tue, Jun 30, 2015 at 2:42 AM,
If the number of items is very large, have you considered using
probabilistic counting? The HyperLogLogPlus
https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java
class from stream-lib https://github.com/addthis/stream-lib
Hi Debashish -
No, there are actually 14 columns any of which can be specified at runtime
by the user. There is a UI which allows the user to specify predicates on
any of the 14 columns. They press submit this form and we generate
a filter like below;
val fnm303 = spp.filter(product = 'FNM30'
How many receivers do you have in the streaming program? You have to have
more numbers of core in reserver by your spar application than the number
of receivers. That would explain the receiving output after stopping.
TD
On Tue, Jun 30, 2015 at 7:59 AM, Borja Garrido Bear kazebo...@gmail.com
You can run
hadoop checknative -a
and see if bzip2 is detected correctly.
--
Ruslan Dautkhanov
On Fri, Jun 26, 2015 at 10:18 AM, Marcelo Vanzin van...@cloudera.com
wrote:
What master are you using? If this is not a local master, you'll need to
set LD_LIBRARY_PATH on the executors also
Hi Burak,
Is `--package` flag only available for maven, no sbt support?
On Tue, Jun 30, 2015 at 2:26 PM Burak Yavuz brk...@gmail.com wrote:
You can pass `--packages your:comma-separated:maven-dependencies` to spark
submit if you have Spark 1.3 or greater.
Best regards,
Burak
On Mon, Jun
This:
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[30 seconds]
Could happen for many reasons, one of them could be because of insufficient
memory. Are you running all 20 apps on the same node? How are you
submitting the apps? (with spark-submit?). I see you have
Try this way:
val data = sc.textFile(s3n://ACCESS_KEY:SECRET_KEY@mybucket/temp/)
Thanks
Best Regards
On Mon, Jun 29, 2015 at 11:59 PM, didi did...@gmail.com wrote:
Hi
*Cant read text file from s3 to create RDD
*
after setting the configuration
val
You can pass `--packages your:comma-separated:maven-dependencies` to spark
submit if you have Spark 1.3 or greater.
Best regards,
Burak
On Mon, Jun 29, 2015 at 10:46 PM, SLiZn Liu sliznmail...@gmail.com wrote:
Hey Spark Users,
I'm writing a demo with Spark and HBase. What I've done is
Hello,
is there a way to get the during the predict() phase also the class
probabilities like I would get in sklearn?
Cheers,
Klaus
--
--
Klaus Schaefers
Senior Optimization Manager
Ligatus GmbH
Hohenstaufenring 30-32
D-50674 Köln
Tel.: +49 (0) 221 / 56939 -784
Fax: +49 (0) 221 / 56
I got good runtime improvement from hive partitioninp, caching the dataset
and increasing the cores through repartition...I think for your case
generating mysql style indexing will help further..it is not supported in
spark sql yet...
I know the dataset might be too big for 1 node mysql but do
There is a sc._jsc_ which you can access to get/set hadoop conf.
On Wed, Jul 1, 2015 at 7:41 AM, Richard Ding pigu...@gmail.com wrote:
Hi,
I noticed that, in Scala API, one can call hadoopConfiguration on
SparkContext to retrieve the hadoop configuration object which is very
handy in
I am trying to find what is the correct way to programmatically check for
null values for rows in a dataframe. For example, below is the code using
pyspark and sql:
df = sqlContext.createDataFrame(sc.parallelize([(1, None), (2, a), (3,
b), (4, None)]))
df.where('_2 is not null').count()
However,
Hi,
I noticed that, in Scala API, one can call hadoopConfiguration on
SparkContext to retrieve the hadoop configuration object which is very
handy in modifying certain hadoop properties at runtime. But there is no
corresponding method in Python API.
Will this method be added to Python API in a
I modified to
detailInputsToGroup.map {
case (detailInput, dataRecord) =
val key: StringBuilder = new StringBuilder
dimensions.foreach {
dimension =
key ++= {
I am guessing one of the two things might work.
1. Either define the pattern SPACE inside the process()
2. Mark streamingContext field and inputStream field as transient.
The problem is that the function like PairFunction needs to be serialized
for being sent to the tasks. And whole closure of
66 matches
Mail list logo