Re: How to control Spark Executors from getting Lost when using YARN client mode?
Hi all any help will be much appreciated my spark job runs fine but in the middle it starts loosing executors because of netafetchfailed exception saying shuffle not found at the location since executor is lost On Jul 31, 2015 11:41 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi thanks for the response. It looks like YARN container is getting killed but dont know why I see shuffle metafetchexception as mentioned in the following SO link. I have enough memory 8 nodes 8 cores 30 gig memory each. And because of this metafetchexpcetion YARN killing container running executor how can it over run memory I tried to give each executor 25 gig still it is not sufficient and it fails. Please guide I dont understand what is going on I am using Spark 1.4.0 I am using spark.shuffle.memory as 0.0 and spark.storage.memory as 0.5. I have almost all optimal properties like Kyro serializer I have kept 500 akka frame size 20 akka threads dont know I am trapped its been two days I am trying to recover from this issue. http://stackoverflow.com/questions/29850784/what-are-the-likely-causes-of-org-apache-spark-shuffle-metadatafetchfailedexcept On Thu, Jul 30, 2015 at 9:56 PM, Ashwin Giridharan ashwin.fo...@gmail.com wrote: What is your cluster configuration ( size and resources) ? If you do not have enough resources, then your executor will not run. Moreover allocating 8 cores to an executor is too much. If you have a cluster with four nodes running NodeManagers, each equipped with 4 cores and 8GB of memory, then an optimal configuration would be, --num-executors 8 --executor-cores 2 --executor-memory 2G Thanks, Ashwin On Thu, Jul 30, 2015 at 12:08 PM, unk1102 umesh.ka...@gmail.com wrote: Hi I have one Spark job which runs fine locally with less data but when I schedule it on YARN to execute I keep on getting the following ERROR and slowly all executors gets removed from UI and my job fails 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 8 on myhost1.com: remote Rpc client disassociated 15/07/30 10:18:13 ERROR cluster.YarnScheduler: Lost executor 6 on myhost2.com: remote Rpc client disassociated I use the following command to schedule spark job in yarn-client mode ./spark-submit --class com.xyz.MySpark --conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M --driver-java-options -XX:MaxPermSize=512m --driver-memory 3g --master yarn-client --executor-memory 2G --executor-cores 8 --num-executors 12 /home/myuser/myspark-1.0.jar I dont know what is the problem please guide. I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-Spark-Executors-from-getting-Lost-when-using-YARN-client-mode-tp24084.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Thanks Regards, Ashwin Giridharan
How to calculate standard deviation of grouped data in a DataFrame?
I have user logs that I have taken from a csv and converted into a DataFrame in order to leverage the SparkSQL querying features. A single user will create numerous entries per hour, and I would like to gather some basic statistical information for each user; really just the count of the user instances, the average, and the standard deviation of numerous columns. I was able to quickly get the mean and count information by using groupBy($user) and the aggregator with SparkSQL functions for count and avg: *val meanData = selectedData.groupBy($user).agg(count($logOn), avg($transaction), avg($submit), avg($submitsPerHour), avg($replies), avg($repliesPerHour), avg($duration))* However, I cannot seem to find an equally elegant way to calculate the standard deviation. So far I can only calculate it by mapping a string, double pair and use StatCounter().stdev utility: *val stdevduration = duration.groupByKey().mapValues(value = org.apache.spark.util.StatCounter(value).stdev)* This returns an RDD however, and I would like to try and keep it all in a DataFrame for further queries to be possible on the returned data. Is there a similarly simplistic method the calculating the standard deviation like there is for the mean and count? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-standard-deviation-of-grouped-data-in-a-DataFrame-tp24114.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
large scheduler delay in pyspark
Hi, Recently, I met some problems about scheduler delay in pyspark. I worked several days on this problem, but not success. Therefore, I come to here to ask for help. I have a key_value pair rdd like rdd[(key, list[dict])] and I tried to merge value by adding two list if I do reduceByKey as follows: rdd.reduceByKey(lambda a, b: a+b) It works fine, scheduler delay is less than 10s. However if I do reduceByKey: def f(a, b): for i in b: if i not in a: a.append(i) return a rdd.reduceByKey(f) It will cause very large scheduler delay, about 15-20 mins.(The data I deal with is about 300 mb, and I use 5 machine with 32GB memory) I know the second code is not the same as the first. In fact, my purpose is to implement the second, but not work. So I try the first one. I don't know whether this is related to the data(with long string) or Spark on Yarn. But the first code works fine on the same data. Is there any way to find out the log when spark stall in scheduler delay, please? Or any ideas about this problem? Thanks a lot in advance for your help. Cheers Gen
Re: Cannot Import Package (spark-csv)
Hi, there was this issue for Scala 2.11. https://issues.apache.org/jira/browse/SPARK-7944 It should be fixed on master branch. You may be hitting that. Best, Burak On Sun, Aug 2, 2015 at 9:06 PM, Ted Yu yuzhih...@gmail.com wrote: I tried the following command on master branch: bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 --jars ../spark-csv_2.10-1.0.3.jar --master local I didn't reproduce the error with your command. FYI On Sun, Aug 2, 2015 at 8:57 PM, Bill Chambers wchamb...@ischool.berkeley.edu wrote: Sure the commands are: scala val df = sqlContext.read.format(com.databricks.spark.csv).option(header, true).load(cars.csv) and get the following error: java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:220) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:233) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104) ... 49 elided On Sun, Aug 2, 2015 at 8:56 PM, Ted Yu yuzhih...@gmail.com wrote: The command you ran and the error you got were not visible. Mind sending them again ? Cheers On Sun, Aug 2, 2015 at 8:33 PM, billchambers wchamb...@ischool.berkeley.edu wrote: I am trying to import the spark csv package while using the scala spark shell. Spark 1.4.1, Scala 2.11 I am starting the shell with: bin/spark-shell --packages com.databricks:spark-csv_2.11:1.1.0 --jars ../sjars/spark-csv_2.11-1.1.0.jar --master local I then try and run and get the following error: What am i doing wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-Import-Package-spark-csv-tp24109.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Bill Chambers http://billchambers.me/ Email wchamb...@ischool.berkeley.edu | LinkedIn http://linkedin.com/in/wachambers | Twitter https://twitter.com/b_a_chambers | Github https://github.com/anabranch
Re: Cannot Import Package (spark-csv)
In addition, you do not need to use --jars with --packages. --packages will get the jar for you. Best, Burak On Mon, Aug 3, 2015 at 9:01 AM, Burak Yavuz brk...@gmail.com wrote: Hi, there was this issue for Scala 2.11. https://issues.apache.org/jira/browse/SPARK-7944 It should be fixed on master branch. You may be hitting that. Best, Burak On Sun, Aug 2, 2015 at 9:06 PM, Ted Yu yuzhih...@gmail.com wrote: I tried the following command on master branch: bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 --jars ../spark-csv_2.10-1.0.3.jar --master local I didn't reproduce the error with your command. FYI On Sun, Aug 2, 2015 at 8:57 PM, Bill Chambers wchamb...@ischool.berkeley.edu wrote: Sure the commands are: scala val df = sqlContext.read.format(com.databricks.spark.csv).option(header, true).load(cars.csv) and get the following error: java.lang.RuntimeException: Failed to load class for data source: com.databricks.spark.csv at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.sources.ResolvedDataSource$.lookupDataSource(ddl.scala:220) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:233) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:104) ... 49 elided On Sun, Aug 2, 2015 at 8:56 PM, Ted Yu yuzhih...@gmail.com wrote: The command you ran and the error you got were not visible. Mind sending them again ? Cheers On Sun, Aug 2, 2015 at 8:33 PM, billchambers wchamb...@ischool.berkeley.edu wrote: I am trying to import the spark csv package while using the scala spark shell. Spark 1.4.1, Scala 2.11 I am starting the shell with: bin/spark-shell --packages com.databricks:spark-csv_2.11:1.1.0 --jars ../sjars/spark-csv_2.11-1.1.0.jar --master local I then try and run and get the following error: What am i doing wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-Import-Package-spark-csv-tp24109.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Bill Chambers http://billchambers.me/ Email wchamb...@ischool.berkeley.edu | LinkedIn http://linkedin.com/in/wachambers | Twitter https://twitter.com/b_a_chambers | Github https://github.com/anabranch
Re: HiveQL to SparkSQL
Did anybody try to convert HiveQL queries to SparkSQL? If so, would you share the experience, pros cons please? Thank you. On Thu, Jul 30, 2015 at 10:37 AM, Bigdata techguy bigdatatech...@gmail.com wrote: Thanks Jorn for the response and for the pointer questions to Hive optimization tips. I believe I have done the possible applicable things to improve hive query performance including but not limited to - running on TEZ, using partitioning, bucketing, using explain to make sure partition pruning is happening, using compression, using the best data types for join columns, denormalizing etc:. I am using Hive version - 0.13. The idea behind this POC is to find the strengths of SparkSQL over HiveQL and identify the use cases where SparkSQL can perform better than HiveQL other than the iterative use cases. In general, what would be the SparkSQL use scenarios? I am pretty sure someone have tried this before and compared performance...Any responses would be much appreciated. Thank you. On Wed, Jul 29, 2015 at 1:57 PM, Jörn Franke jornfra...@gmail.com wrote: What Hive Version are you using? Do you run it in on TEZ? Are you using the ORC Format? Do you use compression? Snappy? Do you use Bloom filters? Do you insert the data sorted on the right columns? Do you use partitioning? Did you increase the replication factor for often used tables or partitions? Do you use bucketing? Is your data model appropriate (join columns as int , use numeric data types where appropriate , dates as int...), dif you calculate statistics? Did you use indexes (compressed, ORC Format?) do you provide mapjoin hints? Did you do any other Hive optimization? Did you use explain to verify that only selected partitions, indexes, Bloom filters had been used? Did you verify that no other application has taken resources? What is the CPU level on namenode, hiveserver2? If it is high then you need Mord memory there! First rule is to get it Hive right before you think about in-memory. Caching will only help for iterative stuff. You may think about denormalizing the model even more to avoid joins as much as possible. Bigdata techguy bigdatatech...@gmail.com schrieb am Mi., 29.07.2015, 18:49: Hi All, I have a fairly complex HiveQL data processing which I am trying to convert to SparkSQL to improve performance. Below is what it does. Select around 100 columns including Aggregates From a FACT_TABLE Joined to the summary of the same FACT_TABLE Joined to 2 smaller DIMENSION tables. The data processing currently takes around an hour to complete processing. This is what I have tried so far. 1. Use hiveContext to query the DIMENSION tables, store it as DataFrames and registerTempTable. 2. Use hiveContext to query the summary of FACT_TABLE, store it as DataFrames and registerTempTable. 3. Use the Temp tables from above 2 steps to get the final RecordSet to another DataFrame. 4. Save the DataFrame from step 3 to Hive with InsertOverwrite using saveAsTable. Below are my questions. Any response would be much appreciated. Thanks. A. Is there a better approach? B. Does breaking down the big Hive query into multiple steps with multiple DataFrames expected to give better performance? C. Is there an opportunity to intermix RDD with SparkSQL in this case? D. Can the Caching of a DataFrame improve performance? E. Are there other suggestions to improve performance? Thank You for your time.
Re: NullPointException Help while using accumulators
Can you show related code in DriverAccumulator.java ? Which Spark release do you use ? Cheers On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal anubha...@gmail.com wrote: Hi, I am trying to modify my code to use HDFS and multiple nodes. The code works fine when I run it locally in a single machine with a single worker. I have been trying to modify it and I get the following error. Any hint would be helpful. java.lang.NullPointerException at thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17) at thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11) at org.apache.spark.Accumulable.add(Accumulators.scala:73) at thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112) at thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303) at thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49) at thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) failed in write bolt execute null failed in write bolt execute null java.lang.NullPointerException
Re: Contributors group and starter task
Once you submit a pull request for some JIRA, the JIRA would be assigned to you. Cheers On Mon, Aug 3, 2015 at 3:50 PM, Namit Katariya katariya.na...@gmail.com wrote: My username on the Apache JIRA is katariya.namit. Could one of the admins please add me to the contributors group so that I can have a starter task assigned to myself? Thanks, Namit
Re: SparkR broadcast variables
I think I just answered my own question. The privitization of the RDD API might have resulted in my error, because this worked: randomMatBr - SparkR:::broadcast(sc, randomMat) On Mon, Aug 3, 2015 at 4:59 PM, Deborah Siegel deborah.sie...@gmail.com wrote: Hello, In looking at the SparkR codebase, it seems as if broadcast variables ought to be working based on the tests. I have tried the following in sparkR shell, and similar code in RStudio, but in both cases got the same message randomMat - matrix(nrow=10, ncol=10, data=rnorm(100)) randomMatBr - broadcast(sc, randomMat) *Error: could not find function broadcast* Does someone know how to use broadcast variables on SparkR? Thanks, Deb
Safe to write to parquet at the same time?
I think this question applies regardless if I have two completely separate Spark jobs or tasks on different machines, or two cores that are part of the same task on the same machine. If two jobs/tasks/cores/stages both save to the same parquet directory in parallel like this: df1.write.mode(SaveMode.Append).partitionBy(a, b).parquet(dir) df2.write.mode(SaveMode.Append).partitionBy(a, b).parquet(dir) Will the result be equivalent to this? df1.unionAll(df2).write.mode(SaveMode.Append).partitionBy(a, b).parquet(dir) What if we ensure that 'dir' does not exist first? - Philip
shutdown local hivecontext?
We are using a local hive context in order to run unit tests. Our unit tests runs perfectly fine if we run why by one using sbt as the next example: sbt test-only com.company.pipeline.scalers.ScalerSuite.scala sbt test-only com.company.pipeline.labels.ActiveUsersLabelsSuite.scala However, if we try to run them as: sbt test-only com.company.pipeline.* we start to run into issues. It appears that the issue is that the hive context is not properly shutdown after finishing the first test. Does any one know how to attack this problem? The test part in my build.sbt file looks like: libraryDependencies += org.scalatest % scalatest_2.10 % 2.0 % test, parallelExecution in Test := false, fork := true, javaOptions ++= Seq(-Xms512M, -Xmx2048M, -XX:MaxPermSize=2048M, -XX:+CMSClassUnloadingEnabled) We are working under Spark 1.3.0 Thanks -- Cesar Flores
Re: Contributors group and starter task
Hi Namit, There's no need to assign a bug to yourself to say you're working on it. The recommended way is to just post a PR on github - the bot will update the bug saying that you have a patch open to fix the issue. On Mon, Aug 3, 2015 at 3:50 PM, Namit Katariya katariya.na...@gmail.com wrote: My username on the Apache JIRA is katariya.namit. Could one of the admins please add me to the contributors group so that I can have a starter task assigned to myself? Thanks, Namit -- Marcelo
SparkR broadcast variables
Hello, In looking at the SparkR codebase, it seems as if broadcast variables ought to be working based on the tests. I have tried the following in sparkR shell, and similar code in RStudio, but in both cases got the same message randomMat - matrix(nrow=10, ncol=10, data=rnorm(100)) randomMatBr - broadcast(sc, randomMat) *Error: could not find function broadcast* Does someone know how to use broadcast variables on SparkR? Thanks, Deb
NullPointException Help while using accumulators
Hi, I am trying to modify my code to use HDFS and multiple nodes. The code works fine when I run it locally in a single machine with a single worker. I have been trying to modify it and I get the following error. Any hint would be helpful. java.lang.NullPointerException at thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17) at thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11) at org.apache.spark.Accumulable.add(Accumulators.scala:73) at thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112) at thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303) at thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49) at thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) failed in write bolt execute null failed in write bolt execute null java.lang.NullPointerException
How does DataFrame except work?
Hello, I'm planning to use DF1.except(DF2) to get difference between two dataframes. I'd like to know how exactly this API works. Both explain() and spark UI show except as an operation on its own. Internally, does does it do a hash partition of both dataframes? If so will it do auto broadcast if second dataframe is small enough? Srikanth
Re: shutdown local hivecontext?
TestHive takes care of creating a temporary directory for each invocation so that multiple test runs won't conflict. On Mon, Aug 3, 2015 at 3:09 PM, Cesar Flores ces...@gmail.com wrote: We are using a local hive context in order to run unit tests. Our unit tests runs perfectly fine if we run why by one using sbt as the next example: sbt test-only com.company.pipeline.scalers.ScalerSuite.scala sbt test-only com.company.pipeline.labels.ActiveUsersLabelsSuite.scala However, if we try to run them as: sbt test-only com.company.pipeline.* we start to run into issues. It appears that the issue is that the hive context is not properly shutdown after finishing the first test. Does any one know how to attack this problem? The test part in my build.sbt file looks like: libraryDependencies += org.scalatest % scalatest_2.10 % 2.0 % test, parallelExecution in Test := false, fork := true, javaOptions ++= Seq(-Xms512M, -Xmx2048M, -XX:MaxPermSize=2048M, -XX:+CMSClassUnloadingEnabled) We are working under Spark 1.3.0 Thanks -- Cesar Flores
Multiple UpdateStateByKey Functions in the same job?
Hi, Can I use multiple UpdateStateByKey Functions in the Streaming job? Suppose I need to maintain the state of the user session in the form of a Json and counts of various other metrics which has different keys ? Can I use multiple updateStateByKey functions to maintain the state for different keys with different return values? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-UpdateStateByKey-Functions-in-the-same-job-tp24119.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Topology.py -- Cannot run on Spark Gateway on Cloudera 5.4.4.
Hi, I recently installed Cloudera CDH 5.4.4. Sparks comes shipped with this version. I created Spark gateways. But I get the following error when run Spark shell from the gateway. Does anyone have any similar experience ? If so, please share the solution. Google shows to copy the Conf files from data nodes to gateway nodes. But I highly doubt if that is the right fix. Thanks Upender etc/hadoop/conf.cloudera.yarn/topology.py java.io.IOException: Cannot run program /etc/hadoop/conf.cloudera.yarn/topology.py
Re: Writing to HDFS
Is your data skewed? What happens if you do rdd.count()? On 4 Aug 2015 05:49, Jasleen Kaur jasleenkaur1...@gmail.com wrote: I am executing a spark job on a cluster as a yarn-client(Yarn cluster not an option due to permission issues). - num-executors 800 - spark.akka.frameSize=1024 - spark.default.parallelism=25600 - driver-memory=4G - executor-memory=32G. - My input size is around 1.5TB. My problem is when I execute rdd.saveAsTextFile(outputPath, classOf[org.apache.hadoop.io.compress.SnappyCodec])(Saving as avro also not an option, I have tried saveAsSequenceFile with GZIP, saveAsNewAPIHadoopFile with same result), I get heap space issue. On the other hand if I execute rdd.take(1). I get no such issue. So I am assuming that issue is due to write.
Unable to compete with performance of single-threaded Scala application
Hello, I am running Spark 1.4.0 on Mesos 0.22.1, and usually I run my jobs in coarse-grained mode. I have written some single-threaded standalone Scala applications for a problem that I am working on, and I am unable to get a Spark solution that comes close to the performance of this application. My hope was to sacrifice some performance to get an easily scalable solution, but I'm finding that the single-threaded implementations consistently outperform Spark even with a couple dozen cores, and I'm having trouble getting Spark to scale linearly. All files are binary files with fixed-width records, ranging from about 40 bytes to 200 bytes per record depending on the type. The files are already partitioned by 3 keys, with one file for each combination. Basically the layout is /customer/day/partition_number. The ultimate goal is to read time series events, join in some smaller tables when processing those events, and write the result to parquet. For this discussion, I'm focusing on just a simple problem: reading and aggregating the events. I started with a simple experiment to walk over all the events and sum the value of an integer field. I implemented two standalone solutions and a Spark solution: 1) For each file, use a BufferedInputStream to iterate over each fixed-width row, copy the row to a Array[Byte], and then parse the one field out of that array. This can process events at about 30 million/second. 2) Memory-map each file to a java.nio.MappedByteBuffer. Calculate the sum by directly selecting the integer field while iterating over the rows. This solution can process about 100-300 million events/second. 3) Use SparkContext.binaryRecords, map over the RDD[Array[Byte]] to parse or select the field, and then called sum on that. Although performance is understandably much better when I use a memory mapped bytebuffer, I would expect my Spark solution to get the same per-core throughput as solution #1 above, where the record type is Array[Byte] and I'm using the same approach to pull out the integer field from that byte array. However, the Spark solution achieves only 1-2 million events/second on 1 core, 4 million events/second on 2 nodes with 4 cores each, and 8 million events/second on 6 nodes with 4 cores each. So, not only was the performance a fraction of my standalone application, but it can't even scale linearly to 6 nodes. - Philip
Re: NullPointException Help while using accumulators
The code was written in 1.4 but I am compiling it and running it with 1.3. import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; import org.apache.spark.AccumulableParam; import scala.Tuple4; import thomsonreuters.trailblazer.operation.DriverCalc; import thomsonreuters.trailblazer.operation.StepAccumulator; //Tuple4Allocation StepIndex.IF_Position, DenKey, NumKey, Value - Allocation Step Add class DriverAccumulator implements AccumulableParamObject2ObjectOpenHashMapString, StepAccumulator, Tuple4String, String, String, Double { private static final Object _lockObj = new Object(); public Object2ObjectOpenHashMapString, StepAccumulator addAccumulator(Object2ObjectOpenHashMapString, StepAccumulator stepAccumulatorMap, Tuple4String, String, String, Double value) { if (value == null) return stepAccumulatorMap; synchronized (_lockObj) { StepAccumulator stepAcc = stepAccumulatorMap.get(value._1()); if (stepAcc == null) { stepAcc = new StepAccumulator(); stepAccumulatorMap.put(value._1(), stepAcc); } DriverCalc dc = stepAcc.stepRows.get(value._2()); if (dc == null) { dc = new DriverCalc(); dc._denominator = value._4(); if (value._3() != null) dc._numerator.put(value._3(), value._4()); stepAcc.stepRows.put(value._2(), dc); } else { dc._denominator = dc._denominator + value._4(); if (value._3() != null) { Double val = dc._numerator.get(value._3()); dc._numerator.put(value._3(), new Double(val != null ? val + value._4() : value._4())); } } } return stepAccumulatorMap; } public Object2ObjectOpenHashMapString, StepAccumulator addInPlace(Object2ObjectOpenHashMapString, StepAccumulator r1, Object2ObjectOpenHashMapString, StepAccumulator r2) { r2.forEach((k,v) - r1.merge(k, v, this::mergeAcc)); return r1; } private StepAccumulator mergeAcc(StepAccumulator source1, StepAccumulator source2) { source2.stepRows.forEach((k,v) - source1.stepRows.merge(k, v, this::denominatorMerge)); return source1; } private DriverCalc denominatorMerge(DriverCalc driverCalc1, DriverCalc driverCalc2) { driverCalc1._denominator = driverCalc1._denominator + driverCalc2._denominator; driverCalc2._numerator.forEach((k,v) - driverCalc1._numerator.merge(k, v, this::numeratorMerge)); return driverCalc1; } private Double numeratorMerge(Double d1, Double d2) { return d1 + d2; } public Object2ObjectOpenHashMapString, StepAccumulator zero(Object2ObjectOpenHashMapString, StepAccumulator initialValue) { return null; } } On Mon, Aug 3, 2015 at 6:20 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show related code in DriverAccumulator.java ? Which Spark release do you use ? Cheers On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal anubha...@gmail.com wrote: Hi, I am trying to modify my code to use HDFS and multiple nodes. The code works fine when I run it locally in a single machine with a single worker. I have been trying to modify it and I get the following error. Any hint would be helpful. java.lang.NullPointerException at thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17) at thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11) at org.apache.spark.Accumulable.add(Accumulators.scala:73) at thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112) at thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303) at thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49) at thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
Re: Topology.py -- Cannot run on Spark Gateway on Cloudera 5.4.4.
That should not be a fatal error, it's just a noisy exception. Anyway, it should go away if you add YARN gateways to those nodes (aside from Spark gateways). On Mon, Aug 3, 2015 at 7:10 PM, Upen N ukn...@gmail.com wrote: Hi, I recently installed Cloudera CDH 5.4.4. Sparks comes shipped with this version. I created Spark gateways. But I get the following error when run Spark shell from the gateway. Does anyone have any similar experience ? If so, please share the solution. Google shows to copy the Conf files from data nodes to gateway nodes. But I highly doubt if that is the right fix. Thanks Upender etc/hadoop/conf.cloudera.yarn/topology.py java.io.IOException: Cannot run program /etc/hadoop/conf.cloudera.yarn/topology.py -- Marcelo
Re: Topology.py -- Cannot run on Spark Gateway on Cloudera 5.4.4.
Hi Upen, Did you deploy the client configs after assigning the gateway roles? You should be able to do this from Cloudera Manager. Can you try this and let us know what you see when you run spark-shell? Guru Medasani gdm...@gmail.com On Aug 3, 2015, at 9:10 PM, Upen N ukn...@gmail.com wrote: Hi, I recently installed Cloudera CDH 5.4.4. Sparks comes shipped with this version. I created Spark gateways. But I get the following error when run Spark shell from the gateway. Does anyone have any similar experience ? If so, please share the solution. Google shows to copy the Conf files from data nodes to gateway nodes. But I highly doubt if that is the right fix. Thanks Upender etc/hadoop/conf.cloudera.yarn/topology.py java.io.IOException: Cannot run program /etc/hadoop/conf.cloudera.yarn/topology.py
Re: NullPointException Help while using accumulators
Putting your code in a file I find the following on line 17: stepAcc = new StepAccumulator(); However I don't think that was where the NPE was thrown. Another thing I don't understand was that there were two addAccumulator() calls at the top of stack trace while in your code I don't see addAccumulator() calling itself. FYI On Mon, Aug 3, 2015 at 3:22 PM, Anubhav Agarwal anubha...@gmail.com wrote: The code was written in 1.4 but I am compiling it and running it with 1.3. import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; import org.apache.spark.AccumulableParam; import scala.Tuple4; import thomsonreuters.trailblazer.operation.DriverCalc; import thomsonreuters.trailblazer.operation.StepAccumulator; //Tuple4Allocation StepIndex.IF_Position, DenKey, NumKey, Value - Allocation Step Add class DriverAccumulator implements AccumulableParamObject2ObjectOpenHashMapString, StepAccumulator, Tuple4String, String, String, Double { private static final Object _lockObj = new Object(); public Object2ObjectOpenHashMapString, StepAccumulator addAccumulator(Object2ObjectOpenHashMapString, StepAccumulator stepAccumulatorMap, Tuple4String, String, String, Double value) { if (value == null) return stepAccumulatorMap; synchronized (_lockObj) { StepAccumulator stepAcc = stepAccumulatorMap.get(value._1()); if (stepAcc == null) { stepAcc = new StepAccumulator(); stepAccumulatorMap.put(value._1(), stepAcc); } DriverCalc dc = stepAcc.stepRows.get(value._2()); if (dc == null) { dc = new DriverCalc(); dc._denominator = value._4(); if (value._3() != null) dc._numerator.put(value._3(), value._4()); stepAcc.stepRows.put(value._2(), dc); } else { dc._denominator = dc._denominator + value._4(); if (value._3() != null) { Double val = dc._numerator.get(value._3()); dc._numerator.put(value._3(), new Double(val != null ? val + value._4() : value._4())); } } } return stepAccumulatorMap; } public Object2ObjectOpenHashMapString, StepAccumulator addInPlace(Object2ObjectOpenHashMapString, StepAccumulator r1, Object2ObjectOpenHashMapString, StepAccumulator r2) { r2.forEach((k,v) - r1.merge(k, v, this::mergeAcc)); return r1; } private StepAccumulator mergeAcc(StepAccumulator source1, StepAccumulator source2) { source2.stepRows.forEach((k,v) - source1.stepRows.merge(k, v, this::denominatorMerge)); return source1; } private DriverCalc denominatorMerge(DriverCalc driverCalc1, DriverCalc driverCalc2) { driverCalc1._denominator = driverCalc1._denominator + driverCalc2._denominator; driverCalc2._numerator.forEach((k,v) - driverCalc1._numerator.merge(k, v, this::numeratorMerge)); return driverCalc1; } private Double numeratorMerge(Double d1, Double d2) { return d1 + d2; } public Object2ObjectOpenHashMapString, StepAccumulator zero(Object2ObjectOpenHashMapString, StepAccumulator initialValue) { return null; } } On Mon, Aug 3, 2015 at 6:20 PM, Ted Yu yuzhih...@gmail.com wrote: Can you show related code in DriverAccumulator.java ? Which Spark release do you use ? Cheers On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal anubha...@gmail.com wrote: Hi, I am trying to modify my code to use HDFS and multiple nodes. The code works fine when I run it locally in a single machine with a single worker. I have been trying to modify it and I get the following error. Any hint would be helpful. java.lang.NullPointerException at thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17) at thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11) at org.apache.spark.Accumulable.add(Accumulators.scala:73) at thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112) at thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303) at thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49) at thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) at
Contributors group and starter task
My username on the Apache JIRA is katariya.namit. Could one of the admins please add me to the contributors group so that I can have a starter task assigned to myself? Thanks, Namit
Re: Spark-Submit error
Hi Guru, I am executing this on DataStax Enterprise Spark node and ~/.dserc file exists which consists Cassandra credentials but still getting the error Below is the given command dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar Please find the error log details below INFO 2015-07-31 05:15:17 org.apache.spark.executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] INFO 2015-07-31 05:15:18 org.apache.spark.SecurityManager: Changing view acls to: cassandra,missingmerch INFO 2015-07-31 05:15:18 org.apache.spark.SecurityManager: Changing modify acls to: cassandra,missingmerch INFO 2015-07-31 05:15:18 org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cassandra, missingmerch); users with modify permissions: Set(cassandra, missingmerch) INFO 2015-07-31 05:15:22 akka.event.slf4j.Slf4jLogger: Slf4jLogger started INFO 2015-07-31 05:15:22 Remoting: Starting remoting INFO 2015-07-31 05:15:22 Remoting: Remoting started; listening on addresses :[akka.tcp:// driverPropsFetcher@10.246.43.14mailto://driverPropsFetcher@10.246.43.14 :48952] INFO 2015-07-31 05:15:22 org.apache.spark.util.Utils: Successfully started service 'driverPropsFetcher' on port 48952. INFO 2015-07-31 05:15:24 akka.remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. INFO 2015-07-31 05:15:24 akka.remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: Changing view acls to: cassandra,missingmerch INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: Changing modify acls to: cassandra,missingmerch INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cassandra, missingmerch); users with modify permissions: Set(cassandra, missingmerch) INFO 2015-07-31 05:15:24 akka.event.slf4j.Slf4jLogger: Slf4jLogger started INFO 2015-07-31 05:15:24 akka.remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. INFO 2015-07-31 05:15:24 Remoting: Starting remoting INFO 2015-07-31 05:15:24 Remoting: Remoting started; listening on addresses :[akka.tcp:// sparkExecutor@10.246.43.14mailto://sparkExecutor@10.246.43.14:56358] INFO 2015-07-31 05:15:24 org.apache.spark.util.Utils: Successfully started service 'sparkExecutor' on port 56358. INFO 2015-07-31 05:15:24 org.apache.spark.executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sparkdri...@tstl400029.wal-mart.commailto:// sparkdri...@tstl400029.wal-mart.com:60525/user/CoarseGrainedScheduler INFO 2015-07-31 05:15:24 org.apache.spark.deploy.worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@10.246.43.14mailto:// sparkWorker@10.246.43.14:51552/user/Worker INFO 2015-07-31 05:15:24 org.apache.spark.deploy.worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@10.246.43.14mailto:// sparkWorker@10.246.43.14:51552/user/Worker INFO 2015-07-31 05:15:24 org.apache.spark.executor.CoarseGrainedExecutorBackend: Successfully registered with driver INFO 2015-07-31 05:15:24 org.apache.spark.executor.Executor: Starting executor ID 0 on host 10.246.43.14 INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: Changing view acls to: cassandra,missingmerch INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: Changing modify acls to: cassandra,missingmerch INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cassandra, missingmerch); users with modify permissions: Set(cassandra, missingmerch) INFO 2015-07-31 05:15:24 org.apache.spark.util.AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkdri...@tstl400029.wal-mart.commailto:// sparkdri...@tstl400029.wal-mart.com:60525/user/MapOutputTracker INFO 2015-07-31 05:15:25 org.apache.spark.util.AkkaUtils: Connecting to BlockManagerMaster: akka.tcp://sparkdri...@tstl400029.wal-mart.commailto:// sparkdri...@tstl400029.wal-mart.com:60525/user/BlockManagerMaster INFO 2015-07-31 05:15:25 org.apache.spark.storage.DiskBlockManager: Created local directory at /var/lib/spark/rdd/spark-2ab3a70e-2bdd-4ab5-9aff-fa48224f14d4/spark-c1430eaf-272d-41d5-acd1-ac88dbca4698/spark-b102f621-e107-4684-96cd-910ed4f2c0cf/spark-7e5ab6f6-3433-4e69-adcc-318083c37c4a INFO 2015-07-31 05:15:25 org.apache.spark.storage.MemoryStore: MemoryStore started with capacity 265.4 MB INFO 2015-07-31 05:15:31 org.apache.spark.network.netty.NettyBlockTransferService: Server created on 60707 INFO 2015-07-31 05:15:31 org.apache.spark.storage.BlockManagerMaster: Trying to register BlockManager INFO 2015-07-31 05:15:31 org.apache.spark.storage.BlockManagerMaster:
Re: Spark-Submit error
Hi Satish, Can you add more error or log info to the email? Guru Medasani gdm...@gmail.com On Jul 31, 2015, at 1:06 AM, satish chandra j jsatishchan...@gmail.com wrote: HI, I have submitted a Spark Job with options jars,class,master as local but i am getting an error as below dse spark-submit spark error exception in thread main java.io.ioexception: Invalid Request Exception(Why you have not logged in) Note: submitting datastax spark node please let me know if anybody have a solutions for this issue Regards, Saish Chandra
Re: Data from PostgreSQL to Spark
Here is the solution this looks perfect for me. thanks for all your help http://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/ On 28 July 2015 at 23:27, Jörn Franke jornfra...@gmail.com wrote: Can you put some transparent cache in front of the database? Or some jdbc proxy? Le mar. 28 juil. 2015 à 19:34, Jeetendra Gangele gangele...@gmail.com a écrit : can the source write to Kafka/Flume/Hbase in addition to Postgres? no it can't write ,this is due to the fact that there are many applications those are producing this postGreSql data.I can't really asked all the teams to start writing to some other source. velocity of the application is too high. On 28 July 2015 at 21:50, santosh...@gmail.com wrote: Sqoop’s incremental data fetch will reduce the data size you need to pull from source, but then by the time that incremental data fetch is complete, is it not current again, if velocity of the data is high? May be you can put a trigger in Postgres to send data to the big data cluster as soon as changes are made. Or as I was saying in another email, can the source write to Kafka/Flume/Hbase in addition to Postgres? Sent from Windows Mail *From:* Jeetendra Gangele gangele...@gmail.com *Sent:* Tuesday, July 28, 2015 5:43 AM *To:* santosh...@gmail.com *Cc:* ayan guha guha.a...@gmail.com, felixcheun...@hotmail.com, user@spark.apache.org I trying do that, but there will always data mismatch, since by the time scoop is fetching main database will get so many updates. There is something called incremental data fetch using scoop but that hits a database rather than reading the WAL edit. On 28 July 2015 at 02:52, santosh...@gmail.com wrote: Why cant you bulk pre-fetch the data to HDFS (like using Sqoop) instead of hitting Postgres multiple times? Sent from Windows Mail *From:* ayan guha guha.a...@gmail.com *Sent:* Monday, July 27, 2015 4:41 PM *To:* Jeetendra Gangele gangele...@gmail.com *Cc:* felixcheun...@hotmail.com, user@spark.apache.org You can call dB connect once per partition. Please have a look at design patterns of for each construct in document. How big is your data in dB? How soon that data changes? You would be better off if data is in spark already On 28 Jul 2015 04:48, Jeetendra Gangele gangele...@gmail.com wrote: Thanks for your reply. Parallel i will be hitting around 6000 call to postgreSQl which is not good my database will die. these calls to database will keeps on increasing. Handling millions on request is not an issue with Hbase/NOSQL any other alternative? On 27 July 2015 at 23:18, felixcheun...@hotmail.com wrote: You can have Spark reading from PostgreSQL through the data access API. Do you have any concern with that approach since you mention copying that data into HBase. From: Jeetendra Gangele Sent: Monday, July 27, 6:00 AM Subject: Data from PostgreSQL to Spark To: user Hi All I have a use case where where I am consuming the Events from RabbitMQ using spark streaming.This event has some fields on which I want to query the PostgreSQL and bring the data and then do the join between event data and PostgreSQl data and put the aggregated data into HDFS, so that I run run analytics query over this data using SparkSQL. my question is PostgreSQL data in production data so i don't want to hit so many times. at any given 1 seconds time I may have 3000 events,that means I need to fire 3000 parallel query to my PostGreSQl and this data keeps on growing, so my database will go down. I can't migrate this PostgreSQL data since lots of system using it,but I can take this data to some NOSQL like base and query the Hbase, but here issue is How can I make sure that Hbase has upto date data? Any anyone suggest me best approach/ method to handle this case? Regards Jeetendra
Re: Spark-Submit error
Thanks Satish. I only see the INFO messages and don’t see any error messages in the output you pasted. Can you paste the log with the error messages? Guru Medasani gdm...@gmail.com On Aug 3, 2015, at 11:12 PM, satish chandra j jsatishchan...@gmail.com wrote: Hi Guru, I am executing this on DataStax Enterprise Spark node and ~/.dserc file exists which consists Cassandra credentials but still getting the error Below is the given command dse spark-submit --master spark://10.246.43.15:7077 http://10.246.43.15:7077/ --class HelloWorld --jars ///home/missingmerch/postgresql-9.4-1201.jdbc41.jar ///home/missingmerch/etl-0.0.1-SNAPSHOT.jar Please find the error log details below INFO 2015-07-31 05:15:17 org.apache.spark.executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] INFO 2015-07-31 05:15:18 org.apache.spark.SecurityManager: Changing view acls to: cassandra,missingmerch INFO 2015-07-31 05:15:18 org.apache.spark.SecurityManager: Changing modify acls to: cassandra,missingmerch INFO 2015-07-31 05:15:18 org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cassandra, missingmerch); users with modify permissions: Set(cassandra, missingmerch) INFO 2015-07-31 05:15:22 akka.event.slf4j.Slf4jLogger: Slf4jLogger started INFO 2015-07-31 05:15:22 Remoting: Starting remoting INFO 2015-07-31 05:15:22 Remoting: Remoting started; listening on addresses :[akka.tcp://driverPropsFetcher@10.246.43.14 mailto:driverPropsFetcher@10.246.43.14mailto://driverPropsFetcher@10.246.43.14 mailto:driverPropsFetcher@10.246.43.14:48952] INFO 2015-07-31 05:15:22 org.apache.spark.util.Utils: Successfully started service 'driverPropsFetcher' on port 48952. INFO 2015-07-31 05:15:24 akka.remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. INFO 2015-07-31 05:15:24 akka.remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: Changing view acls to: cassandra,missingmerch INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: Changing modify acls to: cassandra,missingmerch INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cassandra, missingmerch); users with modify permissions: Set(cassandra, missingmerch) INFO 2015-07-31 05:15:24 akka.event.slf4j.Slf4jLogger: Slf4jLogger started INFO 2015-07-31 05:15:24 akka.remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. INFO 2015-07-31 05:15:24 Remoting: Starting remoting INFO 2015-07-31 05:15:24 Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@10.246.43.14 mailto:sparkExecutor@10.246.43.14mailto://sparkExecutor@10.246.43.14 mailto:sparkExecutor@10.246.43.14:56358] INFO 2015-07-31 05:15:24 org.apache.spark.util.Utils: Successfully started service 'sparkExecutor' on port 56358. INFO 2015-07-31 05:15:24 org.apache.spark.executor.CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://sparkdri...@tstl400029.wal-mart.com mailto:sparkdri...@tstl400029.wal-mart.commailto://sparkdri...@tstl400029.wal-mart.com mailto:sparkdri...@tstl400029.wal-mart.com:60525/user/CoarseGrainedScheduler INFO 2015-07-31 05:15:24 org.apache.spark.deploy.worker.WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@10.246.43.14 mailto:sparkWorker@10.246.43.14mailto://sparkWorker@10.246.43.14 mailto:sparkWorker@10.246.43.14:51552/user/Worker INFO 2015-07-31 05:15:24 org.apache.spark.deploy.worker.WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@10.246.43.14 mailto:sparkWorker@10.246.43.14mailto://sparkWorker@10.246.43.14 mailto:sparkWorker@10.246.43.14:51552/user/Worker INFO 2015-07-31 05:15:24 org.apache.spark.executor.CoarseGrainedExecutorBackend: Successfully registered with driver INFO 2015-07-31 05:15:24 org.apache.spark.executor.Executor: Starting executor ID 0 on host 10.246.43.14 INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: Changing view acls to: cassandra,missingmerch INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: Changing modify acls to: cassandra,missingmerch INFO 2015-07-31 05:15:24 org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cassandra, missingmerch); users with modify permissions: Set(cassandra, missingmerch) INFO 2015-07-31 05:15:24 org.apache.spark.util.AkkaUtils: Connecting to MapOutputTracker: akka.tcp://sparkdri...@tstl400029.wal-mart.com mailto:sparkdri...@tstl400029.wal-mart.commailto://sparkdri...@tstl400029.wal-mart.com mailto:sparkdri...@tstl400029.wal-mart.com:60525/user/MapOutputTracker INFO
Repartition question
Hi All, I am running the WikiPedia parsing example present in the Advance Analytics with Spark book. https://github.com/sryza/aas/blob/d3f62ef3ed43a59140f4ae8afbe2ef81fc643ef2/ch06-lsa/src/main/scala/com/cloudera/datascience/lsa/ParseWikipedia.scala#l112 The partitions of the RDD returned by the readFile function (mentioned above) is of 32MB size. So if my file size is 100 MB, RDD is getting created with 4 partitions with approx 32MB size. I am running this in a standalone spark cluster mode, every thing is working fine only little confused about the nbr of partitions and the size. I want to increase the nbr of partitions for the RDD to make use of the cluster. Is calling repartition() after this the only option or can I pass something in the above method to have more partitions of the RDD. Please let me know. Thanks.
Re: Unable to query existing hive table from spark sql 1.3.0
Your table is in which database - default or result. By default spark will try to look for table in default database. If the table exists in the result database try to prefix the table name with database name like select * from result.salarytest or set the database by executing use database name -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-query-existing-hive-table-from-spark-sql-1-3-0-tp24108p24121.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark streaming max receiver rate doubts
1.In spark 1.3(Non receiver) - If my batch interval is 1 sec and I don't set spark.streaming.kafka.maxRatePerPartition - so default behavious is to bring all messages from kafka from last offset to current offset ? Say no of messages were large and it took 5 sec to process those so will all jobs for interval 2-5 sec be queued and created afterwards or should not be created since all messages are already processed for those interval also? 2.In spark streaming 1.2(Receiver based) if I don't set spark.streaming.receiver.maxRate - will it consume all messages from last offset or it will just consume messages whatever it can consume in this batch interval of 1 sec.
spark --files permission error
Is there any setting to allow --files to copy jar from driver to executor nodes. When I am passing some jar files using --files to executors and adding them in class path of executor it throws exception of File not found 15/08/03 07:59:50 WARN TaskSetManager: Lost task 8.0 in stage 0.0 (TID 8, ip): java.io.FileNotFoundException: ./jar (Permission denied) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at org.spark-project.guava.common.io.Files$FileByteSink.openStream(Files.java:223) Running program as : spark-submit --class classname --files externaljarname.jar --driver-class-path externaljarname.jar --conf spark.executor.extraClassPath=externaljarname.jar mainjar.jar
Re: spark cluster setup
Are you sitting behind a firewall and accessing a remote master machine? In that case, have a look at this http://spark.apache.org/docs/latest/configuration.html#networking, you might want to fix few properties like spark.driver.host, spark.driver.host etc. Thanks Best Regards On Mon, Aug 3, 2015 at 7:46 AM, Angel Angel areyouange...@gmail.com wrote: Hello Sir, I have install the spark. The local spark-shell is working fine. But whenever I tried the Master configuration I got some errors. When I run this command ; MASTER=spark://hadoopm0:7077 spark-shell I gets the errors likes; 15/07/27 21:17:26 INFO AppClient$ClientActor: Connecting to master spark://hadoopm0:7077... 15/07/27 21:17:46 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 15/07/27 21:17:46 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet. 15/07/27 21:17:46 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. Also I have attached the my screenshot of Master UI. Also i have tested using telnet command: it shows that hadoopm0 is connected Can you please give me some references, documentations or how to solve this issue. Thanks in advance. Thanking You, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Checkpoint file not found
Hi, Its an application that maintains some state from the DStream using updateStateByKey() operation. It then selects some of the records from current batch using some criteria over current values and the state and carries over the remaining values to next batch. Following is the pseudo code : var pending = emptyRDD val dstream = kafkaStream val stateStream = dstream.updateStateByKey(myfunc, partitioner, initialState) val joinedStream = dstream.transformWith(sumstream, transformer(pending) _ ) val toUpdate = joinedStream.flter(myfilter).saveToES() val toNotUpdate = joinedStream.filter(notFilter).checkpoint(interval) toNotUpdate.foreachRDD(rdd = pending = rdd ) Thanks On 3 August 2015 at 13:09, Tathagata Das t...@databricks.com wrote: Can you tell us more about streaming app? DStream operation that you are using? On Sun, Aug 2, 2015 at 9:14 PM, Anand Nalya anand.na...@gmail.com wrote: Hi, I'm writing a Streaming application in Spark 1.3. After running for some time, I'm getting following execption. I'm sure, that no other process is modifying the hdfs file. Any idea, what might be the cause of this? 15/08/02 21:24:13 ERROR scheduler.DAGSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop failed; shutting down SparkContext java.io.FileNotFoundException: File does not exist: hdfs://node16:8020/user/anandnalya/tiered-original/e6794c2c-1c9f-414a-ae7e-e58a8f874661/rdd-5112/part-0 at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1132) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1124) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1124) at org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1324) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at
Re: About memory leak in spark 1.4.1
Sea, it exists, trust me. We have spark in production under Yarn. if you want more control use Yarn if you can. At least it kills the executor if it hogs memory.. I am explicitly setting spark.yarn.executor.memoryOverhead to the same size as heap for one of our processes For example: spark.executor.memory 4g spark.yarn.executor.memoryOverhead 4000 Try the following config: spark.executor.memory25g spark.storage.memoryFraction 0.2 (this is more for safety, hopefully you will get a lot of GC and plain java OOM instead of just memory overuse by some off heap magic) And check memory usage.It should give you a feel of offheap memory consumption of your application. If it still dies because machine memory gets completely filled up perhaps there is a memory leak in spark 1.4 On Mon, Aug 3, 2015 at 4:58 AM Sea 261810...@qq.com wrote: spark uses a lot more than heap memory, it is the expected behavior. It didn't exist in spark 1.3.x What does a lot more than means? It means that I lose control of it! I try to apply 31g, but it still grows to 55g and continues to grow!!! That is the point! I have tried set memoryFraction to 0.2,but it didn't help. I don't know whether it will still exist in the next release 1.5, I wish not. -- 原始邮件 -- *发件人:* Barak Gitsis;bar...@similarweb.com; *发送时间:* 2015年8月2日(星期天) 晚上9:55 *收件人:* Sea261810...@qq.com; Ted Yuyuzhih...@gmail.com; *抄送:* user@spark.apache.orguser@spark.apache.org; rxin r...@databricks.com; joshrosenjoshro...@databricks.com; davies dav...@databricks.com; *主题:* Re: About memory leak in spark 1.4.1 spark uses a lot more than heap memory, it is the expected behavior. in 1.4 off-heap memory usage is supposed to grow in comparison to 1.3 Better use as little memory as you can for heap, and since you are not utilizing it already, it is safe for you to reduce it. memoryFraction helps you optimize heap usage for your data/application profile while keeping it tight. On Sun, Aug 2, 2015 at 12:54 PM Sea 261810...@qq.com wrote: spark.storage.memoryFraction is in heap memory, but my situation is that the memory is more than heap memory ! Anyone else use spark 1.4.1 in production? -- 原始邮件 -- *发件人:* Ted Yu;yuzhih...@gmail.com; *发送时间:* 2015年8月2日(星期天) 下午5:45 *收件人:* Sea261810...@qq.com; *抄送:* Barak Gitsisbar...@similarweb.com; user@spark.apache.org user@spark.apache.org; rxinr...@databricks.com; joshrosen joshro...@databricks.com; daviesdav...@databricks.com; *主题:* Re: About memory leak in spark 1.4.1 http://spark.apache.org/docs/latest/tuning.html does mention spark.storage.memoryFraction in two places. One is under Cache Size Tuning section. FYI On Sun, Aug 2, 2015 at 2:16 AM, Sea 261810...@qq.com wrote: Hi, Barak It is ok with spark 1.3.0, the problem is with spark 1.4.1. I don't think spark.storage.memoryFraction will make any sense, because it is still in heap memory. -- 原始邮件 -- *发件人:* Barak Gitsis;bar...@similarweb.com; *发送时间:* 2015年8月2日(星期天) 下午4:11 *收件人:* Sea261810...@qq.com; useruser@spark.apache.org; *抄送:* rxinr...@databricks.com; joshrosenjoshro...@databricks.com; daviesdav...@databricks.com; *主题:* Re: About memory leak in spark 1.4.1 Hi, reducing spark.storage.memoryFraction did the trick for me. Heap doesn't get filled because it is reserved.. My reasoning is: I give executor all the memory i can give it, so that makes it a boundary. From here i try to make the best use of memory I can. storage.memoryFraction is in a sense user data space. The rest can be used by the system. If you don't have so much data that you MUST store in memory for performance, better give spark more space.. ended up setting it to 0.3 All that said, it is on spark 1.3 on cluster hope that helps On Sat, Aug 1, 2015 at 5:43 PM Sea 261810...@qq.com wrote: Hi, all I upgrage spark to 1.4.1, many applications failed... I find the heap memory is not full , but the process of CoarseGrainedExecutorBackend will take more memory than I expect, and it will increase as time goes on, finally more than max limited of the server, the worker will die. Any can help? Mode:standalone spark.executor.memory 50g 25583 xiaoju20 0 75.5g 55g 28m S 1729.3 88.1 2172:52 java 55g more than 50g I apply -- *-Barak* -- *-Barak* -- *-Barak*
Running multiple batch jobs in parallel using Spark on Mesos
Hello *, We are trying to build some Batch jobs using Spark on Mesos. Mesos offer's two main mode of deployment of Spark job. 1. Fine-grained 2. Coarse-grained When we are running the spark jobs in fine grained mode then spark is using max amount of offers from Mesos and running the job. Running batch jobs in this mode can easily starve the high priority jobs in the cluster and one job can easily use large part of the cluster. There is no way to specify a max limit of resource which should be used by one particular framework. Problem with coarse-grained model is that the cluster reserves the given amount of resource at start and then run the spark job on those resources. This becomes a problem as we have to reserve more resources then it might need so that the job never fails. This will lead to the wastage of resources and gives us static partitioning of resource on Mesos cluster. Can anyone share their experience in managing multiple batch Spark job on Mesos Cluster? -- Regards, Akash Mishra. Its not our abilities that make us, but our decisions.--Albus Dumbledore
Re: spark cluster setup
Your master log files will be on the spark home folder/logs at the master machine. Do they show an error ? Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co Check out Reifier at Spark Summit 2015 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/ http://in.linkedin.com/in/sonalgoyal On Mon, Aug 3, 2015 at 9:27 AM, Angel Angel areyouange...@gmail.com wrote: Hi, i have attached the snapshot of console. actually i don't know how to see the Master logs. still i have attache the my master web UI. and the is log file errors. 2015-07-23 17:00:59,977 ERROR org.apache.spark.scheduler.ReplayListenerBus: Malformed line: not started 2015-07-23 17:01:00,096 INFO org.eclipse.jetty.server.Server: jetty-8.y.z-SNAPSHOT 2015-07-23 17:01:00,138 INFO org.eclipse.jetty.server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:18088 2015-07-23 17:01:00,138 INFO org.apache.spark.util.Utils: Successfully started service on port 18088. 2015-07-23 17:01:00,140 INFO org.apache.spark.deploy.history.HistoryServer: Started HistoryServer at http://hadoopm0:18088 2015-07-24 11:36:18,148 INFO org.apache.spark.SecurityManager: Changing view acls to: spark 2015-07-24 11:36:18,148 INFO org.apache.spark.SecurityManager: Changing modify acls to: spark 2015-07-24 11:36:18,148 INFO org.apache.spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark); users with modify permissions: Set(spark) 2015-07-24 11:36:18,367 INFO org.apache.spark.SecurityManager: Changing acls enabled to: false 2015-07-24 11:36:18,367 INFO org.apache.spark.SecurityManager: Changing admin acls to: 2015-07-24 11:36:18,368 INFO org.apache.spark.SecurityManager: Changing view acls to: root Thanks. On Mon, Aug 3, 2015 at 11:52 AM, Sonal Goyal sonalgoy...@gmail.com wrote: What do the master logs show? Best Regards, Sonal Founder, Nube Technologies http://t.sidekickopen13.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs1pNkJdVdDLZW1q7zBxW64k9XR56dLFLf58_ZT802?t=http%3A%2F%2Fwww.nubetech.co%2Fsi=5462006004973568pi=903294d1-e4a2-4926-cf03-b51cc168cfc1 Check out Reifier at Spark Summit 2015 http://t.sidekickopen13.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs1pNkJdVdDLZW1q7zBxW64k9XR56dLFLf58_ZT802?t=https%3A%2F%2Fspark-summit.org%2F2015%2Fevents%2Freal-time-fuzzy-matching-with-spark-and-elastic-search%2Fsi=5462006004973568pi=903294d1-e4a2-4926-cf03-b51cc168cfc1 http://t.sidekickopen13.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs1pNkJdVdDLZW1q7zBxW64k9XR56dLFLf58_ZT802?t=http%3A%2F%2Fin.linkedin.com%2Fin%2Fsonalgoyalsi=5462006004973568pi=903294d1-e4a2-4926-cf03-b51cc168cfc1 On Mon, Aug 3, 2015 at 7:46 AM, Angel Angel areyouange...@gmail.com wrote: Hello Sir, I have install the spark. The local spark-shell is working fine. But whenever I tried the Master configuration I got some errors. When I run this command ; MASTER=spark://hadoopm0:7077 spark-shell I gets the errors likes; 15/07/27 21:17:26 INFO AppClient$ClientActor: Connecting to master spark://hadoopm0:7077... 15/07/27 21:17:46 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 15/07/27 21:17:46 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet. 15/07/27 21:17:46 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. Also I have attached the my screenshot of Master UI. Also i have tested using telnet command: it shows that hadoopm0 is connected Can you please give me some references, documentations or how to solve this issue. Thanks in advance. Thanking You, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: SparkLauncher not notified about finished job - hangs infinitely.
Reading from the input stream and the error stream (in separate threads) indeed unblocked the launcher and it exited properly. Thanks for your responses! Best regards, Tomasz From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Friday, July 31, 2015 19:20 To: Elkhan Dadashov Cc: Tomasz Guziałek; user@spark.apache.org Subject: Re: SparkLauncher not notified about finished job - hangs infinitely. Tomasz: Please take a look at the Redirector class inside: ./launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java FYI On Fri, Jul 31, 2015 at 10:02 AM, Elkhan Dadashov elkhan8...@gmail.commailto:elkhan8...@gmail.com wrote: Hi Tomasz, Answer to your 1st question: Clear/read the error (spark.getErrorStream()) and output (spark.getInputStream()) stream buffers before you call spark.waitFor(), it would be better to clear/read them with 2 different threads. Then it should work fine. As Spark job is launched as subprocess, and according to Oracle documentationhttps://docs.oracle.com/javase/8/docs/api/java/lang/Process.html: By default, the created subprocess does not have its own terminal or console. All its standard I/O (i.e. stdin, stdout, stderr) operations will be redirected to the parent process, where they can be accessed via the streams obtained using the methodsgetOutputStream(), getInputStream(), and getErrorStream(). The parent process uses these streams to feed input to and get output from the subprocess. Because some native platforms only provide limited buffer size for standard input and output streams, failure to promptly write the input stream or read the output stream of the subprocess may cause the subprocess to block, or even deadlock. On Fri, Jul 31, 2015 at 2:45 AM, Tomasz Guziałek tomasz.guzia...@humaninference.commailto:tomasz.guzia...@humaninference.com wrote: I am trying to submit a JAR with Spark job into the YARN cluster from Java code. I am using SparkLauncher to submit SparkPi example: Process spark = new SparkLauncher() .setAppResource(C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar) .setMainClass(org.apache.spark.examples.SparkPi) .setMaster(yarn-cluster) .launch(); System.out.println(Waiting for finish...); int exitCode = spark.waitFor(); System.out.println(Finished! Exit code: + exitCode); There are two problems: 1. While submitting in yarn-cluster mode, the application is successfully submitted to YARN and executes successfully (it is visible in the YARN UI, reported as SUCCESS and PI value is printed in the output). However, the submitting application is never notified that processing is finished - it hangs infinitely after printing Waiting to finish... The log of the container can be found here: http://pastebin.com/LscBjHQc 2. While submitting in yarn-client mode, the application does not appear in YARN UI and the submitting application hangs at Waiting to finish... When hanging code is killed, the application shows up in YARN UI and it is reported as SUCCESS, but the output is empty (PI value is not printed out). The log of the container can be found here: http://pastebin.com/9KHi81r4 I tried to execute the submitting application both with Oracle Java 8 and 7. Any hints what might be wrong? Best regards, Tomasz -- Best regards, Elkhan Dadashov
Re: Checkpoint file not found
Can you tell us more about streaming app? DStream operation that you are using? On Sun, Aug 2, 2015 at 9:14 PM, Anand Nalya anand.na...@gmail.com wrote: Hi, I'm writing a Streaming application in Spark 1.3. After running for some time, I'm getting following execption. I'm sure, that no other process is modifying the hdfs file. Any idea, what might be the cause of this? 15/08/02 21:24:13 ERROR scheduler.DAGSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop failed; shutting down SparkContext java.io.FileNotFoundException: File does not exist: hdfs://node16:8020/user/anandnalya/tiered-original/e6794c2c-1c9f-414a-ae7e-e58a8f874661/rdd-5112/part-0 at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1132) at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1124) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1124) at org.apache.spark.rdd.CheckpointRDD.getPreferredLocations(CheckpointRDD.scala:66) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$1.apply(RDD.scala:230) at scala.Option.map(Option.scala:145) at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1324) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply(DAGScheduler.scala:1333) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1331) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1331) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$2.apply$mcVI$sp(DAGScheduler.scala:1334) at
Is it possible to disable AM page proxy in Yarn client mode?
In Yarn client mode, Spark driver URL will be redirected to Yarn web proxy server, but I don't want to use this dynamic name, is it possible to still use host:port as standalone mode?
How do I Process Streams that span multiple lines?
All examples of Spark Stream programming that I see assume streams of lines that are then tokenised and acted upon (like the WordCount example). How do I process Streams that span multiple lines? Are there examples that I can use?
EOFException when transmitting a class that extends Externalizable
Hi, I am having a problem serializing a custom partitioner that I have written that extends Externalizable. The partitioner wraps a java TreeSet which stores table splits. There are thousands of splits. I noticed earlier that my spark job was taking over 30 seconds just to transmit a task to each worker, which is what motivated me to optimize the serialization of the partitioner I wrote. I read in your configuration page that Kryo can only be used to serialize data in RDDs, but spark cannot use Kryo to transmit the task closure to each executor. That is why I chose to use Externalizable for my custom partitioner. I have unit tests that confirm my externalizable class can be serialized using java serialization. Unfortunately when it comes to the cluster though, it stops working. I added some logging to this and discovered that it had serialized about 2000 splits out of 4000 before it encountered an EOFException. This is happening consistently on every node of my cluster. I have no idea what could cause this or even how to get more information. Would somebody please tell me what could possibly cause this or how to troubleshoot it? Mike Knapp
Re: Standalone Cluster Local Authentication
Looks like related work is in progress. e.g. SPARK-5158 Cheers On Mon, Aug 3, 2015 at 10:05 AM, MrJew kouz...@gmail.com wrote: Hello, Similar to other cluster systems e.g Zookeeper, Hazelcast. Spark has the problem that is protected from the outside world however anyone having access to the host can run a spark node without the need for authentication. Currently we are using Spark 1.3.1. Is there a way to enable authentication so only users that have the secret can run a node. Current solution involves configuring the job via env variable however anyone running 'ps' command can see it. Regards, George -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Standalone-Cluster-Local-Authentication-tp24116.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do I Process Streams that span multiple lines?
Are you looking for RDD.wholeTextFiles? On 3 August 2015 at 10:57, Spark Enthusiast sparkenthusi...@yahoo.in wrote: All examples of Spark Stream programming that I see assume streams of lines that are then tokenised and acted upon (like the WordCount example). How do I process Streams that span multiple lines? Are there examples that I can use?
Re: How do I Process Streams that span multiple lines?
Sorry. SparkContext.wholeTextFiles Not sure about streams. On 3 August 2015 at 14:50, Michal Čizmazia mici...@gmail.com wrote: Are you looking for RDD.wholeTextFiles? On 3 August 2015 at 10:57, Spark Enthusiast sparkenthusi...@yahoo.in wrote: All examples of Spark Stream programming that I see assume streams of lines that are then tokenised and acted upon (like the WordCount example). How do I process Streams that span multiple lines? Are there examples that I can use?
Standalone Cluster Local Authentication
Hello, Similar to other cluster systems e.g Zookeeper, Hazelcast. Spark has the problem that is protected from the outside world however anyone having access to the host can run a spark node without the need for authentication. Currently we are using Spark 1.3.1. Is there a way to enable authentication so only users that have the secret can run a node. Current solution involves configuring the job via env variable however anyone running 'ps' command can see it. Regards, George -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Standalone-Cluster-Local-Authentication-tp24116.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it possible to disable AM page proxy in Yarn client mode?
the reason that redirect is there is for security reasons; in a kerberos enabled cluster the RM proxy does the authentication, then forwards the requests to the running application. There's no obvious way to disable it in the spark application master, and I wouldn't recommend doing this anyway, as it only gets you into a situation where your code works until you flip the security bit on. The Spark Web UI installs a new filter, (AmIpFilter), which 302's all HTTP requests coming in from anywhere other than the host running the RM Proxy. 1. if you make requests from that host (curl, browser, whatever), then they go through without the redirect. 2. If you don't have an RM proxy (why not?) then you can configure the spark AM to treat your client IP address as the proxy -and again, no redirect. YARN-2031 covers the ongoing work to have that proxy/IP filter handle REST API properly. Currently it only handles GET operations assumes a human visiting the application in a web browser On 3 Aug 2015, at 01:52, Rex Xiong bycha...@gmail.com wrote: In Yarn client mode, Spark driver URL will be redirected to Yarn web proxy server, but I don't want to use this dynamic name, is it possible to still use host:port as standalone mode? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Package Release Annoucement: Spark SQL on HBase Astro
When I tried to compile against hbase 1.1.1, I got: [ERROR] /home/hbase/ssoh/src/main/scala/org/apache/spark/sql/hbase/SparkSqlRegionObserver.scala:124: overloaded method next needs result type [ERROR] override def next(result: java.util.List[Cell], limit: Int) = next(result) Is there plan to support hbase 1.x ? Thanks On Wed, Jul 22, 2015 at 4:53 PM, Bing Xiao (Bing) bing.x...@huawei.com wrote: We are happy to announce the availability of the Spark SQL on HBase 1.0.0 release. http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase The main features in this package, dubbed “Astro”, include: · Systematic and powerful handling of data pruning and intelligent scan, based on partial evaluation technique · HBase pushdown capabilities like custom filters and coprocessor to support ultra low latency processing · SQL, Data Frame support · More SQL capabilities made possible (Secondary index, bloom filter, Primary Key, Bulk load, Update) · Joins with data from other sources · Python/Java/Scala support · Support latest Spark 1.4.0 release The tests by Huawei team and community contributors covered the areas: bulk load; projection pruning; partition pruning; partial evaluation; code generation; coprocessor; customer filtering; DML; complex filtering on keys and non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. We will post the test results including performance tests the middle of August. You are very welcomed to try out or deploy the package, and help improve the integration tests with various combinations of the settings, extensive Data Frame tests, complex join/union test and extensive performance tests. Please use the “Issues” “Pull Requests” links at this package homepage, if you want to report bugs, improvement or feature requests. Special thanks to project owner and technical leader Yan Zhou, Huawei global team, community contributors and Databricks. Databricks has been providing great assistance from the design to the release. “Astro”, the Spark SQL on HBase package will be useful for ultra low latency* query and analytics of large scale data sets in vertical enterprises**.* We will continue to work with the community to develop new features and improve code base. Your comments and suggestions are greatly appreciated. Yan Zhou / Bing Xiao Huawei Big Data team
Does RDD.cartesian involve shuffling?
Does RDD.cartesian involve shuffling? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to increase parallelism of a Spark cluster?
@Silvio: the mapPartitions instantiates a HttpSolrServer, then for each query string in the partition, sends the query to Solr using SolrJ, and gets back the top N results. It then reformats the result data into one long string and returns the key value pair as (query string, result string). @Igor: Thanks for the parameter suggestions. I will check the --num-executors and if there is a way to set the number of cores/executor with my Databricks admin and update here if I find it, but from the Databricks console, it appears that the number of executors per box is 1. This seems normal though, per the diagram on this page: http://spark.apache.org/docs/latest/cluster-overview.html where it seems that there is 1 executor per box, and each executor can spawn multiple threads to take care of multiple tasks (see bullet #1 copied below). Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). Regarding hitting the max number of requests, thanks for the link. I am using the default client. Just peeked at the Solr code, and the default settings (if no HttpClient instance is supplied in the ctor) is to use DefaultHttpClient (from HttpComponents) whose settings are as follows: - Version: HttpVersion.HTTP_1_1 - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET - NoTcpDelay: true - SocketBufferSize: 8192 - UserAgent: Apache-HttpClient/release (java 1.5) In addition, the Solr code sets the following additional config parameters on the DefaultHttpClient. params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128); params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32); params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects); Since all my connections are coming out of 2 worker boxes, it looks like I could get 32x2 = 64 clients hitting Solr, right? @Steve: Thanks for the link to the HttpClient config. I was thinking about using a thread pool (or better using a PoolingHttpClientManager per the docs), but it probably won't help since its still being fed one request at a time. @Abhishek: my observations agree with what you said. In the past I have had success with repartition to reduce the partition size especially when groupBy operations were involved. But I believe an executor should be able to handle multiple tasks in parallel from what I understand about Akka on which Spark is built - the worker is essentially an ActorSystem which can contain multiple Actors, each actor works on a queue of tasks. Within an Actor everything is sequential, but the ActorSystem is responsible for farming out tasks it gets to each of its Actors. Although it is possible I could be generalizing incorrectly from my limited experience with Akka. Thanks again for all your help. Please let me know if something jumps out and/or if there is some configuration I should check. -sujit On Sun, Aug 2, 2015 at 6:13 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: I don't know if (your assertion/expectation that) workers will process things (multiple partitions) in parallel is really valid. Or if having more partitions than workers will necessarily help (unless you are memory bound - so partitions is essentially helping your work size rather than execution parallelism). [Disclaimer: I am no authority on Spark, but wanted to throw my spin based my own understanding]. Nothing official about it :) -abhishek- On Jul 31, 2015, at 1:03 PM, Sujit Pal sujitatgt...@gmail.com wrote: Hello, I am trying to run a Spark job that hits an external webservice to get back some information. The cluster is 1 master + 4 workers, each worker has 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server, and is accessed using code similar to that shown below. def getResults(keyValues: Iterator[(String, Array[String])]): Iterator[(String, String)] = { val solr = new HttpSolrClient() initializeSolrParameters(solr) keyValues.map(keyValue = (keyValue._1, process(solr, keyValue))) } myRDD.repartition(10) .mapPartitions(keyValues = getResults(keyValues)) The mapPartitions does some initialization to the SolrJ client per partition and then hits it for each record in the partition via the getResults() call. I repartitioned in the hope that this will result in 10 clients hitting Solr simultaneously (I would like to go upto maybe 30-40 simultaneous clients if I can). However, I counted the number of open connections using netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and observed that Solr has a constant 4 clients (ie, equal to the number of workers) over the lifetime of the run. My observation leads me to
Re: Standalone Cluster Local Authentication
On 3 Aug 2015, at 10:05, MrJew kouz...@gmail.com wrote: Hello, Similar to other cluster systems e.g Zookeeper, Actually, Zookeeper supports SASL authentication of your Kerberos tokens. https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zookeeper+and+SASL Hazelcast. Spark has the problem that is protected from the outside world however anyone having access to the host can run a spark node without the need for authentication. Currently we are using Spark 1.3.1. Is there a way to enable authentication so only users that have the secret can run a node. Current solution involves configuring the job via env variable however anyone running 'ps' command can see it. Regards, George This is where the YARN its kerberos support has the edge over standalone; set up Kerberos properly in your hadoop cluster and you get HDFS locked down, your spark applications running as an different user from other applications, and web access managed via the RM proxy. There's a terrifying amount of complexity going on to achieve that. If you want to lock down a standalone cluster, then you'll have to isolate the cluster rely on SSH tunnelling to only let your trusted users in. Some organisations do that for their Hadoop clusters anyway. (ASF sponsored advert: I am giving a talk, Hadoop And Kerberos: the madness beyond the gate, At Apachecon big data EU ( https://apachebigdata2015.sched.org/event/a10da43d16686f049ee6e25640ee3e8b) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Combine code for RDD and DStream
Hello! I am developing a Spark program that uses both batch and streaming (separately). They are both pretty much the exact same programs, except the inputs come from different sources. Unfortunately, RDD's and DStream's define all of their transformations in their own files, and so I have two different files with pretty much the exact same code. If I make a change to a transformation in one program, I have to make the exact same change to the other program. It would be nice to be able to have a third file that has all of my transformations. The batch program and the streaming program can then both reference this third file to know what transformations to perform on the data. Anyone know a good way of doing this? I want to be able to keep the exact same syntax (..rdd.filter({i:Int=i*2}.map(...).) in this third file. With this method, if I make any changes to the transformations, it will apply to both the batch AND streaming processes. I tried a couple of ideas with no avail. Thanks in advance, Sidd
Re: Extremely poor predictive performance with RF in mllib
hi, I've run into some poor RF behavior, although not as pronounced as you.. would be great to get more insight into this one Thanks! On Mon, Aug 3, 2015 at 8:21 AM pkphlam pkph...@gmail.com wrote: Hi, This might be a long shot, but has anybody run into very poor predictive performance using RandomForest with Mllib? Here is what I'm doing: - Spark 1.4.1 with PySpark - Python 3.4.2 - ~30,000 Tweets of text - 12289 1s and 15956 0s - Whitespace tokenization and then hashing trick for feature selection using 10,000 features - Run RF with 100 trees and maxDepth of 4 and then predict using the features from all the 1s observations. So in theory, I should get predictions of close to 12289 1s (especially if the model overfits). But I'm getting exactly 0 1s, which sounds ludicrous to me and makes me suspect something is wrong with my code or I'm missing something. I notice similar behavior (although not as extreme) if I play around with the settings. But I'm getting normal behavior with other classifiers, so I don't think it's my setup that's the problem. For example: lrm = LogisticRegressionWithSGD.train(lp, iterations=10) logit_predict = lrm.predict(predict_feat) logit_predict.sum() 9077 nb = NaiveBayes.train(lp) nb_predict = nb.predict(predict_feat) nb_predict.sum() 10287.0 rf = RandomForest.trainClassifier(lp, numClasses=2, categoricalFeaturesInfo={}, numTrees=100, seed=422) rf_predict = rf.predict(predict_feat) rf_predict.sum() 0.0 This code was all run back to back so I didn't change anything in between. Does anybody have a possible explanation for this? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extremely-poor-predictive-performance-with-RF-in-mllib-tp24112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *-Barak*
Re: About memory leak in spark 1.4.1
in general, what is your configuration? use --conf spark.logConf=true we have 1.4.1 in production standalone cluster and haven't experienced what you are describing can you verify in web-ui that indeed spark got your 50g per executor limit? I mean in configuration page.. might be you are using offheap storage(Tachyon)? On 3 August 2015 at 04:58, Sea 261810...@qq.com wrote: spark uses a lot more than heap memory, it is the expected behavior. It didn't exist in spark 1.3.x What does a lot more than means? It means that I lose control of it! I try to apply 31g, but it still grows to 55g and continues to grow!!! That is the point! I have tried set memoryFraction to 0.2,but it didn't help. I don't know whether it will still exist in the next release 1.5, I wish not. -- 原始邮件 -- *发件人:* Barak Gitsis;bar...@similarweb.com; *发送时间:* 2015年8月2日(星期天) 晚上9:55 *收件人:* Sea261810...@qq.com; Ted Yuyuzhih...@gmail.com; *抄送:* user@spark.apache.orguser@spark.apache.org; rxin r...@databricks.com; joshrosenjoshro...@databricks.com; davies dav...@databricks.com; *主题:* Re: About memory leak in spark 1.4.1 spark uses a lot more than heap memory, it is the expected behavior. in 1.4 off-heap memory usage is supposed to grow in comparison to 1.3 Better use as little memory as you can for heap, and since you are not utilizing it already, it is safe for you to reduce it. memoryFraction helps you optimize heap usage for your data/application profile while keeping it tight. On Sun, Aug 2, 2015 at 12:54 PM Sea 261810...@qq.com wrote: spark.storage.memoryFraction is in heap memory, but my situation is that the memory is more than heap memory ! Anyone else use spark 1.4.1 in production? -- 原始邮件 -- *发件人:* Ted Yu;yuzhih...@gmail.com; *发送时间:* 2015年8月2日(星期天) 下午5:45 *收件人:* Sea261810...@qq.com; *抄送:* Barak Gitsisbar...@similarweb.com; user@spark.apache.org user@spark.apache.org; rxinr...@databricks.com; joshrosen joshro...@databricks.com; daviesdav...@databricks.com; *主题:* Re: About memory leak in spark 1.4.1 http://spark.apache.org/docs/latest/tuning.html does mention spark.storage.memoryFraction in two places. One is under Cache Size Tuning section. FYI On Sun, Aug 2, 2015 at 2:16 AM, Sea 261810...@qq.com wrote: Hi, Barak It is ok with spark 1.3.0, the problem is with spark 1.4.1. I don't think spark.storage.memoryFraction will make any sense, because it is still in heap memory. -- 原始邮件 -- *发件人:* Barak Gitsis;bar...@similarweb.com; *发送时间:* 2015年8月2日(星期天) 下午4:11 *收件人:* Sea261810...@qq.com; useruser@spark.apache.org; *抄送:* rxinr...@databricks.com; joshrosenjoshro...@databricks.com; daviesdav...@databricks.com; *主题:* Re: About memory leak in spark 1.4.1 Hi, reducing spark.storage.memoryFraction did the trick for me. Heap doesn't get filled because it is reserved.. My reasoning is: I give executor all the memory i can give it, so that makes it a boundary. From here i try to make the best use of memory I can. storage.memoryFraction is in a sense user data space. The rest can be used by the system. If you don't have so much data that you MUST store in memory for performance, better give spark more space.. ended up setting it to 0.3 All that said, it is on spark 1.3 on cluster hope that helps On Sat, Aug 1, 2015 at 5:43 PM Sea 261810...@qq.com wrote: Hi, all I upgrage spark to 1.4.1, many applications failed... I find the heap memory is not full , but the process of CoarseGrainedExecutorBackend will take more memory than I expect, and it will increase as time goes on, finally more than max limited of the server, the worker will die. Any can help? Mode:standalone spark.executor.memory 50g 25583 xiaoju20 0 75.5g 55g 28m S 1729.3 88.1 2172:52 java 55g more than 50g I apply -- *-Barak* -- *-Barak*
org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit
Hi Everyone, I am using Apache Spark for 2 weeks and as of now I am querying hive tables using spark java api. And it is working fine in Hadoop single mode but when I tried the same code in Hadoop multi cluster it throws org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit
Fwd: org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit
Hi Everyone, I am using Apache Spark for 2 weeks and as of now I am querying hive tables using spark java api. And it is working fine in Hadoop single mode but when I tried the same code in Hadoop multi cluster it throws org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit This is my java code what I tried in Single node cluster SparkConf sparkConf = new SparkConf().setAppName(Hive).setMaster(local).setSparkHome(path); JavaSparkContext ctx = new JavaSparkContext(sparkConf); HiveContext sqlContext = new HiveContext(ctx.sc()); org.apache.spark.sql.Row[] result = sqlContext.sql(Select * from tablename).collect(); But In multi node cluster I have changed local to yarn-cluster . can anyone help me in this?
Re: spark streaming program failed on Spark 1.4.1
Just to be clear, did you rebuild your job against spark 1.4.1 as well as upgrading the cluster? On Mon, Aug 3, 2015 at 8:36 AM, Netwaver wanglong_...@163.com wrote: Hi All, I have a spark streaming + kafka program written by Scala, it works well on Spark 1.3.1, but after I migrate my Spark cluster to 1.4.1 and rerun this program, I meet below exception: ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer at kafka.consumer.Consumer$.create(ConsumerConnector.scala:94) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.lang.ClassNotFoundException: org.I0Itec.zkclient.serialize.ZkSerializer at java.net.URLClassLoader$1.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) ... 14 more I did some web searching, and tried to add zkclient-0.3 jar in the classpath, but still get same issue, Who can share your experience on solving this issue? thanks in advance.
spark streaming program failed on Spark 1.4.1
Hi All, I have a spark streaming + kafka program written by Scala, it works well on Spark 1.3.1, but after I migrate my Spark cluster to 1.4.1 and rerun this program, I meet below exception: ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer at kafka.consumer.Consumer$.create(ConsumerConnector.scala:94) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:100) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:125) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:109) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:308) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:300) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.lang.ClassNotFoundException: org.I0Itec.zkclient.serialize.ZkSerializer at java.net.URLClassLoader$1.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) ... 14 more I did some web searching, and tried to add zkclient-0.3 jar in the classpath, but still get same issue, Who can share your experience on solving this issue? thanks in advance.
Re: How to increase parallelism of a Spark cluster?
Hi Sujit, From experimenting with Spark (and other documentation), my understanding is as follows: 1. Each application consists of one or more Jobs 2. Each Job has one or more Stages 3. Each Stage creates one or more Tasks (normally, one Task per Partition) 4. Master allocates one Executor per Worker (that contains Partition) per Application 5. The Executor stays up for the lifetime of the Application (and dies when the Application ends) 6. Each Executor can run multiple Tasks in parallel (normally, the parallelism depends on the number of cores per Executor). 7. The Scheduler schedules only one Task from each Stage to one Executor. 8. If there are multiple Stages (from a Job) and these Stages could be run asynchronously (i.e., in parallel), one Task from each Stage could be scheduled on the same Executor (thus this Executor runs multiple Tasks in parallel: see #6 above). Of course, there could be many exception/exclusions to what I explained above. I expect that Spark community will confirm or correct my observations/understanding above. Now, let’s come back to your situation. You have a cluster of 4 Workers with 10 Partitions. All of these 10 Partitions are distributed among these 4 Workers. Also, from the information provided by you, your Application has just one Job with a two Stages (repartition and mapPartition). The mapPartition Stage will have 10 Tasks. Assuming my observations/understanding is correct, by virtue of #7 above, only 4 Tasks can be executed in parallel. The subsequent Jobs will have to wait. However, if you had 10 or more Workers, all Tasks would have been executed in parallel. BTW, I believe, you can have multiple Workers on one Physical Node. So, one of the solution to your problem would be to increase the number of Workers. Having said so, I believe #7 above is the bottleneck. If there is no good reason for keeping this bottleneck, this could be a good area of improvement (and needs to be addressed by Spark community). I will wait for the community response, and if needed, I will open a JIRA item. I hope it helps. Regards, Ajay On Mon, Aug 3, 2015 at 1:16 PM, Sujit Pal sujitatgt...@gmail.com wrote: @Silvio: the mapPartitions instantiates a HttpSolrServer, then for each query string in the partition, sends the query to Solr using SolrJ, and gets back the top N results. It then reformats the result data into one long string and returns the key value pair as (query string, result string). @Igor: Thanks for the parameter suggestions. I will check the --num-executors and if there is a way to set the number of cores/executor with my Databricks admin and update here if I find it, but from the Databricks console, it appears that the number of executors per box is 1. This seems normal though, per the diagram on this page: http://spark.apache.org/docs/latest/cluster-overview.html where it seems that there is 1 executor per box, and each executor can spawn multiple threads to take care of multiple tasks (see bullet #1 copied below). Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). Regarding hitting the max number of requests, thanks for the link. I am using the default client. Just peeked at the Solr code, and the default settings (if no HttpClient instance is supplied in the ctor) is to use DefaultHttpClient (from HttpComponents) whose settings are as follows: - Version: HttpVersion.HTTP_1_1 - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET - NoTcpDelay: true - SocketBufferSize: 8192 - UserAgent: Apache-HttpClient/release (java 1.5) In addition, the Solr code sets the following additional config parameters on the DefaultHttpClient. params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128); params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32); params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects); Since all my connections are coming out of 2 worker boxes, it looks like I could get 32x2 = 64 clients hitting Solr, right? @Steve: Thanks for the link to the HttpClient config. I was thinking about using a thread pool (or better using a PoolingHttpClientManager per the docs), but it probably won't help since its still being fed one request at a time. @Abhishek: my observations agree with what you said. In the past I have had success with repartition to reduce the partition size especially when groupBy operations were involved. But I believe an executor should be able to handle multiple tasks in parallel from what I understand about Akka on which Spark is built - the worker is essentially an ActorSystem which
Re: how to ignore MatchError then processing a large json file in spark-sql
This sounds like a bug. What version of spark? and can you provide the stack trace? On Sun, Aug 2, 2015 at 11:27 AM, fuellee lee lifuyu198...@gmail.com wrote: I'm trying to process a bunch of large json log files with spark, but it fails every time with `scala.MatchError`, Whether I give it schema or not. I just want to skip lines that does not match schema, but I can't find how in docs of spark. I know write a json parser and map it to json file RDD can get things done, but I want to use `sqlContext.read.schema(schema).json(fileNames).selectExpr(...)` because it's much easier to maintain. thanks
Re: Combine code for RDD and DStream
DStreams transform function helps me solve this issue elegantly. Thanks! On Mon, Aug 3, 2015 at 1:42 PM, Sidd S ssinga...@gmail.com wrote: Hello! I am developing a Spark program that uses both batch and streaming (separately). They are both pretty much the exact same programs, except the inputs come from different sources. Unfortunately, RDD's and DStream's define all of their transformations in their own files, and so I have two different files with pretty much the exact same code. If I make a change to a transformation in one program, I have to make the exact same change to the other program. It would be nice to be able to have a third file that has all of my transformations. The batch program and the streaming program can then both reference this third file to know what transformations to perform on the data. Anyone know a good way of doing this? I want to be able to keep the exact same syntax (..rdd.filter({i:Int=i*2}.map(...).) in this third file. With this method, if I make any changes to the transformations, it will apply to both the batch AND streaming processes. I tried a couple of ideas with no avail. Thanks in advance, Sidd
Re: how to convert a sequence of TimeStamp to a dataframe
In general it needs to be a Seq of Tuples for the implicit toDF to work (which is a little tricky when there is only one column). scala Seq(Tuple1(new java.sql.Timestamp(System.currentTimeMillis))).toDF(a) res3: org.apache.spark.sql.DataFrame = [a: timestamp] or with multiple columns scala Seq((1, new java.sql.Timestamp(System.currentTimeMillis))).toDF(a, b) res4: org.apache.spark.sql.DataFrame = [a: string, b: timestamp] On Fri, Jul 31, 2015 at 2:50 PM, Joanne Contact joannenetw...@gmail.com wrote: Hi Guys, I have struggled for a while on this seeming simple thing: I have a sequence of timestamps and want to create a dataframe with 1 column. Seq[java.sql.Timestamp] //import collection.breakOut var seqTimestamp = scala.collection.Seq(listTs:_*) seqTimestamp: Seq[java.sql.Timestamp] = List(2015-07-22 16:52:00.0, 2015-07-22 16:53:00.0, ., ) I tried a lot of ways to create a dataframe and below is another failed way: import sqlContext.implicits._ var rddTs = sc.parallelize(seqTimestamp) rddTs.toDF(minInterval) console:108: error: value toDF is not a member of org.apache.spark.rdd.RDD[java.sql.Timestamp] rddTs.toDF(minInterval) So, any guru could please tell me how to do this I am not familiar with Scala or Spark. I wonder if learning Scala will help this at all? It just sounds a lot of time of trial/error and googling. docs like https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/DataFrame.html https://spark.apache.org/docs/1.3.0/api/java/org/apache/spark/sql/SQLContext.html#createDataFrame(scala.collection.Seq , scala.reflect.api.TypeTags.TypeTag) does not help. Btw, I am using Spark 1.4. Thanks in advance, J - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Writing to HDFS
I am executing a spark job on a cluster as a yarn-client(Yarn cluster not an option due to permission issues). - num-executors 800 - spark.akka.frameSize=1024 - spark.default.parallelism=25600 - driver-memory=4G - executor-memory=32G. - My input size is around 1.5TB. My problem is when I execute rdd.saveAsTextFile(outputPath, classOf[org.apache.hadoop.io.compress.SnappyCodec])(Saving as avro also not an option, I have tried saveAsSequenceFile with GZIP, saveAsNewAPIHadoopFile with same result), I get heap space issue. On the other hand if I execute rdd.take(1). I get no such issue. So I am assuming that issue is due to write.
Re: How to increase parallelism of a Spark cluster?
hi sujit Can you spin it with 4 (server)*4 (cores) 16 cores i.e there should be 16 cores in your cluster, try to use same no. of partitions. Also look at the http://apache-spark-user-list.1001560.n3.nabble.com/No-of-Task-vs-No-of-Executors-td23824.html On Tue, Aug 4, 2015 at 1:46 AM, Ajay Singal asinga...@gmail.com wrote: Hi Sujit, From experimenting with Spark (and other documentation), my understanding is as follows: 1. Each application consists of one or more Jobs 2. Each Job has one or more Stages 3. Each Stage creates one or more Tasks (normally, one Task per Partition) 4. Master allocates one Executor per Worker (that contains Partition) per Application 5. The Executor stays up for the lifetime of the Application (and dies when the Application ends) 6. Each Executor can run multiple Tasks in parallel (normally, the parallelism depends on the number of cores per Executor). 7. The Scheduler schedules only one Task from each Stage to one Executor. 8. If there are multiple Stages (from a Job) and these Stages could be run asynchronously (i.e., in parallel), one Task from each Stage could be scheduled on the same Executor (thus this Executor runs multiple Tasks in parallel: see #6 above). Of course, there could be many exception/exclusions to what I explained above. I expect that Spark community will confirm or correct my observations/understanding above. Now, let’s come back to your situation. You have a cluster of 4 Workers with 10 Partitions. All of these 10 Partitions are distributed among these 4 Workers. Also, from the information provided by you, your Application has just one Job with a two Stages (repartition and mapPartition). The mapPartition Stage will have 10 Tasks. Assuming my observations/understanding is correct, by virtue of #7 above, only 4 Tasks can be executed in parallel. The subsequent Jobs will have to wait. However, if you had 10 or more Workers, all Tasks would have been executed in parallel. BTW, I believe, you can have multiple Workers on one Physical Node. So, one of the solution to your problem would be to increase the number of Workers. Having said so, I believe #7 above is the bottleneck. If there is no good reason for keeping this bottleneck, this could be a good area of improvement (and needs to be addressed by Spark community). I will wait for the community response, and if needed, I will open a JIRA item. I hope it helps. Regards, Ajay On Mon, Aug 3, 2015 at 1:16 PM, Sujit Pal sujitatgt...@gmail.com wrote: @Silvio: the mapPartitions instantiates a HttpSolrServer, then for each query string in the partition, sends the query to Solr using SolrJ, and gets back the top N results. It then reformats the result data into one long string and returns the key value pair as (query string, result string). @Igor: Thanks for the parameter suggestions. I will check the --num-executors and if there is a way to set the number of cores/executor with my Databricks admin and update here if I find it, but from the Databricks console, it appears that the number of executors per box is 1. This seems normal though, per the diagram on this page: http://spark.apache.org/docs/latest/cluster-overview.html where it seems that there is 1 executor per box, and each executor can spawn multiple threads to take care of multiple tasks (see bullet #1 copied below). Each application gets its own executor processes, which stay up for the duration of the whole application and run tasks in multiple threads. This has the benefit of isolating applications from each other, on both the scheduling side (each driver schedules its own tasks) and executor side (tasks from different applications run in different JVMs). Regarding hitting the max number of requests, thanks for the link. I am using the default client. Just peeked at the Solr code, and the default settings (if no HttpClient instance is supplied in the ctor) is to use DefaultHttpClient (from HttpComponents) whose settings are as follows: - Version: HttpVersion.HTTP_1_1 - ContentCharset: HTTP.DEFAULT_CONTENT_CHARSET - NoTcpDelay: true - SocketBufferSize: 8192 - UserAgent: Apache-HttpClient/release (java 1.5) In addition, the Solr code sets the following additional config parameters on the DefaultHttpClient. params.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 128); params.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 32); params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, followRedirects); Since all my connections are coming out of 2 worker boxes, it looks like I could get 32x2 = 64 clients hitting Solr, right? @Steve: Thanks for the link to the HttpClient config. I was thinking about using a thread pool (or better using a PoolingHttpClientManager per the docs), but it probably won't help since its still being fed one request at a time.
Re: Python, Spark and HBase
I wanted to confirm whether this is now supported, such as in Spark v1.3.0 I've read varying info online just thought I'd verify. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Python-Spark-and-HBase-tp6142p24117.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org