Re: org.apache.spark.shuffle.FetchFailedException: Too large frame:
I am performing join operation , if I convert reduce side join to map side (no shuffle will happen) and I assume in that case this error shouldn't come. Let me know if this understanding is correct On Tue, May 1, 2018 at 9:37 PM, Ryan Bluewrote: > This is usually caused by skew. Sometimes you can work around it by in > creasing the number of partitions like you tried, but when that doesn’t > work you need to change the partitioning that you’re using. > > If you’re aggregating, try adding an intermediate aggregation. For > example, if your query is select sum(x), a from t group by a, then try select > sum(partial), a from (select sum(x) as partial, a, b from t group by a, b) > group by a. > > rb > > > On Tue, May 1, 2018 at 4:21 AM, Pralabh Kumar > wrote: > >> Hi >> >> I am getting the above error in Spark SQL . I have increase (using 5000 ) >> number of partitions but still getting the same error . >> >> My data most probably is skew. >> >> >> >> org.apache.spark.shuffle.FetchFailedException: Too large frame: 4247124829 >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:419) >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:349) >> >> > > > -- > Ryan Blue > Software Engineer > Netflix >
MappingException - org.apache.spark.mllib.classification.LogisticRegressionModel.load
Hi, I used pyspark to create a Logistic Regression model, train my training data and evaluate my test data using ML api. However, to use the model in my program, I saved the model(e.g. Logistic Regression model) and when I tried to load it in pyspark using sameModel = LogisticRegressionModel.load(sc,path) It throws below error: An error occurred while calling z:org.apache.spark.mllib.classification.LogisticRegressionModel.load. : org.json4s.package$MappingException: Did not find value which can be converted into java.lang.String Is there a way to load the model in ML instead of MLIB? Your input is appreciated. Best regards, Mina
AccumulatorV2 vs AccumulableParam (V1)
Hello guys, I've started to migrate my Spark jobs which use Accumulators V1 to AccumulatorV2 and faced with the following issues: 1. LegacyAccumulatorWrapper now requires the resulting type of AccumulableParam to implement equals. In other case the AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper, will fail with AssertionError (SPARK-23697 [1]). 2. Existing AccumulatorV2 classes are hardly difficult to extend easily and correctly (SPARK-24154 [2]) due to its "copy" method which is called during serialization and usually loses type information of descendant classes which don't override "copy" (and it's easier to implement an accumulator from scratch than override it correctly) 3. The same instance of AccumulatorV2 cannot be used with the same SparkContext multiple times (unlike AccumulableParam) failing with "IllegalStateException: Cannot register an Accumulator twice" even after "reset" method called. So it's impossible to unregister already registered accumulator from user code. 4. AccumulableParam (V1) implementations are usually more or less stateless, while AccumulatorV2 implementations are almost always stateful, leading to (unnecessary?) type checks (unlike AccumulableParam). For example typical "merge" method of AccumulatorV2 requires to check whether current accumulator is of an appropriate type, like here [3] 5. AccumulatorV2 is more difficult to implement correctly unlike AccumulableParam. For example, in case of AccumulableParam I have to implement just 3 methods (addAccumulator, addInPlace, zero), in case of AccumulableParam - just 2 methods (addInPlace, zero) and in case of AccumulatorV2 - 6 methods (isZero, copy, reset, add, merge, value) 6. AccumulatorV2 classes are hardly possible to be anonymous classes, because of their "copy" and "merge" methods which typically require a concrete class to make a type check. I understand the motivation for AccumulatorV2 (SPARK-14654 [4]), but just wondering whether there is a way to simplify the API of AccumulatorV2 to meet the points described above and to be less error prone? [1] https://issues.apache.org/jira/browse/SPARK-23697 [2] https://issues.apache.org/jira/browse/SPARK-24154 [3] https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L348 [4] https://issues.apache.org/jira/browse/SPARK-14654 - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Uncaught exception in thread heartbeat-receiver-event-loop-thread
And reading through the comments in that issue https://issues.apache.org/jira/browse/SPARK-20977 it looks like it was just ignored but marked resolved. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
ConcurrentModificationException
I have encountered the below exception running Spark 2.1.0 on emr. The exception is the same as reported in Serialization of accumulators in heartbeats is not thread-safe https://issues.apache.org/jira/browse/SPARK-17463 Pull requests were made and merged and that issue was marked as resolved but someone named Sunil in the comments said they still were encountering the problem with Spark 2.0.2 on emr. I am too. Should this issue be reopened? 18/04/30 22:54:15 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(4229,[Lscala.Tuple2;@5e8fe6a5,BlockManagerId(4229, ip-172-23-229-187.ec2.internal, 35905, None))] in 1 attempts org.apache.spark.SparkException: Exception thrown in awaitResult at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:538) at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:567) at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567) at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:567) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:567) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.ConcurrentModificationException at java.util.ArrayList.writeObject(ArrayList.java:766) at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081) at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at
Re: Uncaught exception in thread heartbeat-receiver-event-loop-thread
I have also encountered the NullPointerException in CollectionAccumulator. It looks like there was an issue filed for this https://issues.apache.org/jira/browse/SPARK-20977. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Problem in persisting file in S3 using Spark: xxx file does not exist Exception
Hi Sorted ..I just replaced s3 with s3aI think I recall similar issues in the past with aws libraries. Thx anyway for getting back Kr On Wed, May 2, 2018, 4:57 PM Paul Tremblaywrote: > I would like to see the full error. However, S3 can give misleading > messages if you don't have the correct permissions. > > On Tue, Apr 24, 2018, 2:28 PM Marco Mistroni wrote: > >> HI all >> i am using the following code for persisting data into S3 (aws keys are >> already stored in the environment variables) >> >> dataFrame.coalesce(1).write.format("com.databricks.spark.csv").save(fileName) >> >> >> However, i keep on receiving an exception that the file does not exist >> >> here's what comes from logs >> >> 18/04/24 22:15:32 INFO Persiste: Persisting data to text file: >> s3://ec2-bucket-mm-spark/form4-results-2404.results >> Exception in thread "main" java.io.IOException: >> /form4-results-2404.results doesn't exist >> >> It seems that Spark expects the file to be there before writing? which >> seems bizzarre? >> >> I Have even tried to remove the coalesce ,but still got the same exception >> Could anyone help pls? >> kind regarsd >> marco >> >
Running apps over a VPN
My setup is that I have a spark master (using the spark scheduler) and 32 workers registered with it but they are on a private network. I can connect to that private network via OpenVPN. I would like to be able to run spark applications from a local (on my desktop) IntelliJ but have them use the remote master/workers. I thought this would allow that: sparkConf.set("spark.submit.deployMode", "cluster") but when my job runs it still complains that there are not enough resources/workers. Connecting to the master, it shows that workers have been assigned and are in the RUNNING state. My local spark app doesn't agree. It's like the workers were assigned but the PC end doesn't know. I can use spark-submit.sh but I was really hoping to be able to run Spark Applications directly from IDEA. Possible?
Re: Problem in persisting file in S3 using Spark: xxx file does not exist Exception
I would like to see the full error. However, S3 can give misleading messages if you don't have the correct permissions. On Tue, Apr 24, 2018, 2:28 PM Marco Mistroniwrote: > HI all > i am using the following code for persisting data into S3 (aws keys are > already stored in the environment variables) > > dataFrame.coalesce(1).write.format("com.databricks.spark.csv").save(fileName) > > > However, i keep on receiving an exception that the file does not exist > > here's what comes from logs > > 18/04/24 22:15:32 INFO Persiste: Persisting data to text file: > s3://ec2-bucket-mm-spark/form4-results-2404.results > Exception in thread "main" java.io.IOException: > /form4-results-2404.results doesn't exist > > It seems that Spark expects the file to be there before writing? which > seems bizzarre? > > I Have even tried to remove the coalesce ,but still got the same exception > Could anyone help pls? > kind regarsd > marco >
[no subject]
how to trace sparkDriver context creation for pyspark
I have python jupyter notebook setup to create a spark context by default, and sometimes these fail with the following error: 18/04/30 18:03:27 WARN Utils: Service 'sparkDriver' could not bind on port 0. Attempting port 1.18/04/30 18:03:27 ERROR SparkContext: Error initializing SparkContext.java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 100 retries! Consider explicitly setting the appropriate port for the service 'sparkDriver' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries. I have tracked it down to two possible settings that may cause this in spark 2.0.2, client mode, standalone cluster setup, running in kubernetes: spark.driver.port - we don't set it, so it should be random spark.ui.port - we set spark.ui.enabled=false so it should not try to bind to this port. Short story is I do not know which one spark gets confused about, and looking at spark code not clear how spark.ui.port would cause this even if the error message lists it as a possible cause. Question 1: have you seen this before? Question 2: how do I trace the spark driver process? It seems that I can only set the sc.logLevel after the spark context is created, but I need to trace before the spark context is created. I created a log4j.properties file in the spark/conf directory and set it to TRACE but that only gets picked up when I run a Scala jupyter notebook, not when I run a python juypyter notebook, and I haven't been able to find out how to turn the same level of tracing for a spark-driver process started via a python jupyter notebook. Some things I looked at: `SPARK_PRINT_LAUNCH_COMMAND=1 /usr/local/spark-2.0.2-bin-hadoop2.7/bin/pyspark` Spark Command: python2.7Python 2.7.13 |Anaconda custom (64-bit)| (default, Dec 20 2016, 23:09:15) [GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2Type "help", "copyright", "credits" or "license" for more information.Anaconda is brought to you by Continuum Analytics.Please check out: http://continuum.io/thanks and https://anaconda.orgSpark Command: **/usr/lib/jvm/java-8-openjdk-amd64/bin/java -cp /usr/local/spark/conf/**:/usr/local/spark/jars/* -Xmx1g org.apache.spark.deploy.SparkSubmit --name PySparkShell pyspark-shell PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND 0 1308 1308 1308 ? 1416 Ss 0 0:00 bash 1308 1416 1416 1308 ? 1416 R+ 0 0:00 \_ ps axjf 0 1151 1151 1151 ? 1151 Ss+ 0 0:00 bash 0 1 1 1 ? -1 Ss 0 0:00 /bin/bash /usr/local/bin/start-dsx-notebook.sh 1 1014 1 1 ? -1 S 0 0:00 /bin/sh /user-home/.scripts/publishing-startup-scripts/nbexec_py_startup.sh 1014 1026 1 1 ? -1 S 0 0:06 \_ python /user-home/.scripts/system/publishing-api/py2http.py 1 1017 1 1 ? -1 S 0 0:00 su -l 1001 /usr/local/bin/start-user-notebook.sh spark-master-svc:7077 dsx /user-home/1001/DSX_Projects/imagemgmt 1523893891668 imagemgmt Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InRlc3QiLCJzdWIiOiJ0ZXN0IiwiaXNzIjoiS05PWFNTTyIsImF1ZCI6IkRTWCIsInJvbGUiOiJBZG1pbiIsInVpZCI6IjEwMDEiLCJpYXQiOjE1MjQ2MDIxMTd9.jHyjakD4G7XlOJ3Q1e5We3agHy_dtao_U98rZcLuTNBgGaETYKfHO2PC-94HG_nxIcTjDxymefWHItiwO7QcTIg_sIkP4uPSfQMTFthrMWNUucR0xRWJxFPcYgLlKo3T2P8JmA_LslVWqFD_MMjmYHI3UukVRj319_MSsRTW3Md3quF5mmv3OZMVjuI8faKMQF7zt_17W_QbNZAT91F0AboXJ7iazz71vcsuZZx0OxnSzJzcW3AEYb8JFWz3opbRwpc3dswbLco8TJ6I4DtacBq7syv3zg0bLIIcHSCp-LBwHrTyCWV7uJ0a3m-MSdvwdZ35WYE6_8LRwadKfW6hiw 1001 1017 1018 1018 1018 ? -1 Ss 1001 0:00 \_ -su /usr/local/bin/start-user-notebook.sh spark-master-svc:7077 dsx /user-home/1001/DSX_Projects/imagemgmt 1523893891668 imagemgmt Bearer eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InRlc3QiLCJzdWIiOiJ0ZXN0IiwiaXNzIjoiS05PWFNTTyIsImF1ZCI6IkRTWCIsInJvbGUiOiJBZG1pbiIsInVpZCI6IjEwMDEiLCJpYXQiOjE1MjQ2MDIxMTd9.jHyjakD4G7XlOJ3Q1e5We3agHy_dtao_U98rZcLuTNBgGaETYKfHO2PC-94HG_nxIcTjDxymefWHItiwO7QcTIg_sIkP4uPSfQMTFthrMWNUucR0xRWJxFPcYgLlKo3T2P8JmA_LslVWqFD_MMjmYHI3UukVRj319_MSsRTW3Md3quF5mmv3OZMVjuI8faKMQF7zt_17W_QbNZAT91F0AboXJ7iazz71vcsuZZx0OxnSzJzcW3AEYb8JFWz3opbRwpc3dswbLco8TJ6I4DtacBq7syv3zg0bLIIcHSCp-LBwHrTyCWV7uJ0a3m-MSdvwdZ35WYE6_8LRwadKfW6hiw 1001 1018 1025 1018 1018 ? -1 Sl 1001 0:51 \_ /opt/conda/bin/python /opt/conda/bin/jupyter-notebook --NotebookApp.token= --port= --no-browser 1025 1033 1033 1033 ? -1 Ssl 1001 0:03 \_ python -m ipykernel_launcher -f
Re: ML Linear and Logistic Regression - Poor Performance
May want to think about reducing the number of iterations. Right now you have it set at 500. Thank You, Irving Duran On Fri, Apr 27, 2018 at 7:15 PM Thodoris Zoiswrote: > I am in CentOS 7 and I use Spark 2.3.0. Below I have posted my code. > Logistic regression took 85 minutes and linear regression 127 seconds… > > My dataset as I said is 128 MB and contains: 1000 features and ~100 > classes. > > > #SparkSession > ss = SparkSession.builder.getOrCreate() > > > start = time.time() > > #Read data > trainData = ss.read.format("csv").option("inferSchema","true").load(file) > > #Calculate Features > assembler = VectorAssembler(inputCols=trainData.columns[1:], outputCol= > "features") > trainData = assembler.transform(trainData) > > #Drop columns > dropColumns = trainData.columns > dropColumns = [e for e in dropColumns if e not in ('_c0', 'features')] > trainData = trainData.drop(*dropColumns) > > #Rename column from _c0 to label > trainData = trainData.withColumnRenamed("_c0", "label") > > #Logistic regression > lr = LogisticRegression(maxIter=500, regParam=0.3, elasticNetParam=0.8) > lrModel = lr.fit(trainData) > > #Output Coefficients > print("Coefficients: " + str(lrModel.coefficientMatrix)) > > > > - Thodoris > > > On 27 Apr 2018, at 22:50, Irving Duran wrote: > > Are you reformatting the data correctly for logistic (meaning 0 & 1's) > before modeling? What are OS and spark version you using? > > Thank You, > > Irving Duran > > > On Fri, Apr 27, 2018 at 2:34 PM Thodoris Zois wrote: > >> Hello, >> >> I am running an experiment to test logistic and linear regression on >> spark using MLlib. >> >> My dataset is only 128MB and something weird happens. Linear regression >> takes about 127 seconds either with 1 or 500 iterations. On the other hand, >> logistic regression most of the times does not manage to finish either with >> 1 iteration. I usually get memory heap error. >> >> In both cases I use the default cores and memory for driver and I spawn 1 >> executor with 1 core and 2GBs of memory. >> >> Except that, I get a warning about NativeBLAS. I searched in the Internet >> and I found that I have to install libgfortran. Even if I did it the >> warning remains. >> >> Any ideas for the above? >> >> Thank you, >> - Thodoris >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >
Re: spark.executor.extraJavaOptions inside application code
You need to pass config before creating a session val conf = new SparkConf() // All three methods below are equivalent conf.set("spark.executor.extraJavaOptions", "-Dbasicauth=myuser:mypassword") conf.set("spark.executorEnv.basicauth", "myuser:mypassword") conf.setExecutorEnv("basicauth", "myuser:mypassword") val spark = SparkSession.builder().config(conf).appName("…").getOrCreate() On Wed, May 2, 2018 at 6:59 AM, Agostino Calamita < agostino.calam...@gmail.com> wrote: > Hi all, > I wrote an application that needs an environment variable. I can set this > variable with > > --conf 'spark.executor.extraJavaOptions=-Dbasicauth=myuser:mypwd' > > in spark-submit and it works well in standalone cluster mode. > > But, I want set it inside the application code, because the variable > contains a password. > > How can I do ? > > I tried with: > > SparkSession spark = SparkSession > .builder() > .appName("Java Spark Solr ETL") > .getOrCreate(); > > > spark.sparkContext().conf().setExecutorEnv("spark.executor.extraJavaOptions", > "-Dbasicauth=myuser:mypassword"); > > but it doesn't work. > > Thanks. > -- Sent from my iPhone
Re: smarter way to "forget" DataFrame definition and stick to its values
There is a trade off involved here. If you have a Spark application with a complicated logical graph, you can either cache data at certain points in the DAG, or you don’t cache data. The side effect of caching data is higher memory usage. The side effect of not caching data is higher CPU usage and perhaps, IO. Ultimately, you can increase both memory and CPU by adding more workers to your cluster, and adding workers costs money. So, your caching choices are reflected in the overall cost of running your application. You need to do some analysis to determine the caching configuration the will result in lowest cost. Usually, being selective about which dataframes to cache results in a good balance between memory usage and CPU usage I will not write data back to S3 and read it back in as a practice. Essentially, you are using S3 as a “cache”. However, reading and writing from S3 is not a scalable solution because it results in higher IO and IO doesn’t scale up as easily as CPU and Memory. The only time I would use S3 as a cache will be when by cached data is in terabyte+ range. If you are caching gigabytes of data, then you are better off caching in memory. This is 2018. Memory is cheap but limited. From: Valery KhamenyaDate: Tuesday, May 1, 2018 at 9:17 AM To: "user@spark.apache.org" Subject: smarter way to "forget" DataFrame definition and stick to its values hi all a short example before the long story: var accumulatedDataFrame = ... // initialize for (i <- 1 to 100) { val myTinyNewData = ... // my slowly calculated new data portion in tiny amounts accumulatedDataFrame = accumulatedDataFrame.union(myTinyNewData) // how to stick here to the values of accumulatedDataFrame only and forget definitions?! } this kind of stuff is likely to get slower and slower on each iteration even if myTinyNewData is quite compact. Usually I write accumulatedDataFrame to S3 and then re-load it back to clear the definition history. It makes code ugly though. Are there any smarter way? It happens very often that a DataFrame is created via complex definitions. The DataFrame is then re-used in several places and sometimes it gets recalculated triggering a heavy cascade of operations. Of course one could use .persist or .cache modifiers, but the result is unfortunately not transparent and instead of speeding up things it results in slow-down or even lost jobs if storage resources are not enough. Any advice? best regards -- Valery The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Dataset Caching and Unpersisting
Hi all, I am having troubles with caching and unpersisting a dataset. I have a cycle that at each iteration filters my dataset. I realized that caching every x steps (e.g., 50 steps) gives good performance. However, after a certain number of caching operations, it seems that the memory used for caching is filled, so I think I should have to unpersist the old cached dataset. This is my code: I tried to use an external variable to cache and unpersist it but it doesn’t seem to solve the problem (maybe I used it in the wrong way). Do you kindly have any suggestion? Thank you for your support! --- Daniele
what is the query language used for graphX?
Hi All, what is the query language used for graphX? are there any plans to introduce gremlin or is that idea being dropped and go with Spark SQL? Thanks!
spark.executor.extraJavaOptions inside application code
Hi all, I wrote an application that needs an environment variable. I can set this variable with --conf 'spark.executor.extraJavaOptions=-Dbasicauth=myuser:mypwd' in spark-submit and it works well in standalone cluster mode. But, I want set it inside the application code, because the variable contains a password. How can I do ? I tried with: SparkSession spark = SparkSession .builder() .appName("Java Spark Solr ETL") .getOrCreate(); spark.sparkContext().conf().setExecutorEnv("spark.executor.extraJavaOptions", "-Dbasicauth=myuser:mypassword"); but it doesn't work. Thanks.
[Spark scheduling] Spark schedules single task although rdd has 48 partitions?
(please notice this question was previously posted to https://stackoverflow.com/questions/49943655/spark-schedules-single-task-although-rdd-has-48-partitions) We are running Spark 2.3 / Python 3.5.2. For a job we run following code (please notice that the input txt files are just a simplified example, in-fact these are large binary files and sc.binaryFiles(...) runs out of memory loading the content into memory, therefor only the filenames are parallelized and the executors open/read the content): files = [u'foo.txt', u'bar.txt', u'baz.txt', etc] # len(files) == 155 def func(filename): from app import generate_rows return list(generate_rows(filename)) rdd = sc.parallelize(files, numSlices=48) rdd2 = rdd.flatMap(func) rdd3 = rdd2.map(lambda d: Row(**d)) df = spark.createDataFrame(rdd3) df.write.mode(u'append').partitionBy(u'foo').parquet(output_path) Where the app is a Python module (added to Spark using --py-files app.egg), simplified code is like this: def generate_rows(filename): yield OrderedDict([ (u'filename', filename), (u'item1', u'item1'), etc ]) We notice that the cluster is not utilized fully during the first stages which we don't understand, and we are looking for ways to control this behavior. Job0 Stage0 1Task 1min paralellize Job1 Stage1 1Task 2min paralellize Job2 Stage2 1Task 1min paralellize Job3 Stage3 48Tasks 5min paralellize|mappartitions|map|mappartitions|existingRDD|sort What are the first 3 jobs? And why isn't there 1 Job/Stage with the 48 tasks (as expected given the second parameter of parallelize set to 48)? Excerpt from DEBUG logging: 18/05/02 10:09:07 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0 18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1 18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1 ... 18/05/02 10:09:58 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1 18/05/02 10:09:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1 18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0 18/05/02 10:10:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/05/02 10:10:00 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 0 18/05/02 10:10:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1 18/05/02 10:10:02 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1 ... 18/05/02 10:12:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1 18/05/02 10:12:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1 18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 0 18/05/02 10:12:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 18/05/02 10:12:05 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 0 18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1 18/05/02 10:12:06 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1 ... 18/05/02 10:12:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1 18/05/02 10:13:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1 18/05/02 10:13:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 0 18/05/02 10:13:01 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 18/05/02 10:13:03 INFO TaskSchedulerImpl: Adding task set 3.0 with 48 tasks 18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 0 18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 48 18/05/02 10:13:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 48 ... 18/05/02 10:17:16 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 1 18/05/02 10:17:17 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 1 18/05/02 10:17:18 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 0 18/05/02 10:17:18 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool -- The information contained in this communication and any attachments is confidential and may be privileged, and is for the sole use of the intended recipient(s). Any unauthorized review, use, disclosure or distribution is prohibited. Unless explicitly stated otherwise in the body of this communication or the attachment thereto
Re: [Spark Streaming]: Does DStream workload run over Spark SQL engine?
No, the underlying of DStream is RDD, so it will not leverage any SparkSQL related feature. I think you should use Structured Streaming instead, which is based on SparkSQL. Khaled Zaouk于2018年5月2日周三 下午4:51写道: > Hi, > > I have a question regarding the execution engine of Spark Streaming > (DStream API): Does Spark streaming jobs run over the Spark SQL engine? > > For example, if I change a configuration parameter related to Spark SQL > (like spark.sql.streaming.minBatchesToRetain or > spark.sql.objectHashAggregate.sortBased.fallbackThreshold), does this > make any difference when I run Spark streaming job (using DStream API)? > > Thank you! > > Khaled >
[Spark Streaming]: Does DStream workload run over Spark SQL engine?
Hi, I have a question regarding the execution engine of Spark Streaming (DStream API): Does Spark streaming jobs run over the Spark SQL engine? For example, if I change a configuration parameter related to Spark SQL (like spark.sql.streaming.minBatchesToRetain or spark.sql.objectHashAggregate.sortBased.fallbackThreshold), does this make any difference when I run Spark streaming job (using DStream API)? Thank you! Khaled