Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment
apologies for the long answer. understanding partitioning at each stage of the the RDD graph/lineage is important for efficient parallelism and having load balanced. This applies to working with any sources streaming or static. you have tricky situation here of one source kafka with 9 partitions and static data set 90 partitions. before joining both these try to have number of partitions equal for both RDD's you can either repartition kafka source to 90 partitions or coalesce flat file RDD to 9 partitions or midway between 9 and 90. in general no of tasks that can run in parallel equal to total no of cores spark job has (no of executors * no of cores per executor). As an example if the flat file has 90 partitions and if you set 4 executors each with 5 cores for a total of 20 cores if you have 20+20+20+20+10 tasks gets scheduled. as you can see at the last you will have only 10 tasks though you have 20 cores. compare this with 6 executors each with 5 cores for a total of 30 cores, then it would be 30+30+30. ideally no of partitions for each RDD (in the graph lineage) should be a multiple of total no of available cores for the spark job. in terms of data locality prefer process-local over node-local over rack local as an example 5 executors with 4 cores and 4 executors with 5 cores each of this option will have 20 cores in total. But with 4 executors its less shuffling more process-local/node-local need to look at RDD graph for this df = sqlContext.read.parquet(...) and RDD rdd = df.as[T].rdd on your final question, you should be able to tune the static RDD without external store by carefully looking at each batch RDD lineage for that 30 mins before the RDD gets refreshed again. if you would like to use external system Apache Ignite is something that you can use as cache. thanks Vijay -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
sqoop import job not working when spark thrift server is running.
Hello , I was trying to optimize my spark cluster. I did it to some extent by doing some changes in yarn-site.xml and spark-defaults.conf file. before the changes the mapreduce import job was running fine along with slow thrift server. after changes, i have to kill the thrift server to execute my sqoop import job. following are the configurations- *yarn-site.xml* yarn.nodemanager.resource.pcores-vcores-multiplier 1.0 yarn.nodemanager.vmem-pmem-ratio 5 yarn.nodemanager.resource.cpu-vcores 4 yarn.scheduler.maximum-allocation-vcores 4 *spark-defaults.conf* spark.master yarn spark.driver.memory9g spark.executor.memory 8570m spark.yarn.executor.memoryOverhead 646m spark.executor.instances 11 spark.executor.cores 3 spark.default.parallelism30 SPARK_WORKER_MEMORY 10g SPARK_WORKER_INSTANCES 1 SPARK_WORKER_CORES 4 SPARK_DRIVER_MEMORY 9g SPARK_DRIVER_CORES 3 SPARK_MASTER_PORT 7077 SPARK_EXECUTOR_INSTANCES 11 SPARK_EXECUTOR_CORES 3 SPARK_EXECUTOR_MEMORY 8570m *Resources in cluster of 9 nodes are * 12GB RAM and 6 cores on each nodes. Thanks for your time.
Re: Does Pyspark Support Graphx?
When using the --jars option, we should include it every time we submit a job , it seems add the jars to the classpath to every slave node a spark is only way to "install" spark packages. -- Original -- From: Nicholas HakobianDate: Tue,Feb 20,2018 3:37 AM To: xiaobo Cc: Denny Lee , user@spark.apache.org Subject: Re: Does Pyspark Support Graphx? If you copy the Jar file and all of the dependencies to the machines, you can manually add them to the classpath. If you are using Yarn and HDFS you can alternatively use --jars and point it to the hdfs locations of the jar files and it will (in most cases) distribute them to the worker nodes at job submission time. Nicholas Szandor Hakobian, Ph.D.Staff Data Scientist Rally Health nicholas.hakob...@rallyhealth.com On Sun, Feb 18, 2018 at 7:24 PM, xiaobo wrote: Another question is how to install graphframes permanently when the spark nodes can not connect to the internet. -- Original -- From: Denny Lee Date: Mon,Feb 19,2018 10:23 AM To: xiaobo Cc: user@spark.apache.org Subject: Re: Does Pyspark Support Graphx? Note the --packages option works for both PySpark and Spark (Scala). For the SparkLauncher class, you should be able to include packages ala: spark.addSparkArg("--packages", "graphframes:0.5.0-spark2.0-s_2.11") On Sun, Feb 18, 2018 at 3:30 PM xiaobo wrote: Hi Denny, The pyspark script uses the --packages option to load graphframe library, what about the SparkLauncher class? -- Original -- From: Denny Lee Date: Sun,Feb 18,2018 11:07 AM To: 94035420 Cc: user@spark.apache.org Subject: Re: Does Pyspark Support Graphx? That??s correct - you can use GraphFrames though as it does support PySpark. On Sat, Feb 17, 2018 at 17:36 94035420 wrote: I can not find anything for graphx module in the python API document, does it mean it is not supported yet?
Re: [graphframes]how Graphframes Deal With BidirectionalRelationships
So the question comes to does graphframes support bidirectional relationship natively with only one edge? -- Original -- From: Felix CheungDate: Tue,Feb 20,2018 10:01 AM To: xiaobo , user@spark.apache.org Subject: Re: [graphframes]how Graphframes Deal With BidirectionalRelationships Generally that would be the approach. But since you have effectively double the number of edges this will likely affect the scale your job will run. From: xiaobo Sent: Monday, February 19, 2018 3:22:02 AM To: user@spark.apache.org Subject: [graphframes]how Graphframes Deal With Bidirectional Relationships Hi, To represent a bidirectional relationship, one solution is to insert two edges for the vertices pair, my question is do the algorithms of graphframes still work when we doing this. Thanks
Errors when running unit tests
Hi , I get errors like below when trying to run the spark unit tests zipPartitions(test.org.apache.spark.Java8RDDAPISuite) Time elapsed: 2.212 > sec <<< ERROR! > java.lang.IllegalStateException: failed to create a child event loop > at test.org.apache.spark.Java8RDDAPISuite.setUp(Java8RDDAPISuite.java:54) > Caused by: io.netty.channel.ChannelException: failed to open a new > selector > at test.org.apache.spark.Java8RDDAPISuite.setUp(Java8RDDAPISuite.java:54) > Caused by: java.io.IOException:* Too many open files* > at test.org.apache.spark.Java8RDDAPISuite.setUp(Java8RDDAPISuite.java:54) I am running unit tests from version 2.2.1 with following system config > $ sysctl fs.file-max > fs.file-max = 10 > $ ulimit -u > 65536 Command line used: > build/mvn -Phadoop-2.6 -Pyarn -Phive -Phive-thriftserver -Pkinesis-asl > -Pmesos --fail-at-end test Any pointers on this will be helpful Thanks Karuppayya
Re: [graphframes]how Graphframes Deal With Bidirectional Relationships
Generally that would be the approach. But since you have effectively double the number of edges this will likely affect the scale your job will run. From: xiaoboSent: Monday, February 19, 2018 3:22:02 AM To: user@spark.apache.org Subject: [graphframes]how Graphframes Deal With Bidirectional Relationships Hi, To represent a bidirectional relationship, one solution is to insert two edges for the vertices pair, my question is do the algorithms of graphframes still work when we doing this. Thanks
Re: KafkaUtils.createStream(..) is removed for API
I can't speak for committers, but my guess is it's more likely for DStreams in general to stop being supported before that particular integration is removed. On Sun, Feb 18, 2018 at 9:34 PM, naresh Goudwrote: > Thanks Ted. > > I see createDirectStream is experimental as annotated with > "org.apache.spark.annotation.Experimental". > > Is it possible to be this API will be removed in future? because we wanted > to use this API in one of our production jobs. afraid if it will not be > supported in future. > > Thank you, > Naresh > > > > > On Sun, Feb 18, 2018 at 7:47 PM, Ted Yu wrote: >> >> createStream() is still in >> external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala >> But it is not in >> external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala >> >> FYI >> >> On Sun, Feb 18, 2018 at 5:17 PM, naresh Goud >> wrote: >>> >>> Hello Team, >>> >>> I see "KafkaUtils.createStream() " method not available in spark 2.2.1. >>> >>> Can someone please confirm if these methods are removed? >>> >>> below is my pom.xml entries. >>> >>> >>> >>> 2.11.8 >>> 2.11 >>> >>> >>> >>> >>> org.apache.spark >>> spark-streaming_${scala.tools.version} >>> 2.2.1 >>> provided >>> >>> >>> org.apache.spark >>> spark-streaming-kafka-0-10_2.11 >>> 2.2.1 >>> provided >>> >>> >>> org.apache.spark >>> spark-core_2.11 >>> 2.2.1 >>> provided >>> >>> >>> >>> >>> >>> >>> >>> Thank you, >>> Naresh >> >> > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Does Pyspark Support Graphx?
If you copy the Jar file and all of the dependencies to the machines, you can manually add them to the classpath. If you are using Yarn and HDFS you can alternatively use --jars and point it to the hdfs locations of the jar files and it will (in most cases) distribute them to the worker nodes at job submission time. Nicholas Szandor Hakobian, Ph.D. Staff Data Scientist Rally Health nicholas.hakob...@rallyhealth.com On Sun, Feb 18, 2018 at 7:24 PM, xiaobowrote: > Another question is how to install graphframes permanently when the spark > nodes can not connect to the internet. > > > > -- Original -- > *From:* Denny Lee > *Date:* Mon,Feb 19,2018 10:23 AM > *To:* xiaobo > *Cc:* user@spark.apache.org > *Subject:* Re: Does Pyspark Support Graphx? > > Note the --packages option works for both PySpark and Spark (Scala). For > the SparkLauncher class, you should be able to include packages ala: > > spark.addSparkArg("--packages", "graphframes:0.5.0-spark2.0-s_2.11") > > > On Sun, Feb 18, 2018 at 3:30 PM xiaobo wrote: > >> Hi Denny, >> The pyspark script uses the --packages option to load graphframe library, >> what about the SparkLauncher class? >> >> >> >> -- Original -- >> *From:* Denny Lee >> *Date:* Sun,Feb 18,2018 11:07 AM >> *To:* 94035420 >> *Cc:* user@spark.apache.org >> *Subject:* Re: Does Pyspark Support Graphx? >> That’s correct - you can use GraphFrames though as it does support >> PySpark. >> On Sat, Feb 17, 2018 at 17:36 94035420 wrote: >> >>> I can not find anything for graphx module in the python API document, >>> does it mean it is not supported yet? >>> >>
Re: [Spark Streaming]: Non-deterministic uneven task-to-machine assignment
Hi Vijay, Thank you very much for your reply. Setting the number of partitions explicitly in the join, and memory pressure influence on partitioning were definitely very good insights. At the end, we avoid the issue of uneven load balancing completely by doing the following two: a) Reducing the number of executors, and increasing the number of cores and executor memory b) Increasing the batch interval size from 15s to 30s. Here is a nice blog post that explains how to improve performance for Spark jobs in general: https://mapr.com/blog/performance-tuning-apache-kafkaspark-streaming-system/ . @Vijay: And here are the responses to your questions: 1) Correct. 2) This is exactly what confuses us: There is nothing between the following lines: df = sqlContext.read.parquet(...) and RDD rdd = df.as[T].rdd We saw that a separate query plan is executed on converting DataFrame to RDD (.rdd method). Is it equivalent to repartition, coalesce or something else? 3) Exactly. 4) We are caching the static rdd for 30 minutes. That is, we have a trait with readLast method that returns the last read RDD, and once the RDD is more than 30 minutes old, we reload its content from disk using df = sqlContext.read.parquet(...). --- My final question is the following: What would be the most efficient way (including possibly an external key-value store) for efficient store, update and retrieval of final_rdd? The state may grow beyond 3GB, and we want to maintain our scalability and latency. In fact, we have many Spark jobs that join the same RDD with different Kafka streams. Thank you very much! On Wed, Jan 31, 2018 at 11:24 AM, vijay.bvpwrote: > Summarizing > > 1) Static data set read from Parquet files as DataFrame in HDFS has initial > parallelism of 90 (based on no input files) > > 2) static data set DataFrame is converted as rdd, and rdd has parallelism > of > 18 this was not expected > dataframe.rdd is lazy evaluation there must be some operation you were > doing > that would have triggered > conversion from 90 to 18, this would be some operation that breaks > stage/requires shuffling such as groupby, reduceby, repartition,coalesce > if you are using coalesce, the second parameter shuff is by default false > which means upstream parallelism is not preserved. > > 3) you have DStream of Kafka source with 9 partitions this is joined with > above static data set? when joining have you tried setting up numPartitions > an optional parameter to provide no of partitions required. > > 4) your batch interval is 15 seconds but you are caching the static data > set > for 30 minutes, what exactly you mean caching for 30 minutes? > > Note when you cache data based on the memory pressure there is chance that > partitioning is not preserved. > > it would be useful to provide spark UI screen shots for one complete batch, > the DAG and other details > > thanks > Vijay > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Understand task timing
Using Spark 1.6.2, I want to understand what « Duration » really mean (and why is slow). Running a simple SELECT COUNT against a parquet file, stored within HDFS: NODE_LOCAL 1 / DATA02 2018/02/19 09:54:27 5 s 30 ms 8.8 MB (hadoop) / 3010830 8 ms 77.2 KB / 1666 This means "took 5 secondes to read 8 M from HDFS » ? Thomas Decaux
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org