Re: stopping spark stream app
Hi How to ensure in spark streaming 1.3 with kafka that when an application is killed , last running batch is fully processed and offsets are written to checkpointing dir. On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am using spark stream 1.3 and using custom checkpoint to save kafka offsets. 1.Is doing Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { jssc.stop(true, true); System.out.println(Inside Add Shutdown Hook); } }); to handle stop is safe ? 2.And I need to handle saving checkoinnt in shutdown hook also or driver will handle it automatically since it grcaefully stops stream and handle completion of foreachRDD function on stream ? directKafkaStream.foreachRDD(new FunctionJavaRDDbyte[][], Void() { } Thanks
deleting application files in standalone cluster
Hi, Using spark 1.4.0 in standalone mode, with following configuration: SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true -Dspark.worker.cleanup.appDataTtl=86400 cleanup interval is set to default. Application files are not deleted. Using JavaSparkContext, and when the application ends it stops the context. Maybe I should also call context.close()? From what I understand, stop should be enough ( http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-stop-td17826.html#a17847 ) Thanks, Lior
ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
Hi When i tried to stop spark streaming using ssc.stop(false,true) It gives the following error. ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver 15/08/07 13:41:11 WARN ReceiverSupervisorImpl: Stopped executor without error 15/08/07 13:41:20 WARN WriteAheadLogManager : Failed to write to write ahead log I've implemented Streaming Listener and a Custom Receiver. Does anyone has idea about this? Thanks :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-ReceiverTracker-Deregistered-receiver-for-stream-0-Stopped-by-driver-tp24183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-submit not finding main class and the error reflects different path to jar file than specified
Are you setting SPARK_PREPEND_CLASSES? try to disable it. Here your uber jar which does not have the SparkConf is put in the first place of the class-path which is messing it up. Thanks Best Regards On Thu, Aug 6, 2015 at 5:48 PM, Stephen Boesch java...@gmail.com wrote: Given the following command line to spark-submit: bin/spark-submit --verbose --master local[2]--class org.yardstick.spark.SparkCoreRDDBenchmark /shared/ysgood/target/yardstick-spark-uber-0.0.1.jar Here is the output: NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of assembly. Using properties file: /shared/spark-1.4.1/conf/spark-defaults.conf Adding default property: spark.akka.askTimeout=180 Adding default property: spark.master=spark://mellyrn.local:7077 Error: Cannot load main class from JAR file:/shared/spark-1.4.1/org.yardstick.spark.SparkCoreRDDBenchmark Run with --help for usage help or --verbose for debug output The path file:/shared/spark-1.4.1/org.yardstick.spark.SparkCoreRDDBenchmark does not seem to make sense. It does not reflect the path to the file that was specified on the submit-spark command line. Note: when attempting to run that jar file via java -classpath shared/ysgood/target/yardstick-spark-uber-0.0.1.jar org.yardstick.spark.SparkCoreRDDBenchmark Then the result is as expected: the main class starts to load and then there is a NoClassDefFoundException on the SparkConf.classs (which is not inside the jar). This shows the app jar is healthy.
Re: Multiple Thrift servers on one Spark cluster
Did you try this way? export HIVE_SERVER2_THRIFT_PORT=6066 ./sbin/start-thriftserver.sh --master master-uri export HIVE_SERVER2_THRIFT_PORT=6067 ./sbin/start-thriftserver.sh --master master-uri You just have to change HIVE_SERVER2_THRIFT_PORT to instantiate multiple servers i think. Thanks Best Regards On Thu, Aug 6, 2015 at 2:05 PM, Bojan Kostic blood9ra...@gmail.com wrote: Hi, Is there a way to instantiate multiple Thrift servers on one Spark Cluster? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Thrift-servers-on-one-Spark-cluster-tp24148.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkException: Yarn application has already ended
Just make sure your hadoop instances are functioning properly, (check for ResourceManager, NodeManager). How are you submitting the job? If that is getting submitted then you can look further in the yarn logs to see whats really going on. Thanks Best Regards On Thu, Aug 6, 2015 at 6:59 PM, Clint McNeil cl...@impactradius.com wrote: Hi I am trying to launch a Spark application on a CM cluster and I get the following error. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:379) What is the remedy for this type of problem -- *Clint McNeil* BI Data Science Engineer | Impact Radius 202 Suntyger, 313 Durban Road, Bellville, 7530 o: +2721 914-1764 %2B2721%20910-3195 | m: +2782 4796 309 | cl...@impactradius.com *Learn more – Watch our 2 minute overview http://www.impactradius.com/?src=slsap* www.impactradius.com | Twitter http://twitter.com/impactradius | Facebook https://www.facebook.com/pages/Impact-Radius/153376411365183 | LinkedIn http://www.linkedin.com/company/impact-radius-inc. | YouTube https://www.youtube.com/user/ImpactRadius Maximizing Return on Ad Spend
Re: Temp file missing when training logistic regression
Which version of spark are you using? Looks like you are hitting the file handles. In that case you might want to increase the ulimit. You can actually validate this by looking in the worker logs (which would probably say Too many open files exception). Thanks Best Regards On Thu, Aug 6, 2015 at 8:35 PM, Cat caterina.gro...@dsp.io wrote: Hello, I am using the Python API to perform a grid search and train models using LogisticRegressionWithSGD. I am using r3.xl machines in EC2, running on top of YARN in cluster mode. The training RDD is persisted in memory and on disk. Some of the models train successfully, but then at some point during the grid search I get an error. It looks like the Python broadcast is looking for a part of the RDD which is no longer there. I scanned the logs for further errors but could not find anything. Any ideas of what could be causing this, and what should I be looking for? Many thanks. Cat model = LogisticRegressionWithSGD.train(the_training, iterations=i, regParam=c, miniBatchFraction=0.8) File /home/hadoop/spark/python/pyspark/mllib/classification.py, line 164, in train return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights) File /home/hadoop/spark/python/pyspark/mllib/regression.py, line 140, in _regression_train_wrapper weights, intercept = train_func(data, _convert_to_vector(initial_weights)) File /home/hadoop/spark/python/pyspark/mllib/classification.py, line 162, in train bool(intercept)) File /home/hadoop/spark/python/pyspark/mllib/common.py, line 120, in callMLlibFunc return callJavaFunc(sc, api, *args) File /home/hadoop/spark/python/pyspark/mllib/common.py, line 113, in callJavaFunc return _java2py(sc, func(*args)) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ self.target_id, self.name) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value format(target_id, '.', name), value) Py4JJavaError: An error occurred while calling o271.trainLogisticRegressionModelWithSGD. : org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.FileNotFoundException: /mnt/spark/spark-b07b34f8-66c3-43ae-a3ed-0c291724409b/pyspark-4196e8e5-8024-4ec5-a7bb-a60b216e6e74/tmpbCjiSR (No such file or directory) java.io.FileInputStream.open(Native Method) java.io.FileInputStream.init(FileInputStream.java:146) org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply$mcJ$sp(PythonRDD.scala:848) org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply(PythonRDD.scala:847) org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply(PythonRDD.scala:847) org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153) org.apache.spark.api.python.PythonBroadcast.writeObject(PythonRDD.scala:847) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1176) org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:79) org.apache.spark.storage.DiskStore.putArray(DiskStore.scala:64) org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1028) org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:419) org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:408) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) org.apache.spark.storage.MemoryStore.ensureFreeSpace(MemoryStore.scala:408) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:263) org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136) org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:991)
Re: Spark Job Failed (Executor Lost then FS closed)
Can you look more in the worker logs and see whats going on? It looks like a memory issue (kind of GC overhead etc., You need to look in the worker logs) Thanks Best Regards On Fri, Aug 7, 2015 at 3:21 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Re attaching the images. On Thu, Aug 6, 2015 at 2:50 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Code: import java.text.SimpleDateFormat import java.util.Calendar import java.sql.Date import org.apache.spark.storage.StorageLevel def extract(array: Array[String], index: Integer) = { if (index array.length) { array(index).replaceAll(\, ) } else { } } case class GuidSess( guid: String, sessionKey: String, sessionStartDate: String, siteId: String, eventCount: String, browser: String, browserVersion: String, operatingSystem: String, experimentChannel: String, deviceName: String) val rowStructText = sc.textFile(/user/zeppelin/guidsess/2015/08/05/part-m-1.gz) val guidSessRDD = rowStructText.filter(s = s.length != 1).map(s = s.split(,)).map( { s = GuidSess(extract(s, 0), extract(s, 1), extract(s, 2), extract(s, 3), extract(s, 4), extract(s, 5), extract(s, 6), extract(s, 7), extract(s, 8), extract(s, 9)) }) val guidSessDF = guidSessRDD.toDF() guidSessDF.registerTempTable(guidsess) Once the temp table is created, i wrote this query select siteid, count(distinct guid) total_visitor, count(sessionKey) as total_visits from guidsess group by siteid *Metrics:* Data Size: 170 MB Spark Version: 1.3.1 YARN: 2.7.x Timeline: There is 1 Job, 2 stages with 1 task each. *1st Stage : mapPartitions* [image: Inline image 1] 1st Stage: Task 1 started to fail. A second attempt started for 1st task of first Stage. The first attempt failed Executor LOST when i go to YARN resource manager and go to that particular host, i see that its running fine. *Attempt #1* [image: Inline image 2] *Attempt #2* Executor LOST AGAIN [image: Inline image 3] *Attempt 34* *[image: Inline image 4]* *2nd Stage runJob : SKIPPED* *[image: Inline image 5]* Any suggestions ? -- Deepak -- Deepak - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Merge metadata error when appending to parquet table
Hi there, I have a problem with a spark streaming job running on Spark 1.4.1, that appends to parquet table. My job receives json strings and creates JsonRdd out of it. The jsons might come in different shape as most of the fields are optional. But they never have conflicting schemas. Next, for each (non-empty) Rdd I'm saving it to parquet files, using append to existing table: jsonRdd.write.mode(SaveMode.Append).parquet(dataDirPath) Unfortunately I'm hitting now an issue on every append of conflict: Aug 9, 2015 7:58:03 AM WARNING: parquet.hadoop.ParquetOutputCommitter: could not write summary file for hdfs://example.com:8020/tmp/parquet java.lang.RuntimeException: could not merge metadata: key org.apache.spark.sql.parquet.row.metadata has conflicting values: [{...schema1...}, {...schema2...} ] The schemas are very similar, some attributes may be missing comparing to other, but for sure they are not conflicting. They are pretty lengthy, but I compared them with diff and ensured, that there are no conflicts. Even with this WARNING, the write actually succeeds, I'm able to read this data. But on every batch, there is yet another schema in the displayed conflicting values array. I would like the job to run forever, so I can't even ignore this warning because it will probably end with OOM. Do you know what might be the reason of this error/ warning? How to overcome this? Maybe it is a Spark bug/regression? I saw tickets like SPARK-6010 https://issues.apache.org/jira/browse/SPARK-6010, but they seem to be fixed in 1.3.0 (I'm using 1.4.1). Thanks for any help! Krzysiek
Starting a service with Spark Executors
Hi, I'd like to start a service with each Spark Executor upon initalization and have the disributed code reference that service locally. What I'm trying to do is to invoke torch7 computations without reloading the model for each row by starting Lua http handler that will recieve http requests for each row in my data. Can this be achieved with Spark ? Thank you. Daniel
Re: Spark-submit fails when jar is in HDFS
Did you try this way? /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class org.apache.spark.examples.SparkPi *--jars hdfs://hdfs1/tmp/spark-* *examples-1.4.1-hadoop2.6.0-**cdh5.4.4.jar* 100 Thanks Best Regards On Fri, Aug 7, 2015 at 5:51 AM, Alan Braithwaite a...@cloudflare.com wrote: Hi All, We're trying to run spark with mesos and docker in client mode (since mesos doesn't support cluster mode) and load the application Jar from HDFS. The following is the command we're running: /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class org.apache.spark.examples.SparkPi hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar 100 We're getting the following warning before an exception from that command: Warning: Skip remote jar hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar. java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi Before I debug further, is this even supported? I started reading the code and it wasn't clear that it's possible to load a remote jar in client mode at all. I did see a related issue in [2] but it didn't quite clarify everything I was looking for. Thanks, - Alan [1] https://spark.apache.org/docs/latest/submitting-applications.html [2] http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-not-working-when-application-jar-is-in-hdfs-td21840.html
Re: Out of memory with twitter spark streaming
I'm not sure what you are upto, but if you can explain what you are trying to achieve then may be we can restructure your code. On a quick glance i could see : tweetsRDD*.collect()*.map(tweet= DBQuery.saveTweets(tweet)) Which will bring the whole data into your driver machine and it would possibly run out of memory, You can avoid that. Thanks Best Regards On Fri, Aug 7, 2015 at 11:23 AM, Pankaj Narang pankajnaran...@gmail.com wrote: Hi I am running one application using activator where I am retrieving tweets and storing them to mysql database using below code. I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the OOM get delayed only. Can anybody give me clue. Here is the code var tweetStream = TwitterUtils.createStream(ssc, None,keywords) var tweets = tweetStream.map(tweet = { var user = tweet.getUser var replyStatusId = tweet.getInReplyToStatusId var reTweetStatus = tweet.getRetweetedStatus var pTweetId = -1L var pcreatedAt = 0L if(reTweetStatus != null){ pTweetId = reTweetStatus.getId pcreatedAt = reTweetStatus.getCreatedAt.getTime } tweet.getCreatedAt.getTime + |$ + tweet.getId + |$+user.getId + |$ + user.getName+ |$ + user.getScreenName + |$ + user.getDescription + |$ + tweet.getText.trim + |$ + user.getFollowersCount + |$ + user.getFriendsCount + |$ + tweet.getGeoLocation + |$ + user.getLocation + |$ + user.getBiggerProfileImageURL + |$ + replyStatusId + |$ + pTweetId + |$ + pcreatedAt } ) tweets.foreachRDD(tweetsRDD = {tweetsRDD.distinct() val count = tweetsRDD.count println(* +%s tweets found on this RDD.format(count)) if (count 0){ var timeMs = System.currentTimeMillis var counter = DBQuery.getProcessedCount() var location=tweets/+ counter +/ tweetsRDD.collect().map(tweet= DBQuery.saveTweets(tweet)) //tweetsRDD.saveAsTextFile(location+ timeMs)+ .txt DBQuery.addTweetRDD(counter) } }) // Checkpoint directory to recover from failures println(tweets for the last stream are saved which can be processed later) val= f:/svn1/checkpoint/ ssc.checkpoint(checkpointDir) ssc.start() ssc.awaitTermination() regards Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Accessing S3 files with s3n://
Depends on which operation you are doing, If you are doing a .count() on a parquet, it might not download the entire file i think, but if you do a .count() on a normal text file it might pull the entire file. Thanks Best Regards On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.com wrote: Hi, I've been trying to track down some problems with Spark reads being very slow with s3n:// URIs (NativeS3FileSystem). After some digging around, I realized that this file system implementation fetches the entire file, which isn't really a Spark problem, but it really slows down things when trying to just read headers from a Parquet file or just creating partitions in the RDD. Is this something that others have observed before, or am I doing something wrong? Thanks, Akshat
Re: using Spark or pig group by efficient in my use case?
Why not give it a shot? Spark always outruns old mapreduce jobs. Thanks Best Regards On Sat, Aug 8, 2015 at 8:25 AM, linlma lin...@gmail.com wrote: I have a tens of million records, which is customer ID and city ID pair. There are tens of millions of unique customer ID, and only a few hundreds unique city ID. I want to do a merge to get all city ID aggregated for a specific customer ID, and pull back all records. I want to do this using group by customer ID using Pig on Hadoop, and wondering if it is the most efficient way. Also wondering if there are overhead for sorting in Hadoop (I do not care if customer1 before customer2 or not, as long as all city are aggregated correctly for customer1 and customer 2)? Do you think Spark is better? Here is an example of inputs, CustomerID1 City1 CustomerID2 City2 CustomerID3 City1 CustomerID1 City3 CustomerID2 City4 I want output like this, CustomerID1 City1 City3 CustomerID2 City2 City4 CustomerID3 City1 thanks in advance, Lin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-Spark-or-pig-group-by-efficient-in-my-use-case-tp24178.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to run start-thrift-server in debug mode?
It seems, it is not able to pick up the debug parameters. You can actually set export _JAVA_OPTIONS=-agentlib:jdwp=transport=dt_socket,address=8000,server=y,suspend=y and then submit the job to enable debugging. Thanks Best Regards On Fri, Aug 7, 2015 at 10:20 PM, Benjamin Ross br...@lattice-engines.com wrote: Hi, I’m trying to run the hive thrift server in debug mode. I’ve tried to simply pass -Xdebug -Xrunjdwp:transport=dt_socket,address=127.0.0.1:,server=y,suspend=n to start-thriftserver.sh as a driver option, but it doesn’t seem to host a server. I’ve then tried to edit the various shell scripts to run hive thrift server but couldn’t get things to work. It seems that there must be an easier way to do this. I’ve also tried to run it directly in eclipse, but ran into issues related to Scala that I haven’t quite yet figured out. start-thriftserver.sh --driver-java-options -agentlib:jdwp=transport=dt_socket,address=localhost:8000,server=y,suspend=n -XX:MaxPermSize=512 --master yarn://localhost:9000 --num-executors 2 jdb -attach localhost:8000 java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at com.sun.tools.jdi.SocketTransportService.attach(SocketTransportService.java:222) at com.sun.tools.jdi.GenericAttachingConnector.attach(GenericAttachingConnector.java:116) at com.sun.tools.jdi.SocketAttachingConnector.attach(SocketAttachingConnector.java:90) at com.sun.tools.example.debug.tty.VMConnection.attachTarget(VMConnection.java:519) at com.sun.tools.example.debug.tty.VMConnection.open(VMConnection.java:328) at com.sun.tools.example.debug.tty.Env.init(Env.java:63) at com.sun.tools.example.debug.tty.TTY.main(TTY.java:1066) Let me know if I’m missing something here… Thanks in advance, Ben
Questions about SparkSQL join on not equality conditions
Hi, I might have a stupid question about sparksql's implementation of join on not equality conditions, for instance condition1 or condition2. In fact, Hive doesn't support such join, as it is very difficult to express such conditions as a map/reduce job. However, sparksql supports such operation. So I would like to know how spark implement it. As I observe such join runs very slow, I guess that spark implement it by doing filter on the top of cartesian product. Is it true? Thanks in advance for your help. Cheers Gen
Re: How to create DataFrame from a binary file?
You can create your own data schema (StructType in spark), and use following method to create data frame with your own data schema: sqlContext.createDataFrame(yourRDD, structType); I wrote a post on how to do it. You can also get the sample code there: Light-Weight Self-Service Data Query through Spark SQL: https://www.linkedin.com/pulse/light-weight-self-service-data-query-through-spark-sql-bo-yang Take a look and feel free to let me know for any question. Best, Bo On Sat, Aug 8, 2015 at 1:42 PM, unk1102 umesh.ka...@gmail.com wrote: Hi how do we create DataFrame from a binary file stored in HDFS? I was thinking to use JavaPairRDDString,PortableDataStream pairRdd = javaSparkContext.binaryFiles(/hdfs/path/to/binfile); JavaRDDPortableDataStream javardd = pairRdd.values(); I can see that PortableDataStream has method called toArray which can convert into byte array I was thinking if I have JavaRDDbyte[] can I call the following and get DataFrame DataFrame binDataFrame = sqlContext.createDataFrame(javaBinRdd,Byte.class); Please guide I am new to Spark. I have my own custom format which is binary format and I was thinking if I can convert my custom format into DataFrame using binary operations then I dont need to create my own custom Hadoop format am I on right track? Will reading binary data into DataFrame scale? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-DataFrame-from-a-binary-file-tp24179.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark-submit fails when jar is in HDFS
Also, Spark on Mesos supports cluster mode: http://spark.apache.org/docs/latest/running-on-mesos.html#cluster-mode Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Sun, Aug 9, 2015 at 4:30 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you try this way? /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class org.apache.spark.examples.SparkPi *--jars hdfs://hdfs1/tmp/spark-* *examples-1.4.1-hadoop2.6.0-**cdh5.4.4.jar* 100 Thanks Best Regards On Fri, Aug 7, 2015 at 5:51 AM, Alan Braithwaite a...@cloudflare.com wrote: Hi All, We're trying to run spark with mesos and docker in client mode (since mesos doesn't support cluster mode) and load the application Jar from HDFS. The following is the command we're running: /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class org.apache.spark.examples.SparkPi hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar 100 We're getting the following warning before an exception from that command: Warning: Skip remote jar hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar. java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi Before I debug further, is this even supported? I started reading the code and it wasn't clear that it's possible to load a remote jar in client mode at all. I did see a related issue in [2] but it didn't quite clarify everything I was looking for. Thanks, - Alan [1] https://spark.apache.org/docs/latest/submitting-applications.html [2] http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-not-working-when-application-jar-is-in-hdfs-td21840.html
stream application map transformation constructor called
In stream application how many times the map transformation object being created? Say I have directKafkaStream.repartition(numPartitions).mapPartitions (new FlatMapFunction_derivedclass(configs)); class FlatMapFunction_derivedclass{ FlatMapFunction_derivedclass(Config config){ } @Override public IterableString call(Iteratorbyte[][] t) throws Exception {..} } How many times FlatMapFunction_derivedclass will be instantiated ? 1.Will the constructor of this class be called only once and same object is serialised and transferred to executors at each batch interval and deserialized at each batch interval on executors.So if my stream application has run 400 batches, same object has been deserialised 400 times ? 2.So I should avoid creating external connection object say http connection in constrcuor of FlatMapFunction_derivedclass rather it should only be created in call function always and will be craeted at each batch interval ?
Re: intellij14 compiling spark-1.3.1 got error: assertion failed: com.google.protobuf.InvalidProtocalBufferException
Can you check if there is protobuf version other than 2.5.0 on the classpath ? Please show the complete stack trace. Cheers On Sun, Aug 9, 2015 at 9:41 AM, longda...@163.com longda...@163.com wrote: hi all, i compile spark-1.3.1 on linux use intellij14 and got error assertion failed: com.google.protobuf.InvalidProtocalBufferException, how could i solve the problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/intellij14-compiling-spark-1-3-1-got-error-assertion-failed-com-google-protobuf-InvalidProtocalBuffen-tp24186.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: intellij14 compiling spark-1.3.1 got error: assertion failed: com.google.protobuf.InvalidProtocalBufferException
the stack trace is below Error:scalac: while compiling: /home/xiaoju/data/spark-1.3.1/core/src/main/scala/org/apache/spark/SparkContext.scala during phase: typer library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: -nobootcp -javabootclasspath : -P:genjavadoc:out=/home/xiaoju/data/spark-1.3.1/core/target/java -deprecation -feature -classpath
Re: Accessing S3 files with s3n://
Hi Akshat, Is there a particular reason you don't use s3a? From my experience,s3a performs much better than the rest. I believe the inefficiency is from the implementation of the s3 interface. Best Regards, Jerry Sent from my iPhone On 9 Aug, 2015, at 5:48 am, Akhil Das ak...@sigmoidanalytics.com wrote: Depends on which operation you are doing, If you are doing a .count() on a parquet, it might not download the entire file i think, but if you do a .count() on a normal text file it might pull the entire file. Thanks Best Regards On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.com wrote: Hi, I've been trying to track down some problems with Spark reads being very slow with s3n:// URIs (NativeS3FileSystem). After some digging around, I realized that this file system implementation fetches the entire file, which isn't really a Spark problem, but it really slows down things when trying to just read headers from a Parquet file or just creating partitions in the RDD. Is this something that others have observed before, or am I doing something wrong? Thanks, Akshat
Re: Merge metadata error when appending to parquet table
Besides finding to this problem, I think I can workaround at least the WARNING message by overwriting parquet variable: parquet.enable.summary-metadata That according to this PARQUET-107 https://issues.apache.org/jira/browse/PARQUET-107 ticket can be used to disable writing summary file which is an issue here. How can I set this variable? I tried sql.setConf(parquet.enable.summary-metadata, false) sql.sql(SET parquet.enable.summary-metadata=false) As well as: spark-submit --conf parquet.enable.summary-metadata=false But neither helped. Anyone can help? Of course the original problem stays open. Thanks! Krzysiek 2015-08-09 14:19 GMT+02:00 Krzysztof Zarzycki k.zarzy...@gmail.com: Hi there, I have a problem with a spark streaming job running on Spark 1.4.1, that appends to parquet table. My job receives json strings and creates JsonRdd out of it. The jsons might come in different shape as most of the fields are optional. But they never have conflicting schemas. Next, for each (non-empty) Rdd I'm saving it to parquet files, using append to existing table: jsonRdd.write.mode(SaveMode.Append).parquet(dataDirPath) Unfortunately I'm hitting now an issue on every append of conflict: Aug 9, 2015 7:58:03 AM WARNING: parquet.hadoop.ParquetOutputCommitter: could not write summary file for hdfs://example.com:8020/tmp/parquet java.lang.RuntimeException: could not merge metadata: key org.apache.spark.sql.parquet.row.metadata has conflicting values: [{...schema1...}, {...schema2...} ] The schemas are very similar, some attributes may be missing comparing to other, but for sure they are not conflicting. They are pretty lengthy, but I compared them with diff and ensured, that there are no conflicts. Even with this WARNING, the write actually succeeds, I'm able to read this data. But on every batch, there is yet another schema in the displayed conflicting values array. I would like the job to run forever, so I can't even ignore this warning because it will probably end with OOM. Do you know what might be the reason of this error/ warning? How to overcome this? Maybe it is a Spark bug/regression? I saw tickets like SPARK-6010 https://issues.apache.org/jira/browse/SPARK-6010, but they seem to be fixed in 1.3.0 (I'm using 1.4.1). Thanks for any help! Krzysiek
intellij14 compiling spark-1.3.1 got error: assertion failed: com.google.protobuf.InvalidProtocalBufferException
hi all, i compile spark-1.3.1 on linux use intellij14 and got error assertion failed: com.google.protobuf.InvalidProtocalBufferException, how could i solve the problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/intellij14-compiling-spark-1-3-1-got-error-assertion-failed-com-google-protobuf-InvalidProtocalBuffen-tp24186.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to create DataFrame from a binary file?
Hi Bo I know how to create a DataFrame my question is how to create a DataFrame for binary files and in your blog it is raw text json files please read my question properly thanks. On Sun, Aug 9, 2015 at 11:21 PM, bo yang bobyan...@gmail.com wrote: You can create your own data schema (StructType in spark), and use following method to create data frame with your own data schema: sqlContext.createDataFrame(yourRDD, structType); I wrote a post on how to do it. You can also get the sample code there: Light-Weight Self-Service Data Query through Spark SQL: https://www.linkedin.com/pulse/light-weight-self-service-data-query-through-spark-sql-bo-yang Take a look and feel free to let me know for any question. Best, Bo On Sat, Aug 8, 2015 at 1:42 PM, unk1102 umesh.ka...@gmail.com wrote: Hi how do we create DataFrame from a binary file stored in HDFS? I was thinking to use JavaPairRDDString,PortableDataStream pairRdd = javaSparkContext.binaryFiles(/hdfs/path/to/binfile); JavaRDDPortableDataStream javardd = pairRdd.values(); I can see that PortableDataStream has method called toArray which can convert into byte array I was thinking if I have JavaRDDbyte[] can I call the following and get DataFrame DataFrame binDataFrame = sqlContext.createDataFrame(javaBinRdd,Byte.class); Please guide I am new to Spark. I have my own custom format which is binary format and I was thinking if I can convert my custom format into DataFrame using binary operations then I dont need to create my own custom Hadoop format am I on right track? Will reading binary data into DataFrame scale? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-DataFrame-from-a-binary-file-tp24179.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re:Re: intellij14 compiling spark-1.3.1 got error: assertion failed: com.google.protobuf.InvalidProtocalBufferException
thank you for reply, i use sbt to complie spark, but there are both protobuf 2.4.1 and 2.5.0 in maven repository , and protobuf 2.5.0 in .ivy repository, the stack trace is below Error:scalac: while compiling: /home/xiaoju/data/spark-1.3.1/core/src/main/scala/org/apache/spark/SparkContext.scala during phase: typer library version: version 2.10.4 compiler version: version 2.10.4 reconstructed args: -nobootcp -javabootclasspath : -P:genjavadoc:out=/home/xiaoju/data/spark-1.3.1/core/target/java -deprecation -feature -classpath
Re: Spark-submit fails when jar is in HDFS
Did you try this way? /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class org.apache.spark.examples.SparkPi --jars hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar 100 I did, and got the same error (I verified again right now too). Also, Spark on Mesos supports cluster mode: http://spark.apache.org/docs/latest/running-on-mesos.html#cluster-mode Oh cool! Looks like this page needs to be updated then: http://spark.apache.org/docs/latest/submitting-applications.html Thanks! - Alan
Re: How to create DataFrame from a binary file?
Well, my post uses raw text json file to show how to create data frame with a custom data schema. The key idea is to show the flexibility to deal with any format of data by using your own schema. Sorry if I did not make you fully understand. Anyway, let us know once you figure out your problem. On Sun, Aug 9, 2015 at 11:10 AM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi Bo I know how to create a DataFrame my question is how to create a DataFrame for binary files and in your blog it is raw text json files please read my question properly thanks. On Sun, Aug 9, 2015 at 11:21 PM, bo yang bobyan...@gmail.com wrote: You can create your own data schema (StructType in spark), and use following method to create data frame with your own data schema: sqlContext.createDataFrame(yourRDD, structType); I wrote a post on how to do it. You can also get the sample code there: Light-Weight Self-Service Data Query through Spark SQL: https://www.linkedin.com/pulse/light-weight-self-service-data-query-through-spark-sql-bo-yang Take a look and feel free to let me know for any question. Best, Bo On Sat, Aug 8, 2015 at 1:42 PM, unk1102 umesh.ka...@gmail.com wrote: Hi how do we create DataFrame from a binary file stored in HDFS? I was thinking to use JavaPairRDDString,PortableDataStream pairRdd = javaSparkContext.binaryFiles(/hdfs/path/to/binfile); JavaRDDPortableDataStream javardd = pairRdd.values(); I can see that PortableDataStream has method called toArray which can convert into byte array I was thinking if I have JavaRDDbyte[] can I call the following and get DataFrame DataFrame binDataFrame = sqlContext.createDataFrame(javaBinRdd,Byte.class); Please guide I am new to Spark. I have my own custom format which is binary format and I was thinking if I can convert my custom format into DataFrame using binary operations then I dont need to create my own custom Hadoop format am I on right track? Will reading binary data into DataFrame scale? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-DataFrame-from-a-binary-file-tp24179.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Merge metadata error when appending to parquet table
The conflicting metadata values warning is a known issue https://issues.apache.org/jira/browse/PARQUET-194 The option parquet.enable.summary-metadata is a Hadoop option rather than a Spark option, so you need to either add it to your Hadoop configuration file(s) or add it via `sparkContext.hadoopConfiguration` before starting your job. Cheng On 8/9/15 8:57 PM, Krzysztof Zarzycki wrote: Besides finding to this problem, I think I can workaround at least the WARNING message by overwriting parquet variable: parquet.enable.summary-metadata That according to this PARQUET-107 https://issues.apache.org/jira/browse/PARQUET-107 ticket can be used to disable writing summary file which is an issue here. How can I set this variable? I tried sql.setConf(parquet.enable.summary-metadata, false) sql.sql(SET parquet.enable.summary-metadata=false) As well as: spark-submit --conf parquet.enable.summary-metadata=false But neither helped. Anyone can help? Of course the original problem stays open. Thanks! Krzysiek 2015-08-09 14:19 GMT+02:00 Krzysztof Zarzycki k.zarzy...@gmail.com mailto:k.zarzy...@gmail.com: Hi there, I have a problem with a spark streaming job running on Spark 1.4.1, that appends to parquet table. My job receives json strings and creates JsonRdd out of it. The jsons might come in different shape as most of the fields are optional. But they never have conflicting schemas. Next, for each (non-empty) Rdd I'm saving it to parquet files, using append to existing table: jsonRdd.write.mode(SaveMode.Append).parquet(dataDirPath) Unfortunately I'm hitting now an issue on every append of conflict: Aug 9, 2015 7:58:03 AM WARNING: parquet.hadoop.ParquetOutputCommitter: could not write summary file for hdfs://example.com:8020/tmp/parquet http://example.com:8020/tmp/parquet java.lang.RuntimeException: could not merge metadata: key org.apache.spark.sql.parquet.row.metadata has conflicting values: [{...schema1...}, {...schema2...} ] The schemas are very similar, some attributes may be missing comparing to other, but for sure they are not conflicting. They are pretty lengthy, but I compared them with diff and ensured, that there are no conflicts. Even with this WARNING, the write actually succeeds, I'm able to read this data. But on every batch, there is yet another schema in the displayed conflicting values array. I would like the job to run forever, so I can't even ignore this warning because it will probably end with OOM. Do you know what might be the reason of this error/ warning? How to overcome this? Maybe it is a Spark bug/regression? I saw tickets like SPARK-6010 https://issues.apache.org/jira/browse/SPARK-6010, but they seem to be fixed in 1.3.0 (I'm using 1.4.1). Thanks for any help! Krzysiek
Re: Spark job workflow engine recommendations
I used to maintain Luigi at Spotify, and got some insight in workflow manager characteristics and production behaviour in the process. I am evaluating options for my current employer, and the short list is basically: Luigi, Azkaban, Pinball, Airflow, and rolling our own. The latter is not necessarily more work than adapting an existing tool, since existing managers are typically more or less tied to the technology used by the company that created them. Are your users primarily developers building pipelines that drive data-intensive products, or are they analysts, producing business intelligence? These groups tend to have preferences for different types of tools and interfaces. I have a love/hate relationship with Luigi, but given your requirements, it is probably the best fit: * It has support for Spark, and it seems to be used and maintained. * It has no builtin support for Cassandra, but Cassandra is heavily used at Spotify. IIRC, the code required to support Cassandra targets is more or less trivial. There is no obvious single definition of a dataset in C*, so you'll have to come up with a convention and encode it as a Target subclass. I guess that is why it never made it outside Spotify. * The open source community is active and it is well tested in production at multiple sites. * It is easy to write dependencies, but in a Python DSL. If your users are developers, this is preferable over XML or a web interface. There are always quirks and odd constraints somewhere that require the expressive power of a programming language. It also allows you to create extensions without changing Luigi itself. * It does not have recurring scheduling bulitin. Luigi needs a motor to get going, typically cron, installed on a few machines for redundancy. In a typical pipeline scenario, you give output datasets a time parameter, which arranges for a dataset to be produced each hour/day/week/month. * It supports failure notifications. Pinball and Airflow have similar architecture to Luigi, with a single central scheduler and workers that submit and execute jobs. They seem to be more solidly engineered at a glance, but less battle tested outside Pinterest/Airbnb, and they have fewer integrations to the data ecosystem. Azkaban has a different architecture and user interface, and seems more geared towards data scientists than developers; it has a good UI for controlling jobs, but writing extensions and controlling it programmatically seems more difficult than for Luigi. All of the tools above are centralised, and the central component can become a bottleneck and a single point of problem. I am not aware of any decentralised open source workflow managers, but you can run multiple instances and shard manually. Regarding recurring jobs, it is typically undesirable to blindly run jobs at a certain time. If you run jobs, e.g. with cron, and process whatever data is available in your input sources, your jobs become indeterministic and unreliable. If incoming data is late or missing, your jobs will fail or create artificial skews in output data, leading to confusing results. Moreover, if jobs fail or have bugs, it will be difficult to rerun them and get predictable results. This is why I don't think Chronos is a meaningful alternative for scheduling data processing. There are different strategies on this topic, but IMHO, it is easiest create predictable and reliable pipelines by bucketing incoming data into datasets that you seal off, and mark ready for processing, and then use the workflow manager's DAG logic to process data when input datasets are available, rather than at a certain time. If you use Kafka for data collection, Secor can handle this logic for you. In addition to your requirements, there are IMHO a few more topics one needs to consider: * How are pipelines tested? I.e. if I change job B below, how can I be sure that the new output does not break A? You need to involve the workflow DAG in testing such scenarios. * How do you debug jobs and DAG problems? In case of trouble, can you figure out where the job logs are, or why a particular job does not start? * Do you need high availability for job scheduling? That will require additional components. This became a bit of a brain dump on the topic. I hope that it is useful. Don't hesitate to get back if I can help. Regards, Lars Albertsson On Fri, Aug 7, 2015 at 5:43 PM, Vikram Kone vikramk...@gmail.com wrote: Hi, I'm looking for open source workflow tools/engines that allow us to schedule spark jobs on a datastax cassandra cluster. Since there are tonnes of alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to check with people here to see what they are using today. Some of the requirements of the workflow engine that I'm looking for are 1. First class support for submitting Spark jobs on Cassandra. Not some wrapper Java code to submit tasks. 2. Active open source community support and well tested at production
multiple dependency jars using pyspark
I'm trying to write a simple job for Pyspark 1.4 migrating data from MySQL to Cassandra. I can work with either the MySQL JDBC jar or the cassandra jar separately without issue, but when I try to reference both of them it throws an exception: Py4JJavaError: An error occurred while calling o32.save. : java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; I'm not sure if I'm including the jars correctly as --jars says it's comma separated and --driver-class-path seems to take a colon delimited classpath. If I separate the list in --driver-class-path with a comma, i get a class not found exception so I'm thinking colon is right. The job, params for submission, and exception are here. Help getting this going would be deeply appreciated. https://gist.github.com/rustyrazorblade/9a38a9499a7531eefd1e
Error when running pyspark/shell.py to set up iPython notebook
I'm trying to set up iPython notebook on an edge node with port forwarding so I can run pyspark off my laptop's browser. I've mostly been following the Cloudera guide here: http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ I got this working on one cluster running Spark 1.0. But now on Spark 1.3 (with Python 2.7 and Java 7), I'm getting the error below when I run /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/shell.py at the line: sc = SparkContext(appName=PySparkShell, pyFiles=add_files) Before showing the error, I'll note that running pyspark --master yarn-client DOES work, so I can run pyspark fine atop YARN, but it looks like ipython notebook is calling Spark via a different method and producing an error. Any ideas? Traceback (most recent call last): File stdin, line 1, in module File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py, line 111, in __init__ conf, jsc, profiler_cls) File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py, line 159, in _do_init self._jsc = jsc or self._initialize_context(self._conf._jconf) File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py, line 212, in _initialize_context return self._jvm.JavaSparkContext(jconf) File /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 701, in __call__ File /opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : java.io.FileNotFoundException: /user/spark/applicationHistory/application_1438611042507_0055.inprogress (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:221) at java.io.FileOutputStream.init(FileOutputStream.java:110) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:117) at org.apache.spark.SparkContext.init(SparkContext.scala:399) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:214) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-running-pyspark-shell-py-to-set-up-iPython-notebook-tp24188.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
mllib kmeans produce 1 large and many extremely small clusters
I tried running mllib k-means with 20newsgroups data set from sklearn. On a 5000 document data set I get one cluster with most of the documents and other clusters just have handful of documents. #code newsgroups_train = fetch_20newsgroups(subset='train',random_state=1,remove=('headers', 'footers', 'quotes')) small_list = random.sample(newsgroups_train.data,5000) def get_word_vec(text,vocabulary): word_lst = tokenize_line(text) word_counter = Counter(word_lst) lst = [] for v in vocabulary: if v in word_counter: lst.append(word_counter[v]) else: lst.append(0) return lst docsrdd = sc.parallelize(small_list) tf = docsrdd.map(lambda x : get_word_vec(x,vocabulary)) idf = IDF().fit(tf) tfidf = idf.transform(tf) clusters = KMeans.train(tfidf, 20) #documents in each cluster, using clusters.predict(x) Counter({0: 4978, 11: 3, 9: 2, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8: 1, 10: 1, 12: 1, 13: 1, 14: 1, 15: 1, 16: 1, 17: 1, 18: 1, 19: 1}) Please Help ! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mllib-kmeans-produce-1-large-and-many-extremely-small-clusters-tp24189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Starting a service with Spark Executors
starting is easy, just use a lazy val. stopping is harder. i do not think executors have a cleanup hook currently... On Sun, Aug 9, 2015 at 5:29 AM, Daniel Haviv daniel.ha...@veracity-group.com wrote: Hi, I'd like to start a service with each Spark Executor upon initalization and have the disributed code reference that service locally. What I'm trying to do is to invoke torch7 computations without reloading the model for each row by starting Lua http handler that will recieve http requests for each row in my data. Can this be achieved with Spark ? Thank you. Daniel
Re: Accessing S3 files with s3n://
Hi Akshat, I find some open source library which implements S3 InputFormat for Hadoop. Then I use Spark newAPIHadoopRDD to load data via that S3 InputFormat. The open source library is https://github.com/ATLANTBH/emr-s3-io. It is a little old. I look inside it and make some changes. Then it works, and I have been using it for more than half year with Spark. It sill work great so far with latest Spark 1.4.0. You may need to modify it to avoid reading the whole file. Please feel free to let me know if you hit any questions. Best, Bo On Sun, Aug 9, 2015 at 6:01 AM, Jerry Lam chiling...@gmail.com wrote: Hi Akshat, Is there a particular reason you don't use s3a? From my experience,s3a performs much better than the rest. I believe the inefficiency is from the implementation of the s3 interface. Best Regards, Jerry Sent from my iPhone On 9 Aug, 2015, at 5:48 am, Akhil Das ak...@sigmoidanalytics.com wrote: Depends on which operation you are doing, If you are doing a .count() on a parquet, it might not download the entire file i think, but if you do a .count() on a normal text file it might pull the entire file. Thanks Best Regards On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.com wrote: Hi, I've been trying to track down some problems with Spark reads being very slow with s3n:// URIs (NativeS3FileSystem). After some digging around, I realized that this file system implementation fetches the entire file, which isn't really a Spark problem, but it really slows down things when trying to just read headers from a Parquet file or just creating partitions in the RDD. Is this something that others have observed before, or am I doing something wrong? Thanks, Akshat