Accelerating Spark SQL / Dataframe using GPUs & Alluxio
Hi Spark users, We have been working on GPU acceleration for Apache Spark SQL / Dataframe using the RAPIDS Accelerator for Apache Spark <https://www.nvidia.com/en-us/deep-learning-ai/solutions/data-science/apache-spark-3/> and open source project Alluxio <https://github.com/Alluxio/alluxio> without any code changes. Our preliminary results suggest 2x improvement in performance and 70% in ROI compared to a CPU-based cluster. Feel free to read the developer blog <https://bit.ly/2QkXjxo> for more details of the benchmark. If you are interested to discuss further with the authors, join our free online meetup <https://go.alluxio.io/community-alluxio-day-2021> next Tuesday morning (April 27) Pacific time. Best, - Bin Fan
Evaluating Apache Spark with Data Orchestration using TPC-DS
Dear Spark Users, I am sharing a whitepaper on “Evaluating Apache Spark and Alluxio for Data Analytics <https://bit.ly/2Pg2jms>” which talks about how to benchmark Spark on Alluxio to accelerate TPCDS benchmark results with details. Hope this helps. If you have any questions, feel free to reach out to me Best regards - Bin Fan
Bursting Your On-Premises Data Lake Analytics and AI Workloads on AWS
Hi everyone! I am sharing this article about running Spark / Presto workloads on AWS: Bursting On-Premise Datalake Analytics and AI Workloads on AWS <https://bit.ly/3qA1Tom> published on AWS blog. Hope you enjoy it. Feel free to discuss with me here <https://alluxio.io/slack>. - Bin Fan Powered by Alluxio <https://www.alluxio.io/powered-by-alluxio/> | Alluxio Slack Channel <https://alluxio.io/slack> | Data Orchestration Summit 2020 <https://www.alluxio.io/data-orchestration-summit-2020/>
Spark in hybrid cloud in AWS & GCP
Dear Spark users, If you are interested in running Spark in Hybrid Cloud? Checkout talks from AWS & GCP at the virtual Data Orchestration Summit <https://www.alluxio.io/data-orchestration-summit-2020/> on Dec. 8-9, 2020, register for free <https://www.alluxio.io/data-orchestration-summit-2020/>. The summit has speaker lineup spans creators and committers of Alluxio, Spark, Presto, Tensorflow, K8s to data engineers and software engineers building cloud-native data and AI platforms at Amazon, Alibaba, Comcast, Facebook, Google, ING Bank, Microsoft, Tencent, and more! - Bin Fan
Building High-performance Lake for Spark using OSS, Hudi, Alluxio
Hi Spark Users, Check out this blog on Building High-performance Data Lake using Apache Hudi, Spark and Alluxio at T3Go <https://bit.ly/373RYPi> <https://bit.ly/373RYPi> Cheers - Bin Fan
Re: Spark dataframe hdfs vs s3
Try to deploy Alluxio as a caching layer on top of S3, providing Spark a similar HDFS interface? Like in this article: https://www.alluxio.io/blog/accelerate-spark-and-hive-jobs-on-aws-s3-by-10x-with-alluxio-tiered-storage/ On Wed, May 27, 2020 at 6:52 PM Dark Crusader wrote: > Hi Randy, > > Yes, I'm using parquet on both S3 and hdfs. > > On Thu, 28 May, 2020, 2:38 am randy clinton, > wrote: > >> Is the file Parquet on S3 or is it some other file format? >> >> In general I would assume that HDFS read/writes are more performant for >> spark jobs. >> >> For instance, consider how well partitioned your HDFS file is vs the S3 >> file. >> >> On Wed, May 27, 2020 at 1:51 PM Dark Crusader < >> relinquisheddra...@gmail.com> wrote: >> >>> Hi Jörn, >>> >>> Thanks for the reply. I will try to create a easier example to reproduce >>> the issue. >>> >>> I will also try your suggestion to look into the UI. Can you guide on >>> what I should be looking for? >>> >>> I was already using the s3a protocol to compare the times. >>> >>> My hunch is that multiple reads from S3 are required because of improper >>> caching of intermediate data. And maybe hdfs is doing a better job at this. >>> Does this make sense? >>> >>> I would also like to add that we built an extra layer on S3 which might >>> be adding to even slower times. >>> >>> Thanks for your help. >>> >>> On Wed, 27 May, 2020, 11:03 pm Jörn Franke, >>> wrote: >>> Have you looked in Spark UI why this is the case ? S3 Reading can take more time - it depends also what s3 url you are using : s3a vs s3n vs S3. It could help after some calculation to persist in-memory or on HDFS. You can also initially load from S3 and store on HDFS and work from there . HDFS offers Data locality for the tasks, ie the tasks start on the nodes where the data is. Depending on what s3 „protocol“ you are using you might be also more punished with performance. Try s3a as a protocol (replace all s3n with s3a). You can also use s3 url but this requires a special bucket configuration, a dedicated empty bucket and it lacks some ineroperability with other AWS services. Nevertheless, it could be also something else with the code. Can you post an example reproducing the issue? > Am 27.05.2020 um 18:18 schrieb Dark Crusader < relinquisheddra...@gmail.com>: > > > Hi all, > > I am reading data from hdfs in the form of parquet files (around 3 GB) and running an algorithm from the spark ml library. > > If I create the same spark dataframe by reading data from S3, the same algorithm takes considerably more time. > > I don't understand why this is happening. Is this a chance occurence or are the spark dataframes created different? > > I don't understand how the data store would effect the algorithm performance. > > Any help would be appreciated. Thanks a lot. >>> >> >> -- >> I appreciate your time, >> >> ~Randy >> >
Re: What is directory "/path/_spark_metadata" for?
Hey Mark, I believe this is the name of the subdirectory that is used to store metadata about which files are valid, see comment in code https://github.com/apache/spark/blob/v2.3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala#L33 Do you see the exception as warnings or as errors in Alluxio master log? It will be helpful to post the stack trace if it is available. My hypothesis is that Spark in your case was testing creating such directory -Bin On Wed, Aug 28, 2019 at 1:59 AM Mark Zhao wrote: > Hey, > > When running Spark on Alluxio-1.8.2, I encounter the following exception: > “alluxio.exception.FileDoseNotExistException: Path > “/test-data/_spark_metadata” does not exist” in Alluxio master.log. What > exactly is the directory "_spark_metadata" used for? And how can I fix this > problem? > > Thanks. > > Mark >
How to avoid spark executor fetching jars in Local mode
Hello everyone, For spark local mode, I noticed the executor fetching jars from local, is there a way to avoid these steps, as these jars already on the local filesystem, it takes seconds to fetching all the jars, we want to reduce the time on bootstrap phase. we are using spark-submit with `--conf spark.jars` to define these libs. [2019-09-26 06:19:59,651] [INFO ] o.a.s.e.Executor - Fetching spark://10.2.1.5:43745/jars/xxx.jar with timestamp 1569478797007 [2019-09-26 06:19:59,729] [INFO ] o.a.s.n.c.TransportClientFactory - Successfully created connection to /10.2.1.5:43745 after 55 ms (0 ms spent in bootstraps) [2019-09-26 06:19:59,754] [INFO ] o.a.s.u.Utils - Fetching spark://10.2.1.5:43745/jars/xxx.jar to /tmp/spark-0365e48c-1747-4370-978f-7cd142ef0375/userFiles-3309dc5e-b6d0-4b76-a9aa-8e0a226ddab9/fetchFileTemp6346264674899227929.tmp [2019-09-26 06:19:59,801] [INFO ] o.a.s.e.Executor - Adding file:/tmp/spark-0365e48c-1747-4370-978f-7cd142ef0375/userFiles-3309dc5e-b6d0-4b76-a9aa-8e0a226ddab9/xxx.jar to class loader Thanks Chen Bin As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our contacts privacy notice at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>
Re: Low cache hit ratio when running Spark on Alluxio
Depending on the Alluxio version you are running, e..g, for 2.0, the metrics of the local short-circuit read is not turned on by default. So I would suggest you to first turn on the metrics collecting local short-circuit reads by setting alluxio.user.metrics.collection.enabled=true Regarding the generic question to achieve high data locality when running Spark on Alluxio, can you read this article https://www.alluxio.io/blog/top-10-tips-for-making-the-spark-alluxio-stack-blazing-fast/ and follow the suggests there. E.g., things can be weird on running Spark on YARN for this case. If you need more detailed instructions, feel free to join Alluxio community channel https://slackin.alluxio.io <https://www.alluxio.io/slack> - Bin Fan alluxio.io <http://bit.ly/2JctWrJ> | powered by <http://bit.ly/2JdD0N2> | Data Orchestration Summit 2019 <https://www.alluxio.io/data-orchestration-summit-2019/> On Wed, Aug 28, 2019 at 1:49 AM Jerry Yan wrote: > Hi, > > We are running Spark jobs on an Alluxio Cluster which is serving 13 > gigabytes of data with 99% of the data is in memory. I was hoping to speed > up the Spark jobs by reading the in-memory data in Alluxio, but found > Alluxio local hit rate is only 1.68%, while Alluxio remote hit rate is > 98.32%. By monitoring the network IO across all worker nodes through > "dstat" command, I found that only two nodes had about 1GB of recv or send > in the whole precessand, and it is sending 1GB or receiving 1GB during > Spark Shuffle Stage. Is there any metrics I could check or configuration > to tune ? > > > Best, > > Jerry >
Re: Can I set the Alluxio WriteType in Spark applications?
Hi Mark, You can follow the instructions here: https://docs.alluxio.io/os/user/stable/en/compute/Spark.html#customize-alluxio-user-properties-for-individual-spark-jobs Something like this: $ spark-submit \--conf 'spark.driver.extraJavaOptions=-Dalluxio.user.file.writetype.default=CACHE_THROUGH' \--conf 'spark.executor.extraJavaOptions=-Dalluxio.user.file.writetype.default=CACHE_THROUGH' \... Hope it helps - Bin On Tue, Sep 17, 2019 at 7:53 AM Mark Zhao wrote: > Hi, > > If Spark applications write data into alluxio, can WriteType be configured? > > Thanks, > Mark > >
Re: How to fix ClosedChannelException
Hi This *java.nio.channels.ClosedChannelException* is often caused by a connection timeout between your Spark executors and Alluxio workers. One simple and quick fix is to increase the timeout value to be larger alluxio.user.network.netty.timeout <https://docs.alluxio.io/os/user/stable/en/reference/Properties-List.html#alluxio.user.network.netty.timeout> in your Spark jobs. Checkout how to run Spark with customized alluxio properties <https://docs.alluxio.io/os/user/stable/en/compute/Spark.html?utm_source=spark&utm_medium=mailinglist> . - Bin On Thu, May 9, 2019 at 4:39 AM u9g wrote: > Hey, > > When I run Spark on Alluxio, I encounter the following error. How can I > fix this? Thanks > > Lost task 63.0 in stage 0.0 (TID 63, 172.28.172.165, executor 7): > java.io.lOException: java.util.concurrent.ExecutionExcep tion: > java.nio.channels.ClosedC hannelException > > Best, > Andy Li > > > >
Re: How to configure alluxio cluster with spark in yarn
hi Andy Assuming you are running Spark with YARN, then I would recommend deploying Alluxio in the same YARN cluster if you are looking for best performance. Alluxio can also be deployed separated as a standalone service, but in that case, you may need to transfer data from Alluxio cluster to your Spark/YARN cluster. Here is the documentation <https://docs.alluxio.io/os/user/1.8/en/deploy/Running-Alluxio-On-Yarn.html?utm_source=spark> about deploying Alluxio with YARN. - Bin On Thu, May 9, 2019 at 4:19 AM u9g wrote: > Hey, > > I want to speed up the Spark task running in the Yarn cluster through > Alluxio. Is Alluxio recommended to run in the same yarn cluster on the yarn > mode? Should I deploy Alluxio independently on the nodes of the yarn > cluster? Or deploy a cluster separately? > Best, > Andy Li > > > >
Re: cache table vs. parquet table performance
Hi Tomas, One option is to cache your table as Parquet files into Alluxio (which can serve as an in-memory distributed caching layer for Spark in your case). The code on Spark will be like > df.write.parquet("alluxio://master:19998/data.parquet")> df = > sqlContext.read.parquet("alluxio://master:19998/data.parquet") (See more details at the documentation http://www.alluxio.org/docs/1.8/en/compute/Spark.html <http://www.alluxio.org/docs/1.8/en/compute/Spark.html#cache-dataframe-into-alluxio?utm_source=spark> ) This would require running Alluxio as a separate service (ideally colocated with Spark servers), of course. But also enables data sharing across Spark jobs. - Bin On Tue, Jan 15, 2019 at 10:29 AM Tomas Bartalos wrote: > Hello, > > I'm using spark-thrift server and I'm searching for best performing > solution to query hot set of data. I'm processing records with nested > structure, containing subtypes and arrays. 1 record takes up several KB. > > I tried to make some improvement with cache table: > > cache table event_jan_01 as select * from events where day_registered = > 20190102; > > > If I understood correctly, the data should be stored in *in-memory > columnar* format with storage level MEMORY_AND_DISK. So data which > doesn't fit to memory will be spille to disk (I assume also in columnar > format (?)) > I cached 1 day of data (1 M records) and according to spark UI storage tab > none of the data was cached to memory and everything was spilled to disk. > The size of the data was *5.7 GB.* > Typical queries took ~ 20 sec. > > Then I tried to store the data to parquet format: > > CREATE TABLE event_jan_01_par USING parquet location "/tmp/events/jan/02" > as > > select * from event_jan_01; > > > The whole parquet took up only *178MB.* > And typical queries took 5-10 sec. > > Is it possible to tune spark to spill the cached data in parquet format ? > Why the whole cached table was spilled to disk and nothing stayed in > memory ? > > Spark version: 2.4.0 > > Best regards, > Tomas > >
Re: How shall I configure the Spark executor memory size and the Alluxio worker memory size on a machine?
oops, sorry for the confusion. I mean "20% of the size of your input data set" allocated to Alluxio as memory resource as the starting point. after that, you can checkout the cache hit ratio into Alluxio space based on the metrics collected in Alluxio web UI <http://www.alluxio.org/docs/1.8/en/basic/Web-Interface.html#master-metrics> . If you see lower hit ratio, increase Alluxio storage size and vice versa. Hope this helps, - Bin On Thu, Apr 4, 2019 at 9:29 PM Bin Fan wrote: > Hi Andy, > > It really depends on your workloads. I would suggest to allocate 20% of > the size of your input data set > as the starting point and see how it works. > > Also depending on your data source as the under store of Alluxio, if it is > remote (e.g., cloud storage like S3 or GCS), > you can perhaps use Alluxio to manage local disk or SSD storage resource > rather than memory resource. > In this case, the "local Alluxio storage" is still much faster compared to > reading from remote storage. > Check out the documentation on tiered storage configuration here ( > http://www.alluxio.org/docs/1.8/en/advanced/Alluxio-Storage-Management.html#configuring-alluxio-storage > ) > > - Bin > > On Thu, Mar 21, 2019 at 8:26 AM u9g wrote: > >> Hey, >> >> We have a cluster of 10 nodes each of which consists 128GB memory. We are >> about to running Spark and Alluxio on the cluster. We wonder how shall >> allocate the memory to the Spark executor and the Alluxio worker on a >> machine? Are there some recommendations? Thanks! >> >> Best, >> Andy Li >> >> >> >> >
Re: How shall I configure the Spark executor memory size and the Alluxio worker memory size on a machine?
Hi Andy, It really depends on your workloads. I would suggest to allocate 20% of the size of your input data set as the starting point and see how it works. Also depending on your data source as the under store of Alluxio, if it is remote (e.g., cloud storage like S3 or GCS), you can perhaps use Alluxio to manage local disk or SSD storage resource rather than memory resource. In this case, the "local Alluxio storage" is still much faster compared to reading from remote storage. Check out the documentation on tiered storage configuration here ( http://www.alluxio.org/docs/1.8/en/advanced/Alluxio-Storage-Management.html#configuring-alluxio-storage ) - Bin On Thu, Mar 21, 2019 at 8:26 AM u9g wrote: > Hey, > > We have a cluster of 10 nodes each of which consists 128GB memory. We are > about to running Spark and Alluxio on the cluster. We wonder how shall > allocate the memory to the Spark executor and the Alluxio worker on a > machine? Are there some recommendations? Thanks! > > Best, > Andy Li > > > >
Re: Questions about caching
Hi Andrew, Since you mentioned the alternative solution with Alluxio <http://alluxio.org>, here is a more comprehensive tutorial on caching Spark dataframes on Alluxio: https://www.alluxio.com/blog/effective-spark-dataframes-with-alluxio Namely, caching your dataframe is simply running df.write.parquet(alluxioFilePath) and your dataframes are stored in Alluxio as parquet files and you can share them with more users. One advantage with Alluxio here is you can manually free the cached data from memory tier or set the TTL for the cached data if you'd like more control on the data. - Bin On Tue, Dec 11, 2018 at 9:13 AM Andrew Melo wrote: > Greetings, Spark Aficionados- > > I'm working on a project to (ab-)use PySpark to do particle physics > analysis, which involves iterating with a lot of transformations (to > apply weights and select candidate events) and reductions (to produce > histograms of relevant physics objects). We have a basic version > working, but I'm looking to exploit some of Spark's caching behavior > to speed up the interactive computation portion of the analysis, > probably by writing a thin convenience wrapper. I have a couple > questions I've been unable to find definitive answers to, which would > help me design this wrapper an efficient way: > > 1) When cache()-ing a dataframe where only a subset of the columns are > used, is the entire dataframe placed into the cache, or only the used > columns. E.G. does "df2" end up caching only "a", or all three > columns? > > df1 = sc.read.load('test.parquet') # Has columns a, b, c > df2 = df1.cache() > df2.select('a').collect() > > 2) Are caches reference-based, or is there some sort of de-duplication > based on the logical/physical plans. So, for instance, does spark take > advantage of the fact that these two dataframes should have the same > content: > > df1 = sc.read.load('test.parquet').cache() > df2 = sc.read.load('test.parquet').cache() > > ...or are df1 and df2 totally independent WRT caching behavior? > > 2a) If the cache is reference-based, is it sufficient to hold a > weakref to the python object to keep the cache in-scope? > > 3) Finally, the spark.externalBlockStore.blockManager is intriguing in > our environment where we have multiple users concurrently analyzing > mostly the same input datasets. We have enough RAM in our clusters to > cache a high percentage of the very common datasets, but only if users > could somehow share their caches (which, conveniently, are the larger > datasets), We also have very large edge SSD cache servers we use to > cache trans-oceanic I/O we could throw at this as well. > > It looks, however, like that API was removed in 2.0.0 and there wasn't > a replacement. There are products like Alluxio, but they aren't > transparent, requiring the user to manually cache their dataframes by > doing save/loads to external files using "alluxio://" URIs. Is there > no way around this behavior now? > > Sorry for the long email, and thanks! > Andrew > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: off heap to alluxio/tachyon in Spark 2
Hi, If you are looking for how to run Spark on Alluxio (formerly Tachyon), here is the documentation from Alluxio doc site: http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html It still works for Spark 2.x. Alluxio team also published articles on when and why running Spark (2.x) with Alluxio may benefit performance: http://www.alluxio.com/2016/08/effective-spark-rdds-with-alluxio/ - Bin On Mon, Sep 19, 2016 at 7:56 AM, aka.fe2s wrote: > Hi folks, > > What has happened with Tachyon / Alluxio in Spark 2? Doc doesn't mention > it no longer. > > -- > Oleksiy Dyagilev >
PySpark read from HBase
Hi there, I have lots of raw data in several Hive tables where we built a workflow to "join" those records together and restructured into HBase. It was done using plain MapReduce to generate HFile, and then load incremental from HFile into HBase to guarantee the best performance. However, we need to do some time series analysis for each of the record in HBase, but the implementation was done in Python (pandas, scikit learn) which is pretty time-consuming to reproduce in Java, Scala. I am thinking PySpark is probably the best approach if it works. Can pyspark read from HFile directory? or can it read from HBase in parallel? I don't see that many examples out there so any help or guidance will be appreciated. Also, we are using Cloudera Hadoop so there might be a slight delay with the latest Spark release. Best regards, Bin
Re: Question About OFF_HEAP Caching
Here is one blog illustrating how to use Spark on Alluxio for this purpose. Hope it will help: http://www.alluxio.com/2016/04/getting-started-with-alluxio-and-spark/ On Mon, Jul 18, 2016 at 6:36 AM, Gene Pang wrote: > Hi, > > If you want to use Alluxio with Spark 2.x, it is recommended to write to > and read from Alluxio with files. You can save an RDD with saveAsObjectFile > with an Alluxio path (alluxio://host:port/path/to/file), and you can read > that file from any other Spark job. Here is additional information on how > to run Spark with Alluxio: > http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html > > Hope that helps, > Gene > > On Mon, Jul 18, 2016 at 12:11 AM, condor join > wrote: > >> Hi All, >> >> I have some questions about OFF_HEAP Caching. In Spark 1.X when we use >> *rdd.persist(StorageLevel.OFF_HEAP)*,that means rdd caching in >> Tachyon(Alluxio). However,in Spark 2.X,we can directly use OFF_HEAP For >> Caching >> >> ( >> https://issues.apache.org/jira/browse/SPARK-13992?jql=project%20%3D%20SPARK%20AND%20text%20~%20%22off-heap%20caching%22). >> I am confuse about this and I have follow questions: >> >> 1.In Spark 2.X, how should we use Tachyon for caching? >> >> 2.Is there any reason that must change in this way(I mean use off_heap >> directly instead of using Tachyon) >> >> Thanks a lot! >> >> >> >> >
Re: Possible to broadcast a function?
following this suggestion, Aaron, you may take a look at Alluxio as the off-heap in-memory data storage as input/output for Spark jobs if that works for you. See more intro on how to run Spark with Alluxio as data input / output. http://www.alluxio.org/documentation/en/Running-Spark-on-Alluxio.html - Bin On Wed, Jun 29, 2016 at 8:40 AM, Sonal Goyal wrote: > Have you looked at Alluxio? (earlier tachyon) > > Best Regards, > Sonal > Founder, Nube Technologies <http://www.nubetech.co> > Reifier at Strata Hadoop World > <https://www.youtube.com/watch?v=eD3LkpPQIgM> > Reifier at Spark Summit 2015 > <https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/> > > <http://in.linkedin.com/in/sonalgoyal> > > > > On Wed, Jun 29, 2016 at 7:30 PM, Aaron Perrin > wrote: > >> The user guide describes a broadcast as a way to move a large dataset to >> each node: >> >> "Broadcast variables allow the programmer to keep a read-only variable >> cached on each machine rather than shipping a copy of it with tasks. They >> can be used, for example, to give every node a copy of a large input >> dataset in an efficient manner." >> >> And the broadcast example shows it being used with a variable. >> >> But, is it somehow possible to instead broadcast a function that can be >> executed once, per node? >> >> My use case is the following: >> >> I have a large data structure that I currently create on each executor. >> The way that I create it is a hack. That is, when the RDD function is >> executed on the executor, I block, load a bunch of data (~250 GiB) from an >> external data source, create the data structure as a static object in the >> JVM, and then resume execution. This works, but it ends up costing me a >> lot of extra memory (i.e. a few TiB when I have a lot of executors). >> >> What I'd like to do is use the broadcast mechanism to load the data >> structure once, per node. But, I can't serialize the data structure from >> the driver. >> >> Any ideas? >> >> Thanks! >> >> Aaron >> >> >
Re: How to close connection in mapPartitions?
BTW, "lines" is a DStream. Bin Wang 于2015年10月23日周五 下午2:16写道: > I use mapPartitions to open connections to Redis, I write it like this: > > val seqs = lines.mapPartitions { lines => > val cache = new RedisCache(redisUrl, redisPort) > val result = lines.map(line => Parser.parseBody(line, cache)) > cache.redisPool.close > result > } > > But it seems the pool is closed before I use it. Am I doing anything > wrong? Here is the error: > > java.lang.IllegalStateException: Pool not open > at > org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) > at > org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166) > at com.redis.RedisClientPool.withClient(Pool.scala:34) > at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17) > at > com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29) > at > com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26) > at > com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) > at > com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > >
How to close connection in mapPartitions?
I use mapPartitions to open connections to Redis, I write it like this: val seqs = lines.mapPartitions { lines => val cache = new RedisCache(redisUrl, redisPort) val result = lines.map(line => Parser.parseBody(line, cache)) cache.redisPool.close result } But it seems the pool is closed before I use it. Am I doing anything wrong? Here is the error: java.lang.IllegalStateException: Pool not open at org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140) at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166) at com.redis.RedisClientPool.withClient(Pool.scala:34) at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17) at com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29) at com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26) at com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) at com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Re: Dose spark auto invoke StreamingContext.stop while receive kill signal?
Thanks for the explain. I've opened a PR at https://github.com/apache/spark/pull/8898 Tathagata Das 于2015年9月24日周四 上午2:44写道: > YEs, since 1.4.0, it shuts down streamingContext without gracefully from > shutdown hook. > You can make it shutdown gracefully in that hook by setting the SparkConf > "spark.streaming.stopGracefullyOnShutdown" to "true" > > Note to self, document this in the programming guide. > > On Wed, Sep 23, 2015 at 3:33 AM, Bin Wang wrote: > >> I'd like the spark application to be stopped gracefully while received >> kill signal, so I add these code: >> >> sys.ShutdownHookThread { >> println("Gracefully stopping Spark Streaming Application") >> ssc.stop(stopSparkContext = true, stopGracefully = true) >> println("Application stopped") >> } >> >> But the application is not stopped gracefully: >> >> 15/09/23 17:44:38 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: >> SIGTERM >> ... >> 15/09/23 17:44:38 INFO streaming.StreamingContext: Invoking >> stop(stopGracefully=false) from shutdown hook >> >> Dose spark auto invoke StreamingContext.stop for me? >> > >
Dose spark auto invoke StreamingContext.stop while receive kill signal?
I'd like the spark application to be stopped gracefully while received kill signal, so I add these code: sys.ShutdownHookThread { println("Gracefully stopping Spark Streaming Application") ssc.stop(stopSparkContext = true, stopGracefully = true) println("Application stopped") } But the application is not stopped gracefully: 15/09/23 17:44:38 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM ... 15/09/23 17:44:38 INFO streaming.StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook Dose spark auto invoke StreamingContext.stop for me?
Is it possible to merged delayed batches in streaming?
I'm using Spark Streaming and there maybe some delays between batches. I'd like to know is it possible to merge delayed batches into one batch to do processing? For example, the interval is set to 5 min but the first batch uses 1 hour, so there are many batches delayed. In the end of processing for each batch, I'll save the data into database. So if all the delayed batches are merged into a big one, it will save many resources. I'd like to know if it is possible. Thanks.
Re: How to recovery DStream from checkpoint directory?
Thanks Adrian, the hint of use updateStateByKey with initialRdd helps a lot! Adrian Tanase 于2015年9月17日周四 下午4:50写道: > This section in the streaming guide makes your options pretty clear > > http://spark.apache.org/docs/latest/streaming-programming-guide.html#upgrading-application-code > > >1. Use 2 versions in parallel, drain the queue up to a point and strat >fresh in the new version, only processing events from that point forward > 1. Note that “up to a point” is specific to you state management > logic, it might mean “user sessions stated after 4 am” NOT “events > received > after 4 am” >2. Graceful shutdown and saving data to DB, followed by checkpoint >cleanup / new checkpoint dir > 1. On restat, you need to use the updateStateByKey that takes an > initialRdd with the values preloaded from DB > 2. By cleaning the checkpoint in between upgrades, data is loaded > only once > > Hope this helps, > -adrian > > From: Bin Wang > Date: Thursday, September 17, 2015 at 11:27 AM > To: Akhil Das > Cc: user > Subject: Re: How to recovery DStream from checkpoint directory? > > In my understand, here I have only three options to keep the DStream state > between redeploys (yes, I'm using updateStateByKey): > > 1. Use checkpoint. > 2. Use my own database. > 3. Use both. > > But none of these options are great: > > 1. Use checkpoint: I cannot load it after code change. Or I need to keep > the structure of the classes, which seems to be impossible in a developing > project. > 2. Use my own database: there may be failure between the program read data > from Kafka and save the DStream to database. So there may have data lose. > 3. Use both: Will the data load two times? How can I know in which > situation I should use the which one? > > The power of checkpoint seems to be very limited. Is there any plan to > support checkpoint while class is changed, like the discussion you gave me > pointed out? > > > > Akhil Das 于2015年9月17日周四 下午3:26写道: > >> Any kind of changes to the jvm classes will make it fail. By >> checkpointing the data you mean using checkpoint with updateStateByKey? >> Here's a similar discussion happened earlier which will clear your doubts i >> guess >> http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E >> >> Thanks >> Best Regards >> >> On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang wrote: >> >>> And here is another question. If I load the DStream from database every >>> time I start the job, will the data be loaded when the job is failed and >>> auto restart? If so, both the checkpoint data and database data are loaded, >>> won't this a problem? >>> >>> >>> >>> Bin Wang 于2015年9月16日周三 下午8:40写道: >>> >>>> Will StreamingContex.getOrCreate do this work?What kind of code change >>>> will make it cannot load? >>>> >>>> Akhil Das 于2015年9月16日周三 20:20写道: >>>> >>>>> You can't really recover from checkpoint if you alter the code. A >>>>> better approach would be to use some sort of external storage (like a db >>>>> or >>>>> zookeeper etc) to keep the state (the indexes etc) and then when you >>>>> deploy >>>>> new code they can be easily recovered. >>>>> >>>>> Thanks >>>>> Best Regards >>>>> >>>>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang wrote: >>>>> >>>>>> I'd like to know if there is a way to recovery dstream from >>>>>> checkpoint. >>>>>> >>>>>> Because I stores state in DStream, I'd like the state to be recovered >>>>>> when I restart the application and deploy new code. >>>>>> >>>>> >>>>> >>
Re: How to recovery DStream from checkpoint directory?
In my understand, here I have only three options to keep the DStream state between redeploys (yes, I'm using updateStateByKey): 1. Use checkpoint. 2. Use my own database. 3. Use both. But none of these options are great: 1. Use checkpoint: I cannot load it after code change. Or I need to keep the structure of the classes, which seems to be impossible in a developing project. 2. Use my own database: there may be failure between the program read data from Kafka and save the DStream to database. So there may have data lose. 3. Use both: Will the data load two times? How can I know in which situation I should use the which one? The power of checkpoint seems to be very limited. Is there any plan to support checkpoint while class is changed, like the discussion you gave me pointed out? Akhil Das 于2015年9月17日周四 下午3:26写道: > Any kind of changes to the jvm classes will make it fail. By checkpointing > the data you mean using checkpoint with updateStateByKey? Here's a similar > discussion happened earlier which will clear your doubts i guess > http://mail-archives.us.apache.org/mod_mbox/spark-user/201507.mbox/%3CCA+AHuK=xoy8dsdaobmgm935goqytaaqkpqsvdaqpmojottj...@mail.gmail.com%3E > > Thanks > Best Regards > > On Thu, Sep 17, 2015 at 10:01 AM, Bin Wang wrote: > >> And here is another question. If I load the DStream from database every >> time I start the job, will the data be loaded when the job is failed and >> auto restart? If so, both the checkpoint data and database data are loaded, >> won't this a problem? >> >> >> >> Bin Wang 于2015年9月16日周三 下午8:40写道: >> >>> Will StreamingContex.getOrCreate do this work?What kind of code change >>> will make it cannot load? >>> >>> Akhil Das 于2015年9月16日周三 20:20写道: >>> >>>> You can't really recover from checkpoint if you alter the code. A >>>> better approach would be to use some sort of external storage (like a db or >>>> zookeeper etc) to keep the state (the indexes etc) and then when you deploy >>>> new code they can be easily recovered. >>>> >>>> Thanks >>>> Best Regards >>>> >>>> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang wrote: >>>> >>>>> I'd like to know if there is a way to recovery dstream from checkpoint. >>>>> >>>>> Because I stores state in DStream, I'd like the state to be recovered >>>>> when I restart the application and deploy new code. >>>>> >>>> >>>> >
Re: How to recovery DStream from checkpoint directory?
And here is another question. If I load the DStream from database every time I start the job, will the data be loaded when the job is failed and auto restart? If so, both the checkpoint data and database data are loaded, won't this a problem? Bin Wang 于2015年9月16日周三 下午8:40写道: > Will StreamingContex.getOrCreate do this work?What kind of code change > will make it cannot load? > > Akhil Das 于2015年9月16日周三 20:20写道: > >> You can't really recover from checkpoint if you alter the code. A better >> approach would be to use some sort of external storage (like a db or >> zookeeper etc) to keep the state (the indexes etc) and then when you deploy >> new code they can be easily recovered. >> >> Thanks >> Best Regards >> >> On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang wrote: >> >>> I'd like to know if there is a way to recovery dstream from checkpoint. >>> >>> Because I stores state in DStream, I'd like the state to be recovered >>> when I restart the application and deploy new code. >>> >> >>
Re: How to recovery DStream from checkpoint directory?
Will StreamingContex.getOrCreate do this work?What kind of code change will make it cannot load? Akhil Das 于2015年9月16日周三 20:20写道: > You can't really recover from checkpoint if you alter the code. A better > approach would be to use some sort of external storage (like a db or > zookeeper etc) to keep the state (the indexes etc) and then when you deploy > new code they can be easily recovered. > > Thanks > Best Regards > > On Wed, Sep 16, 2015 at 3:52 PM, Bin Wang wrote: > >> I'd like to know if there is a way to recovery dstream from checkpoint. >> >> Because I stores state in DStream, I'd like the state to be recovered >> when I restart the application and deploy new code. >> > >
How to recovery DStream from checkpoint directory?
I'd like to know if there is a way to recovery dstream from checkpoint. Because I stores state in DStream, I'd like the state to be recovered when I restart the application and deploy new code.
Re: How to clear Kafka offset in Spark streaming?
I think I've found the reason. It seems that the the smallest offset is not 0 and I should not set the offset to 0. Bin Wang 于2015年9月14日周一 下午2:46写道: > Hi, > > I'm using spark streaming with kafka and I need to clear the offset and > re-compute all things. I deleted checkpoint directory in HDFS and reset > kafka offset with "kafka-run-class kafka.tools.ImportZkOffsets". I can > confirm the offset is set to 0 in kafka: > > ~ > kafka-run-class kafka.tools.ConsumerOffsetChecker --group > adhoc_data_spark --topic adhoc_data --zookeeper szq1.appadhoc.com:2181 > Group Topic Pid Offset logSize > Lag Owner > adhoc_data_spark adhoc_data 0 0 > 5280743 5280743 none > > But when I restart spark streaming, the offset is reset to logSize, I > cannot figure out why is that, can anybody help? Thanks. >
How to clear Kafka offset in Spark streaming?
Hi, I'm using spark streaming with kafka and I need to clear the offset and re-compute all things. I deleted checkpoint directory in HDFS and reset kafka offset with "kafka-run-class kafka.tools.ImportZkOffsets". I can confirm the offset is set to 0 in kafka: ~ > kafka-run-class kafka.tools.ConsumerOffsetChecker --group adhoc_data_spark --topic adhoc_data --zookeeper szq1.appadhoc.com:2181 Group Topic Pid Offset logSize Lag Owner adhoc_data_spark adhoc_data 0 0 5280743 5280743 none But when I restart spark streaming, the offset is reset to logSize, I cannot figure out why is that, can anybody help? Thanks.
Re: Data lost in spark streaming
There is some error logs in the executor and I don't know if it is related: 15/09/11 10:54:05 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01 15/09/11 10:54:05 WARN yarn.ApplicationMaster: Reporter thread fails 4 time(s) in a row. org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid AMRMToken from appattempt_1440495451668_0258_01 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53) at org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79) at sun.reflect.GeneratedMethodAccessor29.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy22.allocate(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:278) at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:174) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:323) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01 at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy21.allocate(Unknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77) ... 9 more ... 15/09/11 10:54:10 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$Inv alidToken): Invalid AMRMToken from appattempt_1440495451668_0258_01 15/09/11 10:54:10 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 12, (reason: Exception was thrown 5 time(s) from Reporter thread.) 15/09/11 10:54:10 INFO streaming.StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 15/09/11 10:54:10 INFO scheduler.ReceiverTracker: Sent stop signal to all 1 receivers 15/09/11 10:54:12 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver Tathagata Das 于2015年9月13日周日 下午4:05写道: > Maybe the driver got restarted. See the log4j logs of the driver before it > restarted. > > On Thu, Sep 10, 2015 at 11:32 PM, Bin Wang wrote: > >> I'm using spark streaming 1.4.0 and have a DStream that have all the data >> it received. But today the history data in the DStream seems to be lost >> suddenly. And the application UI also lost the streaming process time and >> all the related data. Could any give some hint to debug this? Thanks. >> >> >> >
Data lost in spark streaming
I'm using spark streaming 1.4.0 and have a DStream that have all the data it received. But today the history data in the DStream seems to be lost suddenly. And the application UI also lost the streaming process time and all the related data. Could any give some hint to debug this? Thanks.
Re: Will multiple filters on the same RDD optimized to one filter?
What if I would use both rdd1 and rdd2 later? Raghavendra Pandey 于2015年7月16日周四 下午4:08写道: > If you cache rdd it will save some operations. But anyway filter is a lazy > operation. And it runs based on what you will do later on with rdd1 and > rdd2... > > Raghavendra > On Jul 16, 2015 1:33 PM, "Bin Wang" wrote: > >> If I write code like this: >> >> val rdd = input.map(_.value) >> val f1 = rdd.filter(_ == 1) >> val f2 = rdd.filter(_ == 2) >> ... >> >> Then the DAG of the execution may be this: >> >> -> Filter -> ... >> Map >> -> Filter -> ... >> >> But the two filters is operated on the same RDD, which means it could be >> done by just scan the RDD once. Does spark have this kind optimization for >> now? >> >
Will multiple filters on the same RDD optimized to one filter?
If I write code like this: val rdd = input.map(_.value) val f1 = rdd.filter(_ == 1) val f2 = rdd.filter(_ == 2) ... Then the DAG of the execution may be this: -> Filter -> ... Map -> Filter -> ... But the two filters is operated on the same RDD, which means it could be done by just scan the RDD once. Does spark have this kind optimization for now?
Re: Spark Streaming Hangs on Start
Thanks for the help. I set --executor-cores and it works now. I've used --total-executor-cores and don't realize it changed. Tathagata Das 于2015年7月10日周五 上午3:11写道: > 1. There will be a long running job with description "start()" as that is > the jobs that is running the receivers. It will never end. > > 2. You need to set the number of cores given to the Spark executors by the > YARN container. That is SparkConf spark.executor.cores, --executor-cores > in spark-submit. Since it is by default 1, your only container has one core > which is occupied by the receiver, leaving no cores to run the map tasks. > So the map stage is blocked > > 3. Note these log lines. Especially "15/07/09 18:29:00 INFO > receiver.ReceiverSupervisorImpl: Received stop signal" . I think somehow > your streaming context is being shutdown too early which is causing the > KafkaReceiver to stop. Something your should debug. > > > 15/07/09 18:27:13 INFO consumer.ConsumerFetcherThread: > [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], > Starting > 15/07/09 18:27:13 INFO consumer.ConsumerFetcherManager: > [ConsumerFetcherManager-1436437633199] Added fetcher for partitions > ArrayBuffer([[adhoc_data,0], initOffset 53 to broker > id:42,host:szq1.appadhoc.com,port:9092] ) > 15/07/09 18:27:13 INFO storage.MemoryStore: ensureFreeSpace(1680) called with > curMem=96628, maxMem=16669841817 > 15/07/09 18:27:13 INFO storage.MemoryStore: Block input-0-1436437633600 > stored as bytes in memory (estimated size 1680.0 B, free 15.5 GB) > 15/07/09 18:27:13 WARN storage.BlockManager: Block input-0-1436437633600 > replicated to only 0 peer(s) instead of 1 peers > 15/07/09 18:27:14 INFO receiver.BlockGenerator: Pushed block > input-0-1436437633600*15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: > Received stop signal > *15/07/09 18:29:00 INFO receiver.ReceiverSupervisorImpl: Stopping receiver > with message: Stopped by driver: > 15/07/09 18:29:00 INFO consumer.ZookeeperConsumerConnector: > [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], > ZKConsumerConnector shutting down > 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: > [ConsumerFetcherManager-1436437633199] Stopping leader finder thread > 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: > [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], > Shutting down > 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: > [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], > Stopped > 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: > [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-leader-finder-thread], > Shutdown completed > 15/07/09 18:29:00 INFO consumer.ConsumerFetcherManager: > [ConsumerFetcherManager-1436437633199] Stopping all fetchers > 15/07/09 18:29:00 INFO consumer.ConsumerFetcherThread: > [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], > Shutting down > 15/07/09 18:29:01 INFO consumer.SimpleConsumer: Reconnect due to socket > error: java.nio.channels.ClosedByInterruptException > 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: > [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], > Stopped > 15/07/09 18:29:01 INFO consumer.ConsumerFetcherThread: > [ConsumerFetcherThread-adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201-0-42], > Shutdown completed > 15/07/09 18:29:01 INFO consumer.ConsumerFetcherManager: > [ConsumerFetcherManager-1436437633199] All connections stopped > 15/07/09 18:29:01 INFO zkclient.ZkEventThread: Terminate ZkClient event > thread. > 15/07/09 18:29:01 INFO zookeeper.ZooKeeper: Session: 0x14e70eedca00315 closed > 15/07/09 18:29:01 INFO zookeeper.ClientCnxn: EventThread shut down > 15/07/09 18:29:01 INFO consumer.ZookeeperConsumerConnector: > [adhoc_data_spark_szq1.appadhoc.com-1436437633136-a84a7201], > ZKConsumerConnector shutdown completed in 74 ms > 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop > 15/07/09 18:29:01 INFO receiver.ReceiverSupervisorImpl: Deregistering > receiver 0 > > > >
Spark Streaming Hangs on Start
I'm using spark streaming with Kafka, and submit it to YARN cluster with mode "yarn-cluster". But it hangs at SparkContext.start(). The Kafka config is right since it can show some events in "Streaming" tab of web UI. The attached file is the screen shot of the "Jobs" tab of web UI. The code in the main class is: object StatCounter { val config = ConfigFactory.load() val redisUrl = config.getString("redis.url") val redisPort = config.getInt("redis.port") val zkQuorum = config.getString("kafka.zkQuorum") val group = config.getString("kafka.group") val topic = config.getString("kafka.topic") val threadNum = config.getInt("kafka.threadNum") val cache = new RedisCache(redisUrl, redisPort) def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName(config.getString("spark.name")) .set("spark.cassandra.connection.host", config.getString("cassandra.host")) val ssc = new StreamingContext(conf, Seconds(config.getInt("spark.interval"))) ssc.checkpoint(config.getString("spark.checkpoint")) val storage = new CassandraStorage("adhoc_data", ssc) val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic -> threadNum)).map(_._2) val logs = lines.flatMap(line => Parser.parseBody(line, cache)) Counter.count(logs, storage) sys.ShutdownHookThread { println("Gracefully stopping Spark Streaming Application") ssc.stop(stopSparkContext = true, stopGracefully = true) println("Application stopped") } ssc.start() ssc.awaitTermination() } } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to submit streaming application and exit
Thanks. Actually I've find the way. I'm using spark-submit to submit the job the a YARN cluster with --mater yarn-cluster (which spark-submit process is not the driver). So I can config "spark.yarn.submit.waitAppComplettion" to "false" so that the process will exit after the job is submitted. ayan guha 于2015年7月8日周三 下午12:26写道: > spark-submit is nothing but a process in your OS, so you should be able to > submit it in background and exit. However, your spark-submit process itself > is the driver for your spark streaming application, so it will not exit for > the lifetime of the streaming app. > > On Wed, Jul 8, 2015 at 1:13 PM, Bin Wang wrote: > >> I'm writing a streaming application and want to use spark-submit to >> submit it to a YARN cluster. I'd like to submit it in a client node and >> exit spark-submit after the application is running. Is it possible? >> > > > > -- > Best Regards, > Ayan Guha >
How to submit streaming application and exit
I'm writing a streaming application and want to use spark-submit to submit it to a YARN cluster. I'd like to submit it in a client node and exit spark-submit after the application is running. Is it possible?
Problem Run Spark Example HBase Code Using Spark-Submit
I am trying to run the Spark example code HBaseTest from command line using spark-submit instead run-example, in that case, I can learn more how to run spark code in general. However, it told me CLASS_NOT_FOUND about htrace since I am using CDH5.4. I successfully located the htrace jar file but I am having a hard time adding it to path. This is the final spark-submit command I have but still have the class not found error. Can anyone help me with this? #!/bin/bash export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark /bin/bash $SPARK_HOME/bin/spark-submit \ --master yarn-client \ --class org.apache.spark.examples.HBaseTest \ --driver-class-path /etc/hbase/conf:$SPARK_HOME/examples/lib/*.jar:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/*.jar \ --jars $SPARK_HOME/examples/lib/*.jar:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/*.jar \ $SPARK_HOME/examples/lib/*.jar \ myhbasetablename Note: htrace-core-3.0.4.jar, htrace-core-3.1.0-incubating.jar, htrace-core.jar are all located under ' /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hbase/lib/'.
Re: Specify Python interpreter
Hi Felix and Tomoas, Thanks a lot for your information. I figured out the environment variable PYSPARK_PYTHON is the secret key. My current approach is to start iPython notebook on the namenode, export PYSPARK_PYTHON=/opt/local/anaconda/bin/ipython /opt/local/anaconda/bin/ipython notebook --profile=mypysparkprofile In my iPython notebook, I have the flexibility to manually start my SparkContext in a way like this: os.environ["YARN_CONF_DIR"] = "/etc/hadoop/conf" os.environ["JAVA_HOME"] = "/usr/lib/jvm/jre-1.7.0-openjdk.x86_64/" sys.path.append("/opt/cloudera/parcels/CDH/lib/spark/python") sys.path.append("/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip") from pyspark import SparkContext, SparkConf sconf = SparkConf() conf = (SparkConf().setMaster("spark://datafireball1:7077") .setAppName("SparkApplication") .set("spark.executor.memory", "16g") .set("spark.ui.killEnabled", "true")) sc = SparkContext(conf=conf) This really works out for me and I am using the lastest iPython notebook to interactively write Spark application. If you have a better Python solution will can offer a better workflow for interactive spark development. Please share. Bin On Tue, May 12, 2015 at 1:20 AM, Tomas Olsson wrote: > Hi, > You can try > > PYSPARK_DRIVER_PYTHON=/path/to/ipython > PYSPARK_DRIVER_PYTHON_OPTS="notebook” /path/to//pyspark > > > /Tomas > > > On 11 May 2015, at 22:17, Bin Wang wrote: > > > > Hey there, > > > > I have installed a python interpreter in certain location, say > "/opt/local/anaconda". > > > > Is there anything that I can specify the Python interpreter while > developing in iPython notebook? Maybe a property in the while creating the > Sparkcontext? > > > > > > I know that I can put "#!/opt/local/anaconda" at the top of my Python > code and use spark-submit to distribute it to the cluster. However, since I > am using iPython notebook, this is not available as an option. > > > > Best, > > > > Bin > >
Specify Python interpreter
Hey there, I have installed a python interpreter in certain location, say "/opt/local/anaconda". Is there anything that I can specify the Python interpreter while developing in iPython notebook? Maybe a property in the while creating the Sparkcontext? I know that I can put "#!/opt/local/anaconda" at the top of my Python code and use spark-submit to distribute it to the cluster. However, since I am using iPython notebook, this is not available as an option. Best, Bin
Spark on top of YARN Compression in iPython notebook
I started a AWS cluster (1master + 3core) and download the prebuilt Spark binary. I downloaded the latest Anaconda Python and started a iPython notebook server by running the command below: ipython notebook --port --profile nbserver --no-browser Then, I try to develop a Spark application running on top of YARN interactively in the iPython notebook: Here is the code that I have written: import sys import os from pyspark import SparkContext, SparkConf sys.path.append('/home/hadoop/myuser/spark-1.3.1-bin-hadoop2.4/python') sys.path.append('/home/hadoop/myuser/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip') os.environ["YARN_CONF_DIR"] = "/home/hadoop/conf" os.environ["SPARK_HOME"] = "/home/hadoop/bwang/spark-1.3.1-bin-hadoop2.4" conf = (SparkConf() .setMaster("yarn-client") .setAppName("Spark ML") .set("spark.executor.memory", "2g") ) sc = SparkContext(conf=conf) data = sc.textFile("hdfs:// ec2-xx.xx.xx..compute-1.amazonaws.com:8020/data/*") data.count() The code works all the way till the count, and it shows "com.hadoop.compression.lzo.LzoCodec not found".. Here <http://www.wepaste.com/sparkcompression/>is the full log. I did some search, and it is basically around Spark cannot access Lzocodec library. I have tried to use os.environ to set the SPARK_CLASSPATH and SPARK_LIBRARY_PATH to include the hadoop-lzo.jar which is located in "./home/hadoop/.versions/2.4.0-amzn-4/share/hadoop/common/lib/hadoop-lzo.jar " in AWS hadoop. However, it is still not working. Can anyone show me how to solve this problem?
Using Pandas/Scikit Learning in Pyspark
Hey there, I have a CDH cluster where the default Python installed on those Redhat Linux are Python2.6. I am thinking about developing a Spark application using pyspark and I want to be able to use Pandas and Scikit learn package. Anaconda Python interpreter has the most funtionalities out of box, however, when I try to use Anaconda Python2.7. The Spark job won't run properly and failed due to the reason that the Python interpreter is not consistent across the cluster. Here are my questions: (1) I took a quick look at the source code of pyspark, looks like in the end, they are using spark-submit. Doesn't that mean all the work in the end will be translated into scala code and distribute the workload to the whole cluster? In that case, I should not worry about the Python interpreter beyond the master node right? (2) If the Spark job need consistent Python library to be installed on every node. Should I install Anaconda Python on all of them? If so, what is the modern way of managing the Python ecosystem on the cluster? I am a big fan of Python so please guide me. Best regards, Bin
Anaconda iPython notebook working with CDH Spark
Hi there, I have a cluster with CDH5.1 running on top of Redhat6.5, where the default Python version is 2.6. I am trying to set up a proper iPython notebook environment to develop spark application using pyspark. Here <http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/> is a tutorial that I have been following. However, it turned out that the author was using iPython1 where we have the latest Anaconda Python2.7 installed on our name node. When I finished following the tutorial, I can connect to the spark cluster but whenever I tried to distribute the work, it will errorred out and google tells me it is the difference between the version of Python across the cluster. Here are a few thoughts that I am planning to try. (1) remove the Anaconda Python from the namenode and install the iPython version that is compatible with Python2.6. (2) or I need to install Anaconda Python on every node and make it the default Python version across the whole cluster (however, I am not sure if this plan will totally screw up the existing environment since some running services are built by Python2.6...) Let me which should be the proper way to set up an iPython notebook environment. Best regards, Bin
Running time bottleneck on a few worker
Hi All, I met a problem that for each stage, most workers finished fast (around 1min), but a few workers spent like 7min to finish, which significantly slow down the process. As shown below, the running time is very unbalancedly distributed over workers. I wonder whether this is normal? Is it related to the partition strategy? For now, I used the default partition strategy. Looking for advice! Thanks very much! Best, Bin
[GraphX] Is it normal to shuffle write 15GB while the data is only 30MB?
Hi All, I am running a customized label propagation using Pregel. After a few iterations, the program becomes slow and wastes a lot of time in mapPartitions (at GraphImpl.scala:184 or VertexRDD.scala:318, or VertexRDD.scala:323). And the amount of shuffle write reaches 15GB, while the size of the raw data with (srcid, dstid, weight) is only 30MB. I wonder whether this is normal? Below please find my customized label propagation implementation. I have changed "map" into "mapValues in line 183 to decrease shuffling, but it makes no difference": " 154 def adsorption(sc : SparkContext, graph : Graph[(Int, Map[VertexId, Double], String), Double], tol: Double) 155 : Graph[(Int, Map[VertexId, Double], Double, Int, String), Double] = 156 { 157 val adsorptionGraph: Graph[(Int, Map[VertexId, Double], Double, Int, String), Double] = graph 158 .outerJoinVertices(graph.inDegrees){ 159 case (vid, u, inDegOpt) => (u._1, u._2, 1.0, inDegOpt.getOrElse(0), u._3) 160 } 162 .cache() 167 168 def sendMessage(edge : EdgeTriplet[(Int, Map[VertexId, Double], Double, Int, String), Double])={ 175 val dstAttr = edge.dstAttr 176 177 if (dstAttr._3 >= tol && (dstAttr._1 == 1 || dstAttr._1 == 3)) 178 { 181 val indegree = dstAttr._4.toDouble 182 183 val mapToSend = edge.srcAttr._2.mapValues{_/indegree}.map(identity) 187 Iterator((edge.dstId, mapToSend)) 188 } 189 else 190 { 192 Iterator.empty 193 } 194 } 195 196 def mergeMessage(label1:Map[VertexId, Double], label2:Map[VertexId, Double]): Map[VertexId, Double] = 197 { 202 val mm = (label1.keySet ++ label2.keySet).map{i=> 203 val count1Val = label1.getOrElse(i, 0.0) 204 val count2Val = label2.getOrElse(i, 0.0) 205 i->(count1Val + count2Val) 206 }.toMap 211 } 212 213 def vertexProgram(vid: VertexId, attr: (Int, Map[VertexId, Double], Double, Int, String), message: Map[VertexId, Double])={ 218 219if (message.isEmpty) attr 220else 221{ 223 224val oldlabel = attr._2 227 228var accum = 0.0 229 230message.foreach(x=> (accum = accum + x._2)) 233 234val newlabel = message.map(x=>(x._1->x._2/accum)) 235 236val diff = (newlabel.keySet--oldlabel.keySet).toSet.size.toDouble / oldlabel.keySet.size.toDouble 239 240(attr._1, newlabel, diff, attr._4, attr._5) 241} 242 } 243 244 // empty initial message 245 val initialMessage = Map[VertexId, Double]() 246 247 Pregel(adsorptionGraph, initialMessage, maxIterations = 3, activeDirection = EdgeDirection.In)( 248 vprog = vertexProgram, 249 sendMsg = sendMessage, 250 mergeMsg = mergeMessage 251 ) 252 } "
Re:[GraphX] Can't zip RDDs with unequal numbers of partitions
OK, I think I've figured it out. It seems to be a bug which has been reported at: https://issues.apache.org/jira/browse/SPARK-2823 and https://github.com/apache/spark/pull/1763. As it says: "If the users set “spark.default.parallelism” and the value is different with the EdgeRDD partition number, GraphX jobs will throw: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions" So my quick fix is to repartition the EdgeRDD to exactly the number of parallelism. But I think this would lead to much network communication. So is there any other better solutions? Thanks a lot! Best, Bin 在 2014-08-06 04:54:39,"Bin" 写道: Hi All, Finally I found that the problem occured when I called the graphx lib: " Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094) at org.apache.spark.rdd.RDD.foreach(RDD.scala:703) at adsorption$.adsorption(adsorption.scala:138) at adsorption$.main(adsorption.scala:64) at adsorption.main(adsorption.scala) " The source codes: " 129 val adsorptionGraph: Graph[(Int, Map[VertexId, Double], Double, Int, String), Double] = graph 130 .outerJoinVertices(graph.inDegrees){ 131 case (vid, u, inDegOpt) => (u._1, u._2, 1.0, inDegOpt.getOrElse(0), u._3) 132 } 133 //.mapVertices((vid, v_attr) => (v_attr._1, v_attr._2, 1.0, 0)) 134 .cache() 135 136 println("After transforming into adsorption graph ") 137 138 adsorptionGraph.triplets.foreach(tri=>println()) " Any advice? Thanks a lot! Best, Bin
[GraphX] Can't zip RDDs with unequal numbers of partitions
Hi All, Finally I found that the problem occured when I called the graphx lib: " Exception in thread "main" java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1094) at org.apache.spark.rdd.RDD.foreach(RDD.scala:703) at adsorption$.adsorption(adsorption.scala:138) at adsorption$.main(adsorption.scala:64) at adsorption.main(adsorption.scala) " The source codes: " 129 val adsorptionGraph: Graph[(Int, Map[VertexId, Double], Double, Int, String), Double] = graph 130 .outerJoinVertices(graph.inDegrees){ 131 case (vid, u, inDegOpt) => (u._1, u._2, 1.0, inDegOpt.getOrElse(0), u._3) 132 } 133 //.mapVertices((vid, v_attr) => (v_attr._1, v_attr._2, 1.0, 0)) 134 .cache() 135 136 println("After transforming into adsorption graph ") 137 138 adsorptionGraph.triplets.foreach(tri=>println()) " Any advice? Thanks a lot! Best, Bin
Can't zip RDDs with unequal numbers of partitions
Hi All, I met the titled error. This exception occured in line 223, as shown below: 212 // read files 213 val lines = sc.textFile(path_edges).map(line=>line.split(",")).map(line=>((line(0), line(1)), line(2).toDouble)).reduceByKey(_+ _).cache 214 215 val lines_vertices = lines.map{line=>(line._1._1, Map(nameHash(line._1._2)->line._2))}.reduceByKey(_++_).cache 216 217 val name_shadow = "_shadow" 218 219 val nodes = 220 lines_vertices 221 .map{line=>(nameHash(line._1), (1, Map[VertexId,Double](), line._1))} ++ 222 lines_vertices 223 .map{line=>(nameHash(line._1 + name_shadow), (2,line._2, line._1 + name_shadow))} ++ 224 lines 225 .map{line=>(nameHash(line._1._2), (3, Map[VertexId,Double](), line._1._2))} Sorry for posting the source codes, but I couldn't think of a better way. I am confused how come the partitions were unequal, and how I can control the number of partitions of these RDD. Can someone give me some advice on this problem? Thanks very much! Best, Bin
[GraphX] How spark parameters relate to Pregel implementation
Hi all, I wonder how spark parameters, e.g., number of paralellism, affect Pregel performance? Specifically, sendmessage, mergemessage, and vertexprogram? I have tried label propagation on a 300,000 edges graph, and I found that no paralellism is much faster than 5 or 500 paralellism. Looking for advice! Thanks a lot! Best, Bin
Re:Re: Re:Re: [GraphX] The best way to construct a graph
Thanks for the advice. But since I am not the administrator of our spark cluster, I can't do this. Is there any better solution based on the current spark? At 2014-08-01 02:38:15, "shijiaxin" wrote: >Have you tried to write another similar function like edgeListFile in the >same file, and then compile the project again? > > > >-- >View this message in context: >http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-The-best-way-to-construct-a-graph-tp11122p11138.html >Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re:Re: [GraphX] The best way to construct a graph
It seems that I cannot specify the weights. I have also tried to imitate GraphLoader.edgeListFile, but I can't call The methods and class used in GraphLoader.edgeListFile. Have you successfully done this? At 2014-08-01 12:47:08, "shijiaxin" wrote: >I think you can try GraphLoader.edgeListFile, and then use join to associate >the attributes with each vertex > > > >-- >View this message in context: >http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-The-best-way-to-construct-a-graph-tp11122p11127.html >Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re:Re: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
Hi Haiyang, Thanks, it really is the reason. Best, Bin 在 2014-07-31 08:05:34,"Haiyang Fu" 写道: Have you tried to increase the dirver memory? On Thu, Jul 31, 2014 at 3:54 PM, Bin wrote: Hi All, The data size of my task is about 30mb. It runs smoothly in local mode. However, when I submit it to the cluster, it throws the titled error (Please see below for the complete output). Actually, my output is almost the same with http://stackoverflow.com/questions/24080891/spark-program-hangs-at-job-finished-toarray-workers-throw-java-util-concurren. I also toArray my data, which was the reason of his case. However, how come it runs OK in local but not in the cluster? The memory of each worker is over 60g, and my run command is: "$SPARK_HOME/bin/spark-class org.apache.spark.deploy.Client launch spark://10.196.135.101:7077 $jar_path $programname -Dspark.master=spark://10.196.135.101:7077 -Dspark.cores.max=300 -Dspark.executor.memory=20g -spark.jars=$jar_path -Dspark.default.parallelism=100 -Dspark.hadoop.hadoop.job.ugi=$username,$groupname -Dspark.app.name=$appname $in_path $scala_out_path" Looking for help and thanks a lot! Below please find the complete output: 14/07/31 15:06:53 WARN Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively 14/07/31 15:06:53 INFO SecurityManager: Changing view acls to: spark 14/07/31 15:06:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark) 14/07/31 15:06:53 INFO Slf4jLogger: Slf4jLogger started 14/07/31 15:06:53 INFO Remoting: Starting remoting 14/07/31 15:06:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@tdw-10-215-140-22:39446] 14/07/31 15:06:54 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@tdw-10-215-140-22:39446] 14/07/31 15:06:54 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@tdw-10-196-135-106:38502/user/CoarseGrainedScheduler 14/07/31 15:06:54 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker 14/07/31 15:06:54 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker 14/07/31 15:06:56 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 14/07/31 15:06:56 INFO SecurityManager: Changing view acls to: spark 14/07/31 15:06:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark) 14/07/31 15:06:56 INFO Slf4jLogger: Slf4jLogger started 14/07/31 15:06:56 INFO Remoting: Starting remoting 14/07/31 15:06:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@tdw-10-215-140-22:56708] 14/07/31 15:06:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@tdw-10-215-140-22:56708] 14/07/31 15:06:56 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp://spark@tdw-10-196-135-106:38502/user/MapOutputTracker 14/07/31 15:06:58 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@tdw-10-196-135-106:38502/user/BlockManagerMaster 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data1/sparkenv/local/spark-local-20140731150659-3f12 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data2/sparkenv/local/spark-local-20140731150659-1602 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data3/sparkenv/local/spark-local-20140731150659-d213 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data4/sparkenv/local/spark-local-20140731150659-f42e 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data5/sparkenv/local/spark-local-20140731150659-63d0 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data6/sparkenv/local/spark-local-20140731150659-9003 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data7/sparkenv/local/spark-local-20140731150659-f260 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data8/sparkenv/local/spark-local-20140731150659-6334 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data9/sparkenv/local/spark-local-20140731150659-3af4 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data10/sparkenv/local/spark-local-20140731150659-133d 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data11/sparkenv/local/spark-local-20140731150659-ed08 14/07/31 15:06:59 INFO MemoryStore: MemoryStore started with capacity 11.5 GB. 14/07/31 15:06:59 INFO ConnectionManager: Bound socket to port 35127 with id = ConnectionManagerId(tdw-10-215-140-22,35127) 14/07/31 15:06:59 INFO BlockManagerMaster: Trying to register B
[GraphX] The best way to construct a graph
Hi All, I am wondering what is the best way to construct a graph? Say I have some attributes for each user, and specific weight for each user pair. The way I am currently doing is first read user information and edge triple into two arrays, then use sc.parallelize to create vertexRDD and edgeRDD, respectively. Then create the graph using Graph(vertices, edges). I wonder whether there is a better way to do this? Looking for advice! Thanks very much! Best, Bin
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
Hi All, The data size of my task is about 30mb. It runs smoothly in local mode. However, when I submit it to the cluster, it throws the titled error (Please see below for the complete output). Actually, my output is almost the same with http://stackoverflow.com/questions/24080891/spark-program-hangs-at-job-finished-toarray-workers-throw-java-util-concurren. I also toArray my data, which was the reason of his case. However, how come it runs OK in local but not in the cluster? The memory of each worker is over 60g, and my run command is: "$SPARK_HOME/bin/spark-class org.apache.spark.deploy.Client launch spark://10.196.135.101:7077 $jar_path $programname -Dspark.master=spark://10.196.135.101:7077 -Dspark.cores.max=300 -Dspark.executor.memory=20g -spark.jars=$jar_path -Dspark.default.parallelism=100 -Dspark.hadoop.hadoop.job.ugi=$username,$groupname -Dspark.app.name=$appname $in_path $scala_out_path" Looking for help and thanks a lot! Below please find the complete output: 14/07/31 15:06:53 WARN Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively 14/07/31 15:06:53 INFO SecurityManager: Changing view acls to: spark 14/07/31 15:06:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark) 14/07/31 15:06:53 INFO Slf4jLogger: Slf4jLogger started 14/07/31 15:06:53 INFO Remoting: Starting remoting 14/07/31 15:06:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@tdw-10-215-140-22:39446] 14/07/31 15:06:54 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@tdw-10-215-140-22:39446] 14/07/31 15:06:54 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@tdw-10-196-135-106:38502/user/CoarseGrainedScheduler 14/07/31 15:06:54 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker 14/07/31 15:06:54 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker 14/07/31 15:06:56 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 14/07/31 15:06:56 INFO SecurityManager: Changing view acls to: spark 14/07/31 15:06:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark) 14/07/31 15:06:56 INFO Slf4jLogger: Slf4jLogger started 14/07/31 15:06:56 INFO Remoting: Starting remoting 14/07/31 15:06:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@tdw-10-215-140-22:56708] 14/07/31 15:06:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@tdw-10-215-140-22:56708] 14/07/31 15:06:56 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp://spark@tdw-10-196-135-106:38502/user/MapOutputTracker 14/07/31 15:06:58 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@tdw-10-196-135-106:38502/user/BlockManagerMaster 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data1/sparkenv/local/spark-local-20140731150659-3f12 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data2/sparkenv/local/spark-local-20140731150659-1602 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data3/sparkenv/local/spark-local-20140731150659-d213 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data4/sparkenv/local/spark-local-20140731150659-f42e 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data5/sparkenv/local/spark-local-20140731150659-63d0 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data6/sparkenv/local/spark-local-20140731150659-9003 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data7/sparkenv/local/spark-local-20140731150659-f260 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data8/sparkenv/local/spark-local-20140731150659-6334 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data9/sparkenv/local/spark-local-20140731150659-3af4 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data10/sparkenv/local/spark-local-20140731150659-133d 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data11/sparkenv/local/spark-local-20140731150659-ed08 14/07/31 15:06:59 INFO MemoryStore: MemoryStore started with capacity 11.5 GB. 14/07/31 15:06:59 INFO ConnectionManager: Bound socket to port 35127 with id = ConnectionManagerId(tdw-10-215-140-22,35127) 14/07/31 15:06:59 INFO BlockManagerMaster: Trying to register BlockManager 14/07/31 15:07:00 INFO BlockManagerMaster: Registered BlockManager 14/07/31 15:07:00 INFO HttpFileServer: HTTP File server directory is /tmp/spark-0914d215-dd22-4d5e-9ec0-724937dbfd8b 14/07/31 15:
[GraphX] How to access a vertex via vertexId?
Hi All, I wonder how to access a vertex via its vertexId? I need to get vertex's attributes after running graph algorithm. Thanks very much! Best, Bin
Re: Scala vs Python performance differences
At least, Spark Streaming doesn't support Python at this moment, right? On Mon, Apr 14, 2014 at 6:48 PM, Andrew Ash wrote: > Hi Spark users, > > I've always done all my Spark work in Scala, but occasionally people ask > about Python and its performance impact vs the same algorithm > implementation in Scala. > > Has anyone done tests to measure the difference? > > Anecdotally I've heard Python is a 40% slowdown but that's entirely > hearsay. > > Cheers, > Andrew >
Re: Spark and HBase
First, I have not tried it myself. However, what I have heard it has some basic SQL features so you can query you HBase table like query content on HDFS using Hive. So it is not "query a simple column", I believe you can do joins and other SQL queries. Maybe you can wrap up an EMR cluster with Hbase preconfigured and give it a try. Sorry cannot provide more detailed explanation and help. On Tue, Apr 8, 2014 at 10:17 AM, Flavio Pompermaier wrote: > Thanks for the quick reply Bin. Phenix is something I'm going to try for > sure but is seems somehow useless if I can use Spark. > Probably, as you said, since Phoenix use a dedicated data structure within > each HBase Table has a more effective memory usage but if I need to > deserialize data stored in a HBase cell I still have to read in memory that > object and thus I need Spark. From what I understood Phoenix is good if I > have to query a simple column of HBase but things get really complicated if > I have to add an index for each column in my table and I store complex > object within the cells. Is it correct? > > Best, > Flavio > > > > > On Tue, Apr 8, 2014 at 6:05 PM, Bin Wang wrote: > >> Hi Flavio, >> >> I happened to attend, actually attending the 2014 Apache Conf, I heard a >> project called "Apache Phoenix", which fully leverage HBase and suppose to >> be 1000x faster than Hive. And it is not memory bounded, in which case sets >> up a limit for Spark. It is still in the incubating group and the "stats" >> functions spark has already implemented are still on the roadmap. I am not >> sure whether it will be good but might be something interesting to check >> out. >> >> /usr/bin >> >> >> On Tue, Apr 8, 2014 at 9:57 AM, Flavio Pompermaier >> wrote: >> >>> Hi to everybody, >>> >>> in these days I looked a bit at the recent evolution of the big data >>> stacks and it seems that HBase is somehow fading away in favour of >>> Spark+HDFS. Am I correct? >>> Do you think that Spark and HBase should work together or not? >>> >>> Best regards, >>> Flavio >>> >>
Re: Spark and HBase
Hi Flavio, I happened to attend, actually attending the 2014 Apache Conf, I heard a project called "Apache Phoenix", which fully leverage HBase and suppose to be 1000x faster than Hive. And it is not memory bounded, in which case sets up a limit for Spark. It is still in the incubating group and the "stats" functions spark has already implemented are still on the roadmap. I am not sure whether it will be good but might be something interesting to check out. /usr/bin On Tue, Apr 8, 2014 at 9:57 AM, Flavio Pompermaier wrote: > Hi to everybody, > > in these days I looked a bit at the recent evolution of the big data > stacks and it seems that HBase is somehow fading away in favour of > Spark+HDFS. Am I correct? > Do you think that Spark and HBase should work together or not? > > Best regards, > Flavio >
Spark Streaming Maven Build
Hi there, I tried the Kafka WordCount example and it works perfect and the code is pretty straightforward to understand. Can anyone show to me how to start your own maven project with the KafkaWordCount example using minimum-effort. 1. How the pom file should look like (including jar-plugin? assembly-plugin?..etc) 2. mvn install or mvn clean install or mvn install compile assembly:single? 3. after you have a jar file, then how do you execute the jar file instead of using bin/run-example... To answer those people who might ask what you have done (Here is a derivative from the KafkaWordCount example that I have written to do IP count example where the input data from Kafka is actually JSON string. https://github.com/biwa7636/binwangREPO I had such a bad lucky got it to working. So if anyone can copy the code of WordCountExample and build it using maven and got it working.. if you can share your pom and those maven commands, I will be so appreciated!) Best regards and let me know if you need further info. Bin
Re: Missing Spark URL after staring the master
Hi Mayur, I am using CDH4.6.0p0.26. And the latest Cloudera Spark parcel is Spark 0.9.0 CDH4.6.0p0.50. As I mentioned, somehow, the Cloudera Spark version doesn't contain the run-example shell scripts.. However, it is automatically configured and it is pretty easy to set up across the cluster... Thanks, Bin On Tue, Mar 4, 2014 at 10:59 AM, Mayur Rustagi wrote: > I have on cloudera vm > http://docs.sigmoidanalytics.com/index.php/How_to_Install_Spark_on_Cloudera_VM > which version are you trying to setup on cloudera.. also which cloudera > version are you using... > > > Mayur Rustagi > Ph: +1 (760) 203 3257 > http://www.sigmoidanalytics.com > @mayur_rustagi <https://twitter.com/mayur_rustagi> > > > > On Mon, Mar 3, 2014 at 4:29 PM, Bin Wang wrote: > >> Hi Ognen/Mayur, >> >> Thanks for the reply and it is good to know how easy it is to setup Spark >> on AWS cluster. >> >> My situation is a bit different from yours, our company already have a >> cluster and it really doesn't make that much sense not to use them. That is >> why I have been "going through" this. I really wish there are some >> tutorials teaching you how to set up Spark Cluster on baremetal CDH cluster >> or .. some way to tweak the CDH Spark distribution, so it is up to date. >> >> Ognen, of course it will be very helpful if you can 'history | grep >> spark... ' and document the work that you have done since you've already >> made it! >> >> Bin >> >> >> >> On Mon, Mar 3, 2014 at 2:06 PM, Ognen Duzlevski < >> og...@plainvanillagames.com> wrote: >> >>> I should add that in this setup you really do not need to look for the >>> printout of the master node's IP - you set it yourself a priori. If anyone >>> is interested, let me know, I can write it all up so that people can follow >>> some set of instructions. Who knows, maybe I can come up with a set of >>> scripts to automate it all... >>> >>> Ognen >>> >>> >>> >>> On 3/3/14, 3:02 PM, Ognen Duzlevski wrote: >>> >>> I have a Standalone spark cluster running in an Amazon VPC that I set up >>> by hand. All I did was provision the machines from a common AMI image (my >>> underlying distribution is Ubuntu), I created a "sparkuser" on each machine >>> and I have a /home/sparkuser/spark folder where I downladed spark. I did >>> this on the master only, I did sbt/sbt assemble and I set up the >>> conf/spark-env.sh to point to the master which is an IP address (in my case >>> 10.10.0.200, the port is the default 7077). I also set up the slaves file >>> in the same subdirectory to have all 16 ip addresses of the worker nodes >>> (in my case 10.10.0.201-216). After sbt/sbt assembly was done on master, I >>> then did cd ~/; tar -czf spark.tgz spark/ and I copied the resulting tgz >>> file to each worker using the same "sparkuser" account and unpacked the >>> .tgz on each slave (this will effectively replicate everything from master >>> to all slaves - you can script this so you don't do it by hand). >>> >>> Your AMI should have the distribution's version of Java and git >>> installed by the way. >>> >>> All you have to do then is sparkuser@spark-master> >>> spark/sbin/start-all.sh (for 0.9, in 0.8.1 it is spark/bin/start-all.sh) >>> and it will all automagically start :) >>> >>> All my Amazon nodes come with 4x400 Gb of ephemeral space which I have >>> set up into a 1.6TB RAID0 array on each node and I am pooling this into an >>> HDFS filesystem which is operated by a namenode outside the spark cluster >>> while all the datanodes are the same nodes as the spark workers. This >>> enables replication and extremely fast access since ephemeral is much >>> faster than EBS or anything else on Amazon (you can do even better with SSD >>> drives on this setup but it will cost ya). >>> >>> If anyone is interested I can document our pipeline set up - I came up >>> with it myself and do not have a clue as to what the industry standards are >>> since I could not find any written instructions anywhere online about how >>> to set up a whole data analytics pipeline from the point of ingestion to >>> the point of analytics (people don't want to share their secrets? or am I >>> just in the dark and incapable of using Google properly?). My requirement >>> was that I wanted this to run within a VPC for added security and >>>
Re: Missing Spark URL after staring the master
Hi Ognen/Mayur, Thanks for the reply and it is good to know how easy it is to setup Spark on AWS cluster. My situation is a bit different from yours, our company already have a cluster and it really doesn't make that much sense not to use them. That is why I have been "going through" this. I really wish there are some tutorials teaching you how to set up Spark Cluster on baremetal CDH cluster or .. some way to tweak the CDH Spark distribution, so it is up to date. Ognen, of course it will be very helpful if you can 'history | grep spark... ' and document the work that you have done since you've already made it! Bin On Mon, Mar 3, 2014 at 2:06 PM, Ognen Duzlevski wrote: > I should add that in this setup you really do not need to look for the > printout of the master node's IP - you set it yourself a priori. If anyone > is interested, let me know, I can write it all up so that people can follow > some set of instructions. Who knows, maybe I can come up with a set of > scripts to automate it all... > > Ognen > > > > On 3/3/14, 3:02 PM, Ognen Duzlevski wrote: > > I have a Standalone spark cluster running in an Amazon VPC that I set up > by hand. All I did was provision the machines from a common AMI image (my > underlying distribution is Ubuntu), I created a "sparkuser" on each machine > and I have a /home/sparkuser/spark folder where I downladed spark. I did > this on the master only, I did sbt/sbt assemble and I set up the > conf/spark-env.sh to point to the master which is an IP address (in my case > 10.10.0.200, the port is the default 7077). I also set up the slaves file > in the same subdirectory to have all 16 ip addresses of the worker nodes > (in my case 10.10.0.201-216). After sbt/sbt assembly was done on master, I > then did cd ~/; tar -czf spark.tgz spark/ and I copied the resulting tgz > file to each worker using the same "sparkuser" account and unpacked the > .tgz on each slave (this will effectively replicate everything from master > to all slaves - you can script this so you don't do it by hand). > > Your AMI should have the distribution's version of Java and git installed > by the way. > > All you have to do then is sparkuser@spark-master> > spark/sbin/start-all.sh (for 0.9, in 0.8.1 it is spark/bin/start-all.sh) > and it will all automagically start :) > > All my Amazon nodes come with 4x400 Gb of ephemeral space which I have set > up into a 1.6TB RAID0 array on each node and I am pooling this into an HDFS > filesystem which is operated by a namenode outside the spark cluster while > all the datanodes are the same nodes as the spark workers. This enables > replication and extremely fast access since ephemeral is much faster than > EBS or anything else on Amazon (you can do even better with SSD drives on > this setup but it will cost ya). > > If anyone is interested I can document our pipeline set up - I came up > with it myself and do not have a clue as to what the industry standards are > since I could not find any written instructions anywhere online about how > to set up a whole data analytics pipeline from the point of ingestion to > the point of analytics (people don't want to share their secrets? or am I > just in the dark and incapable of using Google properly?). My requirement > was that I wanted this to run within a VPC for added security and > simplicity, the Amazon security groups get really old quickly. Added bonus > is that you can use a VPN as an entry into the whole system and your > cluster instantly becomes "local" to you in terms of IPs etc. I use OpenVPN > since I don't like Cisco nor Juniper (the only two options Amazon provides > for their VPN gateways). > > Ognen > > > On 3/3/14, 1:00 PM, Bin Wang wrote: > > Hi there, > > I have a CDH cluster set up, and I tried using the Spark parcel come > with Cloudera Manager, but it turned out they even don't have the > run-example shell command in the bin folder. Then I removed it from the > cluster and cloned the incubator-spark into the name node of my cluster, > and built from source there successfully with everything as default. > > I ran a few examples and everything seems work fine in the local mode. > Then I am thinking about scale it to my cluster, which is what the > "DISTRIBUTE + ACTIVATE" command does in Cloudera Manager. I want to add all > the datanodes to the slaves and think I should run Spark in the standalone > mode. > > Say I am trying to set up Spark in the standalone mode following this > instruction: > https://spark.incubator.apache.org/docs/latest/spark-standalone.html > However, it says "Once started, the master will print out a > spark://HOST:PORT URL for its
Missing Spark URL after staring the master
Hi there, I have a CDH cluster set up, and I tried using the Spark parcel come with Cloudera Manager, but it turned out they even don't have the run-example shell command in the bin folder. Then I removed it from the cluster and cloned the incubator-spark into the name node of my cluster, and built from source there successfully with everything as default. I ran a few examples and everything seems work fine in the local mode. Then I am thinking about scale it to my cluster, which is what the "DISTRIBUTE + ACTIVATE" command does in Cloudera Manager. I want to add all the datanodes to the slaves and think I should run Spark in the standalone mode. Say I am trying to set up Spark in the standalone mode following this instruction: https://spark.incubator.apache.org/docs/latest/spark-standalone.html However, it says "Once started, the master will print out a spark://HOST:PORT URL for itself, which you can use to connect workers to it, or pass as the "master" argument to SparkContext. You can also find this URL on the master's web UI, which is http://localhost:8080 by default." After I started the master, there is no URL printed on the screen and neither the web UI is running. Here is the output: [root@box incubator-spark]# ./sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /root/bwang_spark_new/incubator-spark/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-box.out First Question: am I even in the ballpark to run Spark in standalone mode if I try to fully utilize my cluster? I saw there are four ways to launch Spark on a cluster, AWS-EC2, Spark in standalone, Apache Meso, Hadoop Yarn... which I guess standalone mode is the way to go? Second Question: how to get the Spark URL of the cluster, why the output is not like what the instruction says? Best regards, Bin