Re: Removing empty partitions before we write to HDFS
Currently, I use rdd.isEmpty() Thanks, Patanachai On 08/06/2015 12:02 PM, gpatcham wrote: Is there a way to filter out empty partitions before I write to HDFS other than using reparition and colasce ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Patanachai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: log4j.xml bundled in jar vs log4.properties in spark/conf
I'm having the same problem here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/log4j-xml-bundled-in-jar-vs-log4-properties-in-spark-conf-tp23923p24158.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: shutdown local hivecontext?
Well. I managed to solve that issue after running my tests on a linux system instead of windows (which I was originally using). However, now I have an error when I try to reset the hive context using hc.reset(). It tries to create a file inside directory /user/my_user_name instead of the usual linux path /home/my_user_name, which fails. On Thu, Aug 6, 2015 at 3:12 PM, Cesar Flores ces...@gmail.com wrote: Well, I try this approach, and still have issues. Apparently TestHive can not delete the hive metastore directory. The complete error that I have is: 15/08/06 15:01:29 ERROR Driver: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.NullPointerException 15/08/06 15:01:29 ERROR TestHive: == HIVE FAILURE OUTPUT == SET spark.sql.test= SET javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945;create=true SET hive.metastore.warehouse.dir=C:\cygwin64\tmp\sparkHiveWarehouse5264564710014125096 FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.NullPointerException == END HIVE FAILURE OUTPUT == [error] Uncaught exception when running com.dotomi.pipeline.utilitytransformers.SorterTransformerSuite: java.lang.ExceptionInInitializerError [trace] Stack trace suppressed: run last pipeline/test:testOnly for the full output. 15/08/06 15:01:29 ERROR Utils: Exception while deleting Spark temp dir: C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945 java.io.IOException: Failed to delete: C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:932) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:181) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:179) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply$mcV$sp(Utils.scala:179) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617) at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:177) Any new idea about how to avoid this error? I think the problem may be running the tests on sbt, as the created directories are locked until I exit the sbt command shell from where I run the tests. Please let me know if you have any other suggestion. Thanks On Mon, Aug 3, 2015 at 5:56 PM, Michael Armbrust mich...@databricks.com wrote: TestHive takes care of creating a temporary directory for each invocation so that multiple test runs won't conflict. On Mon, Aug 3, 2015 at 3:09 PM, Cesar Flores ces...@gmail.com wrote: We are using a local hive context in order to run unit tests. Our unit tests runs perfectly fine if we run why by one using sbt as the next example: sbt test-only com.company.pipeline.scalers.ScalerSuite.scala sbt test-only com.company.pipeline.labels.ActiveUsersLabelsSuite.scala However, if we try to run them as: sbt test-only com.company.pipeline.* we start to run into issues. It appears that the issue is that the hive context is not properly shutdown after finishing the first test. Does any one know how to attack this problem? The test part in my build.sbt file looks like: libraryDependencies += org.scalatest % scalatest_2.10 % 2.0 % test, parallelExecution in Test := false, fork := true, javaOptions ++= Seq(-Xms512M, -Xmx2048M, -XX:MaxPermSize=2048M, -XX:+CMSClassUnloadingEnabled) We are working under Spark 1.3.0 Thanks -- Cesar Flores -- Cesar Flores -- Cesar Flores
Spark Job Failed (Executor Lost then FS closed)
Code: import java.text.SimpleDateFormat import java.util.Calendar import java.sql.Date import org.apache.spark.storage.StorageLevel def extract(array: Array[String], index: Integer) = { if (index array.length) { array(index).replaceAll(\, ) } else { } } case class GuidSess( guid: String, sessionKey: String, sessionStartDate: String, siteId: String, eventCount: String, browser: String, browserVersion: String, operatingSystem: String, experimentChannel: String, deviceName: String) val rowStructText = sc.textFile(/user/zeppelin/guidsess/2015/08/05/part-m-1.gz) val guidSessRDD = rowStructText.filter(s = s.length != 1).map(s = s.split(,)).map( { s = GuidSess(extract(s, 0), extract(s, 1), extract(s, 2), extract(s, 3), extract(s, 4), extract(s, 5), extract(s, 6), extract(s, 7), extract(s, 8), extract(s, 9)) }) val guidSessDF = guidSessRDD.toDF() guidSessDF.registerTempTable(guidsess) Once the temp table is created, i wrote this query select siteid, count(distinct guid) total_visitor, count(sessionKey) as total_visits from guidsess group by siteid *Metrics:* Data Size: 170 MB Spark Version: 1.3.1 YARN: 2.7.x Timeline: There is 1 Job, 2 stages with 1 task each. *1st Stage : mapPartitions* [image: Inline image 1] 1st Stage: Task 1 started to fail. A second attempt started for 1st task of first Stage. The first attempt failed Executor LOST when i go to YARN resource manager and go to that particular host, i see that its running fine. *Attempt #1* [image: Inline image 2] *Attempt #2* Executor LOST AGAIN [image: Inline image 3] *Attempt 34* *[image: Inline image 4]* *2nd Stage runJob : SKIPPED* *[image: Inline image 5]* Any suggestions ? -- Deepak
Re: shutdown local hivecontext?
Well, I try this approach, and still have issues. Apparently TestHive can not delete the hive metastore directory. The complete error that I have is: 15/08/06 15:01:29 ERROR Driver: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.NullPointerException 15/08/06 15:01:29 ERROR TestHive: == HIVE FAILURE OUTPUT == SET spark.sql.test= SET javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945;create=true SET hive.metastore.warehouse.dir=C:\cygwin64\tmp\sparkHiveWarehouse5264564710014125096 FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.NullPointerException == END HIVE FAILURE OUTPUT == [error] Uncaught exception when running com.dotomi.pipeline.utilitytransformers.SorterTransformerSuite: java.lang.ExceptionInInitializerError [trace] Stack trace suppressed: run last pipeline/test:testOnly for the full output. 15/08/06 15:01:29 ERROR Utils: Exception while deleting Spark temp dir: C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945 java.io.IOException: Failed to delete: C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:932) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:181) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:179) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply$mcV$sp(Utils.scala:179) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177) at org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617) at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:177) Any new idea about how to avoid this error? I think the problem may be running the tests on sbt, as the created directories are locked until I exit the sbt command shell from where I run the tests. Please let me know if you have any other suggestion. Thanks On Mon, Aug 3, 2015 at 5:56 PM, Michael Armbrust mich...@databricks.com wrote: TestHive takes care of creating a temporary directory for each invocation so that multiple test runs won't conflict. On Mon, Aug 3, 2015 at 3:09 PM, Cesar Flores ces...@gmail.com wrote: We are using a local hive context in order to run unit tests. Our unit tests runs perfectly fine if we run why by one using sbt as the next example: sbt test-only com.company.pipeline.scalers.ScalerSuite.scala sbt test-only com.company.pipeline.labels.ActiveUsersLabelsSuite.scala However, if we try to run them as: sbt test-only com.company.pipeline.* we start to run into issues. It appears that the issue is that the hive context is not properly shutdown after finishing the first test. Does any one know how to attack this problem? The test part in my build.sbt file looks like: libraryDependencies += org.scalatest % scalatest_2.10 % 2.0 % test, parallelExecution in Test := false, fork := true, javaOptions ++= Seq(-Xms512M, -Xmx2048M, -XX:MaxPermSize=2048M, -XX:+CMSClassUnloadingEnabled) We are working under Spark 1.3.0 Thanks -- Cesar Flores -- Cesar Flores
Re: Removing empty partitions before we write to HDFS
Not that I'm aware of. We ran into the similar issue where we didn't want to keep accumulating all these empty part files in storage on S3 or HDFS. There didn't seem to be any performance free way to do it with an RDD, so we just run a non-spark post-batch operation to delete empty files from the write path. On Thu, Aug 6, 2015 at 3:33 PM, Patanachai Tangchaisin patanac...@ipsy.com wrote: Currently, I use rdd.isEmpty() Thanks, Patanachai On 08/06/2015 12:02 PM, gpatcham wrote: Is there a way to filter out empty partitions before I write to HDFS other than using reparition and colasce ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Patanachai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
How can I know currently supported functions in Spark SQL
Hi All, I am using Spark 1.4.1, and I want to know how can I find the complete function list supported in Spark SQL, currently I only know 'sum','count','min','max'. Thanks a lot.
how to stop twitter-spark streaming
Hi All, i am working with spark streaming and twitter's user api. i used this code to stop streaming ssc.addStreamingListener(new StreamingListener{ var count=1 override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { count += 1 if(count=5) { ssc.stop(true,true) } } }) and also override onStop method in custom receiver to stop streaming. but it gives the following exception java.lang.NullPointerException: Inflater has been closed at java.util.zip.Inflater.ensureOpen(Inflater.java:389) at java.util.zip.Inflater.inflate(Inflater.java:257) at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152) at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116) at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:154) at java.io.BufferedReader.readLine(BufferedReader.java:317) at java.io.BufferedReader.readLine(BufferedReader.java:382) at twitter4j.StatusStreamBase.handleNextElement(StatusStreamBase.java:85) at twitter4j.StatusStreamImpl.next(StatusStreamImpl.java:57) at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:478) 15/08/06 15:50:43 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver 15/08/06 15:50:43 WARN ReceiverSupervisorImpl: Stopped executor without error 15/08/06 15:50:50 WARN WriteAheadLogManager : Failed to write to write ahead log Anyone knows the cause of this exception? Thanks :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-stop-twitter-spark-streaming-tp24150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is there any way to support multiple users executing SQL on thrift server?
What is the JIRA number if a JIRA has been logged for this ? Thanks On Jan 20, 2015, at 11:30 AM, Cheng Lian lian.cs@gmail.com wrote: Hey Yi, I'm quite unfamiliar with Hadoop/HDFS auth mechanisms for now, but would like to investigate this issue later. Would you please open an JIRA for it? Thanks! Cheng On 1/19/15 1:00 AM, Yi Tian wrote: Is there any way to support multiple users executing SQL on one thrift server? I think there are some problems for spark 1.2.0, for example: Start thrift server with user A Connect to thrift server via beeline with user B Execute “insert into table dest select … from table src” then we found these items on hdfs: drwxr-xr-x - B supergroup 0 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1 drwxr-xr-x - B supergroup 0 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary drwxr-xr-x - B supergroup 0 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0 drwxr-xr-x - A supergroup 0 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/_temporary drwxr-xr-x - A supergroup 0 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/task_201501161642_0022_m_00 -rw-r--r-- 3 A supergroup 2671 2015-01-16 16:42 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/task_201501161642_0022_m_00/part-0 You can see all the temporary path created on driver side (thrift server side) is owned by user B (which is what we expected). But all the output data created on executor side is owned by user A, (which is NOT what we expected). error owner of the output data cause org.apache.hadoop.security.AccessControlException while the driver side moving output data into dest table. Is anyone know how to resolve this problem?
Re: spark job not accepting resources from worker
Any inputs? In case of following message, is there a way to check which resources is not sufficient through some logs? [Timer-0] WARN org.apache.spark.scheduler.TaskSchedulerImpl - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources Regards. On 8/6/2015 11:40 AM, Kushal Chokhani wrote: Hi I have a spark/cassandra setup where I am using a spark cassandra java connector to query on a table. So far, I have 1 spark master node (2 cores) and 1 worker node (4 cores). Both of them have following spark-env.sh under conf/: |#!/usr/bin/env bash export SPARK_LOCAL_IP=127.0.0.1 export SPARK_MASTER_IP=192.168.4.134 export SPARK_WORKER_MEMORY=1G export SPARK_EXECUTOR_MEMORY=2G | I am using spark 1.4.1 along with cassandra 2.2.0. I have started my cassandra/spark setup. Created keyspace and table under cassandra and added some rows on table. Now I try to run following spark job using spark cassandra java connector: | SparkConf conf = new SparkConf(); conf.setAppName(Testing); conf.setMaster(spark://192.168.4.134:7077); conf.set(spark.cassandra.connection.host, 192.168.4.129); conf.set(spark.logConf, true); conf.set(spark.driver.maxResultSize, 50m); conf.set(spark.executor.memory, 200m); conf.set(spark.eventLog.enabled, true); conf.set(spark.eventLog.dir, /tmp/); conf.set(spark.executor.extraClassPath, /home/enlighted/ebd.jar); conf.set(spark.cores.max, 1); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(testing, ec) .map(new FunctionCassandraRow, String() { private static final long serialVersionUID = -6263533266898869895L; @Override public String call(CassandraRow cassandraRow) throws Exception { return cassandraRow.toString(); } }); System.out.println(Data as CassandraRows: \n + StringUtils.join(cassandraRowsRDD.toArray(), \n)); sc.close();| This job is stuck with insufficient resources warning. Here are logs: 1107 [main] INFO org.apache.spark.SparkContext - Spark configuration: spark.app.name=Testing spark.cassandra.connection.host=192.168.4.129 spark.cores.max=1 spark.driver.maxResultSize=50m spark.eventLog.dir=/tmp/ spark.eventLog.enabled=true spark.executor.extraClassPath=/home/enlighted/ebd.jar spark.executor.memory=200m spark.logConf=true spark.master=spark://192.168.4.134:7077 1121 [main] INFO org.apache.spark.SecurityManager - Changing view acls to: enlighted 1122 [main] INFO org.apache.spark.SecurityManager - Changing modify acls to: enlighted 1123 [main] INFO org.apache.spark.SecurityManager - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(enlighted); users with modify permissions: Set(enlighted) 1767 [sparkDriver-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 1805 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting - Starting remoting 1957 [main] INFO org.apache.spark.util.Utils - Successfully started service 'sparkDriver' on port 54611. 1958 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting - Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.4.134:54611] 1977 [main] INFO org.apache.spark.SparkEnv - Registering MapOutputTracker 1989 [main] INFO org.apache.spark.SparkEnv - Registering BlockManagerMaster 2007 [main] INFO org.apache.spark.storage.DiskBlockManager - Created local directory at /tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/blockmgr-3e3d54e7-16df-4e97-be48-b0c0fa0389e7 2012 [main] INFO org.apache.spark.storage.MemoryStore - MemoryStore started with capacity 456.0 MB 2044 [main] INFO org.apache.spark.HttpFileServer - HTTP File server directory is /tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/httpd-64b4d92e-cde9-45fb-8b38-edc3cca3933c 2046 [main] INFO org.apache.spark.HttpServer - Starting HTTP Server 2086 [main] INFO org.spark-project.jetty.server.Server - jetty-8.y.z-SNAPSHOT 2098 [main] INFO org.spark-project.jetty.server.AbstractConnector - Started SocketConnector@0.0.0.0:44884 2099 [main] INFO org.apache.spark.util.Utils - Successfully started service 'HTTP file server' on port 44884. 2108 [main] INFO org.apache.spark.SparkEnv - Registering OutputCommitCoordinator 2297 [main] INFO org.spark-project.jetty.server.Server - jetty-8.y.z-SNAPSHOT 2317 [main] INFO org.spark-project.jetty.server.AbstractConnector - Started SelectChannelConnector@0.0.0.0:4040 2318 [main] INFO org.apache.spark.util.Utils - Successfully started service 'SparkUI' on port 4040. 2320 [main]
Re:Re: Real-time data visualization with Zeppelin
Hi andy, Is there any method to convert ipython notebook file(.ipynb) to spark notebook file(.snb) or vice versa? BR Jun At 2015-07-13 02:45:57, andy petrella andy.petre...@gmail.com wrote: Heya, You might be looking for something like this I guess: https://www.youtube.com/watch?v=kB4kRQRFAVc. The Spark-Notebook (https://github.com/andypetrella/spark-notebook/) can bring that to you actually, it uses fully reactive bilateral communication streams to update data and viz, plus it hides almost everything for you ^^. The video was using the notebook notebooks/streaming/Twitter stream.snb so you can play it yourself if you like. You might want building the master (before 0.6.0 will be released → soon) here http://spark-notebook.io/. HTH andy On Sun, Jul 12, 2015 at 8:29 PM Ruslan Dautkhanov dautkha...@gmail.com wrote: Don't think it is a Zeppelin problem.. RDDs are immutable. Unless you integrate something like IndexedRDD http://spark-packages.org/package/amplab/spark-indexedrdd into Zeppelin I think it's not possible. -- Ruslan Dautkhanov On Wed, Jul 8, 2015 at 3:24 PM, Brandon White bwwintheho...@gmail.com wrote: Can you use a con job to update it every X minutes? On Wed, Jul 8, 2015 at 2:23 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi all – I’m just wondering if anyone has had success integrating Spark Streaming with Zeppelin and actually dynamically updating the data in near real-time. From my investigation, it seems that Zeppelin will only allow you to display a snapshot of data, not a continuously updating table. Has anyone figured out if there’s a way to loop a display command or how to provide a mechanism to continuously update visualizations? Thank you, Ilya Ganelin The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: spark job not accepting resources from worker
Figured out the root cause. Master was randomly assigning port to worker for communication. Because of the firewall on master, worker couldn't send out messages to master (maybe like resource details). Weird worker didn't even bother to throw any error also. On 8/6/2015 3:24 PM, Kushal Chokhani wrote: Any inputs? In case of following message, is there a way to check which resources is not sufficient through some logs? [Timer-0] WARN org.apache.spark.scheduler.TaskSchedulerImpl - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources Regards. On 8/6/2015 11:40 AM, Kushal Chokhani wrote: Hi I have a spark/cassandra setup where I am using a spark cassandra java connector to query on a table. So far, I have 1 spark master node (2 cores) and 1 worker node (4 cores). Both of them have following spark-env.sh under conf/: |#!/usr/bin/env bash export SPARK_LOCAL_IP=127.0.0.1 export SPARK_MASTER_IP=192.168.4.134 export SPARK_WORKER_MEMORY=1G export SPARK_EXECUTOR_MEMORY=2G | I am using spark 1.4.1 along with cassandra 2.2.0. I have started my cassandra/spark setup. Created keyspace and table under cassandra and added some rows on table. Now I try to run following spark job using spark cassandra java connector: | SparkConf conf = new SparkConf(); conf.setAppName(Testing); conf.setMaster(spark://192.168.4.134:7077); conf.set(spark.cassandra.connection.host, 192.168.4.129); conf.set(spark.logConf, true); conf.set(spark.driver.maxResultSize, 50m); conf.set(spark.executor.memory, 200m); conf.set(spark.eventLog.enabled, true); conf.set(spark.eventLog.dir, /tmp/); conf.set(spark.executor.extraClassPath, /home/enlighted/ebd.jar); conf.set(spark.cores.max, 1); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(testing, ec) .map(new FunctionCassandraRow, String() { private static final long serialVersionUID = -6263533266898869895L; @Override public String call(CassandraRow cassandraRow) throws Exception { return cassandraRow.toString(); } }); System.out.println(Data as CassandraRows: \n + StringUtils.join(cassandraRowsRDD.toArray(), \n)); sc.close();| This job is stuck with insufficient resources warning. Here are logs: 1107 [main] INFO org.apache.spark.SparkContext - Spark configuration: spark.app.name=Testing spark.cassandra.connection.host=192.168.4.129 spark.cores.max=1 spark.driver.maxResultSize=50m spark.eventLog.dir=/tmp/ spark.eventLog.enabled=true spark.executor.extraClassPath=/home/enlighted/ebd.jar spark.executor.memory=200m spark.logConf=true spark.master=spark://192.168.4.134:7077 1121 [main] INFO org.apache.spark.SecurityManager - Changing view acls to: enlighted 1122 [main] INFO org.apache.spark.SecurityManager - Changing modify acls to: enlighted 1123 [main] INFO org.apache.spark.SecurityManager - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(enlighted); users with modify permissions: Set(enlighted) 1767 [sparkDriver-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 1805 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting - Starting remoting 1957 [main] INFO org.apache.spark.util.Utils - Successfully started service 'sparkDriver' on port 54611. 1958 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting - Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.4.134:54611] 1977 [main] INFO org.apache.spark.SparkEnv - Registering MapOutputTracker 1989 [main] INFO org.apache.spark.SparkEnv - Registering BlockManagerMaster 2007 [main] INFO org.apache.spark.storage.DiskBlockManager - Created local directory at /tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/blockmgr-3e3d54e7-16df-4e97-be48-b0c0fa0389e7 2012 [main] INFO org.apache.spark.storage.MemoryStore - MemoryStore started with capacity 456.0 MB 2044 [main] INFO org.apache.spark.HttpFileServer - HTTP File server directory is /tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/httpd-64b4d92e-cde9-45fb-8b38-edc3cca3933c 2046 [main] INFO org.apache.spark.HttpServer - Starting HTTP Server 2086 [main] INFO org.spark-project.jetty.server.Server - jetty-8.y.z-SNAPSHOT 2098 [main] INFO org.spark-project.jetty.server.AbstractConnector - Started SocketConnector@0.0.0.0:44884 2099 [main] INFO org.apache.spark.util.Utils - Successfully started service 'HTTP file server' on port 44884. 2108 [main] INFO org.apache.spark.SparkEnv - Registering OutputCommitCoordinator 2297 [main]
Re: Re: Real-time data visualization with Zeppelin
Yep, most of the things will work just by renaming it :-D You can even use nbconvert afterwards On Thu, Aug 6, 2015 at 12:09 PM jun kit...@126.com wrote: Hi andy, Is there any method to convert ipython notebook file(.ipynb) to spark notebook file(.snb) or vice versa? BR Jun At 2015-07-13 02:45:57, andy petrella andy.petre...@gmail.com wrote: Heya, You might be looking for something like this I guess: https://www.youtube.com/watch?v=kB4kRQRFAVc. The Spark-Notebook (https://github.com/andypetrella/spark-notebook/) can bring that to you actually, it uses fully reactive bilateral communication streams to update data and viz, plus it hides almost everything for you ^^. The video was using the notebook notebooks/streaming/Twitter stream.snb https://github.com/andypetrella/spark-notebook/blob/master/notebooks/streaming/Twitter%20stream.snb so you can play it yourself if you like. You might want building the master (before 0.6.0 will be released → soon) here http://spark-notebook.io/. HTH andy On Sun, Jul 12, 2015 at 8:29 PM Ruslan Dautkhanov dautkha...@gmail.com wrote: Don't think it is a Zeppelin problem.. RDDs are immutable. Unless you integrate something like IndexedRDD http://spark-packages.org/package/amplab/spark-indexedrdd into Zeppelin I think it's not possible. -- Ruslan Dautkhanov On Wed, Jul 8, 2015 at 3:24 PM, Brandon White bwwintheho...@gmail.com wrote: Can you use a con job to update it every X minutes? On Wed, Jul 8, 2015 at 2:23 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi all – I’m just wondering if anyone has had success integrating Spark Streaming with Zeppelin and actually dynamically updating the data in near real-time. From my investigation, it seems that Zeppelin will only allow you to display a snapshot of data, not a continuously updating table. Has anyone figured out if there’s a way to loop a display command or how to provide a mechanism to continuously update visualizations? Thank you, Ilya Ganelin [image: 2DD951D6-FF99-4415-80AA-E30EFE7CF452[4].png] -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. -- andy
Re: How to read gzip data in Spark - Simple question
I got it running by myself On Wed, Aug 5, 2015 at 10:27 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Have you tried reading the spark documentation? http://spark.apache.org/docs/latest/programming-guide.html Thank you, Ilya Ganelin -Original Message- *From: *ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.com] *Sent: *Thursday, August 06, 2015 12:41 AM Eastern Standard Time *To: *Philip Weaver *Cc: *user *Subject: *Re: How to read gzip data in Spark - Simple question how do i persist the RDD to HDFS ? On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver philip.wea...@gmail.com wrote: This message means that java.util.Date is not supported by Spark DataFrame. You'll need to use java.sql.Date, I believe. On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: That seem to be working. however i see a new exception Code: def formatStringAsDate(dateStr: String) = new SimpleDateFormat(-MM-dd).parse(dateStr) //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) val rowStructText = sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz) case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: Integer, f13: Integer, f14: String) val summary = rowStructText.map(s = s.split(,)).map( s = Summary(formatStringAsDate(s(0)), s(1).replaceAll(\, ).toLong, s(3).replaceAll(\, ).toLong, s(4).replaceAll(\, ).toInt, s(5).replaceAll(\, ), s(6).replaceAll(\, ).toInt, formatStringAsDate(s(7)), formatStringAsDate(s(8)), s(9).replaceAll(\, ).toInt, s(10).replaceAll(\, ).toInt, s(11).replaceAll(\, ).toFloat, s(12).replaceAll(\, ).toInt, s(13).replaceAll(\, ).toInt, s(14).replaceAll(\, ) ) ).toDF() bank.registerTempTable(summary) //Output import java.text.SimpleDateFormat import java.util.Calendar import java.util.Date formatStringAsDate: (dateStr: String)java.util.Date rowStructText: org.apache.spark.rdd.RDD[String] = /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz MapPartitionsRDD[105] at textFile at console:60 defined class Summary x: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at console:61 java.lang.UnsupportedOperationException: Schema for type java.util.Date is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) Any suggestions On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com wrote: The parallelize method does not read the contents of a file. It simply takes a collection and distributes it to the cluster. In this case, the String is a collection 67 characters. Use sc.textFile instead of sc.parallelize, and it should work as you want. On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I have csv data that is embedded in gzip format on HDFS. *With Pig* a = load '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using PigStorage(); b = limit a 10 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) However with Spark val rowStructText = sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz) val x = rowStructText.map(s = { println(s) s} ) x.count Questions 1) x.count always shows 67 irrespective of the path i change in sc.parallelize 2) It shows x as RDD[Char] instead of String 3) println() never emits the rows. Any suggestions -Deepak -- Deepak -- Deepak -- Deepak -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your
Re: Unable to persist RDD to HDFS
This isn't really a Spark question. You're trying to parse a string to an integer, but it contains an invalid character. The exception message explains this. On Wed, Aug 5, 2015 at 11:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Code: import java.text.SimpleDateFormat import java.util.Calendar import java.sql.Date import org.apache.spark.storage.StorageLevel def formatStringAsDate(dateStr: String) = new java.sql.Date(new SimpleDateFormat(-MM-dd).parse(dateStr).getTime()) //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: Integer, f13: Integer, f14: String) val rowStructText = sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-1.gz) val summary = rowStructText.filter(s = s.length != 1).map(s = s.split(\t)).map( { s = Summary(formatStringAsDate(s(0)), s(1).replaceAll(\, ).toLong, s(3).replaceAll(\, ).toLong, s(4).replaceAll(\, ).toInt, s(5).replaceAll(\, ), s(6).replaceAll(\, ).toInt, formatStringAsDate(s(7)), formatStringAsDate(s(8)), s(9).replaceAll(\, ).toInt, s(10).replaceAll(\, ).toInt, s(11).replaceAll(\, ).toFloat, s(12).replaceAll(\, ).toInt, s(13).replaceAll(\, ).toInt, s(14).replaceAll(\, ) ) } ) summary.saveAsTextFile(sparkO) Output: import java.text.SimpleDateFormat import java.util.Calendar import java.sql.Date import org.apache.spark.storage.StorageLevel formatStringAsDate: (dateStr: String)java.sql.Date defined class Summary rowStructText: org.apache.spark.rdd.RDD[String] = /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-1.gz MapPartitionsRDD[639] at textFile at console:305 summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[642] at map at console:310 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 147.0 failed 4 times, most recent failure: Lost task 0.3 in stage 147.0 (TID 3396, datanode-6-3486.phx01.dev.ebayc3.com): java.lang.NumberFormatException: For input string: 3g at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$b7c842676ccaed446a4cace94f9edC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(console:318) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$b7c842676ccaed446a4cace94f9edC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(console:312) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1072) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) OR summary.count throws same exception Any suggestions ? -- Deepak
Re: Re: How can I know currently supported functions in Spark SQL
Worth noting that Spark 1.5 is extending that list of Spark SQL functions quite a bit. Not sure where in the docs they would be yet, but the JIRA is here: https://issues.apache.org/jira/browse/SPARK-8159 On Thu, Aug 6, 2015 at 7:27 PM, Netwaver wanglong_...@163.com wrote: Thanks for your kindly help At 2015-08-06 19:28:10, Todd Nist tsind...@gmail.com wrote: They are covered here in the docs: http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$ On Thu, Aug 6, 2015 at 5:52 AM, Netwaver wanglong_...@163.com wrote: Hi All, I am using Spark 1.4.1, and I want to know how can I find the complete function list supported in Spark SQL, currently I only know 'sum','count','min','max'. Thanks a lot. -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience
Re:Re: How can I know currently supported functions in Spark SQL
Thanks for your kindly help At 2015-08-06 19:28:10, Todd Nist tsind...@gmail.com wrote: They are covered here in the docs: http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$ On Thu, Aug 6, 2015 at 5:52 AM, Netwaver wanglong_...@163.com wrote: Hi All, I am using Spark 1.4.1, and I want to know how can I find the complete function list supported in Spark SQL, currently I only know 'sum','count','min','max'. Thanks a lot.
Spark-submit fails when jar is in HDFS
Hi All, We're trying to run spark with mesos and docker in client mode (since mesos doesn't support cluster mode) and load the application Jar from HDFS. The following is the command we're running: We're getting the following warning before an exception from that command: Before I debug further, is this even supported? I started reading the code and it wasn't clear that it's possible to load a remote jar in client mode at all. I did see a related issue in [2] but it didn't quite clarify everything I was looking for. Thanks, - Alan [1] https://spark.apache.org/docs/latest/submitting-applications.html [2] http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-not-working-when-application-jar-is-in-hdfs-td21840.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-fails-when-jar-is-in-HDFS-tp24163.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark MLib v/s SparkR
I am starting off with classification models, Logistic,RandomForest. Basically wanted to learn Machine learning. Since I have a java background I started off with MLib, but later heard R works as well ( with scaling issues - only). So, with SparkR was wondering the scaling issue would be resolved - hence my question why not go with R and Spark R alone.( keeping aside my inclination towards java) On Thu, Aug 6, 2015 at 12:28 AM, Charles Earl charles.ce...@gmail.com wrote: What machine learning algorithms are you interested in exploring or using? Start from there or better yet the problem you are trying to solve, and then the selection may be evident. On Wednesday, August 5, 2015, praveen S mylogi...@gmail.com wrote: I was wondering when one should go for MLib or SparkR. What is the criteria or what should be considered before choosing either of the solutions for data analysis? or What is the advantages of Spark MLib over Spark R or advantages of SparkR over MLib? -- - Charles
Out of memory with twitter spark streaming
Hi I am running one application using activator where I am retrieving tweets and storing them to mysql database using below code. I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the OOM get delayed only. Can anybody give me clue. Here is the code var tweetStream = TwitterUtils.createStream(ssc, None,keywords) var tweets = tweetStream.map(tweet = { var user = tweet.getUser var replyStatusId = tweet.getInReplyToStatusId var reTweetStatus = tweet.getRetweetedStatus var pTweetId = -1L var pcreatedAt = 0L if(reTweetStatus != null){ pTweetId = reTweetStatus.getId pcreatedAt = reTweetStatus.getCreatedAt.getTime } tweet.getCreatedAt.getTime + |$ + tweet.getId + |$+user.getId + |$ + user.getName+ |$ + user.getScreenName + |$ + user.getDescription + |$ + tweet.getText.trim + |$ + user.getFollowersCount + |$ + user.getFriendsCount + |$ + tweet.getGeoLocation + |$ + user.getLocation + |$ + user.getBiggerProfileImageURL + |$ + replyStatusId + |$ + pTweetId + |$ + pcreatedAt } ) tweets.foreachRDD(tweetsRDD = {tweetsRDD.distinct() val count = tweetsRDD.count println(* +%s tweets found on this RDD.format(count)) if (count 0){ var timeMs = System.currentTimeMillis var counter = DBQuery.getProcessedCount() var location=tweets/+ counter +/ tweetsRDD.collect().map(tweet= DBQuery.saveTweets(tweet)) //tweetsRDD.saveAsTextFile(location+ timeMs)+ .txt DBQuery.addTweetRDD(counter) } }) // Checkpoint directory to recover from failures println(tweets for the last stream are saved which can be processed later) val= f:/svn1/checkpoint/ ssc.checkpoint(checkpointDir) ssc.start() ssc.awaitTermination() regards Pankaj -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
stopping spark stream app
Hi I am using spark stream 1.3 and using custom checkpoint to save kafka offsets. 1.Is doing Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { jssc.stop(true, true); System.out.println(Inside Add Shutdown Hook); } }); to handle stop is safe ? 2.And I need to handle saving checkoinnt in shutdown hook also or driver will handle it automatically since it grcaefully stops stream and handle completion of foreachRDD function on stream ? directKafkaStream.foreachRDD(new FunctionJavaRDDbyte[][], Void() { } Thanks
spark job not accepting resources from worker
Hi I have a spark/cassandra setup where I am using a spark cassandra java connector to query on a table. So far, I have 1 spark master node (2 cores) and 1 worker node (4 cores). Both of them have following spark-env.sh under conf/: |#!/usr/bin/env bash export SPARK_LOCAL_IP=127.0.0.1 export SPARK_MASTER_IP=192.168.4.134 export SPARK_WORKER_MEMORY=1G export SPARK_EXECUTOR_MEMORY=2G | I am using spark 1.4.1 along with cassandra 2.2.0. I have started my cassandra/spark setup. Created keyspace and table under cassandra and added some rows on table. Now I try to run following spark job using spark cassandra java connector: | SparkConf conf = new SparkConf(); conf.setAppName(Testing); conf.setMaster(spark://192.168.4.134:7077); conf.set(spark.cassandra.connection.host, 192.168.4.129); conf.set(spark.logConf, true); conf.set(spark.driver.maxResultSize, 50m); conf.set(spark.executor.memory, 200m); conf.set(spark.eventLog.enabled, true); conf.set(spark.eventLog.dir, /tmp/); conf.set(spark.executor.extraClassPath, /home/enlighted/ebd.jar); conf.set(spark.cores.max, 1); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(testing, ec) .map(new FunctionCassandraRow, String() { private static final long serialVersionUID = -6263533266898869895L; @Override public String call(CassandraRow cassandraRow) throws Exception { return cassandraRow.toString(); } }); System.out.println(Data as CassandraRows: \n + StringUtils.join(cassandraRowsRDD.toArray(), \n)); sc.close();| This job is stuck with insufficient resources warning. Here are logs: 1107 [main] INFO org.apache.spark.SparkContext - Spark configuration: spark.app.name=Testing spark.cassandra.connection.host=192.168.4.129 spark.cores.max=1 spark.driver.maxResultSize=50m spark.eventLog.dir=/tmp/ spark.eventLog.enabled=true spark.executor.extraClassPath=/home/enlighted/ebd.jar spark.executor.memory=200m spark.logConf=true spark.master=spark://192.168.4.134:7077 1121 [main] INFO org.apache.spark.SecurityManager - Changing view acls to: enlighted 1122 [main] INFO org.apache.spark.SecurityManager - Changing modify acls to: enlighted 1123 [main] INFO org.apache.spark.SecurityManager - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(enlighted); users with modify permissions: Set(enlighted) 1767 [sparkDriver-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 1805 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting - Starting remoting 1957 [main] INFO org.apache.spark.util.Utils - Successfully started service 'sparkDriver' on port 54611. 1958 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting - Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.4.134:54611] 1977 [main] INFO org.apache.spark.SparkEnv - Registering MapOutputTracker 1989 [main] INFO org.apache.spark.SparkEnv - Registering BlockManagerMaster 2007 [main] INFO org.apache.spark.storage.DiskBlockManager - Created local directory at /tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/blockmgr-3e3d54e7-16df-4e97-be48-b0c0fa0389e7 2012 [main] INFO org.apache.spark.storage.MemoryStore - MemoryStore started with capacity 456.0 MB 2044 [main] INFO org.apache.spark.HttpFileServer - HTTP File server directory is /tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/httpd-64b4d92e-cde9-45fb-8b38-edc3cca3933c 2046 [main] INFO org.apache.spark.HttpServer - Starting HTTP Server 2086 [main] INFO org.spark-project.jetty.server.Server - jetty-8.y.z-SNAPSHOT 2098 [main] INFO org.spark-project.jetty.server.AbstractConnector - Started SocketConnector@0.0.0.0:44884 2099 [main] INFO org.apache.spark.util.Utils - Successfully started service 'HTTP file server' on port 44884. 2108 [main] INFO org.apache.spark.SparkEnv - Registering OutputCommitCoordinator 2297 [main] INFO org.spark-project.jetty.server.Server - jetty-8.y.z-SNAPSHOT 2317 [main] INFO org.spark-project.jetty.server.AbstractConnector - Started SelectChannelConnector@0.0.0.0:4040 2318 [main] INFO org.apache.spark.util.Utils - Successfully started service 'SparkUI' on port 4040. 2320 [main] INFO org.apache.spark.ui.SparkUI - Started SparkUI at http://192.168.4.134:4040 2387 [sparkDriver-akka.actor.default-dispatcher-3] INFO org.apache.spark.deploy.client.AppClient$ClientActor - Connecting to master akka.tcp://sparkMaster@192.168.4.134:7077/user/Master... 2662 [sparkDriver-akka.actor.default-dispatcher-14] INFO org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend - Connected to Spark cluster with app ID
Re: Multiple UpdateStateByKey Functions in the same job?
I think you can. Give it a try and see. Thanks Best Regards On Tue, Aug 4, 2015 at 7:02 AM, swetha swethakasire...@gmail.com wrote: Hi, Can I use multiple UpdateStateByKey Functions in the Streaming job? Suppose I need to maintain the state of the user session in the form of a Json and counts of various other metrics which has different keys ? Can I use multiple updateStateByKey functions to maintain the state for different keys with different return values? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-UpdateStateByKey-Functions-in-the-same-job-tp24119.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Extremely poor predictive performance with RF in mllib
I can reproduce this issue, so looks like a bug of Random Forest, I will try to find some clue. 2015-08-05 1:34 GMT+08:00 Patrick Lam pkph...@gmail.com: Yes, I rechecked and the label is correct. As you can see in the code posted, I used the exact same features for all the classifiers so unless rf somehow switches the labels, it should be correct. I have posted a sample dataset and sample code to reproduce what I'm getting here: https://github.com/pkphlam/spark_rfpredict On Tue, Aug 4, 2015 at 6:42 AM, Yanbo Liang yblia...@gmail.com wrote: It looks like the predicted result just opposite with expectation, so could you check whether the label is right? Or could you share several data which can help to reproduce this output? 2015-08-03 19:36 GMT+08:00 Barak Gitsis bar...@similarweb.com: hi, I've run into some poor RF behavior, although not as pronounced as you.. would be great to get more insight into this one Thanks! On Mon, Aug 3, 2015 at 8:21 AM pkphlam pkph...@gmail.com wrote: Hi, This might be a long shot, but has anybody run into very poor predictive performance using RandomForest with Mllib? Here is what I'm doing: - Spark 1.4.1 with PySpark - Python 3.4.2 - ~30,000 Tweets of text - 12289 1s and 15956 0s - Whitespace tokenization and then hashing trick for feature selection using 10,000 features - Run RF with 100 trees and maxDepth of 4 and then predict using the features from all the 1s observations. So in theory, I should get predictions of close to 12289 1s (especially if the model overfits). But I'm getting exactly 0 1s, which sounds ludicrous to me and makes me suspect something is wrong with my code or I'm missing something. I notice similar behavior (although not as extreme) if I play around with the settings. But I'm getting normal behavior with other classifiers, so I don't think it's my setup that's the problem. For example: lrm = LogisticRegressionWithSGD.train(lp, iterations=10) logit_predict = lrm.predict(predict_feat) logit_predict.sum() 9077 nb = NaiveBayes.train(lp) nb_predict = nb.predict(predict_feat) nb_predict.sum() 10287.0 rf = RandomForest.trainClassifier(lp, numClasses=2, categoricalFeaturesInfo={}, numTrees=100, seed=422) rf_predict = rf.predict(predict_feat) rf_predict.sum() 0.0 This code was all run back to back so I didn't change anything in between. Does anybody have a possible explanation for this? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extremely-poor-predictive-performance-with-RF-in-mllib-tp24112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- *-Barak* -- Patrick Lam Institute for Quantitative Social Science, Harvard University http://www.patricklam.org
Unable to persist RDD to HDFS
Code: import java.text.SimpleDateFormat import java.util.Calendar import java.sql.Date import org.apache.spark.storage.StorageLevel def formatStringAsDate(dateStr: String) = new java.sql.Date(new SimpleDateFormat(-MM-dd).parse(dateStr).getTime()) //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: Integer, f13: Integer, f14: String) val rowStructText = sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-1.gz) val summary = rowStructText.filter(s = s.length != 1).map(s = s.split(\t)).map( { s = Summary(formatStringAsDate(s(0)), s(1).replaceAll(\, ).toLong, s(3).replaceAll(\, ).toLong, s(4).replaceAll(\, ).toInt, s(5).replaceAll(\, ), s(6).replaceAll(\, ).toInt, formatStringAsDate(s(7)), formatStringAsDate(s(8)), s(9).replaceAll(\, ).toInt, s(10).replaceAll(\, ).toInt, s(11).replaceAll(\, ).toFloat, s(12).replaceAll(\, ).toInt, s(13).replaceAll(\, ).toInt, s(14).replaceAll(\, ) ) } ) summary.saveAsTextFile(sparkO) Output: import java.text.SimpleDateFormat import java.util.Calendar import java.sql.Date import org.apache.spark.storage.StorageLevel formatStringAsDate: (dateStr: String)java.sql.Date defined class Summary rowStructText: org.apache.spark.rdd.RDD[String] = /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-1.gz MapPartitionsRDD[639] at textFile at console:305 summary: org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[642] at map at console:310 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 147.0 failed 4 times, most recent failure: Lost task 0.3 in stage 147.0 (TID 3396, datanode-6-3486.phx01.dev.ebayc3.com): java.lang.NumberFormatException: For input string: 3g at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$b7c842676ccaed446a4cace94f9edC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(console:318) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$b7c842676ccaed446a4cace94f9edC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(console:312) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1072) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) OR summary.count throws same exception Any suggestions ? -- Deepak
Re: NoSuchMethodError : org.apache.spark.streaming.scheduler.StreamingListenerBus.start()V
For some reason you are having two different versions of spark jars in your classpath. Thanks Best Regards On Tue, Aug 4, 2015 at 12:37 PM, Deepesh Maheshwari deepesh.maheshwar...@gmail.com wrote: Hi, I am trying to read data from kafka and process it using spark. i have attached my source code , error log. For integrating kafka, i have added dependency in pom.xml dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming_2.10/artifactId version1.3.0/version /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-streaming-kafka_2.10/artifactId version1.3.0/version /dependency i have attached full error log.please check it why it is giving the error . this class exits in my class path. I am running spark and kafka locally using java class. SparkConf conf = new SparkConf().setAppName(Spark Demo).setMaster( local[2]).set(spark.executor.memory, 1g); I [image: Inline image 2] - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Talk on Deep dive in Spark Dataframe API
Hi, Recently I gave a talk on a deep dive into data frame api and sql catalyst . Video of the same is available on Youtube with slides and code. Please have a look if you are interested. *http://blog.madhukaraphatak.com/anatomy-of-spark-dataframe-api/ http://blog.madhukaraphatak.com/anatomy-of-spark-dataframe-api/* -- Regards, Madhukara Phatak http://datamantra.io/
Re: Twitter Connector-Spark Streaming
Hi Sadaf Which version of spark are you using? And whats in the spark-env.sh file? I think you are using both SPARK_CLASSPATH (which is deprecated) and spark.executor.extraClasspath (may be set in spark-defaults.sh file). Thanks Best Regards On Wed, Aug 5, 2015 at 6:22 PM, Sadaf Khan sa...@platalytics.com wrote: Hi Akhil, I know you are a big knowledge base of spark streaming. I wan your little help more. I've done the twitter streaming using twitter's streaming user api and spark streaming. this runs successfully on my local machine. but when i run this program on cluster in local mode. it just run successfully for the very first time. later on it gives the following exception. Exception in thread main org.apache.spark.SparkException: Found both spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former. and spark class path is unset already!! I have to make a new checkpoint directory each time to make it run successfully. otherwise it shows above exception. Can you please again help me to resolve this issue? Thanks :) On Tue, Aug 4, 2015 at 4:33 PM, Sadaf Khan sa...@platalytics.com wrote: thanks for being a helping hand before. :) one more point i want to discuss is that i now used Twitter Rest Api and successfully fetch the home timeline. but now it is showing 20 recent tweets, Is there any way to get 3200 (max rate limit) last tweets? Thanks in anticipation :) On Tue, Aug 4, 2015 at 2:05 PM, Sadaf Khan sa...@platalytics.com wrote: thanks alot On Tue, Aug 4, 2015 at 2:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You will have to write your own consumer for pulling your custom feeds, and then you can do a union (customfeedDstream.union(twitterStream)) with the twitter stream api. Thanks Best Regards On Tue, Aug 4, 2015 at 2:28 PM, Sadaf Khan sa...@platalytics.com wrote: Thanks alot :) One more thing that i want to ask is that i have used twitters streaming api.and it seems that the above solution uses rest api. how can i used both simultaneously ? Any response will be much appreciated :) Regards On Tue, Aug 4, 2015 at 1:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Yes you can, when you start the application in the first batch you just need to pull all the tweets from your account. You need to look into the API for that. Have a look at this https://dev.twitter.com/rest/reference/get/statuses/user_timeline Thanks Best Regards On Tue, Aug 4, 2015 at 1:47 PM, Sadaf Khan sa...@platalytics.com wrote: Hi. You were really helpful for me last time :) and i have done with the last problem. I wanna ask you one more question. Now my connector is showing the tweets that occurs after running the program. Is there any way to fetch all old tweets since when the account was created? I will be thankful to you for you kind response. On Thu, Jul 30, 2015 at 6:49 PM, Sadaf Khan sa...@platalytics.com wrote: thanks alot for this help :) On Thu, Jul 30, 2015 at 6:41 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You can create a custom receiver and then inside it you can write yourown piece of code to receive data, filter them etc before giving it to spark. Thanks Best Regards On Thu, Jul 30, 2015 at 6:49 PM, Sadaf Khan sa...@platalytics.com wrote: okay :) then is there anyway to fetch the tweets specific to my account? Thanks in anticipation :) On Thu, Jul 30, 2015 at 6:17 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Owh, this one fetches the public tweets, not the one specific to your account. Thanks Best Regards On Thu, Jul 30, 2015 at 6:11 PM, Sadaf Khan sa...@platalytics.com wrote: yes. but can you please tell me how to mention a specific user account in filter? I want to fetch my tweets, tweets of my followers and the tweets of those whom i followed. So in short i want to fatch the tweets of my account only. Recently i have used val tweets =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2) Any response will be very much appreciated. :) Thanks. On Thu, Jul 30, 2015 at 5:20 PM, Akhil Das ak...@sigmoidanalytics.com wrote: TwitterUtils.createStream takes a 3rd argument which is a filter, once you provide these, it will only fetch tweets of such. Thanks Best Regards On Thu, Jul 30, 2015 at 4:19 PM, Sadaf sa...@platalytics.com wrote: Hi. I am writing twitter connector using spark streaming. but it fetched the random tweets. Is there any way to receive the tweets of a particular account? I made an app on twitter and used the credentials as given below. def managingCredentials(): Option[twitter4j.auth.Authorization]= { object auth{ val config = new twitter4j.conf.ConfigurationBuilder() .setOAuthConsumerKey() .setOAuthConsumerSecret() .setOAuthAccessToken() .setOAuthAccessTokenSecret() .build }
Re: Memory allocation error with Spark 1.5
Works like a charm. Thanks Reynold for the quick and efficient response! Alexis 2015-08-05 19:19 GMT+02:00 Reynold Xin r...@databricks.com: In Spark 1.5, we have a new way to manage memory (part of Project Tungsten). The default unit of memory allocation is 64MB, which is way too high when you have 1G of memory allocated in total and have more than 4 threads. We will reduce the default page size before releasing 1.5. For now, you can just reduce spark.buffer.pageSize variable to a lower value (e.g. 16m). https://github.com/apache/spark/blob/702aa9d7fb16c98a50e046edfd76b8a7861d0391/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala#L125 On Wed, Aug 5, 2015 at 9:25 AM, Alexis Seigneurin aseigneu...@ippon.fr wrote: Hi, I'm receiving a memory allocation error with a recent build of Spark 1.5: java.io.IOException: Unable to acquire 67108864 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:348) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:398) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126) The issue appears when joining 2 datasets. One with 6084 records, the other one with 200 records. I'm expecting to receive 200 records in the result. I'm using a homemade build prepared from branch-1.5 with commit ID eedb996. I have run mvn -DskipTests clean install to generate that build. Apart from that, I'm using Java 1.7.0_51 and Maven 3.3.3. I've prepared a test case that can be built and executed very easily (data files are included in the repo): https://github.com/aseigneurin/spark-testcase One thing to note is that the issue arises when the master is set to local[*] but not when set to local. Both options work without problem with Spark 1.4, though. Any help will be greatly appreciated! Many thanks, Alexis
Re: Is it worth storing in ORC for one time read. And can be replace hive with HBase
Yes you should use orc it is much faster and more compact. Additionally you can apply compression (snappy) to increase performance. Your data processing pipeline seems to be not.very optimized. You should use the newest hive version enabling storage indexes and bloom filters on appropriate columns. Ideally you should insert the data sorted appropriately. Partitioning and setting the execution engine to tez is also beneficial. Hbase with phoenix should currently only be used if you do few joins, not very complex queries and not many full table scans. Le jeu. 6 août 2015 à 14:54, venkatesh b venkateshmailingl...@gmail.com a écrit : Hi, here I got two things to know. FIRST: In our project we use hive. We daily get new data. We need to process this new data only once. And send this processed data to RDBMS. Here in processing we majorly use many complex queries with joins with where condition and grouping functions. There are many intermediate tables generated around 50 while processing. Till now we use text format as storage. We came across ORC file format. I would like to know that since it is one Time querying the table is it worth of storing as ORC format. SECOND: I came to know about HBase, which is faster. Can I replace hive with HBase for processing of data daily faster. Currently it is taking 15hrs daily with hive. Please inform me if any other information is needed. Thanks regards Venkatesh
Spark-submit not finding main class and the error reflects different path to jar file than specified
Given the following command line to spark-submit: bin/spark-submit --verbose --master local[2]--class org.yardstick.spark.SparkCoreRDDBenchmark /shared/ysgood/target/yardstick-spark-uber-0.0.1.jar Here is the output: NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of assembly. Using properties file: /shared/spark-1.4.1/conf/spark-defaults.conf Adding default property: spark.akka.askTimeout=180 Adding default property: spark.master=spark://mellyrn.local:7077 Error: Cannot load main class from JAR file:/shared/spark-1.4.1/org.yardstick.spark.SparkCoreRDDBenchmark Run with --help for usage help or --verbose for debug output The path file:/shared/spark-1.4.1/org.yardstick.spark.SparkCoreRDDBenchmark does not seem to make sense. It does not reflect the path to the file that was specified on the submit-spark command line. Note: when attempting to run that jar file via java -classpath shared/ysgood/target/yardstick-spark-uber-0.0.1.jar org.yardstick.spark.SparkCoreRDDBenchmark Then the result is as expected: the main class starts to load and then there is a NoClassDefFoundException on the SparkConf.classs (which is not inside the jar). This shows the app jar is healthy.
Re: Starting Spark SQL thrift server from within a streaming app
Well the creation of a thrift server would be to allow external access to the data from JDBC / ODBC type connections. The sparkstreaming-sql leverages a standard spark sql context and then provides a means of converting an incoming dstream into a row, look at the MessageToRow trait in KafkaSource class. The example, org.apache.spark.sql.streaming.examples.KafkaDDL should make it clear; I think. -Todd On Thu, Aug 6, 2015 at 7:58 AM, Daniel Haviv daniel.ha...@veracity-group.com wrote: Thank you Todd, How is the sparkstreaming-sql project different from starting a thrift server on a streaming app ? Thanks again. Daniel On Thu, Aug 6, 2015 at 1:53 AM, Todd Nist tsind...@gmail.com wrote: Hi Danniel, It is possible to create an instance of the SparkSQL Thrift server, however seems like this project is what you may be looking for: https://github.com/Intel-bigdata/spark-streamingsql Not 100% sure of your use case is, but you can always convert the data into DF then issue a query against it. If you want other systems to be able to query it then there are numerous connectors to store data into Hive, Cassandra, HBase, ElasticSearch, To create a instance of a thrift server with its own SQL Context you would do something like the following: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveMetastoreTypes._ import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.thriftserver._ object MyThriftServer { val sparkConf = new SparkConf() // master is passed to spark-submit, but could also be specified explicitely // .setMaster(sparkMaster) .setAppName(My ThriftServer) .set(spark.cores.max, 2) val sc = new SparkContext(sparkConf) val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ import sqlContext.implicits._ makeRDD((1,hello) :: (2,world) ::Nil).toDF.cache().registerTempTable(t) HiveThriftServer2.startWithContext(sqlContext) } Again, I'm not really clear what your use case is, but it does sound like the first link above is what you may want. -Todd On Wed, Aug 5, 2015 at 1:57 PM, Daniel Haviv daniel.ha...@veracity-group.com wrote: Hi, Is it possible to start the Spark SQL thrift server from with a streaming app so the streamed data could be queried as it's goes in ? Thank you. Daniel
Is it worth storing in ORC for one time read. And can be replace hive with HBase
Hi, here I got two things to know. FIRST: In our project we use hive. We daily get new data. We need to process this new data only once. And send this processed data to RDBMS. Here in processing we majorly use many complex queries with joins with where condition and grouping functions. There are many intermediate tables generated around 50 while processing. Till now we use text format as storage. We came across ORC file format. I would like to know that since it is one Time querying the table is it worth of storing as ORC format. SECOND: I came to know about HBase, which is faster. Can I replace hive with HBase for processing of data daily faster. Currently it is taking 15hrs daily with hive. Please inform me if any other information is needed. Thanks regards Venkatesh
Re: Is it worth storing in ORC for one time read. And can be replace hive with HBase
Additionally it is of key importance to use the right data types for the columns. Use int for ids, int or decimal or float or double etc for numeric values etc. - A bad data model using varchars and string where not appropriate is a significant bottle neck. Furthermore include partition columns in join statements (not where) otherwise you do a full table scan ignoring partitions Le jeu. 6 août 2015 à 15:07, Jörn Franke jornfra...@gmail.com a écrit : Yes you should use orc it is much faster and more compact. Additionally you can apply compression (snappy) to increase performance. Your data processing pipeline seems to be not.very optimized. You should use the newest hive version enabling storage indexes and bloom filters on appropriate columns. Ideally you should insert the data sorted appropriately. Partitioning and setting the execution engine to tez is also beneficial. Hbase with phoenix should currently only be used if you do few joins, not very complex queries and not many full table scans. Le jeu. 6 août 2015 à 14:54, venkatesh b venkateshmailingl...@gmail.com a écrit : Hi, here I got two things to know. FIRST: In our project we use hive. We daily get new data. We need to process this new data only once. And send this processed data to RDBMS. Here in processing we majorly use many complex queries with joins with where condition and grouping functions. There are many intermediate tables generated around 50 while processing. Till now we use text format as storage. We came across ORC file format. I would like to know that since it is one Time querying the table is it worth of storing as ORC format. SECOND: I came to know about HBase, which is faster. Can I replace hive with HBase for processing of data daily faster. Currently it is taking 15hrs daily with hive. Please inform me if any other information is needed. Thanks regards Venkatesh
Re: How can I know currently supported functions in Spark SQL
They are covered here in the docs: http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$ On Thu, Aug 6, 2015 at 5:52 AM, Netwaver wanglong_...@163.com wrote: Hi All, I am using Spark 1.4.1, and I want to know how can I find the complete function list supported in Spark SQL, currently I only know 'sum','count','min','max'. Thanks a lot.
Re: Starting Spark SQL thrift server from within a streaming app
Thank you Todd, How is the sparkstreaming-sql project different from starting a thrift server on a streaming app ? Thanks again. Daniel On Thu, Aug 6, 2015 at 1:53 AM, Todd Nist tsind...@gmail.com wrote: Hi Danniel, It is possible to create an instance of the SparkSQL Thrift server, however seems like this project is what you may be looking for: https://github.com/Intel-bigdata/spark-streamingsql Not 100% sure of your use case is, but you can always convert the data into DF then issue a query against it. If you want other systems to be able to query it then there are numerous connectors to store data into Hive, Cassandra, HBase, ElasticSearch, To create a instance of a thrift server with its own SQL Context you would do something like the following: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.HiveMetastoreTypes._ import org.apache.spark.sql.types._ import org.apache.spark.sql.hive.thriftserver._ object MyThriftServer { val sparkConf = new SparkConf() // master is passed to spark-submit, but could also be specified explicitely // .setMaster(sparkMaster) .setAppName(My ThriftServer) .set(spark.cores.max, 2) val sc = new SparkContext(sparkConf) val sparkContext = sc import sparkContext._ val sqlContext = new HiveContext(sparkContext) import sqlContext._ import sqlContext.implicits._ makeRDD((1,hello) :: (2,world) ::Nil).toDF.cache().registerTempTable(t) HiveThriftServer2.startWithContext(sqlContext) } Again, I'm not really clear what your use case is, but it does sound like the first link above is what you may want. -Todd On Wed, Aug 5, 2015 at 1:57 PM, Daniel Haviv daniel.ha...@veracity-group.com wrote: Hi, Is it possible to start the Spark SQL thrift server from with a streaming app so the streamed data could be queried as it's goes in ? Thank you. Daniel
Re: How can I know currently supported functions in Spark SQL
Have you looked at this? http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.functions$ On Aug 6, 2015, at 2:52 AM, Netwaver wanglong_...@163.com wrote: Hi All, I am using Spark 1.4.1, and I want to know how can I find the complete function list supported in Spark SQL, currently I only know 'sum','count','min','max'. Thanks a lot.
Re: Combining Spark Files with saveAsTextFile
Hi,Try using coalesce(1) before calling saveAsTextFile() Thanks Regards, Meethu M On Wednesday, 5 August 2015 7:53 AM, Brandon White bwwintheho...@gmail.com wrote: What is the best way to make saveAsTextFile save as only a single file?
Re: Spark Kinesis Checkpointing/Processing Delay
Hi, I actually run into the same problem although our endpoint is not ElasticSearch. When the spark job is dead, we lose some data because Kinesis checkpoint is already beyond the last point that spark is processed. Currently, our workaround is to use spark's checkpoint mechanism with write ahead log (WAL) https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications Using checkpointing comes with some disadvantage like application code is not upgradable, etc. I believe there is some work to fix this problem like Kafka direct API. Not sure if this is it : https://issues.apache.org/jira/browse/SPARK-9215 Thanks, Patanachai On 08/06/2015 12:08 PM, phibit wrote: Hi! I'm using Spark + Kinesis ASL to process and persist stream data to ElasticSearch. For the most part it works nicely. There is a subtle issue I'm running into about how failures are handled. For example's sake, let's say I am processing a Kinesis stream that produces 400 records per second, continuously. Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream consumer to use TRIM_HORIZON, to mean go as far back as possible and start processing the stream data as quickly as possible, until you catch up to the tip of the stream. This means that for some period of time, Spark will suck in data from Kinesis as quickly as it can, let's say at 5000 records per second. In my specific case, ElasticSearch can gracefully handle 400 writes per second, but is NOT happy to process 5000 writes per second. Let's say it only handles 2000 wps. This means that the processing time will exceed the batch time, scheduling delay in the stream will rise consistently and batches of data will get backlogged for some period of time. In normal circumstances, this is fine. When the Spark consumers catch up to real-time, the data input rate slows to 400rps and the backlogged batches eventually get flushed to ES. The system stabilizes. However! It appears to me that the Kinesis consumer actively submits checkpoints, even though the records may not have been processed yet (since they are backlogged). If for some reason there is processing delay and the Spark process crashes, the checkpoint will have advanced too far. If I resume the Spark Streaming process, there is essentially a gap in my ElasticSearch data. In principle I understand the reason for this, but is there a way to adjust this behavior? Or is there another way to handle this specific problem? Ideally I would be able to configure the process to only submit Kinesis checkpoints only after my data is successfully written to ES. Thanks, Phil -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kinesis-Checkpointing-Processing-Delay-tp24157.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Patanachai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkR -Graphx
+Xiangrui I am not sure exposing the entire GraphX API would make sense as it contains a lot of low level functions. However we could expose some high level functions like PageRank etc. Xiangrui, who has been working on similar techniques to expose MLLib functions like GLM might have more to add. Thanks Shivaram On Thu, Aug 6, 2015 at 6:21 AM, smagadi sudhindramag...@fico.com wrote: Wanted to use the GRaphX from SparkR , is there a way to do it ?.I think as of now it is not possible.I was thinking if one can write a wrapper in R that can call Scala Graphx libraries . Any thought on this please. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-tp24152.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Specifying the role when launching an AWS spark cluster using spark_ec2
There's no support for IAM roles in the s3n:// client code in Apache Hadoop ( HADOOP-9384 ); Amazon's modified EMR distro may have it.. The s3a filesystem adds it, —this is ready for production use in Hadoop 2.7.1+ (implicitly HDP 2.3; CDH 5.4 has cherrypicked the relevant patches.) I don't know about the spark_ec2 scripts or what they start On 6 Aug 2015, at 10:27, SK skrishna...@gmail.com wrote: Hi, I need to access data on S3 from another account and I have been given the IAM role information to access that S3 bucket. From what I understand, AWS allows us to attach a role to a resource at the time it is created. However, I don't see an option for specifying the role using the spark_ec2.py script. So I created a spark cluster using the default role, but I was not able to change its IAM role after creation through AWS console. I see a ticket for this issue: https://github.com/apache/spark/pull/6962 and the status is closed. If anyone knows how I can specify the role using spark_ec2.py, please let me know. I am using spark 1.4.1. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-the-role-when-launching-an-AWS-spark-cluster-using-spark-ec2-tp24154.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Very high latency to initialize a DataFrame from partitioned parquet database.
With DEBUG, the log output was over 10MB, so I opted for just INFO output. The (sanitized) log is attached. The driver is essentially this code: info(A) val t = System.currentTimeMillis val df = sqlContext.read.parquet(dir).select(...).cache val elapsed = System.currentTimeMillis - t info(sInit time: ${elapsed} ms) We've also observed that it is very slow to read the contents of the parquet files. My colleague wrote a PySpark application that gets the list of files, parallelizes it, maps across it and reads each file manually using a C parquet library, and aggregates manually in the loop. Ignoring the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame query in Scala, his is an order of magnitude faster. Since he is parallelizing the work through Spark, and that isn't causing any performance issues, it seems to be a problem with the parquet reader. I may try to do what he did to construct a DataFrame manually, and see if I can query it with Spark SQL with reasonable performance. - Philip On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian lian.cs@gmail.com wrote: Would you mind to provide the driver log? On 8/6/15 3:58 PM, Philip Weaver wrote: I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried again. The initialization time is about 1 minute now, which is still pretty terrible. On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com wrote: Absolutely, thanks! On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com lian.cs@gmail.com wrote: We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396 Could you give it a shot to see whether it helps in your case? We've observed ~50x performance boost with schema merging turned on. Cheng On 8/6/15 8:26 AM, Philip Weaver wrote: I have a parquet directory that was produced by partitioning by two keys, e.g. like this: df.write.partitionBy(a, b).parquet(asdf) There are 35 values of a, and about 1100-1200 values of b for each value of a, for a total of over 40,000 partitions. Before running any transformations or actions on the DataFrame, just initializing it like this takes *2 minutes*: val df = sqlContext.read.parquet(asdf) Is this normal? Is this because it is doing some bookeeping to discover all the partitions? Is it perhaps having to merge the schema from each partition? Would you expect it to get better or worse if I subpartition by another key? - Philip 10:51:42 INFO spark.SparkContext: Running Spark version 1.5.0-SNAPSHOT 10:51:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 10:51:42 INFO spark.SecurityManager: Changing view acls to: pweaver 10:51:42 INFO spark.SecurityManager: Changing modify acls to: pweaver 10:51:42 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pweaver); users with modify permissions: Set(pweaver) 10:51:43 INFO slf4j.Slf4jLogger: Slf4jLogger started 10:51:43 INFO Remoting: Starting remoting 10:51:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.26.21.70:51400] 10:51:43 INFO util.Utils: Successfully started service 'sparkDriver' on port 51400. 10:51:43 INFO spark.SparkEnv: Registering MapOutputTracker 10:51:43 INFO spark.SparkEnv: Registering BlockManagerMaster 10:51:43 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-04438917-93ee-45f3-bc10-c5f5eb3d6a4a 10:51:43 INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB 10:51:43 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-faec22af-bb2d-4fae-8a02-b8ca67867858/httpd-50939810-7da7-42d9-9342-48d9dc2705dc 10:51:43 INFO spark.HttpServer: Starting HTTP Server 10:51:43 INFO server.Server: jetty-8.y.z-SNAPSHOT 10:51:43 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55227 10:51:43 INFO util.Utils: Successfully started service 'HTTP file server' on port 55227. 10:51:43 INFO spark.SparkEnv: Registering OutputCommitCoordinator 10:51:43 INFO server.Server: jetty-8.y.z-SNAPSHOT 10:51:43 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 10:51:43 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 10:51:43 INFO ui.SparkUI: Started SparkUI at http://172.26.21.70:4040 10:51:43 INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark-assembly-1.0-deps.jar at http://172.26.21.70:55227/jars/linear_spark-assembly-1.0-deps.jar with timestamp 1438883503937 10:51:43 INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark_2.11-1.0.jar at http://172.26.21.70:55227/jars/linear_spark_2.11-1.0.jar with timestamp 1438883503940 10:51:44 WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 10:51:44 INFO mesos.CoarseMesosSchedulerBackend:
Spark Kinesis Checkpointing/Processing Delay
Hi! I'm using Spark + Kinesis ASL to process and persist stream data to ElasticSearch. For the most part it works nicely. There is a subtle issue I'm running into about how failures are handled. For example's sake, let's say I am processing a Kinesis stream that produces 400 records per second, continuously. Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream consumer to use TRIM_HORIZON, to mean go as far back as possible and start processing the stream data as quickly as possible, until you catch up to the tip of the stream. This means that for some period of time, Spark will suck in data from Kinesis as quickly as it can, let's say at 5000 records per second. In my specific case, ElasticSearch can gracefully handle 400 writes per second, but is NOT happy to process 5000 writes per second. Let's say it only handles 2000 wps. This means that the processing time will exceed the batch time, scheduling delay in the stream will rise consistently and batches of data will get backlogged for some period of time. In normal circumstances, this is fine. When the Spark consumers catch up to real-time, the data input rate slows to 400rps and the backlogged batches eventually get flushed to ES. The system stabilizes. However! It appears to me that the Kinesis consumer actively submits checkpoints, even though the records may not have been processed yet (since they are backlogged). If for some reason there is processing delay and the Spark process crashes, the checkpoint will have advanced too far. If I resume the Spark Streaming process, there is essentially a gap in my ElasticSearch data. In principle I understand the reason for this, but is there a way to adjust this behavior? Or is there another way to handle this specific problem? Ideally I would be able to configure the process to only submit Kinesis checkpoints only after my data is successfully written to ES. Thanks, Phil -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kinesis-Checkpointing-Processing-Delay-tp24157.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SparkR -Graphx
Wanted to use the GRaphX from SparkR , is there a way to do it ?.I think as of now it is not possible.I was thinking if one can write a wrapper in R that can call Scala Graphx libraries . Any thought on this please. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-tp24152.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it worth storing in ORC for one time read. And can be replace hive with HBase
I'm really sorry, by mistake I posted in spark mailing list. Jorn Frankie Thanks for your reply. I have many joins, many complex queries and all are table scans. So I think HBase do not work for me. On Thursday, August 6, 2015, Jörn Franke jornfra...@gmail.com wrote: Additionally it is of key importance to use the right data types for the columns. Use int for ids, int or decimal or float or double etc for numeric values etc. - A bad data model using varchars and string where not appropriate is a significant bottle neck. Furthermore include partition columns in join statements (not where) otherwise you do a full table scan ignoring partitions Le jeu. 6 août 2015 à 15:07, Jörn Franke jornfra...@gmail.com javascript:_e(%7B%7D,'cvml','jornfra...@gmail.com'); a écrit : Yes you should use orc it is much faster and more compact. Additionally you can apply compression (snappy) to increase performance. Your data processing pipeline seems to be not.very optimized. You should use the newest hive version enabling storage indexes and bloom filters on appropriate columns. Ideally you should insert the data sorted appropriately. Partitioning and setting the execution engine to tez is also beneficial. Hbase with phoenix should currently only be used if you do few joins, not very complex queries and not many full table scans. Le jeu. 6 août 2015 à 14:54, venkatesh b venkateshmailingl...@gmail.com javascript:_e(%7B%7D,'cvml','venkateshmailingl...@gmail.com'); a écrit : Hi, here I got two things to know. FIRST: In our project we use hive. We daily get new data. We need to process this new data only once. And send this processed data to RDBMS. Here in processing we majorly use many complex queries with joins with where condition and grouping functions. There are many intermediate tables generated around 50 while processing. Till now we use text format as storage. We came across ORC file format. I would like to know that since it is one Time querying the table is it worth of storing as ORC format. SECOND: I came to know about HBase, which is faster. Can I replace hive with HBase for processing of data daily faster. Currently it is taking 15hrs daily with hive. Please inform me if any other information is needed. Thanks regards Venkatesh
Re: subscribe
Welcome aboard! Thanks Best Regards On Thu, Aug 6, 2015 at 11:21 AM, Franc Carter franc.car...@rozettatech.com wrote: subscribe
Re: subscribe
See http://spark.apache.org/community.html Cheers On Aug 5, 2015, at 10:51 PM, Franc Carter franc.car...@rozettatech.com wrote: subscribe
Re: Enum values in custom objects mess up RDD operations
Thanks a lot Igor, the following hashCode function is stable: @Override public int hashCode() { int hash = 5; hash = 41 * hash + this.myEnum.ordinal(); return hash; } For anyone having the same problem: http://tech.technoflirt.com/2014/08/21/issues-with-java-enum-hashcode/ Cheers, Sebastian Igor Berman igor.ber...@gmail.com schrieb am Do., 6. Aug. 2015 um 10:59 Uhr: enums hashcode is jvm instance specific(ie. different jvms will give you different values), so you can use ordinal in hashCode computation or use hashCode on enums ordinal as part of hashCode computation On 6 August 2015 at 11:41, Warfish sebastian.ka...@gmail.com wrote: Hi everyone, I was working with Spark for a little while now and have encountered a very strange behaviour that caused me a lot of headaches: I have written my own POJOs to encapsulate my data and this data is held in some JavaRDDs. Part of these POJOs is a member variable of a custom enum type. Whenever I do some operations on these RDDs such as subtract, groupByKey, reduce or similar things, the results are inconsistent and non-sensical. However, this happens only when the application runs in standalone cluster mode (10 nodes). When running locally on my developer machine, the code executes just fine. If you want to reproduce this behaviour, here http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip is the complete Maven project that you can run out of the box. I am running Spark 1.4.0 and submitting the application using /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar Consider the following code for my custom object: package de.spark.test; import java.io.Serializable; import java.util.Objects; public class MyObject implements Serializable { private MyEnum myEnum; public MyObject(MyEnum myEnum) { this.myEnum = myEnum; } public MyEnum getMyEnum() { return myEnum; } public void setMyEnum(MyEnum myEnum) { this.myEnum = myEnum; } @Override public int hashCode() { int hash = 5; hash = 41 * hash + Objects.hashCode(this.myEnum); return hash; } @Override public boolean equals(Object obj) { if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } final MyObject other = (MyObject) obj; if (this.myEnum != other.myEnum) { return false; } return true; } @Override public String toString() { return MyObject{ + myEnum= + myEnum + '}'; } } As you can see, I have overriden equals() and hashCode() (both are auto-generated). The enum is given as follows: package de.spark.test; import java.io.Serializable; public enum MyEnum implements Serializable { VALUE1, VALUE2 } The main() method is defined by: package de.spark.test; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class Main { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName(Spark Test) .setMaster(myMaster); JavaSparkContext jsc = new JavaSparkContext(conf); System.out.println(/// Object generation); ListMyObject l1 = new ArrayList(); for(int i = 0; i 1000; i++) { l1.add(new MyObject(MyEnum.VALUE1)); } JavaRDDMyObject myObjectRDD1 = jsc.parallelize(l1); JavaRDDMyObject myObjectRDD2 = jsc.parallelize(l1); System.out.println(myObjectRDD1 count = + myObjectRDD1.count()); System.out.println(myObjectRDD2 count = + myObjectRDD2.count()); System.out.println(/// Distinct); JavaRDDMyObject myObjectRDD1Distinct = myObjectRDD1.distinct(); JavaRDDMyObject myObjectRDD2Distinct = myObjectRDD2.distinct(); System.out.println(myObjectRDD1Distinct count = + myObjectRDD1Distinct.count()); System.out.println(myObjectRDD2Distinct count = + myObjectRDD2Distinct.count()); System.out.println(/// Subtract); JavaRDDMyObject myObjectRDD1Minus1 = myObjectRDD1.subtract(myObjectRDD1); JavaRDDMyObject myObjectRDD1Minus2 = myObjectRDD1.subtract(myObjectRDD2); JavaRDDMyObject myObjectRDD2Minus1 = myObjectRDD2.subtract(myObjectRDD1); System.out.println(myObjectRDD1Minus1 count= + myObjectRDD1Minus1.count()); System.out.println(myObjectRDD1Minus2 count= + myObjectRDD1Minus2.count()); System.out.println(myObjectRDD2Minus1 count= + myObjectRDD2Minus1.count());
Re: Pause Spark Streaming reading or sampling streaming data
Hi, - yes - it's great that you wrote it yourself - it means you have more control. I have the feeling that the most efficient point to discard as much data as possible - or even modify your subscription protocol to - your spark input source - not even receive the other 50 seconds of data is the most efficient point. After you deliver data to DStream - you might filter them as much as you want - but you will still be subject to garbage collection and/or potential shuffles/and HDD checkpoints. On Thu, Aug 6, 2015 at 1:31 AM, Heath Guo heath...@fb.com wrote: Hi Dimitris, Thanks for your reply. Just wondering – are you asking about my streaming input source? I implemented a custom receiver and have been using that. Thanks. From: Dimitris Kouzis - Loukas look...@gmail.com Date: Wednesday, August 5, 2015 at 5:27 PM To: Heath Guo heath...@fb.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Pause Spark Streaming reading or sampling streaming data What driver do you use? Sounds like something you should do before the driver... On Thu, Aug 6, 2015 at 12:50 AM, Heath Guo heath...@fb.com wrote: Hi, I have a question about sampling Spark Streaming data, or getting part of the data. For every minute, I only want the data read in during the first 10 seconds, and discard all data in the next 50 seconds. Is there any way to pause reading and discard data in that period? I'm doing this to sample from a stream of huge amount of data, which saves processing time in the real-time program. Thanks!
Re: Pause Spark Streaming reading or sampling streaming data
Re-reading your description - I guess you could potentially make your input source to connect for 10 seconds, pause for 50 and then reconnect. On Thu, Aug 6, 2015 at 10:32 AM, Dimitris Kouzis - Loukas look...@gmail.com wrote: Hi, - yes - it's great that you wrote it yourself - it means you have more control. I have the feeling that the most efficient point to discard as much data as possible - or even modify your subscription protocol to - your spark input source - not even receive the other 50 seconds of data is the most efficient point. After you deliver data to DStream - you might filter them as much as you want - but you will still be subject to garbage collection and/or potential shuffles/and HDD checkpoints. On Thu, Aug 6, 2015 at 1:31 AM, Heath Guo heath...@fb.com wrote: Hi Dimitris, Thanks for your reply. Just wondering – are you asking about my streaming input source? I implemented a custom receiver and have been using that. Thanks. From: Dimitris Kouzis - Loukas look...@gmail.com Date: Wednesday, August 5, 2015 at 5:27 PM To: Heath Guo heath...@fb.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Pause Spark Streaming reading or sampling streaming data What driver do you use? Sounds like something you should do before the driver... On Thu, Aug 6, 2015 at 12:50 AM, Heath Guo heath...@fb.com wrote: Hi, I have a question about sampling Spark Streaming data, or getting part of the data. For every minute, I only want the data read in during the first 10 seconds, and discard all data in the next 50 seconds. Is there any way to pause reading and discard data in that period? I'm doing this to sample from a stream of huge amount of data, which saves processing time in the real-time program. Thanks!
Aggregate by timestamp from json message
Hi team, I am very new to SPARK, actually today is my first day. I have a nested json string which contains timestamp and lot of other details. I have json messages from which I need to write multiple aggregation but for now I need to write one aggregation. If code structure is already there then kindly post if or give some pointers to start. Quick inputs will help me lot. Sample Requirement example: *Requirement: How many Stock dispatched in last 1 hour* [ { name:Stock dispatched, *timestamp:2015-04-14T10:03:10.000Z,* component:Work Order, sessionID:4324--52-3-52-46-3-46-3-75, properties:{ Priority:3, Appliance Manufacturer:XXX game, Appliance Model:HJ 10, Appliance Model Year:2012 } }, { name:Stock dispatched, * timestamp:2015-04-14T10:04:10.000Z,* component:Work Order, sessionID:4324--52-3-52-46-3-46-3-75, properties:{ Priority:3, Appliance Manufacturer:XXX game, Appliance Model:DJ 15, Appliance Model Year:2012 } } ] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-by-timestamp-from-json-message-tp24147.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: No Twitter Input from Kafka to Spark Streaming
You just pasted your twitter credentials, consider changing it. :/ Thanks Best Regards On Wed, Aug 5, 2015 at 10:07 PM, narendra narencs...@gmail.com wrote: Thanks Akash for the answer. I added endpoint to the listener and now it is working. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Very high latency to initialize a DataFrame from partitioned parquet database.
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried again. The initialization time is about 1 minute now, which is still pretty terrible. On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com wrote: Absolutely, thanks! On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote: We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396 Could you give it a shot to see whether it helps in your case? We've observed ~50x performance boost with schema merging turned on. Cheng On 8/6/15 8:26 AM, Philip Weaver wrote: I have a parquet directory that was produced by partitioning by two keys, e.g. like this: df.write.partitionBy(a, b).parquet(asdf) There are 35 values of a, and about 1100-1200 values of b for each value of a, for a total of over 40,000 partitions. Before running any transformations or actions on the DataFrame, just initializing it like this takes *2 minutes*: val df = sqlContext.read.parquet(asdf) Is this normal? Is this because it is doing some bookeeping to discover all the partitions? Is it perhaps having to merge the schema from each partition? Would you expect it to get better or worse if I subpartition by another key? - Philip
Multiple Thrift servers on one Spark cluster
Hi, Is there a way to instantiate multiple Thrift servers on one Spark Cluster? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Thrift-servers-on-one-Spark-cluster-tp24148.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Enum values in custom objects mess up RDD operations
Hi everyone, I was working with Spark for a little while now and have encountered a very strange behaviour that caused me a lot of headaches: I have written my own POJOs to encapsulate my data and this data is held in some JavaRDDs. Part of these POJOs is a member variable of a custom enum type. Whenever I do some operations on these RDDs such as subtract, groupByKey, reduce or similar things, the results are inconsistent and non-sensical. However, this happens only when the application runs in standalone cluster mode (10 nodes). When running locally on my developer machine, the code executes just fine. If you want to reproduce this behaviour, here http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip is the complete Maven project that you can run out of the box. I am running Spark 1.4.0 and submitting the application using /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar Consider the following code for my custom object: package de.spark.test; import java.io.Serializable; import java.util.Objects; public class MyObject implements Serializable { private MyEnum myEnum; public MyObject(MyEnum myEnum) { this.myEnum = myEnum; } public MyEnum getMyEnum() { return myEnum; } public void setMyEnum(MyEnum myEnum) { this.myEnum = myEnum; } @Override public int hashCode() { int hash = 5; hash = 41 * hash + Objects.hashCode(this.myEnum); return hash; } @Override public boolean equals(Object obj) { if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } final MyObject other = (MyObject) obj; if (this.myEnum != other.myEnum) { return false; } return true; } @Override public String toString() { return MyObject{ + myEnum= + myEnum + '}'; } } As you can see, I have overriden equals() and hashCode() (both are auto-generated). The enum is given as follows: package de.spark.test; import java.io.Serializable; public enum MyEnum implements Serializable { VALUE1, VALUE2 } The main() method is defined by: package de.spark.test; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class Main { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName(Spark Test) .setMaster(myMaster); JavaSparkContext jsc = new JavaSparkContext(conf); System.out.println(/// Object generation); ListMyObject l1 = new ArrayList(); for(int i = 0; i 1000; i++) { l1.add(new MyObject(MyEnum.VALUE1)); } JavaRDDMyObject myObjectRDD1 = jsc.parallelize(l1); JavaRDDMyObject myObjectRDD2 = jsc.parallelize(l1); System.out.println(myObjectRDD1 count = + myObjectRDD1.count()); System.out.println(myObjectRDD2 count = + myObjectRDD2.count()); System.out.println(/// Distinct); JavaRDDMyObject myObjectRDD1Distinct = myObjectRDD1.distinct(); JavaRDDMyObject myObjectRDD2Distinct = myObjectRDD2.distinct(); System.out.println(myObjectRDD1Distinct count = + myObjectRDD1Distinct.count()); System.out.println(myObjectRDD2Distinct count = + myObjectRDD2Distinct.count()); System.out.println(/// Subtract); JavaRDDMyObject myObjectRDD1Minus1 = myObjectRDD1.subtract(myObjectRDD1); JavaRDDMyObject myObjectRDD1Minus2 = myObjectRDD1.subtract(myObjectRDD2); JavaRDDMyObject myObjectRDD2Minus1 = myObjectRDD2.subtract(myObjectRDD1); System.out.println(myObjectRDD1Minus1 count= + myObjectRDD1Minus1.count()); System.out.println(myObjectRDD1Minus2 count= + myObjectRDD1Minus2.count()); System.out.println(myObjectRDD2Minus1 count= + myObjectRDD2Minus1.count()); System.out.println(/// End); } } Both RDDs contain 1000 exactly equal objects, one would expect each call of distinct() to result in 1 and subtract(JavaRDDMyObject) to result in empty RDDs. However here is some sample output: /// Object generation myObjectRDD1 count = 1000 myObjectRDD2 count = 1000 /// Distinct myObjectRDD1Distinct count = 1 myObjectRDD2Distinct count = 2 /// Subtract myObjectRDD1Minus1 count= 500 myObjectRDD1Minus2 count= 0 myObjectRDD2Minus1 count= 0 /// End
Re: Enum values in custom objects mess up RDD operations
enums hashcode is jvm instance specific(ie. different jvms will give you different values), so you can use ordinal in hashCode computation or use hashCode on enums ordinal as part of hashCode computation On 6 August 2015 at 11:41, Warfish sebastian.ka...@gmail.com wrote: Hi everyone, I was working with Spark for a little while now and have encountered a very strange behaviour that caused me a lot of headaches: I have written my own POJOs to encapsulate my data and this data is held in some JavaRDDs. Part of these POJOs is a member variable of a custom enum type. Whenever I do some operations on these RDDs such as subtract, groupByKey, reduce or similar things, the results are inconsistent and non-sensical. However, this happens only when the application runs in standalone cluster mode (10 nodes). When running locally on my developer machine, the code executes just fine. If you want to reproduce this behaviour, here http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip is the complete Maven project that you can run out of the box. I am running Spark 1.4.0 and submitting the application using /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar Consider the following code for my custom object: package de.spark.test; import java.io.Serializable; import java.util.Objects; public class MyObject implements Serializable { private MyEnum myEnum; public MyObject(MyEnum myEnum) { this.myEnum = myEnum; } public MyEnum getMyEnum() { return myEnum; } public void setMyEnum(MyEnum myEnum) { this.myEnum = myEnum; } @Override public int hashCode() { int hash = 5; hash = 41 * hash + Objects.hashCode(this.myEnum); return hash; } @Override public boolean equals(Object obj) { if (obj == null) { return false; } if (getClass() != obj.getClass()) { return false; } final MyObject other = (MyObject) obj; if (this.myEnum != other.myEnum) { return false; } return true; } @Override public String toString() { return MyObject{ + myEnum= + myEnum + '}'; } } As you can see, I have overriden equals() and hashCode() (both are auto-generated). The enum is given as follows: package de.spark.test; import java.io.Serializable; public enum MyEnum implements Serializable { VALUE1, VALUE2 } The main() method is defined by: package de.spark.test; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class Main { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName(Spark Test) .setMaster(myMaster); JavaSparkContext jsc = new JavaSparkContext(conf); System.out.println(/// Object generation); ListMyObject l1 = new ArrayList(); for(int i = 0; i 1000; i++) { l1.add(new MyObject(MyEnum.VALUE1)); } JavaRDDMyObject myObjectRDD1 = jsc.parallelize(l1); JavaRDDMyObject myObjectRDD2 = jsc.parallelize(l1); System.out.println(myObjectRDD1 count = + myObjectRDD1.count()); System.out.println(myObjectRDD2 count = + myObjectRDD2.count()); System.out.println(/// Distinct); JavaRDDMyObject myObjectRDD1Distinct = myObjectRDD1.distinct(); JavaRDDMyObject myObjectRDD2Distinct = myObjectRDD2.distinct(); System.out.println(myObjectRDD1Distinct count = + myObjectRDD1Distinct.count()); System.out.println(myObjectRDD2Distinct count = + myObjectRDD2Distinct.count()); System.out.println(/// Subtract); JavaRDDMyObject myObjectRDD1Minus1 = myObjectRDD1.subtract(myObjectRDD1); JavaRDDMyObject myObjectRDD1Minus2 = myObjectRDD1.subtract(myObjectRDD2); JavaRDDMyObject myObjectRDD2Minus1 = myObjectRDD2.subtract(myObjectRDD1); System.out.println(myObjectRDD1Minus1 count= + myObjectRDD1Minus1.count()); System.out.println(myObjectRDD1Minus2 count= + myObjectRDD1Minus2.count()); System.out.println(myObjectRDD2Minus1 count= + myObjectRDD2Minus1.count()); System.out.println(/// End); } } Both RDDs contain 1000 exactly equal objects, one would expect each call of distinct() to result in 1 and subtract(JavaRDDMyObject) to result in empty RDDs. However here is some sample output: /// Object generation myObjectRDD1 count = 1000 myObjectRDD2
Re: spark hangs at broadcasting during a filter
Thanks. Repartitioning to a smaller number of partitions seems to fix my issue, but I'll keep broadcasting in mind (droprows is an integer array with about 4 million entries). On Wed, Aug 5, 2015 at 12:34 PM, Philip Weaver philip.wea...@gmail.com wrote: How big is droprows? Try explicitly broadcasting it like this: val broadcastDropRows = sc.broadcast(dropRows) val valsrows = ... .filter(x = !broadcastDropRows.value.contains(x._1)) - Philip On Wed, Aug 5, 2015 at 11:54 AM, AlexG swift...@gmail.com wrote: I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a matrix given as A_{ij} = v so I can convert it to a Parquet file. Only some of the rows of A are relevant, so the following code first loads the triplets are text, splits them into Tuple3[Int, Int, Double], drops triplets whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int, Double]]] for each row (if I'm judging datatypes correctly). val valsrows = sc.textFile(valsinpath).map(_.split(,)). map(x = (x(1).toInt, (x(0).toInt, x(2).toDouble))). filter(x = !droprows.contains(x._1)). groupByKey. map(x = (x._1, x._2.toSeq.sortBy(_._1))) Spark hangs during a broadcast that occurs during the filter step (according to the Spark UI). The last two lines in the log before it pauses are: 5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB) 15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB) I've left Spark running for up to 17 minutes one time, and it never continues past this point. I'm using a cluster of 30 r3.8xlarge EC2 instances (244Gb, 32 cores) with spark in standalone mode with 220G executor and driver memory, and using the kyroserializer. Any ideas on what could be causing this hang? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Temp file missing when training logistic regression
Hello, I am using the Python API to perform a grid search and train models using LogisticRegressionWithSGD. I am using r3.xl machines in EC2, running on top of YARN in cluster mode. The training RDD is persisted in memory and on disk. Some of the models train successfully, but then at some point during the grid search I get an error. It looks like the Python broadcast is looking for a part of the RDD which is no longer there. I scanned the logs for further errors but could not find anything. Any ideas of what could be causing this, and what should I be looking for? Many thanks. Cat model = LogisticRegressionWithSGD.train(the_training, iterations=i, regParam=c, miniBatchFraction=0.8) File /home/hadoop/spark/python/pyspark/mllib/classification.py, line 164, in train return _regression_train_wrapper(train, LogisticRegressionModel, data, initialWeights) File /home/hadoop/spark/python/pyspark/mllib/regression.py, line 140, in _regression_train_wrapper weights, intercept = train_func(data, _convert_to_vector(initial_weights)) File /home/hadoop/spark/python/pyspark/mllib/classification.py, line 162, in train bool(intercept)) File /home/hadoop/spark/python/pyspark/mllib/common.py, line 120, in callMLlibFunc return callJavaFunc(sc, api, *args) File /home/hadoop/spark/python/pyspark/mllib/common.py, line 113, in callJavaFunc return _java2py(sc, func(*args)) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ self.target_id, self.name) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value format(target_id, '.', name), value) Py4JJavaError: An error occurred while calling o271.trainLogisticRegressionModelWithSGD. : org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.FileNotFoundException: /mnt/spark/spark-b07b34f8-66c3-43ae-a3ed-0c291724409b/pyspark-4196e8e5-8024-4ec5-a7bb-a60b216e6e74/tmpbCjiSR (No such file or directory) java.io.FileInputStream.open(Native Method) java.io.FileInputStream.init(FileInputStream.java:146) org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply$mcJ$sp(PythonRDD.scala:848) org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply(PythonRDD.scala:847) org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply(PythonRDD.scala:847) org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153) org.apache.spark.api.python.PythonBroadcast.writeObject(PythonRDD.scala:847) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:606) java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110) org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1176) org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:79) org.apache.spark.storage.DiskStore.putArray(DiskStore.scala:64) org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1028) org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:419) org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:408) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) org.apache.spark.storage.MemoryStore.ensureFreeSpace(MemoryStore.scala:408) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:263) org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136) org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114) org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786) org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637) org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:991) org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98) org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
How to binarize data in spark
I have a set of data based on which I want to create a classification model. Each row has the following form: user1,class1,product1 user1,class1,product2 user1,class1,product5 user2,class1,product2 user2,class1,product5 user3,class2,product1 etc There are about 1M users, 2 classes, and 1M products. What I would like to do next is create the sparse vectors (something already supported by MLlib) BUT in order to apply that function I have to create the dense vectors (with the 0s), first. In other words, I have to binarize my data. What's the easiest (or most elegant) way of doing that? *// Adamantios*
Terminate streaming app on cluster restart
Hello, everyone! I have a case, when running standalone cluster: on master stop-all.sh/star-all.sh are invoked, streaming app loses all it's executors, but does not interrupt. Since it is a streaming app, expected to get it's results ASAP, an downtime is undesirable. Is there any workaround to solve that problem? Thanks a lot. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Very high latency to initialize a DataFrame from partitioned parquet database.
Would you mind to provide the driver log? On 8/6/15 3:58 PM, Philip Weaver wrote: I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried again. The initialization time is about 1 minute now, which is still pretty terrible. On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com mailto:philip.wea...@gmail.com wrote: Absolutely, thanks! On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396 Could you give it a shot to see whether it helps in your case? We've observed ~50x performance boost with schema merging turned on. Cheng On 8/6/15 8:26 AM, Philip Weaver wrote: I have a parquet directory that was produced by partitioning by two keys, e.g. like this: df.write.partitionBy(a, b).parquet(asdf) There are 35 values of a, and about 1100-1200 values of b for each value of a, for a total of over 40,000 partitions. Before running any transformations or actions on the DataFrame, just initializing it like this takes *2 minutes*: val df = sqlContext.read.parquet(asdf) Is this normal? Is this because it is doing some bookeeping to discover all the partitions? Is it perhaps having to merge the schema from each partition? Would you expect it to get better or worse if I subpartition by another key? - Philip