Python's ReduceByKeyAndWindow DStream Keeps Growing
When I use reduceByKeyAndWindow with func and invFunc (in PySpark) the size of the window keeps growing. I am appending the code that reproduces this issue. This prints out the count() of the dstream which goes up every batch by 10 elements. Is this a bug in the Python version of Scala or is this expected behavior? Here is the code that reproduces this issue. from pyspark import SparkContext from pyspark.streaming import StreamingContext from pprint import pprint print 'Initializing ssc' ssc = StreamingContext(SparkContext(), batchDuration=1) ssc.checkpoint('ckpt') ds = ssc.textFileStream('input') \ .map(lambda event: (event,1)) \ .reduceByKeyAndWindow( func=lambda count1,count2: count1+count2, invFunc=lambda count1,count2: count1-count2, windowDuration=10, slideDuration=2) ds.pprint() ds.count().pprint() print 'Starting ssc' ssc.start() import itertools import time import random from distutils import dir_util def batch_write(batch_data, batch_file_path): with open(batch_file_path,'w') as batch_file: for element in batch_data: line = str(element) + \n batch_file.write(line) def xrange_write( batch_size = 5, batch_dir = 'input', batch_duration = 1): '''Every batch_duration write a file with batch_size numbers, forever. Start at 0 and keep incrementing. Intended for testing Spark Streaming code.''' dir_util.mkpath('./input') for i in itertools.count(): min = batch_size * i max = batch_size * (i + 1) batch_data = xrange(min,max) file_path = batch_dir + '/' + str(i) batch_write(batch_data, file_path) time.sleep(batch_duration) print 'Feeding data to app' xrange_write() ssc.awaitTermination()
Re: Spark executor lost because of time out even after setting quite long time out value 1000 seconds
It could be stuck on a GC pause, Can you check a bit more in the executor logs and see whats going on? Also from the driver UI you would get to know at which stage it is being stuck etc. Thanks Best Regards On Sun, Aug 16, 2015 at 11:45 PM, unk1102 umesh.ka...@gmail.com wrote: Hi I have written Spark job which seems to be working fine for almost an hour and after that executor start getting lost because of timeout I see the following in log statement 15/08/16 12:26:46 WARN spark.HeartbeatReceiver: Removing executor 10 with no recent heartbeats: 1051638 ms exceeds timeout 100 ms I dont see any errors but I see above warning and because of it executor gets removed by YARN and I see Rpc client disassociated error and IOException connection refused and FetchFailedException After executor gets removed I see it is again getting added and starts working and some other executors fails again. My question is is it normal for executor getting lost? What happens to that task lost executors were working on? My Spark job keeps on running since it is long around 4-5 hours I have very good cluster with 1.2 TB memory and good no of CPU cores. To solve above time out issue I tried to increase time spark.akka.timeout to 1000 seconds but no luck. I am using the following command to run my Spark job Please guide I am new to Spark. I am using Spark 1.4.1. Thanks in advance. /spark-submit --class com.xyz.abc.MySparkJob --conf spark.executor.extraJavaOptions=-XX:MaxPermSize=512M --driver-java-options -XX:MaxPermSize=512m --driver-memory 4g --master yarn-client --executor-memory 25G --executor-cores 8 --num-executors 5 --jars /path/to/spark-job.jar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-lost-because-of-time-out-even-after-setting-quite-long-time-out-value-1000-seconds-tp24289.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark hangs on collect (stuck on scheduler delay)
You need to debug further and figure out the bottle neck. Why are you doing a collect? If the dataset is too huge that will mostly hung the driver machine. It would be good if you can paste the sample code, without that its really hard to understand the flow of your program. Thanks Best Regards On Sun, Aug 16, 2015 at 1:14 PM, Sagi r stsa...@gmail.com wrote: Hi, I'm building a spark application in which I load some data from an Elasticsearch cluster (using latest elasticsearch-hadoop connector) and continue to perform some calculations on the spark cluster. In one case, I use collect on the RDD as soon as it is created (loaded from ES). However, it is sometimes hangs on one (and sometimes more) node and doesn't continue. In the web UI, I can see that one node is stuck on scheduler delay and prevents from the job to continue, (while others have finished). Do you have any idea what is going on here? The data that is being loaded is fairly small, and only gets mapped once to domain objects before being collected. Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hangs-on-collect-stuck-on-scheduler-delay-tp24283.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation
Or can I generally create new RDD from transformation and enrich its partitions with some metadata so that I would copy OffsetRanges in my new RDD in DStream? On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak oss.mli...@gmail.com wrote: Hi all, I need to transform KafkaRDD into a new stream of deserialized case classes. I want to use the new stream to save it to file and to perform additional transformations on it. To save it I want to use offsets in filenames, hence I need OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't know how to do it. Alternatively I could deserialize directly in messageHandler before KafkaRDD but it seems it is 1:1 transformation while I need to drop bad messages (KafkaRDD = RDD it would be flatMap). Is there a way how to do it using messageHandler, is there another approach? Many thanks for any help. Petr
Re: SparkPi is geting java.lang.NoClassDefFoundError: scala/collection/Seq
Yeah, lots of libraries needs to be changed to compile in order to run the examples in intellij. Thanks, Xiaohe On Mon, Aug 17, 2015 at 10:01 AM, Jeff Zhang zjf...@gmail.com wrote: Check module example's dependency (right click examples and click Open Modules Settings), by default scala-library is provided, you need to change it to compile to run SparkPi in Intellij. As I remember, you also need to change guava and jetty related library to compile too. On Mon, Aug 17, 2015 at 2:14 AM, xiaohe lan zombiexco...@gmail.com wrote: Hi, I am trying to run SparkPi in Intellij and getting NoClassDefFoundError. Anyone else saw this issue before ? Exception in thread main java.lang.NoClassDefFoundError: scala/collection/Seq at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) Caused by: java.lang.ClassNotFoundException: scala.collection.Seq at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 6 more Process finished with exit code 1 Thanks, Xiaohe -- Best Regards Jeff Zhang
Re: Cannot cast to Tuple when running in cluster mode
That looks like scala version mismatch. Thanks Best Regards On Fri, Aug 14, 2015 at 9:04 PM, saif.a.ell...@wellsfargo.com wrote: Hi All, I have a working program, in which I create two big tuples2 out of the data. This seems to work in local but when I switch over cluster standalone mode, I get this error at the very beggining: 15/08/14 10:22:25 WARN TaskSetManager: Lost task 4.0 in stage 1.0 (TID 10, 162.101.194.44): java.lang.ClassCastException: scala.collection.Iterator$$anon$13 cannot be cast to scala.Tuple2 at org.apache.spark.sql.DataFrame$$anonfun$33.apply(DataFrame.scala:1189) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) The data comes from JDBC, but I also tried persisting it into memory to turn it into a collection, in case JDBC was the problem. Any advice? Saif
Re: Too many files/dirs in hdfs
In Spark Streaming you can simply check whether your RDD contains any records or not and if records are there you can save them using FIleOutputStream: DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR STUFF} }; This will not create unnecessary files of 0 bytes. On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
Meaning of local[2]
What does this mean in .setMaster(local[2]) Is this applicable only for standalone Mode? Can I do this in a cluster setup, eg: . setMaster(hostname:port[2]).. Is it number of threads per worker node?
Re: Too many files/dirs in hdfs
Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
Re: spark streaming 1.3 doubts(force it to not consume anything)
How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function. I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, MapString, String kafkaParams, MapTopicAndPartition, Object fromOffsets, Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler, ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2, ClassTagDefaultDecoder evidence$3, ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) { super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, evidence$2, evidence$3, evidence$4, evidence$5); } @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processe=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } To create this stream I am using scala.collection.immutable.MapString, String scalakafkaParams = JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.Tuple2String, Stringconforms()); scala.collection.immutable.MapTopicAndPartition, Long scalaktopicOffsetMap= JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.Tuple2TopicAndPartition, Longconforms()); scala.Function1MessageAndMetadatabyte[], byte[], byte[][] handler = new FunctionMessageAndMetadatabyte[], byte[], byte[][]() { ..}); JavaDStreambyte[][] directKafkaStream = new CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class,byte[][].class); How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And how to use Function instead of Function1 ? On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger c...@koeninger.org wrote: I'm not aware of an existing api per se, but you could create your own subclass of the DStream that returns None for compute() under certain conditions. On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi Cody Can you help here if streaming 1.3 has any api for not consuming any message in next few runs? Thanks -- Forwarded message -- From: Shushant Arora shushantaror...@gmail.com Date: Wed, Aug 12, 2015 at 11:23 PM Subject: spark streaming 1.3 doubts(force it to not consume anything) To: user user@spark.apache.org I Can't make my stream application batch interval to change at run time . Its always fixed and it always creates jobs at specified batch inetval and enqueue them if earleir batch is not finished. My requirement is to process the events and post them to some external server and if external server is down I want to increase the batch time - that is not possible but can I make it not to consume any messages in say next 5 successive runs ?
Re: S3n, parallelism, partitions
s3n underneath uses the hadoop api, so i guess it would partition according to your hadoop configuration (128MB per partition by default) Thanks Best Regards On Mon, Aug 17, 2015 at 2:29 PM, matd matd...@gmail.com wrote: Hello, I would like to understand how the work is parallelized accross a Spark cluster (and what is left to the driver) when I read several files from a single folder in s3 s3n://bucket_xyz/some_folder_having_many_files_in_it/ How files (or file parts) are mapped to partitions ? Thanks Mathieu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3n-parallelism-partitions-tp24293.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to run spark in standalone mode on cassandra with high availability?
Have a look at Mesos. Thanks Best Regards On Sat, Aug 15, 2015 at 1:03 PM, Vikram Kone vikramk...@gmail.com wrote: Hi, We are planning to install Spark in stand alone mode on cassandra cluster. The problem, is since Cassandra has a no-SPOF architecture ie any node can become the master for the cluster, it creates the problem for Spark master since it's not a peer-peer architecture where any node can become the master. What are our options here? Are there any framworks or tools out there that would allow any application to run on a cluster of machines with high availablity?
Re: Meaning of local[2]
Hi Praveen, On Mon, Aug 17, 2015 at 12:34 PM, praveen S mylogi...@gmail.com wrote: What does this mean in .setMaster(local[2]) Local mode (executor in the same JVM) with 2 executor threads. Is this applicable only for standalone Mode? It is not applicable for standalone mode, only for local. Can I do this in a cluster setup, eg: . setMaster(hostname:port[2]).. No. It's faster to try than to ask a mailing list, actually. Also it's documented at http://spark.apache.org/docs/latest/submitting-applications.html#master-urls . Is it number of threads per worker node? You can control the number of total threads with spark-submit's --total-executor-cores parameter, if that's what you're looking for.
Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation
Hi all, I need to transform KafkaRDD into a new stream of deserialized case classes. I want to use the new stream to save it to file and to perform additional transformations on it. To save it I want to use offsets in filenames, hence I need OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't know how to do it. Alternatively I could deserialize directly in messageHandler before KafkaRDD but it seems it is 1:1 transformation while I need to drop bad messages (KafkaRDD = RDD it would be flatMap). Is there a way how to do it using messageHandler, is there another approach? Many thanks for any help. Petr
S3n, parallelism, partitions
Hello, I would like to understand how the work is parallelized accross a Spark cluster (and what is left to the driver) when I read several files from a single folder in s3 s3n://bucket_xyz/some_folder_having_many_files_in_it/ How files (or file parts) are mapped to partitions ? Thanks Mathieu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3n-parallelism-partitions-tp24293.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help with persist: Data is requested again
Are you triggering an action within the while loop? How are you loading the data from jdbc? You need to make sure the job has enough partitions to run parallel to increase the performance. Thanks Best Regards On Sat, Aug 15, 2015 at 2:41 AM, saif.a.ell...@wellsfargo.com wrote: Hello all, I am writing a program which calls from a database. A run a couple computations, but in the end I have a while loop, in which I make a modification to the persisted thata. eg: val data = PairRDD... persist() var i = 0 while (i 10) { val data_mod = data.map(_._1 + 1, _._2) val data_joined = data.join(data_mod) ... do stuff with data_joined } Sadly, the result causes that the shuffle inside the WHILE loop is causing a jdbc call and that is very slow. It is not finding the data locally How can I help myself? Saif
Re: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString
Have you tried adding path to hbase-protocol jar to spark.driver.extraClassPath and spark.executor.extraClassPath ? Cheers On Mon, Aug 17, 2015 at 7:51 PM, stark_summer stark_sum...@qq.com wrote: spark vesion:1.4.1 java version:1.7 hadoop version: Hadoop 2.3.0-cdh5.1.0 submit spark job to yarn cluster that read hbase data,after job running, it comes below error : 15/08/17 19:28:33 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString ca nnot access its superclass com.google.protobuf.LiteralByteString org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString at org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:210) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:121) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90) at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:264) at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:169) at org.apache.hadoop.hbase.client.ClientScanner.init(ClientScanner.java:164) at org.apache.hadoop.hbase.client.ClientScanner.init(ClientScanner.java:107) at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:736) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:178) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:82) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.isTableAvailable(HConnectionManager.java:962) at org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1081) at org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1089) at com.umeng.dp.yuliang.play.HBaseToES$.main(HBaseToES.scala:28) at com.umeng.dp.yuliang.play.HBaseToES.main(HBaseToES.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:483) Caused by: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:930) at org.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:434) at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:297) at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:157) at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:57) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114) ... 18 more PS: running hadoop mr on yarn that read hbase data, also have this error, https://issues.apache.org/jira/browse/HBASE-10304,that is hbase issues , when submit hadoop mr, add export HADOOP_CLASSPATH=./hbase/hbase-protocol/target/hbase-protocol-0.99.0-SNAPSHOT.jar to shell comand or add export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar to linux /etc/basrc file,it can work well, but submit spark job ,it can not work -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalAccessError-class-com-google-protobuf-HBaseZeroCopyByteString-cannot-access-its-supg-tp24303.html Sent from the Apache Spark
how do I execute a job on a single worker node in standalone mode
I have a 4 node cluster and have been playing around with the num-executors parameters, executor-memory and executor-cores I set the following: --executor-memory=10G --num-executors=1 --executor-cores=8 But when I run the job, I see that each worker, is running one executor which has 2 cores and 2.5G memory. What I'd like to do instead is have Spark just allocate the job to a single worker node? Is that possible in standalone mode or do I need a job/resource scheduler like Yarn to do that? Thanks in advance, -Axel
Spark 1.4.1 - Mac OSX Yosemite
Has anyone experienced issues running Spark 1.4.1 on a Mac OSX Yosemite? I'm been running a standalone 1.3.1 fine but it failed when trying to run 1.4.1. (I also trie 1.4.0). I've tried both the pre-built packages as well as compiling from source, both with the same results (I can successfully compile with both mvn and sbt (after fixing the sbt.jar - which was corrupt) After downloading/building spark and running ./bin/pyspark or ./bin/spark-shell it silently exits with a code 1. Creating a context in python I get: Exception: Java gateway process exited before sending the driver its port number I couldn't find any specific resolutions on the web. I did add 'pyspark-shell' to the PYSPARK_SUBMIT_ARGS but to no effect. Anyone have any further ideas I can explore? Cheers -Alun.
Re: grpah x issue spark 1.3
the code below is taken from the spark website and generates the error detailed Hi using spark 1.3 and trying some sample code: val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, (rxin, student)), (7L, (jgonzal, postdoc)), (5L, (franklin, prof)), (2L, (istoica, prof // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, collab), Edge(5L, 3L, advisor), Edge(2L, 5L, colleague), Edge(5L, 7L, pi))) // Define a default user in case there are relationship with missing user val defaultUser = (John Doe, Missing) // Build the initial Graph val graph = Graph(users, relationships, defaultUser) when i run: graph.numEdges all works well but with graph.numVertices it falls over and i get a whole heap of errors: Failed to open file: /tmp/spark..shuffle_0_21_0.index at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202) at org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74) at org.apache.spark.network.server.TransporSLF4J: Class path contains multiple SLF4J bindings. Is anyone else experiencing this? Ive tried different graphs and always end up with the same results. thanks On Tue, 18 Aug 2015 at 12:15 am, Sonal Goyal sonalgoy...@gmail.com wrote: I have been using graphx in production on 1.3 and 1.4 with no issues. What's the exception you see and what are you trying to do? On Aug 17, 2015 10:49 AM, dizzy5112 dave.zee...@gmail.com wrote: Hi using spark 1.3 and trying some sample code: when i run: all works well but with it falls over and i get a whole heap of errors: Is anyone else experiencing this? Ive tried different graphs and always end up with the same results. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/grpah-x-issue-spark-1-3-tp24292.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.4.1 - Mac OSX Yosemite
I had success earlier today on OSX Yosemite 10.10.4 building Spark 1.4.1 using these instructions http://genomegeek.blogspot.com/2014/11/how-to-install-apache-spark-on-mac-os-x.html (using `$ sbt/sbt clean assembly`, with the additional step of downloading the proper sbt-launch.jar (0.13.7) from here http://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/ and replacing the one that is in build/ as you noted. You've set SCALA_HOME and JAVA_HOME environment variables? On Mon, Aug 17, 2015 at 8:36 PM, Alun Champion a...@achampion.net wrote: Has anyone experienced issues running Spark 1.4.1 on a Mac OSX Yosemite? I'm been running a standalone 1.3.1 fine but it failed when trying to run 1.4.1. (I also trie 1.4.0). I've tried both the pre-built packages as well as compiling from source, both with the same results (I can successfully compile with both mvn and sbt (after fixing the sbt.jar - which was corrupt) After downloading/building spark and running ./bin/pyspark or ./bin/spark-shell it silently exits with a code 1. Creating a context in python I get: Exception: Java gateway process exited before sending the driver its port number I couldn't find any specific resolutions on the web. I did add 'pyspark-shell' to the PYSPARK_SUBMIT_ARGS but to no effect. Anyone have any further ideas I can explore? Cheers -Alun. -- # +17344761472
Re: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString
approach1: submit spark job add bolow: --conf spark.driver.extraClassPath=/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar --conf spark.executor.extraClassPath=/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar such as: /home/dp/spark/spark-1.4/spark-1.4.1/bin/spark-submit --class com.umeng.dp.yuliang.play.HBaseToES --master yarn-cluster --conf spark.driver.extraClassPath=/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar --conf spark.executor.extraClassPath=/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar --jars /home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar ScalaMR-0.0.1-jar-with-dependencies.jar approach2: add below config to $SPARK_HOME/conf/spark-deafults.conf spark.driver.extraClassPath /home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar spark.executor.extraClassPath /home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalAccessError-class-com-google-protobuf-HBaseZeroCopyByteString-cannot-access-its-supg-tp24303p24306.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString
spark vesion:1.4.1 java version:1.7 hadoop version: Hadoop 2.3.0-cdh5.1.0 submit spark job to yarn cluster that read hbase data,after job running, it comes below error : 15/08/17 19:28:33 ERROR yarn.ApplicationMaster: User class threw exception: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString ca nnot access its superclass com.google.protobuf.LiteralByteString org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString at org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:210) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:121) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90) at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:264) at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:169) at org.apache.hadoop.hbase.client.ClientScanner.init(ClientScanner.java:164) at org.apache.hadoop.hbase.client.ClientScanner.init(ClientScanner.java:107) at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:736) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:178) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:82) at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.isTableAvailable(HConnectionManager.java:962) at org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1081) at org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1089) at com.umeng.dp.yuliang.play.HBaseToES$.main(HBaseToES.scala:28) at com.umeng.dp.yuliang.play.HBaseToES.main(HBaseToES.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:483) Caused by: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:800) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:930) at org.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:434) at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:297) at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:157) at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:57) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114) ... 18 more PS: running hadoop mr on yarn that read hbase data, also have this error, https://issues.apache.org/jira/browse/HBASE-10304,that is hbase issues , when submit hadoop mr, add export HADOOP_CLASSPATH=./hbase/hbase-protocol/target/hbase-protocol-0.99.0-SNAPSHOT.jar to shell comand or add export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar to linux /etc/basrc file,it can work well, but submit spark job ,it can not work -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalAccessError-class-com-google-protobuf-HBaseZeroCopyByteString-cannot-access-its-supg-tp24303.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Serializing MLlib MatrixFactorizationModel
I'd recommend using the built-in save and load, which will be better for cross-version compatibility. You should be able to call myModel.save(path), and load it back with MatrixFactorizationModel.load(path). On Mon, Aug 17, 2015 at 6:31 AM, Madawa Soysa madawa...@cse.mrt.ac.lk wrote: Hi All, I have an issue when i try to serialize a MatrixFactorizationModel object as a java object in a Java application. When I deserialize the object, I get the following exception. Caused by: java.lang.ClassNotFoundException: org.apache.spark.OneToOneDependency cannot be found by org.scala-lang.scala-library_2.10.4.v20140209-180020-VFINAL-b66a39653b Any solution for this? -- *_**Madawa Soysa* Undergraduate, Department of Computer Science and Engineering, University of Moratuwa. Mobile: +94 71 461 6050 %2B94%2075%20812%200726 | Email: madawa...@cse.mrt.ac.lk LinkedIn http://lk.linkedin.com/in/madawasoysa | Twitter https://twitter.com/madawa_rc | Tumblr http://madawas.tumblr.com/
Re: Spark 1.4.1 - Mac OSX Yosemite
Yes, they both are set. Just recompiled and still no success, silent failure. Which versions of java and scala are you using? On 17 August 2015 at 19:59, Charlie Hack charles.t.h...@gmail.com wrote: I had success earlier today on OSX Yosemite 10.10.4 building Spark 1.4.1 using these instructions http://genomegeek.blogspot.com/2014/11/how-to-install-apache-spark-on-mac-os-x.html (using `$ sbt/sbt clean assembly`, with the additional step of downloading the proper sbt-launch.jar (0.13.7) from here http://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/ and replacing the one that is in build/ as you noted. You've set SCALA_HOME and JAVA_HOME environment variables? On Mon, Aug 17, 2015 at 8:36 PM, Alun Champion a...@achampion.net wrote: Has anyone experienced issues running Spark 1.4.1 on a Mac OSX Yosemite? I'm been running a standalone 1.3.1 fine but it failed when trying to run 1.4.1. (I also trie 1.4.0). I've tried both the pre-built packages as well as compiling from source, both with the same results (I can successfully compile with both mvn and sbt (after fixing the sbt.jar - which was corrupt) After downloading/building spark and running ./bin/pyspark or ./bin/spark-shell it silently exits with a code 1. Creating a context in python I get: Exception: Java gateway process exited before sending the driver its port number I couldn't find any specific resolutions on the web. I did add 'pyspark-shell' to the PYSPARK_SUBMIT_ARGS but to no effect. Anyone have any further ideas I can explore? Cheers -Alun. -- # +17344761472
Re: spark streaming 1.3 doubts(force it to not consume anything)
Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones that take a JavaStreamingContext) On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora shushantaror...@gmail.com wrote: How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function. I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, MapString, String kafkaParams, MapTopicAndPartition, Object fromOffsets, Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler, ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2, ClassTagDefaultDecoder evidence$3, ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) { super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, evidence$2, evidence$3, evidence$4, evidence$5); } @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processe=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } To create this stream I am using scala.collection.immutable.MapString, String scalakafkaParams = JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.Tuple2String, Stringconforms()); scala.collection.immutable.MapTopicAndPartition, Long scalaktopicOffsetMap= JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.Tuple2TopicAndPartition, Longconforms()); scala.Function1MessageAndMetadatabyte[], byte[], byte[][] handler = new FunctionMessageAndMetadatabyte[], byte[], byte[][]() { ..}); JavaDStreambyte[][] directKafkaStream = new CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class,byte[][].class); How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And how to use Function instead of Function1 ? On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger c...@koeninger.org wrote: I'm not aware of an existing api per se, but you could create your own subclass of the DStream that returns None for compute() under certain conditions. On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi Cody Can you help here if streaming 1.3 has any api for not consuming any message in next few runs? Thanks -- Forwarded message -- From: Shushant Arora shushantaror...@gmail.com Date: Wed, Aug 12, 2015 at 11:23 PM Subject: spark streaming 1.3 doubts(force it to not consume anything) To: user user@spark.apache.org I Can't make my stream application batch interval to change at run time . Its always fixed and it always creates jobs at specified batch inetval and enqueue them if earleir batch is not finished. My requirement is to process the events and post them to some external server and if external server is down I want to increase the batch time - that is not possible but can I make it not to consume any messages in say next 5 successive runs ?
rdd count is throwing null pointer exception
Hi All, Thank you very much for the detailed explanation. I have scenario like this- I have rdd of ticket records and another rdd of booking records. for each ticket record, i need to check whether any link exists in booking table. val ticketCachedRdd = ticketRdd.cache ticketRdd.foreach{ ticket = val bookingRecords = queryOnBookingTable (date, flightNumber, flightCarrier) // this function queries the booking table and retrieves the booking rows println(ticketCachedRdd.count) // this is throwing Null pointer exception } Is there somthing wrong in the count, i am trying to use the count of cached rdd when looping through the actual rdd. whats wrong in this ? Thanks, Padma Ch
Re: grpah x issue spark 1.3
I have been using graphx in production on 1.3 and 1.4 with no issues. What's the exception you see and what are you trying to do? On Aug 17, 2015 10:49 AM, dizzy5112 dave.zee...@gmail.com wrote: Hi using spark 1.3 and trying some sample code: when i run: all works well but with it falls over and i get a whole heap of errors: Is anyone else experiencing this? Ive tried different graphs and always end up with the same results. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/grpah-x-issue-spark-1-3-tp24292.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Interview Questions
This statement is from the Spark's website itself. Regards, Sandeep Giri, +1 347 781 4573 (US) +91-953-899-8962 (IN) www.KnowBigData.com. http://KnowBigData.com. Phone: +1-253-397-1945 (Office) [image: linkedin icon] https://linkedin.com/company/knowbigdata [image: other site icon] http://knowbigdata.com [image: facebook icon] https://facebook.com/knowbigdata [image: twitter icon] https://twitter.com/IKnowBigData https://twitter.com/IKnowBigData On Wed, Aug 12, 2015 at 10:42 PM, Peyman Mohajerian mohaj...@gmail.com wrote: I think this statement is inaccurate: Q7: What are Actions? A: An action brings back the data from the RDD to the local machine - Also I wouldn't say Spark is 100x faster than Hadoop and it is memory based. This is the kind of statement that will not get you the job. When it comes to shuffle it has to write to disk, it is a faster in many cases but 100x is just some marketing statement in a very narrow use cases. On Thu, Jul 30, 2015 at 4:55 AM, Sandeep Giri sand...@knowbigdata.com wrote: i have prepared some interview questions: http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-1 http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2 please provide your feedback. On Wed, Jul 29, 2015, 23:43 Pedro Rodriguez ski.rodrig...@gmail.com wrote: You might look at the edx course on Apache Spark or ML with Spark. There are probably some homework problems or quiz questions that might be relevant. I haven't looked at the course myself, but thats where I would go first. https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Serializing MLlib MatrixFactorizationModel
Hi All, I have an issue when i try to serialize a MatrixFactorizationModel object as a java object in a Java application. When I deserialize the object, I get the following exception. Caused by: java.lang.ClassNotFoundException: org.apache.spark.OneToOneDependency cannot be found by org.scala-lang.scala-library_2.10.4.v20140209-180020-VFINAL-b66a39653b Any solution for this? -- *_**Madawa Soysa* Undergraduate, Department of Computer Science and Engineering, University of Moratuwa. Mobile: +94 71 461 6050 %2B94%2075%20812%200726 | Email: madawa...@cse.mrt.ac.lk LinkedIn http://lk.linkedin.com/in/madawasoysa | Twitter https://twitter.com/madawa_rc | Tumblr http://madawas.tumblr.com/
Re: rdd count is throwing null pointer exception
The error could be because of the missing brackets after the word cache - .ticketRdd.cache() On Aug 17, 2015, at 7:26 AM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, Thank you very much for the detailed explanation. I have scenario like this- I have rdd of ticket records and another rdd of booking records. for each ticket record, i need to check whether any link exists in booking table. val ticketCachedRdd = ticketRdd.cache ticketRdd.foreach{ ticket = val bookingRecords = queryOnBookingTable (date, flightNumber, flightCarrier) // this function queries the booking table and retrieves the booking rows println(ticketCachedRdd.count) // this is throwing Null pointer exception } Is there somthing wrong in the count, i am trying to use the count of cached rdd when looping through the actual rdd. whats wrong in this ? Thanks, Padma Ch - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Paper on Spark SQL
Hi, I can't access http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf. Could someone help try to see if it is available and reply with it?Thanks!
Re: Paper on Spark SQL
I got 404 when trying to access the link. On Aug 17, 2015, at 5:31 AM, Todd bit1...@163.com wrote: Hi, I can't access http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf. Could someone help try to see if it is available and reply with it?Thanks!
Re: Paper on Spark SQL
an extra “,” is at the end -- Nan Zhu http://codingcat.me On Monday, August 17, 2015 at 9:28 AM, Ted Yu wrote: I got 404 when trying to access the link. On Aug 17, 2015, at 5:31 AM, Todd bit1...@163.com (mailto:bit1...@163.com) wrote: Hi, I can't access http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf. (http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf,) Could someone help try to see if it is available and reply with it?Thanks!
Exception when S3 path contains colons
Hi, I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0). I'm seeing the exception below when encountering file names that contain colons. Any idea on how to get around this? scala val files = sc.textFile(s3a://redactedbucketname/*) 2015-08-18 04:38:34,567 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with curMem=669367, maxMem=285203496 2015-08-18 04:38:34,568 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory (estimated size 236.5 KB, free 271.1 MB) 2015-08-18 04:38:34,663 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with curMem=911591, maxMem=285203496 2015-08-18 04:38:34,664 INFO [main] storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in memory (estimated size 21.0 KB, free 271.1 MB) 2015-08-18 04:38:34,665 INFO [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on 10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB) 2015-08-18 04:38:34,667 INFO [main] spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at console:21 files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at console:21 scala files.count 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:listStatus(533)) - List status for path: s3a://redactedbucketname/ 2015-08-18 04:38:37,262 INFO [main] s3a.S3AFileSystem (S3AFileSystem.java:getFileStatus(684)) - Getting path status for s3a://redactedbucketname/ () java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: [922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv at org.apache.hadoop.fs.Path.initialize(Path.java:206) at org.apache.hadoop.fs.Path.init(Path.java:172) at org.apache.hadoop.fs.Path.init(Path.java:94) at org.apache.hadoop.fs.Globber.glob(Globber.java:240) at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24) at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29) at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31) at $iwC$iwC$iwC$iwC$iwC.init(console:33) at $iwC$iwC$iwC$iwC.init(console:35) at $iwC$iwC$iwC.init(console:37) at $iwC$iwC.init(console:39) at $iwC.init(console:41) at init(console:43) at .init(console:47) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org http://org.apache.spark.repl.sparkiloop.org/ $apache$spark$repl$SparkILoop$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$anonfun$org$apache$spark$repl$SparkILoop$process$1.apply$mcZ$sp(SparkILoop.scala:997) at
Re: Paper on Spark SQL
Thanks Nan. That is why I always put an extra space between URL and punctuation in my comments / emails. On Mon, Aug 17, 2015 at 6:31 AM, Nan Zhu zhunanmcg...@gmail.com wrote: an extra “,” is at the end -- Nan Zhu http://codingcat.me On Monday, August 17, 2015 at 9:28 AM, Ted Yu wrote: I got 404 when trying to access the link. On Aug 17, 2015, at 5:31 AM, Todd bit1...@163.com wrote: Hi, I can't access http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf. http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf, Could someone help try to see if it is available and reply with it?Thanks!
Re: Left outer joining big data set with small lookups
Try doing a count on both lookups to force the caching to occur before the join. On 8/17/15, 12:39 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote: Thanks for your help I tried to cache the lookup tables and left out join with the big table (DF). Join does not seem to be using broadcast join-still it goes with hash partition join and shuffling big table. Here is the scenario … table1 as big_df left outer join table2 as lkup on big_df.lkupid = lkup.lkupid table1 above is well distributed across all 40 partitions because sqlContext.sql(SET spark.sql.shuffle.partitions=40). table2 is small, using just 2 partition. s. After the join stage, sparkUI showed me that all activities ended up in just 2 executors. When I tried to dump the data in hdfs after join stage, all data ended up in 2 partition files and rest 38 files are 0 sized files. Since above one did not work, I tried to broadcast DF and registered as table before join. val table2_df = sqlContext.sql(select * from table2) val broadcast_table2 =sc.broadcast(table2_df) broadcast_table2.value.registerTempTable(“table2”) Broadcast is also having same issue as explained above. All data processed by just executors due to lookup skew. Any more idea to tackle this issue in Spark Dataframe? Thanks Vijay On Aug 14, 2015, at 10:27 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: You could cache the lookup DataFrames, it’ll then do a broadcast join. On 8/14/15, 9:39 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote: Hi I am facing huge performance problem when I am trying to left outer join very big data set (~140GB) with bunch of small lookups [Start schema type]. I am using data frame in spark sql. It looks like data is shuffled and skewed when that join happens. Is there any way to improve performance of such type of join in spark? How can I hint optimizer to go with replicated join etc., to avoid shuffle? Would it help to create broadcast variables on small lookups? If I create broadcast variables, how can I convert them into data frame and use them in sparksql type of join? Thanks Vijay - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setting up Spark/flume/? to Ingest 10TB from FTP
with the right ftp client JAR on your classpath (I forget which), you can use ftp:// a a source for a hadoop FS operation. you may even be able to use it as an input for some spark (non streaming job directly. On 14 Aug 2015, at 14:11, Varadhan, Jawahar varad...@yahoo.com.INVALIDmailto:varad...@yahoo.com.INVALID wrote: Thanks Marcelo. But our problem is little complicated. We have 10+ ftp sites that we will be transferring data from. The ftp server info, filename, credentials are all coming via Kafka message. So, I want to read those kafka message and dynamically connect to the ftp site and download those fat files and store it in HDFS. And hence, I was planning to use Spark Streaming with Kafka or Flume with Kafka. But flume runs on a JVM and may not be the best option as the huge file will create memory issues. Please suggest someway to run it inside the cluster. From: Marcelo Vanzin van...@cloudera.commailto:van...@cloudera.com To: Varadhan, Jawahar varad...@yahoo.commailto:varad...@yahoo.com Cc: d...@spark.apache.orgmailto:d...@spark.apache.org d...@spark.apache.orgmailto:d...@spark.apache.org Sent: Friday, August 14, 2015 3:23 PM Subject: Re: Setting up Spark/flume/? to Ingest 10TB from FTP Why do you need to use Spark or Flume for this? You can just use curl and hdfs: curl ftp://blahftp://blah/ | hdfs dfs -put - /blah On Fri, Aug 14, 2015 at 1:15 PM, Varadhan, Jawahar varad...@yahoo.com.invalidmailto:varad...@yahoo.com.invalid wrote: What is the best way to bring such a huge file from a FTP server into Hadoop to persist in HDFS? Since a single jvm process might run out of memory, I was wondering if I can use Spark or Flume to do this. Any help on this matter is appreciated. I prefer a application/process running inside Hadoop which is doing this transfer Thanks. -- Marcelo
Re: rdd count is throwing null pointer exception
Looks like because of Spark-5063 RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x = rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. On Mon, Aug 17, 2015 at 8:13 PM, Preetam preetam...@gmail.com wrote: The error could be because of the missing brackets after the word cache - .ticketRdd.cache() On Aug 17, 2015, at 7:26 AM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, Thank you very much for the detailed explanation. I have scenario like this- I have rdd of ticket records and another rdd of booking records. for each ticket record, i need to check whether any link exists in booking table. val ticketCachedRdd = ticketRdd.cache ticketRdd.foreach{ ticket = val bookingRecords = queryOnBookingTable (date, flightNumber, flightCarrier) // this function queries the booking table and retrieves the booking rows println(ticketCachedRdd.count) // this is throwing Null pointer exception } Is there somthing wrong in the count, i am trying to use the count of cached rdd when looping through the actual rdd. whats wrong in this ? Thanks, Padma Ch
Re: S3n, parallelism, partitions
This will also depend on the file format you are using. A word of advice: you would be much better off with the s3a file system. As I found out recently the hard way, s3n has some issues with reading through entire files even when looking for headers. On Mon, Aug 17, 2015 at 2:10 AM, Akhil Das ak...@sigmoidanalytics.com wrote: s3n underneath uses the hadoop api, so i guess it would partition according to your hadoop configuration (128MB per partition by default) Thanks Best Regards On Mon, Aug 17, 2015 at 2:29 PM, matd matd...@gmail.com wrote: Hello, I would like to understand how the work is parallelized accross a Spark cluster (and what is left to the driver) when I read several files from a single folder in s3 s3n://bucket_xyz/some_folder_having_many_files_in_it/ How files (or file parts) are mapped to partitions ? Thanks Mathieu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/S3n-parallelism-partitions-tp24293.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark - Parallel Processing of messages from Kafka - Java
val numStreams = 4 val kafkaStreams = (1 to numStreams).map { i = KafkaUtils.createStream(...) } In a Java in a for loop you will create four streams using KafkaUtils.createStream() so that each receiver will run in different threads for more information please visit http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving Hope it helps! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Parallel-Processing-of-messages-from-Kafka-Java-tp24284p24297.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
What's the logic in RangePartitioner.rangeBounds method of Apache Spark
*Firstly so sorry for my poor English.* I was reading the source code of Apache Spark 1.4.1 and I really got stuck at the logic of RangePartitioner.rangeBounds method. The code is shown below. So can anyone please explain me that: 1. What is 3.0 * for in the code line of val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt? Why choose 3.0 rather than other values? 2. Why fraction * n sampleSizePerPartition means that a partition contains much more than the average number of items. Can you give an example that we need to re-sample the partition? Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-s-the-logic-in-RangePartitioner-rangeBounds-method-of-Apache-Spark-tp24296.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Programmatically create SparkContext on YARN
Hi all, when runnig the Spark cluster in standalone mode I am able to create the Spark context from Java via the following code snippet: SparkConf conf = new SparkConf() .setAppName(MySparkApp) .setMaster(spark://SPARK_MASTER:7077) .setJars(jars); JavaSparkContext sc = new JavaSparkContext(conf); As soon as I'm done with my processing, I can just close it via sc.stop(); Now my question: Is the same also possible when running Spark on YARN? I currently don't see how this should be possible without submitting your application as a packaged jar file. Is there a way to get this kind of interactivity from within your Scala/Java code? Regards, Andrea
Re: issue Running Spark Job on Yarn Cluster
Did you resolve this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21779p24300.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Embarassingly parallel computation in SparkR?
Hi, I'm wondering how to achieve, say, a Monte Carlo simulation in SparkR without use of low level RDD functions that were made private in 1.4, such as parallelize and map. Something like parallelize(sc, 1:1000).map ( ### R code that does my computation ) where the code is the same on every node, only with different seeds. (I'm going to use this code with SparkR:::parallelize, but I'm wondering if there is a better way, or whether this might be a use case that would justify not making those functions private?) Many thanks! kristina
registering an empty RDD as a temp table in a PySpark SQL context
I have an RDD queried from a scan of a data source. Sometimes the RDD has rows and at other times it has none. I would like to register this RDD as a temporary table in a SQL context. I suspect this will work in Scala, but in PySpark some code assumes that the RDD has rows in it, which are used to verify the schema: https://github.com/apache/spark/blob/branch-1.3/python/pyspark/sql/context.py#L299 Before I attempt to extend the Scala code to handle an empty RDD or provide an empty DataFrame that can be registered, I was wondering what people recommend in this case. Perhaps there's a simple way of registering an empty RDD as a temporary table in a PySpark SQL context that I'm overlooking. An alternative is to add special case logic in the client code to deal with an RDD backed by an empty table scan. But since the SQL will already handle that, I was hoping to avoid special case logic. Eric
Re: Spark on scala 2.11 build fails due to incorrect jline dependency in REPL
You were building against 1.4.x, right ? In master branch, switch-to-scala-2.11.sh is gone. There is scala-2.11 profile. FYI On Sun, Aug 16, 2015 at 11:12 AM, Stephen Boesch java...@gmail.com wrote: I am building spark with the following options - most notably the **scala-2.11**: . dev/switch-to-scala-2.11.sh mvn -Phive -Pyarn -Phadoop-2.6 -Dhadoop2.6.2 -Pscala-2.11 -DskipTests -Dmaven.javadoc.skip=true clean package The build goes pretty far but fails in one of the minor modules *repl*: [INFO] [ERROR] Failed to execute goal on project spark-repl_2.11: Could not resolve dependencies for project org.apache.spark:spark-repl_2.11:jar:1.5.0-SNAPSHOT: Could not find artifact org.scala-lang:jline:jar:2.11.7 in central (https://repo1.maven.org/maven2) - [Help 1] Upon investigation - from 2.11.5 and later the scala version of jline is no longer required: they use the default jline distribution. And in fact the repl only shows dependency on jline for the 2.10.4 scala version: profile idscala-2.10/id activation propertyname!scala-2.11/name/property /activation properties scala.version2.10.4/scala.version scala.binary.version2.10/scala.binary.version jline.version${scala.version}/jline.version jline.groupidorg.scala-lang/jline.groupid /properties dependencyManagement dependencies dependency groupId${jline.groupid}/groupId artifactIdjline/artifactId version${jline.version}/version /dependency /dependencies /dependencyManagement /profile So then it is not clear why this error is occurring. Pointers appreciated.
Re: Spark on scala 2.11 build fails due to incorrect jline dependency in REPL
In 1.4 it is change-scala-version.sh 2.11 But the problem was it is a -Dscala-211 not a -P. I misread the doc's. 2015-08-17 14:17 GMT-07:00 Ted Yu yuzhih...@gmail.com: You were building against 1.4.x, right ? In master branch, switch-to-scala-2.11.sh is gone. There is scala-2.11 profile. FYI On Sun, Aug 16, 2015 at 11:12 AM, Stephen Boesch java...@gmail.com wrote: I am building spark with the following options - most notably the **scala-2.11**: . dev/switch-to-scala-2.11.sh mvn -Phive -Pyarn -Phadoop-2.6 -Dhadoop2.6.2 -Pscala-2.11 -DskipTests -Dmaven.javadoc.skip=true clean package The build goes pretty far but fails in one of the minor modules *repl*: [INFO] [ERROR] Failed to execute goal on project spark-repl_2.11: Could not resolve dependencies for project org.apache.spark:spark-repl_2.11:jar:1.5.0-SNAPSHOT: Could not find artifact org.scala-lang:jline:jar:2.11.7 in central (https://repo1.maven.org/maven2) - [Help 1] Upon investigation - from 2.11.5 and later the scala version of jline is no longer required: they use the default jline distribution. And in fact the repl only shows dependency on jline for the 2.10.4 scala version: profile idscala-2.10/id activation propertyname!scala-2.11/name/property /activation properties scala.version2.10.4/scala.version scala.binary.version2.10/scala.binary.version jline.version${scala.version}/jline.version jline.groupidorg.scala-lang/jline.groupid /properties dependencyManagement dependencies dependency groupId${jline.groupid}/groupId artifactIdjline/artifactId version${jline.version}/version /dependency /dependencies /dependencyManagement /profile So then it is not clear why this error is occurring. Pointers appreciated.
Spark Job Hangs on our production cluster
I am comparing the log of Spark line by line between the hanging case (big dataset) and not hanging case (small dataset). In the hanging case, the Spark's log looks identical with not hanging case for reading the first block data from the HDFS. But after that, starting from line 438 in the spark-hang.log, I only see the log generated from Worker, like following in the next 10 minutes: 15/08/14 14:24:19 DEBUG Worker: [actor] received message SendHeartbeat from Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:24:19 DEBUG Worker: [actor] handled message (0.121965 ms) SendHeartbeat from Actor[akka://sparkWorker/user/Worker#90699948]...15/08/14 14:33:04 DEBUG Worker: [actor] received message SendHeartbeat from Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:33:04 DEBUG Worker: [actor] handled message (0.136146 ms) SendHeartbeat from Actor[akka://sparkWorker/user/Worker#90699948] until almost 10 minutes I have to kill the job. I know it will hang forever. But in the good log (spark-finished.log), starting from the line 361, Spark started to read the 2nd split data, I can see all the debug message from BlockReaderLocal, BlockManger. If I compared between these 2 cases log: in the good log case from line 478, I can saw this message:15/08/14 14:37:09 DEBUG BlockReaderLocal: putting FileInputStream for .. But in the hang log case for reading the 2nd split data, I don't see this message any more (It existed for the 1st split). I believe in this case, this log message should show up, as the 2nd split block also existed on this Spark node, as just before it, I can see the following debug message: 15/08/14 14:24:11 DEBUG BlockReaderLocal: Created BlockReaderLocal for file /services/contact2/data/contacts/20150814004805-part-r-2.avro block BP-834217708-10.20.95.130-1438701195738:blk_1074484553_1099531839081 in datanode 10.20.95.146:5001015/08/14 14:24:11 DEBUG Project: Creating MutableProj: WrappedArray(), inputSchema: ArrayBuffer(account_id#0L, contact_id#1, sequence_id#2, state#3, name#4, kind#5, prefix_name#6, first_name#7, middle_name#8, company_name#9, job_title#10, source_name#11, source_details#12, provider_name#13, provider_details#14, created_at#15L, create_source#16, updated_at#17L, update_source#18, accessed_at#19L, deleted_at#20L, delta#21, birthday_day#22, birthday_month#23, anniversary#24L, contact_fields#25, related_contacts#26, contact_channels#27, contact_notes#28, contact_service_addresses#29, contact_street_addresses#30), codegen:false This log is generated on node (10.20.95.146), and Spark created BlockReaderLocal to read the data from the local node. Now my question is, can someone give me any idea why DEBUG BlockReaderLocal: putting FileInputStream for doesn't show up any more in this case? I attached the log files again in this email, and really hope I can get some help from this list. Thanks Yong From: java8...@hotmail.com To: user@spark.apache.org Subject: RE: Spark Job Hangs on our production cluster Date: Fri, 14 Aug 2015 15:14:10 -0400 I still want to check if anyone can provide any help related to the Spark 1.2.2 will hang on our production cluster when reading Big HDFS data (7800 avro blocks), while looks fine for small data (769 avro blocks). I enable the debug level in the spark log4j, and attached the log file if it helps to trouble shooting in this case. Summary of our cluster: IBM BigInsight V3.0.0.2 (running with Hadoop 2.2.0 + Hive 0.12)42 Data nodes, each one is running HDFS data node process + task tracker + spark workerOne master, running HDFS Name node + Spark masterAnother master node, running 2nd Name node + JobTracker The test cases I did are 2, using very simple spark shell to read 2 folders, one is big data with 1T avro files; another one is small data with 160G avro files. The avro files schema of 2 folders are different, but I don't think that will make any difference here. The test script is like following: import org.apache.spark.sql.SQLContextval sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)import com.databricks.spark.avro._val testdata = sqlContext.avroFile(hdfs://namenode:9000/bigdata_folder) // vs sqlContext.avroFile(hdfs://namenode:9000/smalldata_folder)testdata.registerTempTable(testdata)testdata.count() Both cases are kicking off as the same following:/opt/spark/bin/spark-shell --jars /opt/ibm/cclib/spark-avro.jar --conf spark.ui.port=4042 --executor-memory 24G --total-executor-cores 42 --conf spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=2000 --conf spark.default.parallelism=2000 When the script point to the small data folder, the Spark can finish very fast. Each task of scanning the HDFS block can finish within 30 seconds or less. When the script point to the big data folder, most of the nodes can finish scan the first block of HDFS within 2 mins (longer than case 1), then the scanning will
[survey] [spark-ec2] What do you like/dislike about spark-ec2?
Howdy folks! I’m interested in hearing about what people think of spark-ec2 http://spark.apache.org/docs/latest/ec2-scripts.html outside of the formal JIRA process. Your answers will all be anonymous and public. If the embedded form below doesn’t work for you, you can use this link to get the same survey: http://goo.gl/forms/erct2s6KRR Cheers! Nick
Calling hiveContext.sql(insert into table xyz...) in multiple threads?
Hi I have around 2000 Hive source partitions to process and insert data into same table and different partition. For e.g. I have the following query hiveContext.sql(insert into table myTable partition(mypartition=someparition) bla bla) If I call above query in Spark driver program it runs fine and creates corresponding partition in HDFS. Now this works but it is very slow takes 4-5 hours to process all 2000 partitions. So I though of using ExecutorService and calling above query with couple of similar insert into queries in Callable threads. Now using threads become definitely faster but I dont see any parition created in HDFS is it concurrency issue since every thread is trying to insert into same table but different patition I see tasks are running very fast and getting finished but dont see any partition in HDFS please guide I am new to Spark and Hive. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Calling-hiveContext-sql-insert-into-table-xyz-in-multiple-threads-tp24298.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Left outer joining big data set with small lookups
Thanks for your help I tried to cache the lookup tables and left out join with the big table (DF). Join does not seem to be using broadcast join-still it goes with hash partition join and shuffling big table. Here is the scenario … table1 as big_df left outer join table2 as lkup on big_df.lkupid = lkup.lkupid table1 above is well distributed across all 40 partitions because sqlContext.sql(SET spark.sql.shuffle.partitions=40). table2 is small, using just 2 partition. s. After the join stage, sparkUI showed me that all activities ended up in just 2 executors. When I tried to dump the data in hdfs after join stage, all data ended up in 2 partition files and rest 38 files are 0 sized files. Since above one did not work, I tried to broadcast DF and registered as table before join. val table2_df = sqlContext.sql(select * from table2) val broadcast_table2 =sc.broadcast(table2_df) broadcast_table2.value.registerTempTable(“table2”) Broadcast is also having same issue as explained above. All data processed by just executors due to lookup skew. Any more idea to tackle this issue in Spark Dataframe? Thanks Vijay On Aug 14, 2015, at 10:27 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: You could cache the lookup DataFrames, it’ll then do a broadcast join. On 8/14/15, 9:39 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote: Hi I am facing huge performance problem when I am trying to left outer join very big data set (~140GB) with bunch of small lookups [Start schema type]. I am using data frame in spark sql. It looks like data is shuffled and skewed when that join happens. Is there any way to improve performance of such type of join in spark? How can I hint optimizer to go with replicated join etc., to avoid shuffle? Would it help to create broadcast variables on small lookups? If I create broadcast variables, how can I convert them into data frame and use them in sparksql type of join? Thanks Vijay - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Subscribe
Re: [survey] [spark-ec2] What do you like/dislike about spark-ec2?
Hi Nick, I forgot to mention in the survey that ganglia is never installed properly for some reasons. I have this exception every time I launched the cluster: Starting httpd: httpd: Syntax error on line 154 of /etc/httpd/conf/httpd.conf: Cannot load /etc/httpd/modules/mod_authz_core.so into server: /etc/httpd/modules/mod_authz_core.so: cannot open shared object file: No such file or directory [FAILED] Best Regards, Jerry On Mon, Aug 17, 2015 at 11:09 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Howdy folks! I’m interested in hearing about what people think of spark-ec2 http://spark.apache.org/docs/latest/ec2-scripts.html outside of the formal JIRA process. Your answers will all be anonymous and public. If the embedded form below doesn’t work for you, you can use this link to get the same survey: http://goo.gl/forms/erct2s6KRR Cheers! Nick