Re: Removing empty partitions before we write to HDFS

2015-08-06 Thread Patanachai Tangchaisin

Currently, I use rdd.isEmpty()

Thanks,
Patanachai


On 08/06/2015 12:02 PM, gpatcham wrote:

Is there a way to filter out empty partitions before I write to HDFS other
than using reparition and colasce ?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Patanachai


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: log4j.xml bundled in jar vs log4.properties in spark/conf

2015-08-06 Thread mlemay
I'm having the same problem here. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/log4j-xml-bundled-in-jar-vs-log4-properties-in-spark-conf-tp23923p24158.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: shutdown local hivecontext?

2015-08-06 Thread Cesar Flores
Well. I managed to solve that issue after running my tests on a linux
system instead of windows (which I was originally using). However, now I
have an error when I try to reset the hive context using hc.reset(). It
tries to create a file inside directory /user/my_user_name instead of the
usual linux path /home/my_user_name, which fails.



On Thu, Aug 6, 2015 at 3:12 PM, Cesar Flores ces...@gmail.com wrote:

 Well, I try this approach, and still have issues. Apparently TestHive can
 not delete the hive metastore directory. The complete error that I have is:

 15/08/06 15:01:29 ERROR Driver: FAILED: Execution Error, return code 1
 from org.apache.hadoop.hive.ql.exec.DDLTask.
 org.apache.hadoop.hive.ql.metadata.HiveException:
 java.lang.NullPointerException
 15/08/06 15:01:29 ERROR TestHive:
 ==
 HIVE FAILURE OUTPUT
 ==
 SET spark.sql.test=
 SET
 javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945;create=true
 SET
 hive.metastore.warehouse.dir=C:\cygwin64\tmp\sparkHiveWarehouse5264564710014125096
 FAILED: Execution Error, return code 1 from
 org.apache.hadoop.hive.ql.exec.DDLTask.
 org.apache.hadoop.hive.ql.metadata.HiveException:
 java.lang.NullPointerException

 ==
 END HIVE FAILURE OUTPUT
 ==

 [error] Uncaught exception when running
 com.dotomi.pipeline.utilitytransformers.SorterTransformerSuite:
 java.lang.ExceptionInInitializerError
 [trace] Stack trace suppressed: run last pipeline/test:testOnly for the
 full output.
 15/08/06 15:01:29 ERROR Utils: Exception while deleting Spark temp dir:
 C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945
 java.io.IOException: Failed to delete:
 C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945
 at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:932)
 at
 org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:181)
 at
 org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:179)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
 at
 org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply$mcV$sp(Utils.scala:179)
 at
 org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177)
 at
 org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177)
 at
 org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
 at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:177)

 Any new idea about how to avoid this error? I think the problem may be
 running the tests on sbt, as the created directories are locked until I
 exit the sbt command shell from where I run the tests. Please let me know
 if you have any other suggestion.


 Thanks

 On Mon, Aug 3, 2015 at 5:56 PM, Michael Armbrust mich...@databricks.com
 wrote:

 TestHive takes care of creating a temporary directory for each invocation
 so that multiple test runs won't conflict.

 On Mon, Aug 3, 2015 at 3:09 PM, Cesar Flores ces...@gmail.com wrote:


 We are using a local hive context in order to run unit tests. Our unit
 tests runs perfectly fine if we run why by one using sbt as the next
 example:

 sbt test-only com.company.pipeline.scalers.ScalerSuite.scala
 sbt test-only com.company.pipeline.labels.ActiveUsersLabelsSuite.scala

 However, if we try to run them as:

 sbt test-only com.company.pipeline.*

 we start to run into issues. It appears that the issue is that the hive
 context is not properly shutdown after finishing the first test. Does any
 one know how to attack this problem? The test part in my build.sbt file
 looks like:

 libraryDependencies += org.scalatest % scalatest_2.10 % 2.0 %
 test,
 parallelExecution in Test := false,
 fork := true,
 javaOptions ++= Seq(-Xms512M, -Xmx2048M, -XX:MaxPermSize=2048M,
 -XX:+CMSClassUnloadingEnabled)

 We are working under Spark 1.3.0


 Thanks
 --
 Cesar Flores





 --
 Cesar Flores




-- 
Cesar Flores


Spark Job Failed (Executor Lost then FS closed)

2015-08-06 Thread ๏̯͡๏
Code:
import java.text.SimpleDateFormat
import java.util.Calendar
import java.sql.Date
import org.apache.spark.storage.StorageLevel

def extract(array: Array[String], index: Integer) = {
  if (index  array.length) {
array(index).replaceAll(\, )
  } else {

  }
}


case class GuidSess(
  guid: String,
  sessionKey: String,
  sessionStartDate: String,
  siteId: String,
  eventCount: String,
  browser: String,
  browserVersion: String,
  operatingSystem: String,
  experimentChannel: String,
  deviceName: String)

val rowStructText =
sc.textFile(/user/zeppelin/guidsess/2015/08/05/part-m-1.gz)
val guidSessRDD = rowStructText.filter(s = s.length != 1).map(s =
s.split(,)).map(
  {
s =
  GuidSess(extract(s, 0),
extract(s, 1),
extract(s, 2),
extract(s, 3),
extract(s, 4),
extract(s, 5),
extract(s, 6),
extract(s, 7),
extract(s, 8),
extract(s, 9))
  })

val guidSessDF = guidSessRDD.toDF()
guidSessDF.registerTempTable(guidsess)

Once the temp table is created, i wrote this query

select siteid, count(distinct guid) total_visitor,
count(sessionKey) as total_visits
from guidsess
group by siteid

*Metrics:*

Data Size: 170 MB
Spark Version: 1.3.1
YARN: 2.7.x



Timeline:
There is 1 Job, 2 stages with 1 task each.

*1st Stage : mapPartitions*
[image: Inline image 1]

1st Stage: Task 1 started to fail. A second attempt started for 1st task of
first Stage. The first attempt failed Executor LOST
when i go to YARN resource manager and go to that particular host, i see
that its running fine.

*Attempt #1*
[image: Inline image 2]

*Attempt #2* Executor LOST AGAIN
[image: Inline image 3]
*Attempt 34*

*[image: Inline image 4]*



*2nd Stage runJob : SKIPPED*

*[image: Inline image 5]*

Any suggestions ?


-- 
Deepak


Re: shutdown local hivecontext?

2015-08-06 Thread Cesar Flores
Well, I try this approach, and still have issues. Apparently TestHive can
not delete the hive metastore directory. The complete error that I have is:

15/08/06 15:01:29 ERROR Driver: FAILED: Execution Error, return code 1 from
org.apache.hadoop.hive.ql.exec.DDLTask.
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.NullPointerException
15/08/06 15:01:29 ERROR TestHive:
==
HIVE FAILURE OUTPUT
==
SET spark.sql.test=
SET
javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945;create=true
SET
hive.metastore.warehouse.dir=C:\cygwin64\tmp\sparkHiveWarehouse5264564710014125096
FAILED: Execution Error, return code 1 from
org.apache.hadoop.hive.ql.exec.DDLTask.
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.NullPointerException

==
END HIVE FAILURE OUTPUT
==

[error] Uncaught exception when running
com.dotomi.pipeline.utilitytransformers.SorterTransformerSuite:
java.lang.ExceptionInInitializerError
[trace] Stack trace suppressed: run last pipeline/test:testOnly for the
full output.
15/08/06 15:01:29 ERROR Utils: Exception while deleting Spark temp dir:
C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945
java.io.IOException: Failed to delete:
C:\cygwin64\tmp\sparkHiveMetastore1376676777991703945
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:932)
at
org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:181)
at
org.apache.spark.util.Utils$$anon$4$$anonfun$run$1$$anonfun$apply$mcV$sp$2.apply(Utils.scala:179)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at
org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply$mcV$sp(Utils.scala:179)
at
org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177)
at
org.apache.spark.util.Utils$$anon$4$$anonfun$run$1.apply(Utils.scala:177)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at org.apache.spark.util.Utils$$anon$4.run(Utils.scala:177)

Any new idea about how to avoid this error? I think the problem may be
running the tests on sbt, as the created directories are locked until I
exit the sbt command shell from where I run the tests. Please let me know
if you have any other suggestion.


Thanks

On Mon, Aug 3, 2015 at 5:56 PM, Michael Armbrust mich...@databricks.com
wrote:

 TestHive takes care of creating a temporary directory for each invocation
 so that multiple test runs won't conflict.

 On Mon, Aug 3, 2015 at 3:09 PM, Cesar Flores ces...@gmail.com wrote:


 We are using a local hive context in order to run unit tests. Our unit
 tests runs perfectly fine if we run why by one using sbt as the next
 example:

 sbt test-only com.company.pipeline.scalers.ScalerSuite.scala
 sbt test-only com.company.pipeline.labels.ActiveUsersLabelsSuite.scala

 However, if we try to run them as:

 sbt test-only com.company.pipeline.*

 we start to run into issues. It appears that the issue is that the hive
 context is not properly shutdown after finishing the first test. Does any
 one know how to attack this problem? The test part in my build.sbt file
 looks like:

 libraryDependencies += org.scalatest % scalatest_2.10 % 2.0 %
 test,
 parallelExecution in Test := false,
 fork := true,
 javaOptions ++= Seq(-Xms512M, -Xmx2048M, -XX:MaxPermSize=2048M,
 -XX:+CMSClassUnloadingEnabled)

 We are working under Spark 1.3.0


 Thanks
 --
 Cesar Flores





-- 
Cesar Flores


Re: Removing empty partitions before we write to HDFS

2015-08-06 Thread Richard Marscher
Not that I'm aware of. We ran into the similar issue where we didn't want
to keep accumulating all these empty part files in storage on S3 or HDFS.
There didn't seem to be any performance free way to do it with an RDD, so
we just run a non-spark post-batch operation to delete empty files from the
write path.

On Thu, Aug 6, 2015 at 3:33 PM, Patanachai Tangchaisin patanac...@ipsy.com
wrote:

 Currently, I use rdd.isEmpty()

 Thanks,
 Patanachai



 On 08/06/2015 12:02 PM, gpatcham wrote:

 Is there a way to filter out empty partitions before I write to HDFS other
 than using reparition and colasce ?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Removing-empty-partitions-before-we-write-to-HDFS-tp24156.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


 --
 Patanachai



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


How can I know currently supported functions in Spark SQL

2015-08-06 Thread Netwaver
Hi All,
 I am using Spark 1.4.1, and I want to know how can I find the complete 
function list supported in Spark SQL, currently I only know 
'sum','count','min','max'. Thanks a lot.


how to stop twitter-spark streaming

2015-08-06 Thread Sadaf
Hi All,
i am working with spark streaming and twitter's user api.
i used this code to stop streaming

   ssc.addStreamingListener(new StreamingListener{
var count=1
override def onBatchCompleted(batchCompleted:
StreamingListenerBatchCompleted) {
count += 1
if(count=5)
{
  ssc.stop(true,true)
 }
}
  })
and also override onStop method in custom receiver to stop streaming. but it
gives the following exception

java.lang.NullPointerException: Inflater has been closed
at java.util.zip.Inflater.ensureOpen(Inflater.java:389)
at java.util.zip.Inflater.inflate(Inflater.java:257)
at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:152)
at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:116)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:154)
at java.io.BufferedReader.readLine(BufferedReader.java:317)
at java.io.BufferedReader.readLine(BufferedReader.java:382)
at 
twitter4j.StatusStreamBase.handleNextElement(StatusStreamBase.java:85)
at twitter4j.StatusStreamImpl.next(StatusStreamImpl.java:57)
at
twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:478)
15/08/06 15:50:43 ERROR ReceiverTracker: Deregistered receiver for stream 0:
Stopped by driver
15/08/06 15:50:43 WARN ReceiverSupervisorImpl: Stopped executor without
error
15/08/06 15:50:50 WARN WriteAheadLogManager : Failed to write to write ahead
log

Anyone knows the cause of this exception?

Thanks :)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-stop-twitter-spark-streaming-tp24150.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is there any way to support multiple users executing SQL on thrift server?

2015-08-06 Thread Ted Yu
What is the JIRA number if a JIRA has been logged for this ?

Thanks



 On Jan 20, 2015, at 11:30 AM, Cheng Lian lian.cs@gmail.com wrote:
 
 Hey Yi,
 
 I'm quite unfamiliar with Hadoop/HDFS auth mechanisms for now, but would like 
 to investigate this issue later. Would you please open an JIRA for it? Thanks!
 
 Cheng
 
 On 1/19/15 1:00 AM, Yi Tian wrote:
 Is there any way to support multiple users executing SQL on one thrift 
 server?
 
 I think there are some problems for spark 1.2.0, for example:
 
 Start thrift server with user A
 Connect to thrift server via beeline with user B
 Execute “insert into table dest select … from table src”
 then we found these items on hdfs:
 
 drwxr-xr-x   - B supergroup  0 2015-01-16 16:42 
 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1
 drwxr-xr-x   - B supergroup  0 2015-01-16 16:42 
 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary
 drwxr-xr-x   - B supergroup  0 2015-01-16 16:42 
 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0
 drwxr-xr-x   - A supergroup  0 2015-01-16 16:42 
 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/_temporary
 drwxr-xr-x   - A supergroup  0 2015-01-16 16:42 
 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/task_201501161642_0022_m_00
 -rw-r--r--   3 A supergroup   2671 2015-01-16 16:42 
 /tmp/hadoop/hive_2015-01-16_16-42-48_923_1860943684064616152-3/-ext-1/_temporary/0/task_201501161642_0022_m_00/part-0
 You can see all the temporary path created on driver side (thrift server 
 side) is owned by user B (which is what we expected).
 
 But all the output data created on executor side is owned by user A, (which 
 is NOT what we expected).
 error owner of the output data cause 
 org.apache.hadoop.security.AccessControlException while the driver side 
 moving output data into dest table.
 
 Is anyone know how to resolve this problem?
 
 


Re: spark job not accepting resources from worker

2015-08-06 Thread Kushal Chokhani

Any inputs?

In case of following message, is there a way to check which resources is 
not sufficient through some logs?


   [Timer-0] WARN org.apache.spark.scheduler.TaskSchedulerImpl  -
   Initial job has not accepted any resources; check your cluster UI to
   ensure that workers are registered and have sufficient resources

Regards.

On 8/6/2015 11:40 AM, Kushal Chokhani wrote:

Hi

I have a spark/cassandra setup where I am using a spark cassandra java 
connector to query on a table. So far, I have 1 spark master node (2 
cores) and 1 worker node (4 cores). Both of them have following 
spark-env.sh under conf/:


|#!/usr/bin/env bash
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_MASTER_IP=192.168.4.134
export SPARK_WORKER_MEMORY=1G
export SPARK_EXECUTOR_MEMORY=2G

|

I am using spark 1.4.1 along with cassandra 2.2.0. I have started my 
cassandra/spark setup. Created keyspace and table under cassandra and 
added some rows on table. Now I try to run following spark job using 
spark cassandra java connector:


| SparkConf conf = new SparkConf();
 conf.setAppName(Testing);
 conf.setMaster(spark://192.168.4.134:7077);
 conf.set(spark.cassandra.connection.host, 192.168.4.129);
 conf.set(spark.logConf, true);
 conf.set(spark.driver.maxResultSize, 50m);
 conf.set(spark.executor.memory, 200m);
 conf.set(spark.eventLog.enabled, true);
 conf.set(spark.eventLog.dir, /tmp/);
 conf.set(spark.executor.extraClassPath, /home/enlighted/ebd.jar);
 conf.set(spark.cores.max, 1);
 JavaSparkContext sc = new JavaSparkContext(conf);


 JavaRDDString cassandraRowsRDD = 
CassandraJavaUtil.javaFunctions(sc).cassandraTable(testing, ec)
 .map(new FunctionCassandraRow, String() {
 private static final long serialVersionUID = -6263533266898869895L;
 @Override
 public String call(CassandraRow cassandraRow) throws Exception {
 return cassandraRow.toString();
 }
 });
 System.out.println(Data as CassandraRows: \n + 
StringUtils.join(cassandraRowsRDD.toArray(), \n));
 sc.close();|


This job is stuck with insufficient resources warning. Here are logs:

1107 [main] INFO org.apache.spark.SparkContext  - Spark configuration:
spark.app.name=Testing
spark.cassandra.connection.host=192.168.4.129
spark.cores.max=1
spark.driver.maxResultSize=50m
spark.eventLog.dir=/tmp/
spark.eventLog.enabled=true
spark.executor.extraClassPath=/home/enlighted/ebd.jar
spark.executor.memory=200m
spark.logConf=true
spark.master=spark://192.168.4.134:7077
1121 [main] INFO org.apache.spark.SecurityManager  - Changing view
acls to: enlighted
1122 [main] INFO org.apache.spark.SecurityManager  - Changing
modify acls to: enlighted
1123 [main] INFO org.apache.spark.SecurityManager  -
SecurityManager: authentication disabled; ui acls disabled; users
with view permissions: Set(enlighted); users with modify
permissions: Set(enlighted)
1767 [sparkDriver-akka.actor.default-dispatcher-4] INFO
akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
1805 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting 
- Starting remoting

1957 [main] INFO org.apache.spark.util.Utils  - Successfully
started service 'sparkDriver' on port 54611.
1958 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting 
- Remoting started; listening on addresses

:[akka.tcp://sparkDriver@192.168.4.134:54611]
1977 [main] INFO org.apache.spark.SparkEnv  - Registering
MapOutputTracker
1989 [main] INFO org.apache.spark.SparkEnv  - Registering
BlockManagerMaster
2007 [main] INFO org.apache.spark.storage.DiskBlockManager  -
Created local directory at

/tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/blockmgr-3e3d54e7-16df-4e97-be48-b0c0fa0389e7
2012 [main] INFO org.apache.spark.storage.MemoryStore  -
MemoryStore started with capacity 456.0 MB
2044 [main] INFO org.apache.spark.HttpFileServer  - HTTP File
server directory is

/tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/httpd-64b4d92e-cde9-45fb-8b38-edc3cca3933c
2046 [main] INFO org.apache.spark.HttpServer  - Starting HTTP Server
2086 [main] INFO org.spark-project.jetty.server.Server  -
jetty-8.y.z-SNAPSHOT
2098 [main] INFO org.spark-project.jetty.server.AbstractConnector 
- Started SocketConnector@0.0.0.0:44884

2099 [main] INFO org.apache.spark.util.Utils  - Successfully
started service 'HTTP file server' on port 44884.
2108 [main] INFO org.apache.spark.SparkEnv  - Registering
OutputCommitCoordinator
2297 [main] INFO org.spark-project.jetty.server.Server  -
jetty-8.y.z-SNAPSHOT
2317 [main] INFO org.spark-project.jetty.server.AbstractConnector 
- Started SelectChannelConnector@0.0.0.0:4040

2318 [main] INFO org.apache.spark.util.Utils  - Successfully
started service 'SparkUI' on port 4040.
2320 [main] 

Re:Re: Real-time data visualization with Zeppelin

2015-08-06 Thread jun
Hi andy,


Is there any method to convert ipython notebook file(.ipynb) to spark notebook 
file(.snb) or vice versa?


BR
Jun 

At 2015-07-13 02:45:57, andy petrella andy.petre...@gmail.com wrote:

Heya,


You might be looking for something like this I guess: 
https://www.youtube.com/watch?v=kB4kRQRFAVc.


The Spark-Notebook (https://github.com/andypetrella/spark-notebook/) can bring 
that to you actually, it uses fully reactive bilateral communication streams to 
update data and viz, plus it hides almost everything for you ^^. The video was 
using the notebook notebooks/streaming/Twitter stream.snb so you can play it 
yourself if you like.


You might want building the master (before 0.6.0 will be released → soon) here 
http://spark-notebook.io/.


HTH
andy






On Sun, Jul 12, 2015 at 8:29 PM Ruslan Dautkhanov dautkha...@gmail.com wrote:

Don't think it is a Zeppelin problem.. RDDs are immutable.
Unless you integrate something like IndexedRDD 
http://spark-packages.org/package/amplab/spark-indexedrdd
into Zeppelin I think it's not possible.



--
Ruslan Dautkhanov



On Wed, Jul 8, 2015 at 3:24 PM, Brandon White bwwintheho...@gmail.com wrote:

Can you use a con job to update it every X minutes?


On Wed, Jul 8, 2015 at 2:23 PM, Ganelin, Ilya ilya.gane...@capitalone.com 
wrote:

Hi all – I’m just wondering if anyone has had success integrating Spark 
Streaming with Zeppelin and actually dynamically updating the data in near 
real-time. From my investigation, it seems that Zeppelin will only allow you to 
display a snapshot of data, not a continuously updating table. Has anyone 
figured out if there’s a way to loop a display command or how to provide a 
mechanism to continuously update visualizations? 


Thank you, 
Ilya Ganelin





The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.






Re: spark job not accepting resources from worker

2015-08-06 Thread Kushal Chokhani
Figured out the root cause. Master was randomly assigning port to worker 
for communication. Because of the firewall on master, worker couldn't 
send out messages to master (maybe like resource details). Weird worker 
didn't even bother to throw any error also.


On 8/6/2015 3:24 PM, Kushal Chokhani wrote:

Any inputs?

In case of following message, is there a way to check which resources 
is not sufficient through some logs?


[Timer-0] WARN org.apache.spark.scheduler.TaskSchedulerImpl  -
Initial job has not accepted any resources; check your cluster UI
to ensure that workers are registered and have sufficient resources

Regards.

On 8/6/2015 11:40 AM, Kushal Chokhani wrote:

Hi

I have a spark/cassandra setup where I am using a spark cassandra 
java connector to query on a table. So far, I have 1 spark master 
node (2 cores) and 1 worker node (4 cores). Both of them have 
following spark-env.sh under conf/:


|#!/usr/bin/env bash
export SPARK_LOCAL_IP=127.0.0.1
export SPARK_MASTER_IP=192.168.4.134
export SPARK_WORKER_MEMORY=1G
export SPARK_EXECUTOR_MEMORY=2G

|

I am using spark 1.4.1 along with cassandra 2.2.0. I have started my 
cassandra/spark setup. Created keyspace and table under cassandra and 
added some rows on table. Now I try to run following spark job using 
spark cassandra java connector:


| SparkConf conf = new SparkConf();
 conf.setAppName(Testing);
 conf.setMaster(spark://192.168.4.134:7077);
 conf.set(spark.cassandra.connection.host, 192.168.4.129);
 conf.set(spark.logConf, true);
 conf.set(spark.driver.maxResultSize, 50m);
 conf.set(spark.executor.memory, 200m);
 conf.set(spark.eventLog.enabled, true);
 conf.set(spark.eventLog.dir, /tmp/);
 conf.set(spark.executor.extraClassPath, /home/enlighted/ebd.jar);
 conf.set(spark.cores.max, 1);
 JavaSparkContext sc = new JavaSparkContext(conf);


 JavaRDDString cassandraRowsRDD = 
CassandraJavaUtil.javaFunctions(sc).cassandraTable(testing, ec)
 .map(new FunctionCassandraRow, String() {
 private static final long serialVersionUID = -6263533266898869895L;
 @Override
 public String call(CassandraRow cassandraRow) throws Exception {
 return cassandraRow.toString();
 }
 });
 System.out.println(Data as CassandraRows: \n + 
StringUtils.join(cassandraRowsRDD.toArray(), \n));
 sc.close();|


This job is stuck with insufficient resources warning. Here are logs:

1107 [main] INFO org.apache.spark.SparkContext  - Spark
configuration:
spark.app.name=Testing
spark.cassandra.connection.host=192.168.4.129
spark.cores.max=1
spark.driver.maxResultSize=50m
spark.eventLog.dir=/tmp/
spark.eventLog.enabled=true
spark.executor.extraClassPath=/home/enlighted/ebd.jar
spark.executor.memory=200m
spark.logConf=true
spark.master=spark://192.168.4.134:7077
1121 [main] INFO org.apache.spark.SecurityManager  - Changing
view acls to: enlighted
1122 [main] INFO org.apache.spark.SecurityManager  - Changing
modify acls to: enlighted
1123 [main] INFO org.apache.spark.SecurityManager  -
SecurityManager: authentication disabled; ui acls disabled; users
with view permissions: Set(enlighted); users with modify
permissions: Set(enlighted)
1767 [sparkDriver-akka.actor.default-dispatcher-4] INFO
akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
1805 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting 
- Starting remoting

1957 [main] INFO org.apache.spark.util.Utils  - Successfully
started service 'sparkDriver' on port 54611.
1958 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting 
- Remoting started; listening on addresses

:[akka.tcp://sparkDriver@192.168.4.134:54611]
1977 [main] INFO org.apache.spark.SparkEnv  - Registering
MapOutputTracker
1989 [main] INFO org.apache.spark.SparkEnv  - Registering
BlockManagerMaster
2007 [main] INFO org.apache.spark.storage.DiskBlockManager  -
Created local directory at

/tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/blockmgr-3e3d54e7-16df-4e97-be48-b0c0fa0389e7
2012 [main] INFO org.apache.spark.storage.MemoryStore  -
MemoryStore started with capacity 456.0 MB
2044 [main] INFO org.apache.spark.HttpFileServer  - HTTP File
server directory is

/tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/httpd-64b4d92e-cde9-45fb-8b38-edc3cca3933c
2046 [main] INFO org.apache.spark.HttpServer  - Starting HTTP Server
2086 [main] INFO org.spark-project.jetty.server.Server  -
jetty-8.y.z-SNAPSHOT
2098 [main] INFO
org.spark-project.jetty.server.AbstractConnector  - Started
SocketConnector@0.0.0.0:44884
2099 [main] INFO org.apache.spark.util.Utils  - Successfully
started service 'HTTP file server' on port 44884.
2108 [main] INFO org.apache.spark.SparkEnv  - Registering
OutputCommitCoordinator
2297 [main] 

Re: Re: Real-time data visualization with Zeppelin

2015-08-06 Thread andy petrella
Yep, most of the things will work just by renaming it :-D
You can even use nbconvert afterwards


On Thu, Aug 6, 2015 at 12:09 PM jun kit...@126.com wrote:

 Hi andy,

 Is there any method to convert ipython notebook file(.ipynb) to spark
 notebook file(.snb) or vice versa?

 BR
 Jun

 At 2015-07-13 02:45:57, andy petrella andy.petre...@gmail.com wrote:

 Heya,

 You might be looking for something like this I guess:
 https://www.youtube.com/watch?v=kB4kRQRFAVc.

 The Spark-Notebook (https://github.com/andypetrella/spark-notebook/) can
 bring that to you actually, it uses fully reactive bilateral communication
 streams to update data and viz, plus it hides almost everything for you ^^.
 The video was using the notebook notebooks/streaming/Twitter stream.snb
 https://github.com/andypetrella/spark-notebook/blob/master/notebooks/streaming/Twitter%20stream.snb
  so
 you can play it yourself if you like.

 You might want building the master (before 0.6.0 will be released → soon)
 here http://spark-notebook.io/.

 HTH
 andy



 On Sun, Jul 12, 2015 at 8:29 PM Ruslan Dautkhanov dautkha...@gmail.com
 wrote:

 Don't think it is a Zeppelin problem.. RDDs are immutable.
 Unless you integrate something like IndexedRDD
 http://spark-packages.org/package/amplab/spark-indexedrdd
 into Zeppelin I think it's not possible.


 --
 Ruslan Dautkhanov

 On Wed, Jul 8, 2015 at 3:24 PM, Brandon White bwwintheho...@gmail.com
 wrote:

 Can you use a con job to update it every X minutes?

 On Wed, Jul 8, 2015 at 2:23 PM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

 Hi all – I’m just wondering if anyone has had success integrating Spark
 Streaming with Zeppelin and actually dynamically updating the data in near
 real-time. From my investigation, it seems that Zeppelin will only allow
 you to display a snapshot of data, not a continuously updating table. Has
 anyone figured out if there’s a way to loop a display command or how to
 provide a mechanism to continuously update visualizations?

 Thank you,
 Ilya Ganelin

 [image: 2DD951D6-FF99-4415-80AA-E30EFE7CF452[4].png]

 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. If you
 have received this communication in error, please contact the sender and
 delete the material from your computer.



 --
andy


Re: How to read gzip data in Spark - Simple question

2015-08-06 Thread ๏̯͡๏
I got it running by myself

On Wed, Aug 5, 2015 at 10:27 PM, Ganelin, Ilya ilya.gane...@capitalone.com
wrote:

 Have you tried reading the spark documentation?

 http://spark.apache.org/docs/latest/programming-guide.html



 Thank you,
 Ilya Ganelin




 -Original Message-
 *From: *ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.com]
 *Sent: *Thursday, August 06, 2015 12:41 AM Eastern Standard Time
 *To: *Philip Weaver
 *Cc: *user
 *Subject: *Re: How to read gzip data in Spark - Simple question

 how do i persist the RDD to HDFS ?

 On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 This message means that java.util.Date is not supported by Spark
 DataFrame. You'll need to use java.sql.Date, I believe.

 On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 That seem to be working. however i see a new exception

 Code:
 def formatStringAsDate(dateStr: String) = new
 SimpleDateFormat(-MM-dd).parse(dateStr)


 //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
 val rowStructText =
 sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz)
 case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 :
 String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11:
 Float, f12: Integer, f13: Integer, f14: String)

 val summary  = rowStructText.map(s = s.split(,)).map(
 s = Summary(formatStringAsDate(s(0)),
 s(1).replaceAll(\, ).toLong,
 s(3).replaceAll(\, ).toLong,
 s(4).replaceAll(\, ).toInt,
 s(5).replaceAll(\, ),
 s(6).replaceAll(\, ).toInt,
 formatStringAsDate(s(7)),
 formatStringAsDate(s(8)),
 s(9).replaceAll(\, ).toInt,
 s(10).replaceAll(\, ).toInt,
 s(11).replaceAll(\, ).toFloat,
 s(12).replaceAll(\, ).toInt,
 s(13).replaceAll(\, ).toInt,
 s(14).replaceAll(\, )
 )
 ).toDF()
 bank.registerTempTable(summary)


 //Output
 import java.text.SimpleDateFormat import java.util.Calendar import
 java.util.Date formatStringAsDate: (dateStr: String)java.util.Date
 rowStructText: org.apache.spark.rdd.RDD[String] =
 /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz
 MapPartitionsRDD[105] at textFile at console:60 defined class Summary x:
 org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at
 console:61 java.lang.UnsupportedOperationException: Schema for type
 java.util.Date is not supported at
 org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
 at
 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164)

 Any suggestions

 On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 The parallelize method does not read the contents of a file. It simply
 takes a collection and distributes it to the cluster. In this case, the
 String is a collection 67 characters.

 Use sc.textFile instead of sc.parallelize, and it should work as you
 want.

 On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com
 wrote:

 I have csv data that is embedded in gzip format on HDFS.

 *With Pig*

 a = load
 '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' 
 using
 PigStorage();

 b = limit a 10


 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)


 (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)


 However with Spark

 val rowStructText =
 sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz)

 val x = rowStructText.map(s = {

 println(s)

 s}

 )

 x.count

 Questions

 1) x.count always shows 67 irrespective of the path i change in
 sc.parallelize

 2) It shows x as RDD[Char] instead of String

 3) println() never emits the rows.

 Any suggestions

 -Deepak



 --
 Deepak





 --
 Deepak





 --
 Deepak


 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. If you
 have received this communication in error, please contact the sender and
 delete the material from your 

Re: Unable to persist RDD to HDFS

2015-08-06 Thread Philip Weaver
This isn't really a Spark question. You're trying to parse a string to an
integer, but it contains an invalid character. The exception message
explains this.

On Wed, Aug 5, 2015 at 11:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Code:
 import java.text.SimpleDateFormat
 import java.util.Calendar
 import java.sql.Date
 import org.apache.spark.storage.StorageLevel

 def formatStringAsDate(dateStr: String) = new java.sql.Date(new
 SimpleDateFormat(-MM-dd).parse(dateStr).getTime())


 //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)

 case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String,
 f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float,
 f12: Integer, f13: Integer, f14: String)


 val rowStructText =
 sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-1.gz)

 val summary  = rowStructText.filter(s = s.length != 1).map(s =
 s.split(\t)).map(
 {
 s =
 Summary(formatStringAsDate(s(0)),
 s(1).replaceAll(\, ).toLong,
 s(3).replaceAll(\, ).toLong,
 s(4).replaceAll(\, ).toInt,
 s(5).replaceAll(\, ),
 s(6).replaceAll(\, ).toInt,
 formatStringAsDate(s(7)),
 formatStringAsDate(s(8)),
 s(9).replaceAll(\, ).toInt,
 s(10).replaceAll(\, ).toInt,
 s(11).replaceAll(\, ).toFloat,
 s(12).replaceAll(\, ).toInt,
 s(13).replaceAll(\, ).toInt,
 s(14).replaceAll(\, )
 )
 }
 )

 summary.saveAsTextFile(sparkO)


 Output:
 import java.text.SimpleDateFormat import java.util.Calendar import
 java.sql.Date import org.apache.spark.storage.StorageLevel
 formatStringAsDate: (dateStr: String)java.sql.Date defined class Summary
 rowStructText: org.apache.spark.rdd.RDD[String] =
 /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-1.gz
 MapPartitionsRDD[639] at textFile at console:305 summary:
 org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[642] at map at
 console:310 org.apache.spark.SparkException: Job aborted due to stage
 failure: Task 0 in stage 147.0 failed 4 times, most recent failure: Lost
 task 0.3 in stage 147.0 (TID 3396, datanode-6-3486.phx01.dev.ebayc3.com):
 java.lang.NumberFormatException: For input string: 3g at
 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
 at java.lang.Integer.parseInt(Integer.java:580) at
 java.lang.Integer.parseInt(Integer.java:615) at
 scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at
 scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$b7c842676ccaed446a4cace94f9edC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(console:318)
 at
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$b7c842676ccaed446a4cace94f9edC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(console:312)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1072)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at
 org.apache.spark.scheduler.Task.run(Task.scala:64) at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)



 OR
 summary.count throws same exception

 Any suggestions ?

 --
 Deepak




Re: Re: How can I know currently supported functions in Spark SQL

2015-08-06 Thread Pedro Rodriguez
Worth noting that Spark 1.5 is extending that list of Spark SQL functions
quite a bit. Not sure where in the docs they would be yet, but the JIRA is
here: https://issues.apache.org/jira/browse/SPARK-8159

On Thu, Aug 6, 2015 at 7:27 PM, Netwaver wanglong_...@163.com wrote:

 Thanks for your kindly help






 At 2015-08-06 19:28:10, Todd Nist tsind...@gmail.com wrote:

 They are covered here in the docs:


 http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$


 On Thu, Aug 6, 2015 at 5:52 AM, Netwaver wanglong_...@163.com wrote:

 Hi All,
  I am using Spark 1.4.1, and I want to know how can I find the
 complete function list supported in Spark SQL, currently I only know
 'sum','count','min','max'. Thanks a lot.








-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re:Re: How can I know currently supported functions in Spark SQL

2015-08-06 Thread Netwaver
Thanks for your kindly help






At 2015-08-06 19:28:10, Todd Nist tsind...@gmail.com wrote:

They are covered here in the docs:

http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$





On Thu, Aug 6, 2015 at 5:52 AM, Netwaver wanglong_...@163.com wrote:

Hi All,
 I am using Spark 1.4.1, and I want to know how can I find the complete 
function list supported in Spark SQL, currently I only know 
'sum','count','min','max'. Thanks a lot.







Spark-submit fails when jar is in HDFS

2015-08-06 Thread abraithwaite
Hi All,

We're trying to run spark with mesos and docker in client mode (since mesos
doesn't support cluster mode) and load the application Jar from HDFS.  The
following is the command we're running:

We're getting the following warning before an exception from that command:


Before I debug further, is this even supported?  I started reading the code
and it wasn't clear that it's possible to load a remote jar in client mode
at all.  I did see a related issue in [2] but it didn't quite clarify
everything I was looking for.

Thanks,
- Alan

[1] https://spark.apache.org/docs/latest/submitting-applications.html

[2]
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-not-working-when-application-jar-is-in-hdfs-td21840.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-fails-when-jar-is-in-HDFS-tp24163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark MLib v/s SparkR

2015-08-06 Thread praveen S
I am starting off with classification models, Logistic,RandomForest.
Basically wanted to learn Machine learning.
Since I have a java background I started off with MLib, but later heard R
works as well ( with scaling issues - only).

So, with SparkR was wondering the scaling issue would be resolved - hence
my question why not go with R and Spark R alone.( keeping aside my
inclination towards java)

On Thu, Aug 6, 2015 at 12:28 AM, Charles Earl charles.ce...@gmail.com
wrote:

 What machine learning algorithms are you interested in exploring or using?
 Start from there or better yet the problem you are trying to solve, and
 then the selection may be evident.


 On Wednesday, August 5, 2015, praveen S mylogi...@gmail.com wrote:

 I was wondering when one should go for MLib or SparkR. What is the
 criteria or what should be considered before choosing either of the
 solutions for data analysis?
 or What is the advantages of Spark MLib over Spark R or advantages of
 SparkR over MLib?



 --
 - Charles



Out of memory with twitter spark streaming

2015-08-06 Thread Pankaj Narang
Hi 

I am running one application using activator where I am retrieving tweets
and storing them to mysql database using below code. 

I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
OOM get delayed only.

Can anybody give me clue. Here is the code

 var tweetStream  = TwitterUtils.createStream(ssc, None,keywords)
var tweets = tweetStream.map(tweet = { 
  var user = tweet.getUser
  var replyStatusId = tweet.getInReplyToStatusId
  var reTweetStatus = tweet.getRetweetedStatus
  var pTweetId = -1L
  var pcreatedAt = 0L
  if(reTweetStatus != null){
pTweetId = reTweetStatus.getId
pcreatedAt = reTweetStatus.getCreatedAt.getTime
  }  
  tweet.getCreatedAt.getTime + |$ + tweet.getId +
|$+user.getId + |$ + user.getName+ |$ + user.getScreenName + |$ +
user.getDescription +
  |$ + tweet.getText.trim + |$ + user.getFollowersCount +
|$ + user.getFriendsCount + |$ + tweet.getGeoLocation + |$ +
  user.getLocation + |$ + user.getBiggerProfileImageURL + |$
+ replyStatusId + |$ + pTweetId + |$ + pcreatedAt
} )
  tweets.foreachRDD(tweetsRDD = {tweetsRDD.distinct()
 val count = tweetsRDD.count
 println(* +%s tweets found on
this RDD.format(count))
 if (count   0){
var timeMs = System.currentTimeMillis
var counter =
DBQuery.getProcessedCount()
   var location=tweets/+ counter +/ 
tweetsRDD.collect().map(tweet= 
DBQuery.saveTweets(tweet)) 
//tweetsRDD.saveAsTextFile(location+
timeMs)+ .txt
DBQuery.addTweetRDD(counter) 
}
})
  
   // Checkpoint directory to recover from failures
   println(tweets for the last stream are saved which can be processed
later)
   val= f:/svn1/checkpoint/
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()


regards
Pankaj



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



stopping spark stream app

2015-08-06 Thread Shushant Arora
Hi

I am using spark stream 1.3 and using custom checkpoint to save kafka
offsets.

1.Is doing
Runtime.getRuntime().addShutdownHook(new Thread() {
  @Override
  public void run() {
  jssc.stop(true, true);
   System.out.println(Inside Add Shutdown Hook);
  }
 });

to handle stop is safe ?

2.And I need to handle saving checkoinnt in shutdown hook also or driver
will handle it automatically since it grcaefully stops stream and handle
completion of foreachRDD function on stream ?
directKafkaStream.foreachRDD(new FunctionJavaRDDbyte[][], Void() {
}

Thanks


spark job not accepting resources from worker

2015-08-06 Thread Kushal Chokhani

Hi

I have a spark/cassandra setup where I am using a spark cassandra java 
connector to query on a table. So far, I have 1 spark master node (2 
cores) and 1 worker node (4 cores). Both of them have following 
spark-env.sh under conf/:


   |#!/usr/bin/env bash
   export SPARK_LOCAL_IP=127.0.0.1
   export SPARK_MASTER_IP=192.168.4.134
   export SPARK_WORKER_MEMORY=1G
   export SPARK_EXECUTOR_MEMORY=2G

   |

I am using spark 1.4.1 along with cassandra 2.2.0. I have started my 
cassandra/spark setup. Created keyspace and table under cassandra and 
added some rows on table. Now I try to run following spark job using 
spark cassandra java connector:


| SparkConf conf = new SparkConf();
conf.setAppName(Testing);
conf.setMaster(spark://192.168.4.134:7077);
conf.set(spark.cassandra.connection.host, 192.168.4.129);
conf.set(spark.logConf, true);
conf.set(spark.driver.maxResultSize, 50m);
conf.set(spark.executor.memory, 200m);
conf.set(spark.eventLog.enabled, true);
conf.set(spark.eventLog.dir, /tmp/);
conf.set(spark.executor.extraClassPath, /home/enlighted/ebd.jar);
conf.set(spark.cores.max, 1);
JavaSparkContext sc = new JavaSparkContext(conf);


JavaRDDString cassandraRowsRDD = 
CassandraJavaUtil.javaFunctions(sc).cassandraTable(testing, ec)
.map(new FunctionCassandraRow, String() {
private static final long serialVersionUID = -6263533266898869895L;
@Override
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
}
});
System.out.println(Data as CassandraRows: \n + 
StringUtils.join(cassandraRowsRDD.toArray(), \n));
sc.close();|



This job is stuck with insufficient resources warning. Here are logs:

   1107 [main] INFO org.apache.spark.SparkContext  - Spark configuration:
   spark.app.name=Testing
   spark.cassandra.connection.host=192.168.4.129
   spark.cores.max=1
   spark.driver.maxResultSize=50m
   spark.eventLog.dir=/tmp/
   spark.eventLog.enabled=true
   spark.executor.extraClassPath=/home/enlighted/ebd.jar
   spark.executor.memory=200m
   spark.logConf=true
   spark.master=spark://192.168.4.134:7077
   1121 [main] INFO org.apache.spark.SecurityManager  - Changing view
   acls to: enlighted
   1122 [main] INFO org.apache.spark.SecurityManager  - Changing modify
   acls to: enlighted
   1123 [main] INFO org.apache.spark.SecurityManager  -
   SecurityManager: authentication disabled; ui acls disabled; users
   with view permissions: Set(enlighted); users with modify
   permissions: Set(enlighted)
   1767 [sparkDriver-akka.actor.default-dispatcher-4] INFO
   akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
   1805 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting -
   Starting remoting
   1957 [main] INFO org.apache.spark.util.Utils  - Successfully started
   service 'sparkDriver' on port 54611.
   1958 [sparkDriver-akka.actor.default-dispatcher-4] INFO Remoting -
   Remoting started; listening on addresses
   :[akka.tcp://sparkDriver@192.168.4.134:54611]
   1977 [main] INFO org.apache.spark.SparkEnv  - Registering
   MapOutputTracker
   1989 [main] INFO org.apache.spark.SparkEnv  - Registering
   BlockManagerMaster
   2007 [main] INFO org.apache.spark.storage.DiskBlockManager  -
   Created local directory at
   
/tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/blockmgr-3e3d54e7-16df-4e97-be48-b0c0fa0389e7
   2012 [main] INFO org.apache.spark.storage.MemoryStore  - MemoryStore
   started with capacity 456.0 MB
   2044 [main] INFO org.apache.spark.HttpFileServer  - HTTP File server
   directory is
   
/tmp/spark-f21125fd-ae9d-460e-884d-563fa8720f09/httpd-64b4d92e-cde9-45fb-8b38-edc3cca3933c
   2046 [main] INFO org.apache.spark.HttpServer  - Starting HTTP Server
   2086 [main] INFO org.spark-project.jetty.server.Server  -
   jetty-8.y.z-SNAPSHOT
   2098 [main] INFO org.spark-project.jetty.server.AbstractConnector -
   Started SocketConnector@0.0.0.0:44884
   2099 [main] INFO org.apache.spark.util.Utils  - Successfully started
   service 'HTTP file server' on port 44884.
   2108 [main] INFO org.apache.spark.SparkEnv  - Registering
   OutputCommitCoordinator
   2297 [main] INFO org.spark-project.jetty.server.Server  -
   jetty-8.y.z-SNAPSHOT
   2317 [main] INFO org.spark-project.jetty.server.AbstractConnector -
   Started SelectChannelConnector@0.0.0.0:4040
   2318 [main] INFO org.apache.spark.util.Utils  - Successfully started
   service 'SparkUI' on port 4040.
   2320 [main] INFO org.apache.spark.ui.SparkUI  - Started SparkUI at
   http://192.168.4.134:4040
   2387 [sparkDriver-akka.actor.default-dispatcher-3] INFO
   org.apache.spark.deploy.client.AppClient$ClientActor  - Connecting
   to master akka.tcp://sparkMaster@192.168.4.134:7077/user/Master...
   2662 [sparkDriver-akka.actor.default-dispatcher-14] INFO
   org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend  -
   Connected to Spark cluster with app ID 

Re: Multiple UpdateStateByKey Functions in the same job?

2015-08-06 Thread Akhil Das
I think you can. Give it a try and see.

Thanks
Best Regards

On Tue, Aug 4, 2015 at 7:02 AM, swetha swethakasire...@gmail.com wrote:

 Hi,

 Can I use multiple UpdateStateByKey Functions in the Streaming job? Suppose
 I need to maintain the state of the user session in the form of a Json and
 counts of various other metrics which has different keys ? Can I use
 multiple updateStateByKey functions to maintain the state for different
 keys
 with different return values?

 Thanks,
 Swetha



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-UpdateStateByKey-Functions-in-the-same-job-tp24119.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Extremely poor predictive performance with RF in mllib

2015-08-06 Thread Yanbo Liang
I can reproduce this issue, so looks like a bug of Random Forest, I will
try to find some clue.

2015-08-05 1:34 GMT+08:00 Patrick Lam pkph...@gmail.com:

 Yes, I rechecked and the label is correct. As you can see in the code
 posted, I used the exact same features for all the classifiers so unless rf
 somehow switches the labels, it should be correct.

 I have posted a sample dataset and sample code to reproduce what I'm
 getting here:

 https://github.com/pkphlam/spark_rfpredict

 On Tue, Aug 4, 2015 at 6:42 AM, Yanbo Liang yblia...@gmail.com wrote:

 It looks like the predicted result just opposite with expectation, so
 could you check whether the label is right?
 Or could you share several data which can help to reproduce this output?

 2015-08-03 19:36 GMT+08:00 Barak Gitsis bar...@similarweb.com:

 hi,
 I've run into some poor RF behavior, although not as pronounced as you..
 would be great to get more insight into this one

 Thanks!

 On Mon, Aug 3, 2015 at 8:21 AM pkphlam pkph...@gmail.com wrote:

 Hi,

 This might be a long shot, but has anybody run into very poor predictive
 performance using RandomForest with Mllib? Here is what I'm doing:

 - Spark 1.4.1 with PySpark
 - Python 3.4.2
 - ~30,000 Tweets of text
 - 12289 1s and 15956 0s
 - Whitespace tokenization and then hashing trick for feature selection
 using
 10,000 features
 - Run RF with 100 trees and maxDepth of 4 and then predict using the
 features from all the 1s observations.

 So in theory, I should get predictions of close to 12289 1s (especially
 if
 the model overfits). But I'm getting exactly 0 1s, which sounds
 ludicrous to
 me and makes me suspect something is wrong with my code or I'm missing
 something. I notice similar behavior (although not as extreme) if I play
 around with the settings. But I'm getting normal behavior with other
 classifiers, so I don't think it's my setup that's the problem.

 For example:

  lrm = LogisticRegressionWithSGD.train(lp, iterations=10)
  logit_predict = lrm.predict(predict_feat)
  logit_predict.sum()
 9077

  nb = NaiveBayes.train(lp)
  nb_predict = nb.predict(predict_feat)
  nb_predict.sum()
 10287.0

  rf = RandomForest.trainClassifier(lp, numClasses=2,
  categoricalFeaturesInfo={}, numTrees=100, seed=422)
  rf_predict = rf.predict(predict_feat)
  rf_predict.sum()
 0.0

 This code was all run back to back so I didn't change anything in
 between.
 Does anybody have a possible explanation for this?

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Extremely-poor-predictive-performance-with-RF-in-mllib-tp24112.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

 --
 *-Barak*





 --
 Patrick Lam
 Institute for Quantitative Social Science, Harvard University
 http://www.patricklam.org



Unable to persist RDD to HDFS

2015-08-06 Thread ๏̯͡๏
Code:
import java.text.SimpleDateFormat
import java.util.Calendar
import java.sql.Date
import org.apache.spark.storage.StorageLevel

def formatStringAsDate(dateStr: String) = new java.sql.Date(new
SimpleDateFormat(-MM-dd).parse(dateStr).getTime())

//(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)

case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String,
f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float,
f12: Integer, f13: Integer, f14: String)


val rowStructText =
sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-1.gz)

val summary  = rowStructText.filter(s = s.length != 1).map(s =
s.split(\t)).map(
{
s =
Summary(formatStringAsDate(s(0)),
s(1).replaceAll(\, ).toLong,
s(3).replaceAll(\, ).toLong,
s(4).replaceAll(\, ).toInt,
s(5).replaceAll(\, ),
s(6).replaceAll(\, ).toInt,
formatStringAsDate(s(7)),
formatStringAsDate(s(8)),
s(9).replaceAll(\, ).toInt,
s(10).replaceAll(\, ).toInt,
s(11).replaceAll(\, ).toFloat,
s(12).replaceAll(\, ).toInt,
s(13).replaceAll(\, ).toInt,
s(14).replaceAll(\, )
)
}
)

summary.saveAsTextFile(sparkO)


Output:
import java.text.SimpleDateFormat import java.util.Calendar import
java.sql.Date import org.apache.spark.storage.StorageLevel
formatStringAsDate: (dateStr: String)java.sql.Date defined class Summary
rowStructText: org.apache.spark.rdd.RDD[String] =
/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-1.gz
MapPartitionsRDD[639] at textFile at console:305 summary:
org.apache.spark.rdd.RDD[Summary] = MapPartitionsRDD[642] at map at
console:310 org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 147.0 failed 4 times, most recent failure: Lost
task 0.3 in stage 147.0 (TID 3396, datanode-6-3486.phx01.dev.ebayc3.com):
java.lang.NumberFormatException: For input string: 3g at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580) at
java.lang.Integer.parseInt(Integer.java:615) at
scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at
scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$b7c842676ccaed446a4cace94f9edC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(console:318)
at
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$b7c842676ccaed446a4cace94f9edC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(console:312)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1072)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at
org.apache.spark.scheduler.Task.run(Task.scala:64) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193)



OR
summary.count throws same exception

Any suggestions ?

-- 
Deepak


Re: NoSuchMethodError : org.apache.spark.streaming.scheduler.StreamingListenerBus.start()V

2015-08-06 Thread Akhil Das
For some reason you are having two different versions of spark jars in your
classpath.

Thanks
Best Regards

On Tue, Aug 4, 2015 at 12:37 PM, Deepesh Maheshwari 
deepesh.maheshwar...@gmail.com wrote:

 Hi,

 I am trying to read data from kafka and process it using spark.
 i have attached my source code , error log.

 For integrating kafka,
 i have added dependency in pom.xml

 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-streaming_2.10/artifactId
 version1.3.0/version
  /dependency

  dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-streaming-kafka_2.10/artifactId
 version1.3.0/version
  /dependency

 i have attached  full error log.please check it why it is giving the error
 . this class exits in my class path.
 I am running spark and kafka locally using java class.

 SparkConf conf = new SparkConf().setAppName(Spark Demo).setMaster(
 local[2]).set(spark.executor.memory, 1g);

 I

 [image: Inline image 2]



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Talk on Deep dive in Spark Dataframe API

2015-08-06 Thread madhu phatak
Hi,
Recently I gave a talk on a deep dive into data frame api and sql catalyst
. Video of the same is available on Youtube with slides and code. Please
have a look if you are interested.

*http://blog.madhukaraphatak.com/anatomy-of-spark-dataframe-api/
http://blog.madhukaraphatak.com/anatomy-of-spark-dataframe-api/*

-- 
Regards,
Madhukara Phatak
http://datamantra.io/


Re: Twitter Connector-Spark Streaming

2015-08-06 Thread Akhil Das
 Hi Sadaf

 Which version of spark are you using? And whats in the spark-env.sh file?
 I think you are using both SPARK_CLASSPATH (which is deprecated) and
 spark.executor.extraClasspath (may be set in spark-defaults.sh file).

 Thanks
 Best Regards

 On Wed, Aug 5, 2015 at 6:22 PM, Sadaf Khan sa...@platalytics.com wrote:

 Hi Akhil,

 I know you are a big knowledge base of spark streaming.
 I wan your little help more.
 I've done the twitter streaming using twitter's streaming user api and
 spark streaming. this runs successfully on my local machine. but when i run
 this program on cluster in local mode. it just run successfully for the
 very first time. later on it gives the following exception.

 Exception in thread main org.apache.spark.SparkException: Found both
 spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.

 and spark class path is unset already!!
 I have to make a new checkpoint directory each time to make it run
 successfully. otherwise it shows above exception.

 Can you please again help me to resolve this issue?
 Thanks :)

 On Tue, Aug 4, 2015 at 4:33 PM, Sadaf Khan sa...@platalytics.com wrote:

 thanks for being a helping hand before. :)

 one more point i want to discuss is that i now used Twitter Rest Api and
 successfully fetch the home timeline. but now it is showing 20 recent
 tweets,
 Is there any way to get 3200 (max rate limit) last tweets?

 Thanks in anticipation :)


 On Tue, Aug 4, 2015 at 2:05 PM, Sadaf Khan sa...@platalytics.com
 wrote:

 thanks alot

 On Tue, Aug 4, 2015 at 2:00 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 You will have to write your own consumer for pulling your custom
 feeds, and then you can do a union 
 (customfeedDstream.union(twitterStream))
 with the twitter stream api.

 Thanks
 Best Regards

 On Tue, Aug 4, 2015 at 2:28 PM, Sadaf Khan sa...@platalytics.com
 wrote:

 Thanks alot :)

 One more thing that i want to ask is that i have used twitters
 streaming api.and it seems that the above solution uses rest api. how 
 can i
 used both simultaneously ?

 Any response will be much appreciated :)
 Regards

 On Tue, Aug 4, 2015 at 1:51 PM, Akhil Das ak...@sigmoidanalytics.com
  wrote:

 Yes you can, when you start the application in the first batch you
 just need to pull all the tweets from your account. You need to look 
 into
 the API for that. Have a look at this
 https://dev.twitter.com/rest/reference/get/statuses/user_timeline

 Thanks
 Best Regards

 On Tue, Aug 4, 2015 at 1:47 PM, Sadaf Khan sa...@platalytics.com
 wrote:

 Hi.
 You were really helpful for me last time :) and i have done with
 the last problem.
 I wanna ask you one more question. Now my connector is showing the
 tweets that occurs after running the program. Is there any way to 
 fetch all
 old tweets since when the account was created?

 I will be thankful to you for you kind response.

 On Thu, Jul 30, 2015 at 6:49 PM, Sadaf Khan sa...@platalytics.com
 wrote:

 thanks alot for this help :)

 On Thu, Jul 30, 2015 at 6:41 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 You can create a custom receiver and then inside it you can write
 yourown piece of code to receive data, filter them etc before giving 
 it to
 spark.

 Thanks
 Best Regards

 On Thu, Jul 30, 2015 at 6:49 PM, Sadaf Khan 
 sa...@platalytics.com wrote:

 okay :)

 then is there anyway to fetch the tweets specific to my account?

 Thanks in anticipation :)

 On Thu, Jul 30, 2015 at 6:17 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 Owh, this one fetches the public tweets, not the one specific
 to your account.

 Thanks
 Best Regards

 On Thu, Jul 30, 2015 at 6:11 PM, Sadaf Khan 
 sa...@platalytics.com wrote:

 yes. but can you please tell me how to mention a specific user
 account in filter?
 I want to fetch my tweets, tweets of my followers and the
 tweets of those  whom i followed.
 So in short i want to fatch the tweets of my account only.

 Recently i have used
val tweets
 =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2)


 Any response will be very much appreciated. :)

 Thanks.


 On Thu, Jul 30, 2015 at 5:20 PM, Akhil Das 
 ak...@sigmoidanalytics.com wrote:

 TwitterUtils.createStream takes a 3rd argument which is a
 filter, once you provide these, it will only fetch tweets of 
 such.

 Thanks
 Best Regards

 On Thu, Jul 30, 2015 at 4:19 PM, Sadaf sa...@platalytics.com
  wrote:

 Hi.
 I am writing twitter connector using spark streaming. but it
 fetched the
 random tweets.
 Is there any way to receive the tweets of a particular
 account?

 I made an app on twitter and used the credentials as given
 below.

  def managingCredentials():
 Option[twitter4j.auth.Authorization]=
   {
   object auth{
   val config = new twitter4j.conf.ConfigurationBuilder()
 .setOAuthConsumerKey()
 .setOAuthConsumerSecret()
 .setOAuthAccessToken()
 .setOAuthAccessTokenSecret()
 .build
 }
  

Re: Memory allocation error with Spark 1.5

2015-08-06 Thread Alexis Seigneurin
Works like a charm. Thanks Reynold for the quick and efficient response!

Alexis

2015-08-05 19:19 GMT+02:00 Reynold Xin r...@databricks.com:

 In Spark 1.5, we have a new way to manage memory (part of Project
 Tungsten). The default unit of memory allocation is 64MB, which is way too
 high when you have 1G of memory allocated in total and have more than 4
 threads.

 We will reduce the default page size before releasing 1.5.  For now, you
 can just reduce spark.buffer.pageSize variable to a lower value (e.g. 16m).


 https://github.com/apache/spark/blob/702aa9d7fb16c98a50e046edfd76b8a7861d0391/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala#L125

 On Wed, Aug 5, 2015 at 9:25 AM, Alexis Seigneurin aseigneu...@ippon.fr
 wrote:

 Hi,

 I'm receiving a memory allocation error with a recent build of Spark 1.5:

 java.io.IOException: Unable to acquire 67108864 bytes of memory
 at
 org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:348)
 at
 org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:398)
 at
 org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:92)
 at
 org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:174)
 at
 org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:146)
 at
 org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:126)


 The issue appears when joining 2 datasets. One with 6084 records, the
 other one with 200 records. I'm expecting to receive 200 records in the
 result.

 I'm using a homemade build prepared from branch-1.5 with commit ID
 eedb996. I have run mvn -DskipTests clean install to generate that
 build.

 Apart from that, I'm using Java 1.7.0_51 and Maven 3.3.3.

 I've prepared a test case that can be built and executed very easily
 (data files are included in the repo):
 https://github.com/aseigneurin/spark-testcase

 One thing to note is that the issue arises when the master is set to
 local[*] but not when set to local. Both options work without problem
 with Spark 1.4, though.

 Any help will be greatly appreciated!

 Many thanks,
 Alexis





Re: Is it worth storing in ORC for one time read. And can be replace hive with HBase

2015-08-06 Thread Jörn Franke
Yes you should use orc it is much faster and more compact. Additionally you
can apply compression (snappy) to increase performance. Your data
processing pipeline seems to be not.very optimized. You should use the
newest hive version enabling storage indexes and bloom filters on
appropriate columns. Ideally you should insert the data sorted
appropriately. Partitioning and setting the execution engine to tez is also
beneficial.

Hbase with phoenix should currently only be used if you do few joins, not
very complex queries and not many full table scans.

Le jeu. 6 août 2015 à 14:54, venkatesh b venkateshmailingl...@gmail.com a
écrit :

 Hi, here I got two things to know.
 FIRST:
 In our project we use hive.
 We daily get new data. We need to process this new data only once. And
 send this processed data to RDBMS. Here in processing we majorly use many
 complex queries with joins with where condition and grouping functions.
 There are many intermediate tables generated around 50 while
 processing. Till now we use text format as storage. We came across ORC file
 format. I would like to know that since it is one Time querying the table
 is it worth of storing as ORC format.

 SECOND:
 I came to know about HBase, which is faster.
 Can I replace hive with HBase for processing of data daily faster.
 Currently it is taking 15hrs daily with hive.


 Please inform me if any other information is needed.

 Thanks  regards
 Venkatesh



Spark-submit not finding main class and the error reflects different path to jar file than specified

2015-08-06 Thread Stephen Boesch
Given the following command line to spark-submit:

bin/spark-submit --verbose --master local[2]--class
org.yardstick.spark.SparkCoreRDDBenchmark
/shared/ysgood/target/yardstick-spark-uber-0.0.1.jar

Here is the output:

NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes
ahead of assembly.
Using properties file: /shared/spark-1.4.1/conf/spark-defaults.conf
Adding default property: spark.akka.askTimeout=180
Adding default property: spark.master=spark://mellyrn.local:7077
Error: Cannot load main class from JAR
file:/shared/spark-1.4.1/org.yardstick.spark.SparkCoreRDDBenchmark
Run with --help for usage help or --verbose for debug output


The path
file:/shared/spark-1.4.1/org.yardstick.spark.SparkCoreRDDBenchmark  does
not seem to make sense. It  does not reflect the path to the file that was
specified on the submit-spark command line.

Note: when attempting to run that jar file via

java -classpath shared/ysgood/target/yardstick-spark-uber-0.0.1.jar
org.yardstick.spark.SparkCoreRDDBenchmark

Then the result is as expected: the main class starts to load and then
there is a NoClassDefFoundException on the SparkConf.classs (which is not
inside the jar). This shows the app jar is healthy.


Re: Starting Spark SQL thrift server from within a streaming app

2015-08-06 Thread Todd Nist
Well the creation of a thrift server would be to allow external access to
the data from JDBC / ODBC type connections.  The sparkstreaming-sql
leverages a standard spark sql context and then provides a means of
converting an incoming dstream into a row, look at the MessageToRow trait
in KafkaSource class.

The example, org.apache.spark.sql.streaming.examples.KafkaDDL should make
it clear; I think.

-Todd

On Thu, Aug 6, 2015 at 7:58 AM, Daniel Haviv 
daniel.ha...@veracity-group.com wrote:

 Thank you Todd,
 How is the sparkstreaming-sql project different from starting a thrift
 server on a streaming app ?

 Thanks again.
 Daniel


 On Thu, Aug 6, 2015 at 1:53 AM, Todd Nist tsind...@gmail.com wrote:

 Hi Danniel,

 It is possible to create an instance of the SparkSQL Thrift server,
 however seems like this project is what you may be looking for:

 https://github.com/Intel-bigdata/spark-streamingsql

 Not 100% sure of your use case is, but you can always convert the data
 into DF then issue a query against it.  If you want other systems to be
 able to query it then there are numerous connectors to  store data into
 Hive, Cassandra, HBase, ElasticSearch, 

 To create a instance of a thrift server with its own SQL Context you
 would do something like the following:

 import org.apache.spark.{SparkConf, SparkContext}

 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.HiveMetastoreTypes._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.hive.thriftserver._


 object MyThriftServer {

   val sparkConf = new SparkConf()
 // master is passed to spark-submit, but could also be specified 
 explicitely
 // .setMaster(sparkMaster)
 .setAppName(My ThriftServer)
 .set(spark.cores.max, 2)
   val sc = new SparkContext(sparkConf)
   val  sparkContext  =  sc
   import  sparkContext._
   val  sqlContext  =  new  HiveContext(sparkContext)
   import  sqlContext._
   import sqlContext.implicits._

   makeRDD((1,hello) :: (2,world) 
 ::Nil).toDF.cache().registerTempTable(t)

   HiveThriftServer2.startWithContext(sqlContext)
 }

 Again, I'm not really clear what your use case is, but it does sound like
 the first link above is what you may want.

 -Todd

 On Wed, Aug 5, 2015 at 1:57 PM, Daniel Haviv 
 daniel.ha...@veracity-group.com wrote:

 Hi,
 Is it possible to start the Spark SQL thrift server from with a
 streaming app so the streamed data could be queried as it's goes in ?

 Thank you.
 Daniel






Is it worth storing in ORC for one time read. And can be replace hive with HBase

2015-08-06 Thread venkatesh b
Hi, here I got two things to know.
FIRST:
In our project we use hive.
We daily get new data. We need to process this new data only once. And send
this processed data to RDBMS. Here in processing we majorly use many
complex queries with joins with where condition and grouping functions.
There are many intermediate tables generated around 50 while
processing. Till now we use text format as storage. We came across ORC file
format. I would like to know that since it is one Time querying the table
is it worth of storing as ORC format.

SECOND:
I came to know about HBase, which is faster.
Can I replace hive with HBase for processing of data daily faster.
Currently it is taking 15hrs daily with hive.


Please inform me if any other information is needed.

Thanks  regards
Venkatesh


Re: Is it worth storing in ORC for one time read. And can be replace hive with HBase

2015-08-06 Thread Jörn Franke
Additionally it is of key importance to use the right data types for the
columns. Use int for ids,  int or decimal or float or double etc for
numeric values etc. - A bad data model using varchars and string where not
appropriate is a significant bottle neck.
Furthermore include partition columns in join statements (not where)
otherwise you do a full table scan ignoring partitions

Le jeu. 6 août 2015 à 15:07, Jörn Franke jornfra...@gmail.com a écrit :

 Yes you should use orc it is much faster and more compact. Additionally
 you can apply compression (snappy) to increase performance. Your data
 processing pipeline seems to be not.very optimized. You should use the
 newest hive version enabling storage indexes and bloom filters on
 appropriate columns. Ideally you should insert the data sorted
 appropriately. Partitioning and setting the execution engine to tez is also
 beneficial.

 Hbase with phoenix should currently only be used if you do few joins, not
 very complex queries and not many full table scans.

 Le jeu. 6 août 2015 à 14:54, venkatesh b venkateshmailingl...@gmail.com
 a écrit :

 Hi, here I got two things to know.
 FIRST:
 In our project we use hive.
 We daily get new data. We need to process this new data only once. And
 send this processed data to RDBMS. Here in processing we majorly use many
 complex queries with joins with where condition and grouping functions.
 There are many intermediate tables generated around 50 while
 processing. Till now we use text format as storage. We came across ORC file
 format. I would like to know that since it is one Time querying the table
 is it worth of storing as ORC format.

 SECOND:
 I came to know about HBase, which is faster.
 Can I replace hive with HBase for processing of data daily faster.
 Currently it is taking 15hrs daily with hive.


 Please inform me if any other information is needed.

 Thanks  regards
 Venkatesh




Re: How can I know currently supported functions in Spark SQL

2015-08-06 Thread Todd Nist
They are covered here in the docs:

http://spark.apache.org/docs/1.4.1/api/scala/index.html#org.apache.spark.sql.functions$


On Thu, Aug 6, 2015 at 5:52 AM, Netwaver wanglong_...@163.com wrote:

 Hi All,
  I am using Spark 1.4.1, and I want to know how can I find the
 complete function list supported in Spark SQL, currently I only know
 'sum','count','min','max'. Thanks a lot.





Re: Starting Spark SQL thrift server from within a streaming app

2015-08-06 Thread Daniel Haviv
Thank you Todd,
How is the sparkstreaming-sql project different from starting a thrift
server on a streaming app ?

Thanks again.
Daniel


On Thu, Aug 6, 2015 at 1:53 AM, Todd Nist tsind...@gmail.com wrote:

 Hi Danniel,

 It is possible to create an instance of the SparkSQL Thrift server,
 however seems like this project is what you may be looking for:

 https://github.com/Intel-bigdata/spark-streamingsql

 Not 100% sure of your use case is, but you can always convert the data
 into DF then issue a query against it.  If you want other systems to be
 able to query it then there are numerous connectors to  store data into
 Hive, Cassandra, HBase, ElasticSearch, 

 To create a instance of a thrift server with its own SQL Context you would
 do something like the following:

 import org.apache.spark.{SparkConf, SparkContext}

 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.HiveMetastoreTypes._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.hive.thriftserver._


 object MyThriftServer {

   val sparkConf = new SparkConf()
 // master is passed to spark-submit, but could also be specified 
 explicitely
 // .setMaster(sparkMaster)
 .setAppName(My ThriftServer)
 .set(spark.cores.max, 2)
   val sc = new SparkContext(sparkConf)
   val  sparkContext  =  sc
   import  sparkContext._
   val  sqlContext  =  new  HiveContext(sparkContext)
   import  sqlContext._
   import sqlContext.implicits._

   makeRDD((1,hello) :: (2,world) 
 ::Nil).toDF.cache().registerTempTable(t)

   HiveThriftServer2.startWithContext(sqlContext)
 }

 Again, I'm not really clear what your use case is, but it does sound like
 the first link above is what you may want.

 -Todd

 On Wed, Aug 5, 2015 at 1:57 PM, Daniel Haviv 
 daniel.ha...@veracity-group.com wrote:

 Hi,
 Is it possible to start the Spark SQL thrift server from with a streaming
 app so the streamed data could be queried as it's goes in ?

 Thank you.
 Daniel





Re: How can I know currently supported functions in Spark SQL

2015-08-06 Thread Ted Yu
Have you looked at this?

http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.functions$



 On Aug 6, 2015, at 2:52 AM, Netwaver wanglong_...@163.com wrote:
 
 Hi All,
  I am using Spark 1.4.1, and I want to know how can I find the 
 complete function list supported in Spark SQL, currently I only know 
 'sum','count','min','max'. Thanks a lot.
 
 


Re: Combining Spark Files with saveAsTextFile

2015-08-06 Thread MEETHU MATHEW
Hi,Try using coalesce(1) before calling saveAsTextFile() Thanks  Regards, 
Meethu M 


 On Wednesday, 5 August 2015 7:53 AM, Brandon White 
bwwintheho...@gmail.com wrote:
   

 What is the best way to make saveAsTextFile save as only a single file?

  

Re: Spark Kinesis Checkpointing/Processing Delay

2015-08-06 Thread Patanachai Tangchaisin

Hi,

I actually run into the same problem although our endpoint is not 
ElasticSearch. When the spark job is dead, we lose some data because 
Kinesis checkpoint is already beyond the last point that spark is processed.


Currently, our workaround is to use spark's checkpoint mechanism with 
write ahead log (WAL)


https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications

Using checkpointing comes with some disadvantage like application code 
is not upgradable, etc.


I believe there is some work to fix this problem like Kafka direct API.
Not sure if this is it : https://issues.apache.org/jira/browse/SPARK-9215

Thanks,
Patanachai


On 08/06/2015 12:08 PM, phibit wrote:

Hi! I'm using Spark + Kinesis ASL to process and persist stream data to
ElasticSearch. For the most part it works nicely.

There is a subtle issue I'm running into about how failures are handled.

For example's sake, let's say I am processing a Kinesis stream that produces
400 records per second, continuously.

Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream
consumer to use TRIM_HORIZON, to mean go as far back as possible and
start processing the stream data as quickly as possible, until you catch up
to the tip of the stream.

This means that for some period of time, Spark will suck in data from
Kinesis as quickly as it can, let's say at 5000 records per second.

In my specific case, ElasticSearch can gracefully handle 400 writes per
second, but is NOT happy to process 5000 writes per second. Let's say it
only handles 2000 wps. This means that the processing time will exceed the
batch time, scheduling delay in the stream will rise consistently and
batches of data will get backlogged for some period of time.

In normal circumstances, this is fine. When the Spark consumers catch up to
real-time, the data input rate slows to 400rps and the backlogged batches
eventually get flushed to ES. The system stabilizes.

However! It appears to me that the Kinesis consumer actively submits
checkpoints, even though the records may not have been processed yet (since
they are backlogged). If for some reason there is processing delay and the
Spark process crashes, the checkpoint will have advanced too far. If I
resume the Spark Streaming process, there is essentially a gap in my
ElasticSearch data.

In principle I understand the reason for this, but is there a way to adjust
this behavior? Or is there another way to handle this specific problem?
Ideally I would be able to configure the process to only submit Kinesis
checkpoints only after my data is successfully written to ES.

Thanks,
Phil





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kinesis-Checkpointing-Processing-Delay-tp24157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



--
Patanachai


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR -Graphx

2015-08-06 Thread Shivaram Venkataraman
+Xiangrui

I am not sure exposing the entire GraphX API would make sense as it
contains a lot of low level functions. However we could expose some
high level functions like PageRank etc. Xiangrui, who has been working
on similar techniques to expose MLLib functions like GLM might have
more to add.

Thanks
Shivaram

On Thu, Aug 6, 2015 at 6:21 AM, smagadi sudhindramag...@fico.com wrote:
 Wanted to use the GRaphX from SparkR , is there a way to do it ?.I think as
 of now it is not possible.I was thinking if one can write a wrapper in R
 that can call Scala Graphx libraries .
 Any thought on this please.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-tp24152.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Specifying the role when launching an AWS spark cluster using spark_ec2

2015-08-06 Thread Steve Loughran

There's no support for IAM roles in the s3n:// client code in Apache Hadoop ( 
HADOOP-9384 ); Amazon's modified EMR distro may have it.. 

The s3a filesystem adds it, —this is ready for production use in Hadoop 2.7.1+ 
(implicitly HDP 2.3; CDH 5.4 has cherrypicked the relevant patches.) I don't 
know about the spark_ec2 scripts or what they start

 On 6 Aug 2015, at 10:27, SK skrishna...@gmail.com wrote:
 
 Hi,
 
 I need to access data on S3 from another account and I have been given the
 IAM role information to access that S3 bucket. From what I understand, AWS
 allows us to attach a role to a resource at the time it is created. However,
 I don't see an option for specifying the role using the spark_ec2.py script. 
 So I created a spark cluster using the default role, but I was not able to
 change its IAM role after creation through AWS console.
 
 I see a ticket for this issue:
 https://github.com/apache/spark/pull/6962 and the status is closed. 
 
 If anyone knows how I can specify the role using spark_ec2.py, please let me
 know. I am using spark 1.4.1.
 
 thanks
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Specifying-the-role-when-launching-an-AWS-spark-cluster-using-spark-ec2-tp24154.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
With DEBUG, the log output was over 10MB, so I opted for just INFO output.
The (sanitized) log is attached.

The driver is essentially this code:

info(A)

val t = System.currentTimeMillis
val df = sqlContext.read.parquet(dir).select(...).cache

val elapsed = System.currentTimeMillis - t
info(sInit time: ${elapsed} ms)

We've also observed that it is very slow to read the contents of the
parquet files. My colleague wrote a PySpark application that gets the list
of files, parallelizes it, maps across it and reads each file manually
using a C parquet library, and aggregates manually in the loop. Ignoring
the 1-2 minute initialization cost, compared to a Spark SQL or DataFrame
query in Scala, his is an order of magnitude faster. Since he is
parallelizing the work through Spark, and that isn't causing any
performance issues, it seems to be a problem with the parquet reader. I may
try to do what he did to construct a DataFrame manually, and see if I can
query it with Spark SQL with reasonable performance.

- Philip


On Thu, Aug 6, 2015 at 8:37 AM, Cheng Lian lian.cs@gmail.com wrote:

 Would you mind to provide the driver log?


 On 8/6/15 3:58 PM, Philip Weaver wrote:

 I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried
 again.

 The initialization time is about 1 minute now, which is still pretty
 terrible.

 On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com
 wrote:

 Absolutely, thanks!

 On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian  lian.cs@gmail.com
 lian.cs@gmail.com wrote:

 We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396

 Could you give it a shot to see whether it helps in your case? We've
 observed ~50x performance boost with schema merging turned on.

 Cheng


 On 8/6/15 8:26 AM, Philip Weaver wrote:

 I have a parquet directory that was produced by partitioning by two
 keys, e.g. like this:

 df.write.partitionBy(a, b).parquet(asdf)


 There are 35 values of a, and about 1100-1200 values of b for each
 value of a, for a total of over 40,000 partitions.

 Before running any transformations or actions on the DataFrame, just
 initializing it like this takes *2 minutes*:

 val df = sqlContext.read.parquet(asdf)


 Is this normal? Is this because it is doing some bookeeping to discover
 all the partitions? Is it perhaps having to merge the schema from each
 partition? Would you expect it to get better or worse if I subpartition by
 another key?

 - Philip







10:51:42  INFO spark.SparkContext: Running Spark version 1.5.0-SNAPSHOT
10:51:42  WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:51:42  INFO spark.SecurityManager: Changing view acls to: pweaver
10:51:42  INFO spark.SecurityManager: Changing modify acls to: pweaver
10:51:42  INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(pweaver); users with modify permissions: Set(pweaver)
10:51:43  INFO slf4j.Slf4jLogger: Slf4jLogger started
10:51:43  INFO Remoting: Starting remoting
10:51:43  INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.26.21.70:51400]
10:51:43  INFO util.Utils: Successfully started service 'sparkDriver' on port 51400.
10:51:43  INFO spark.SparkEnv: Registering MapOutputTracker
10:51:43  INFO spark.SparkEnv: Registering BlockManagerMaster
10:51:43  INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-04438917-93ee-45f3-bc10-c5f5eb3d6a4a
10:51:43  INFO storage.MemoryStore: MemoryStore started with capacity 530.0 MB
10:51:43  INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-faec22af-bb2d-4fae-8a02-b8ca67867858/httpd-50939810-7da7-42d9-9342-48d9dc2705dc
10:51:43  INFO spark.HttpServer: Starting HTTP Server
10:51:43  INFO server.Server: jetty-8.y.z-SNAPSHOT
10:51:43  INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55227
10:51:43  INFO util.Utils: Successfully started service 'HTTP file server' on port 55227.
10:51:43  INFO spark.SparkEnv: Registering OutputCommitCoordinator
10:51:43  INFO server.Server: jetty-8.y.z-SNAPSHOT
10:51:43  INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
10:51:43  INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
10:51:43  INFO ui.SparkUI: Started SparkUI at http://172.26.21.70:4040
10:51:43  INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark-assembly-1.0-deps.jar at http://172.26.21.70:55227/jars/linear_spark-assembly-1.0-deps.jar with timestamp 1438883503937
10:51:43  INFO spark.SparkContext: Added JAR file:/home/pweaver/work/linear_spark_2.11-1.0.jar at http://172.26.21.70:55227/jars/linear_spark_2.11-1.0.jar with timestamp 1438883503940
10:51:44  WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
10:51:44  INFO mesos.CoarseMesosSchedulerBackend: 

Spark Kinesis Checkpointing/Processing Delay

2015-08-06 Thread phibit
Hi! I'm using Spark + Kinesis ASL to process and persist stream data to
ElasticSearch. For the most part it works nicely.

There is a subtle issue I'm running into about how failures are handled.

For example's sake, let's say I am processing a Kinesis stream that produces
400 records per second, continuously.

Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream
consumer to use TRIM_HORIZON, to mean go as far back as possible and
start processing the stream data as quickly as possible, until you catch up
to the tip of the stream.

This means that for some period of time, Spark will suck in data from
Kinesis as quickly as it can, let's say at 5000 records per second.

In my specific case, ElasticSearch can gracefully handle 400 writes per
second, but is NOT happy to process 5000 writes per second. Let's say it
only handles 2000 wps. This means that the processing time will exceed the
batch time, scheduling delay in the stream will rise consistently and
batches of data will get backlogged for some period of time.

In normal circumstances, this is fine. When the Spark consumers catch up to
real-time, the data input rate slows to 400rps and the backlogged batches
eventually get flushed to ES. The system stabilizes.

However! It appears to me that the Kinesis consumer actively submits
checkpoints, even though the records may not have been processed yet (since
they are backlogged). If for some reason there is processing delay and the
Spark process crashes, the checkpoint will have advanced too far. If I
resume the Spark Streaming process, there is essentially a gap in my
ElasticSearch data.

In principle I understand the reason for this, but is there a way to adjust
this behavior? Or is there another way to handle this specific problem?
Ideally I would be able to configure the process to only submit Kinesis
checkpoints only after my data is successfully written to ES.

Thanks,
Phil





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kinesis-Checkpointing-Processing-Delay-tp24157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkR -Graphx

2015-08-06 Thread smagadi
Wanted to use the GRaphX from SparkR , is there a way to do it ?.I think as
of now it is not possible.I was thinking if one can write a wrapper in R
that can call Scala Graphx libraries .
Any thought on this please.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Graphx-tp24152.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is it worth storing in ORC for one time read. And can be replace hive with HBase

2015-08-06 Thread venkatesh b
I'm really sorry, by mistake I posted in spark mailing list.

Jorn Frankie Thanks for your reply.
I have many joins, many complex queries and all are table scans. So I think
HBase do not work for me.

On Thursday, August 6, 2015, Jörn Franke jornfra...@gmail.com wrote:

 Additionally it is of key importance to use the right data types for the
 columns. Use int for ids,  int or decimal or float or double etc for
 numeric values etc. - A bad data model using varchars and string where not
 appropriate is a significant bottle neck.
 Furthermore include partition columns in join statements (not where)
 otherwise you do a full table scan ignoring partitions

 Le jeu. 6 août 2015 à 15:07, Jörn Franke jornfra...@gmail.com
 javascript:_e(%7B%7D,'cvml','jornfra...@gmail.com'); a écrit :

 Yes you should use orc it is much faster and more compact. Additionally
 you can apply compression (snappy) to increase performance. Your data
 processing pipeline seems to be not.very optimized. You should use the
 newest hive version enabling storage indexes and bloom filters on
 appropriate columns. Ideally you should insert the data sorted
 appropriately. Partitioning and setting the execution engine to tez is also
 beneficial.

 Hbase with phoenix should currently only be used if you do few joins, not
 very complex queries and not many full table scans.

 Le jeu. 6 août 2015 à 14:54, venkatesh b venkateshmailingl...@gmail.com
 javascript:_e(%7B%7D,'cvml','venkateshmailingl...@gmail.com'); a
 écrit :

 Hi, here I got two things to know.
 FIRST:
 In our project we use hive.
 We daily get new data. We need to process this new data only once. And
 send this processed data to RDBMS. Here in processing we majorly use many
 complex queries with joins with where condition and grouping functions.
 There are many intermediate tables generated around 50 while
 processing. Till now we use text format as storage. We came across ORC file
 format. I would like to know that since it is one Time querying the table
 is it worth of storing as ORC format.

 SECOND:
 I came to know about HBase, which is faster.
 Can I replace hive with HBase for processing of data daily faster.
 Currently it is taking 15hrs daily with hive.


 Please inform me if any other information is needed.

 Thanks  regards
 Venkatesh




Re: subscribe

2015-08-06 Thread Akhil Das
Welcome aboard!

Thanks
Best Regards

On Thu, Aug 6, 2015 at 11:21 AM, Franc Carter franc.car...@rozettatech.com
wrote:

 subscribe



Re: subscribe

2015-08-06 Thread Ted Yu
See http://spark.apache.org/community.html

Cheers



 On Aug 5, 2015, at 10:51 PM, Franc Carter franc.car...@rozettatech.com 
 wrote:
 
 subscribe


Re: Enum values in custom objects mess up RDD operations

2015-08-06 Thread Sebastian Kalix
Thanks a lot Igor,

the following hashCode function is stable:

@Override
public int hashCode() {
int hash = 5;
hash = 41 * hash + this.myEnum.ordinal();
return hash;
}

For anyone having the same problem:
http://tech.technoflirt.com/2014/08/21/issues-with-java-enum-hashcode/


Cheers,

Sebastian

Igor Berman igor.ber...@gmail.com schrieb am Do., 6. Aug. 2015 um
10:59 Uhr:

 enums hashcode is jvm instance specific(ie. different jvms will give you
 different values), so  you can use ordinal in hashCode computation or use
 hashCode on enums ordinal as part of hashCode computation

 On 6 August 2015 at 11:41, Warfish sebastian.ka...@gmail.com wrote:

 Hi everyone,

 I was working with Spark for a little while now and have encountered a
 very
 strange behaviour that caused me a lot of headaches:

 I have written my own POJOs to encapsulate my data and this data is held
 in
 some JavaRDDs. Part of these POJOs is a member variable of a custom enum
 type. Whenever I do some operations on these RDDs such as subtract,
 groupByKey, reduce or similar things, the results are inconsistent and
 non-sensical. However, this happens only when the application runs in
 standalone cluster mode (10 nodes). When running locally on my developer
 machine, the code executes just fine. If you want to reproduce this
 behaviour,  here
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip
 
 is the complete Maven project that you can run out of the box. I am
 running
 Spark 1.4.0 and submitting the application using
 /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
 de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar



 Consider the following code for my custom object:


 package de.spark.test;

 import java.io.Serializable;
 import java.util.Objects;

 public class MyObject implements Serializable {

 private MyEnum myEnum;

 public MyObject(MyEnum myEnum) {
 this.myEnum = myEnum;
 }

 public MyEnum getMyEnum() {
 return myEnum;
 }

 public void setMyEnum(MyEnum myEnum) {
 this.myEnum = myEnum;
 }

 @Override
 public int hashCode() {
 int hash = 5;
 hash = 41 * hash + Objects.hashCode(this.myEnum);
 return hash;
 }

 @Override
 public boolean equals(Object obj) {
 if (obj == null) {
 return false;
 }
 if (getClass() != obj.getClass()) {
 return false;
 }
 final MyObject other = (MyObject) obj;
 if (this.myEnum != other.myEnum) {
 return false;
 }
 return true;
 }

 @Override
 public String toString() {
 return MyObject{ + myEnum= + myEnum + '}';
 }

 }


 As you can see, I have overriden equals() and hashCode() (both are
 auto-generated). The enum is given as follows:


 package de.spark.test;

 import java.io.Serializable;

 public enum MyEnum implements Serializable {
   VALUE1, VALUE2
 }


 The main() method is defined by:


 package de.spark.test;

 import java.util.ArrayList;
 import java.util.List;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;

 public class Main {

   public static void main(String[] args) {
 SparkConf conf = new SparkConf().setAppName(Spark Test)
 .setMaster(myMaster);

 JavaSparkContext jsc = new JavaSparkContext(conf);


 System.out.println(///
 Object generation);

 ListMyObject l1 = new ArrayList();

 for(int i = 0; i  1000; i++) {
 l1.add(new MyObject(MyEnum.VALUE1));
 }

 JavaRDDMyObject myObjectRDD1 = jsc.parallelize(l1);
 JavaRDDMyObject myObjectRDD2 = jsc.parallelize(l1);

 System.out.println(myObjectRDD1 count  =  +
 myObjectRDD1.count());
 System.out.println(myObjectRDD2 count  =  +
 myObjectRDD2.count());


 System.out.println(///
 Distinct);

 JavaRDDMyObject myObjectRDD1Distinct = myObjectRDD1.distinct();
 JavaRDDMyObject myObjectRDD2Distinct = myObjectRDD2.distinct();

 System.out.println(myObjectRDD1Distinct count  =  +
 myObjectRDD1Distinct.count());
 System.out.println(myObjectRDD2Distinct count  =  +
 myObjectRDD2Distinct.count());


 System.out.println(///
 Subtract);

 JavaRDDMyObject myObjectRDD1Minus1 =
 myObjectRDD1.subtract(myObjectRDD1);
 JavaRDDMyObject myObjectRDD1Minus2 =
 myObjectRDD1.subtract(myObjectRDD2);
 JavaRDDMyObject myObjectRDD2Minus1 =
 myObjectRDD2.subtract(myObjectRDD1);

 System.out.println(myObjectRDD1Minus1 count=  +
 myObjectRDD1Minus1.count());
 System.out.println(myObjectRDD1Minus2 count=  +
 myObjectRDD1Minus2.count());
 System.out.println(myObjectRDD2Minus1 count=  +
 myObjectRDD2Minus1.count());


 

Re: Pause Spark Streaming reading or sampling streaming data

2015-08-06 Thread Dimitris Kouzis - Loukas
Hi, - yes - it's great that you wrote it yourself - it means you have more
control. I have the feeling that the most efficient point to discard as
much data as possible - or even modify your subscription protocol to - your
spark input source - not even receive the other 50 seconds of data is the
most efficient point. After you deliver data to DStream - you might filter
them as much as you want - but you will still be subject to garbage
collection and/or potential shuffles/and HDD checkpoints.

On Thu, Aug 6, 2015 at 1:31 AM, Heath Guo heath...@fb.com wrote:

 Hi Dimitris,

 Thanks for your reply. Just wondering – are you asking about my streaming
 input source? I implemented a custom receiver and have been using that.
 Thanks.

 From: Dimitris Kouzis - Loukas look...@gmail.com
 Date: Wednesday, August 5, 2015 at 5:27 PM
 To: Heath Guo heath...@fb.com
 Cc: user@spark.apache.org user@spark.apache.org
 Subject: Re: Pause Spark Streaming reading or sampling streaming data

 What driver do you use? Sounds like something you should do before the
 driver...

 On Thu, Aug 6, 2015 at 12:50 AM, Heath Guo heath...@fb.com wrote:

 Hi, I have a question about sampling Spark Streaming data, or getting
 part of the data. For every minute, I only want the data read in during the
 first 10 seconds, and discard all data in the next 50 seconds. Is there any
 way to pause reading and discard data in that period? I'm doing this to
 sample from a stream of huge amount of data, which saves processing time in
 the real-time program. Thanks!





Re: Pause Spark Streaming reading or sampling streaming data

2015-08-06 Thread Dimitris Kouzis - Loukas
Re-reading your description - I guess you could potentially make your input
source to connect for 10 seconds, pause for 50 and then reconnect.

On Thu, Aug 6, 2015 at 10:32 AM, Dimitris Kouzis - Loukas look...@gmail.com
 wrote:

 Hi, - yes - it's great that you wrote it yourself - it means you have more
 control. I have the feeling that the most efficient point to discard as
 much data as possible - or even modify your subscription protocol to - your
 spark input source - not even receive the other 50 seconds of data is the
 most efficient point. After you deliver data to DStream - you might filter
 them as much as you want - but you will still be subject to garbage
 collection and/or potential shuffles/and HDD checkpoints.

 On Thu, Aug 6, 2015 at 1:31 AM, Heath Guo heath...@fb.com wrote:

 Hi Dimitris,

 Thanks for your reply. Just wondering – are you asking about my streaming
 input source? I implemented a custom receiver and have been using that.
 Thanks.

 From: Dimitris Kouzis - Loukas look...@gmail.com
 Date: Wednesday, August 5, 2015 at 5:27 PM
 To: Heath Guo heath...@fb.com
 Cc: user@spark.apache.org user@spark.apache.org
 Subject: Re: Pause Spark Streaming reading or sampling streaming data

 What driver do you use? Sounds like something you should do before the
 driver...

 On Thu, Aug 6, 2015 at 12:50 AM, Heath Guo heath...@fb.com wrote:

 Hi, I have a question about sampling Spark Streaming data, or getting
 part of the data. For every minute, I only want the data read in during the
 first 10 seconds, and discard all data in the next 50 seconds. Is there any
 way to pause reading and discard data in that period? I'm doing this to
 sample from a stream of huge amount of data, which saves processing time in
 the real-time program. Thanks!






Aggregate by timestamp from json message

2015-08-06 Thread vchandra
Hi team,
I am very new to SPARK, actually today is my first day. 

I have a nested json string which contains timestamp and lot of
other details. I have json messages from which I need to write multiple
aggregation but for now I need to write one aggregation. If code structure
is already there then kindly post if or give some pointers to start. Quick
inputs will help me lot.
 
Sample Requirement example:


*Requirement: How many Stock dispatched in last 1 hour*

[
   {
  name:Stock dispatched,
  *timestamp:2015-04-14T10:03:10.000Z,*
  component:Work Order,
  sessionID:4324--52-3-52-46-3-46-3-75,
  properties:{
 Priority:3,
 Appliance Manufacturer:XXX game,
 Appliance Model:HJ 10,
 Appliance Model Year:2012
  }
   },
   {
  name:Stock dispatched,
 * timestamp:2015-04-14T10:04:10.000Z,*
  component:Work Order,
  sessionID:4324--52-3-52-46-3-46-3-75,
  properties:{
 Priority:3,
 Appliance Manufacturer:XXX game,
 Appliance Model:DJ 15,
 Appliance Model Year:2012
  }
   }
]



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Aggregate-by-timestamp-from-json-message-tp24147.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No Twitter Input from Kafka to Spark Streaming

2015-08-06 Thread Akhil Das
You just pasted your twitter credentials, consider changing it. :/

Thanks
Best Regards

On Wed, Aug 5, 2015 at 10:07 PM, narendra narencs...@gmail.com wrote:

 Thanks Akash for the answer. I added endpoint to the listener and now it is
 working.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/No-Twitter-Input-from-Kafka-to-Spark-Streaming-tp24131p24142.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried
again.

The initialization time is about 1 minute now, which is still pretty
terrible.

On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com
wrote:

 Absolutely, thanks!

 On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com wrote:

 We've fixed this issue in 1.5 https://github.com/apache/spark/pull/7396

 Could you give it a shot to see whether it helps in your case? We've
 observed ~50x performance boost with schema merging turned on.

 Cheng


 On 8/6/15 8:26 AM, Philip Weaver wrote:

 I have a parquet directory that was produced by partitioning by two keys,
 e.g. like this:

 df.write.partitionBy(a, b).parquet(asdf)


 There are 35 values of a, and about 1100-1200 values of b for each
 value of a, for a total of over 40,000 partitions.

 Before running any transformations or actions on the DataFrame, just
 initializing it like this takes *2 minutes*:

 val df = sqlContext.read.parquet(asdf)


 Is this normal? Is this because it is doing some bookeeping to discover
 all the partitions? Is it perhaps having to merge the schema from each
 partition? Would you expect it to get better or worse if I subpartition by
 another key?

 - Philip







Multiple Thrift servers on one Spark cluster

2015-08-06 Thread Bojan Kostic
Hi,

Is there a way to instantiate multiple Thrift servers on one Spark Cluster?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Thrift-servers-on-one-Spark-cluster-tp24148.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Enum values in custom objects mess up RDD operations

2015-08-06 Thread Warfish
Hi everyone,

I was working with Spark for a little while now and have encountered a very
strange behaviour that caused me a lot of headaches:

I have written my own POJOs to encapsulate my data and this data is held in
some JavaRDDs. Part of these POJOs is a member variable of a custom enum
type. Whenever I do some operations on these RDDs such as subtract,
groupByKey, reduce or similar things, the results are inconsistent and
non-sensical. However, this happens only when the application runs in
standalone cluster mode (10 nodes). When running locally on my developer
machine, the code executes just fine. If you want to reproduce this
behaviour,  here
http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip 
 
is the complete Maven project that you can run out of the box. I am running
Spark 1.4.0 and submitting the application using 
/usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar



Consider the following code for my custom object:


package de.spark.test;

import java.io.Serializable;
import java.util.Objects;

public class MyObject implements Serializable {

private MyEnum myEnum;

public MyObject(MyEnum myEnum) {
this.myEnum = myEnum;
}

public MyEnum getMyEnum() {
return myEnum;
}

public void setMyEnum(MyEnum myEnum) {
this.myEnum = myEnum;
}

@Override
public int hashCode() {
int hash = 5;
hash = 41 * hash + Objects.hashCode(this.myEnum);
return hash;
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final MyObject other = (MyObject) obj;
if (this.myEnum != other.myEnum) {
return false;
}
return true;
}

@Override
public String toString() {
return MyObject{ + myEnum= + myEnum + '}';
}

}


As you can see, I have overriden equals() and hashCode() (both are
auto-generated). The enum is given as follows:


package de.spark.test;

import java.io.Serializable;

public enum MyEnum implements Serializable {
  VALUE1, VALUE2
}


The main() method is defined by:


package de.spark.test;

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class Main {

  public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(Spark Test)
.setMaster(myMaster);

JavaSparkContext jsc = new JavaSparkContext(conf);

System.out.println(///
Object generation);

ListMyObject l1 = new ArrayList();

for(int i = 0; i  1000; i++) {
l1.add(new MyObject(MyEnum.VALUE1));
}

JavaRDDMyObject myObjectRDD1 = jsc.parallelize(l1);
JavaRDDMyObject myObjectRDD2 = jsc.parallelize(l1);

System.out.println(myObjectRDD1 count  =  +
myObjectRDD1.count());
System.out.println(myObjectRDD2 count  =  +
myObjectRDD2.count());

System.out.println(///
Distinct);

JavaRDDMyObject myObjectRDD1Distinct = myObjectRDD1.distinct();
JavaRDDMyObject myObjectRDD2Distinct = myObjectRDD2.distinct();

System.out.println(myObjectRDD1Distinct count  =  +
myObjectRDD1Distinct.count());
System.out.println(myObjectRDD2Distinct count  =  +
myObjectRDD2Distinct.count());

System.out.println(///
Subtract);

JavaRDDMyObject myObjectRDD1Minus1 =
myObjectRDD1.subtract(myObjectRDD1);
JavaRDDMyObject myObjectRDD1Minus2 =
myObjectRDD1.subtract(myObjectRDD2);
JavaRDDMyObject myObjectRDD2Minus1 =
myObjectRDD2.subtract(myObjectRDD1);

System.out.println(myObjectRDD1Minus1 count=  +
myObjectRDD1Minus1.count());
System.out.println(myObjectRDD1Minus2 count=  +
myObjectRDD1Minus2.count());
System.out.println(myObjectRDD2Minus1 count=  +
myObjectRDD2Minus1.count());

System.out.println(///
End);
  }
  
}


Both RDDs contain 1000 exactly equal objects, one would expect each call of
distinct() to result in 1 and subtract(JavaRDDMyObject) to result in empty
RDDs. However here is some sample output:


/// Object generation
myObjectRDD1 count  = 1000
myObjectRDD2 count  = 1000
/// Distinct
myObjectRDD1Distinct count  = 1
myObjectRDD2Distinct count  = 2
/// Subtract
myObjectRDD1Minus1 count= 500
myObjectRDD1Minus2 count= 0
myObjectRDD2Minus1 count= 0
/// End


Re: Enum values in custom objects mess up RDD operations

2015-08-06 Thread Igor Berman
enums hashcode is jvm instance specific(ie. different jvms will give you
different values), so  you can use ordinal in hashCode computation or use
hashCode on enums ordinal as part of hashCode computation

On 6 August 2015 at 11:41, Warfish sebastian.ka...@gmail.com wrote:

 Hi everyone,

 I was working with Spark for a little while now and have encountered a very
 strange behaviour that caused me a lot of headaches:

 I have written my own POJOs to encapsulate my data and this data is held in
 some JavaRDDs. Part of these POJOs is a member variable of a custom enum
 type. Whenever I do some operations on these RDDs such as subtract,
 groupByKey, reduce or similar things, the results are inconsistent and
 non-sensical. However, this happens only when the application runs in
 standalone cluster mode (10 nodes). When running locally on my developer
 machine, the code executes just fine. If you want to reproduce this
 behaviour,  here
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip
 
 is the complete Maven project that you can run out of the box. I am running
 Spark 1.4.0 and submitting the application using
 /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
 de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar



 Consider the following code for my custom object:


 package de.spark.test;

 import java.io.Serializable;
 import java.util.Objects;

 public class MyObject implements Serializable {

 private MyEnum myEnum;

 public MyObject(MyEnum myEnum) {
 this.myEnum = myEnum;
 }

 public MyEnum getMyEnum() {
 return myEnum;
 }

 public void setMyEnum(MyEnum myEnum) {
 this.myEnum = myEnum;
 }

 @Override
 public int hashCode() {
 int hash = 5;
 hash = 41 * hash + Objects.hashCode(this.myEnum);
 return hash;
 }

 @Override
 public boolean equals(Object obj) {
 if (obj == null) {
 return false;
 }
 if (getClass() != obj.getClass()) {
 return false;
 }
 final MyObject other = (MyObject) obj;
 if (this.myEnum != other.myEnum) {
 return false;
 }
 return true;
 }

 @Override
 public String toString() {
 return MyObject{ + myEnum= + myEnum + '}';
 }

 }


 As you can see, I have overriden equals() and hashCode() (both are
 auto-generated). The enum is given as follows:


 package de.spark.test;

 import java.io.Serializable;

 public enum MyEnum implements Serializable {
   VALUE1, VALUE2
 }


 The main() method is defined by:


 package de.spark.test;

 import java.util.ArrayList;
 import java.util.List;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;

 public class Main {

   public static void main(String[] args) {
 SparkConf conf = new SparkConf().setAppName(Spark Test)
 .setMaster(myMaster);

 JavaSparkContext jsc = new JavaSparkContext(conf);

 System.out.println(///
 Object generation);

 ListMyObject l1 = new ArrayList();

 for(int i = 0; i  1000; i++) {
 l1.add(new MyObject(MyEnum.VALUE1));
 }

 JavaRDDMyObject myObjectRDD1 = jsc.parallelize(l1);
 JavaRDDMyObject myObjectRDD2 = jsc.parallelize(l1);

 System.out.println(myObjectRDD1 count  =  +
 myObjectRDD1.count());
 System.out.println(myObjectRDD2 count  =  +
 myObjectRDD2.count());

 System.out.println(///
 Distinct);

 JavaRDDMyObject myObjectRDD1Distinct = myObjectRDD1.distinct();
 JavaRDDMyObject myObjectRDD2Distinct = myObjectRDD2.distinct();

 System.out.println(myObjectRDD1Distinct count  =  +
 myObjectRDD1Distinct.count());
 System.out.println(myObjectRDD2Distinct count  =  +
 myObjectRDD2Distinct.count());

 System.out.println(///
 Subtract);

 JavaRDDMyObject myObjectRDD1Minus1 =
 myObjectRDD1.subtract(myObjectRDD1);
 JavaRDDMyObject myObjectRDD1Minus2 =
 myObjectRDD1.subtract(myObjectRDD2);
 JavaRDDMyObject myObjectRDD2Minus1 =
 myObjectRDD2.subtract(myObjectRDD1);

 System.out.println(myObjectRDD1Minus1 count=  +
 myObjectRDD1Minus1.count());
 System.out.println(myObjectRDD1Minus2 count=  +
 myObjectRDD1Minus2.count());
 System.out.println(myObjectRDD2Minus1 count=  +
 myObjectRDD2Minus1.count());

 System.out.println(///
 End);
   }

 }


 Both RDDs contain 1000 exactly equal objects, one would expect each call of
 distinct() to result in 1 and subtract(JavaRDDMyObject) to result in
 empty
 RDDs. However here is some sample output:


 /// Object generation
 myObjectRDD1 count  = 1000
 myObjectRDD2 

Re: spark hangs at broadcasting during a filter

2015-08-06 Thread Alex Gittens
Thanks. Repartitioning to a smaller number of partitions seems to fix my
issue, but I'll keep broadcasting in mind (droprows is an integer array
with about 4 million entries).

On Wed, Aug 5, 2015 at 12:34 PM, Philip Weaver philip.wea...@gmail.com
wrote:

 How big is droprows?

 Try explicitly broadcasting it like this:

 val broadcastDropRows = sc.broadcast(dropRows)

 val valsrows = ...
 .filter(x = !broadcastDropRows.value.contains(x._1))

 - Philip


 On Wed, Aug 5, 2015 at 11:54 AM, AlexG swift...@gmail.com wrote:

 I'm trying to load a 1 Tb file whose lines i,j,v represent the values of a
 matrix given as A_{ij} = v so I can convert it to a Parquet file. Only
 some
 of the rows of A are relevant, so the following code first loads the
 triplets are text, splits them into Tuple3[Int, Int, Double], drops
 triplets
 whose rows should be skipped, then forms a Tuple2[Int, List[Tuple2[Int,
 Double]]] for each row (if I'm judging datatypes correctly).

 val valsrows = sc.textFile(valsinpath).map(_.split(,)).
   map(x = (x(1).toInt, (x(0).toInt,
 x(2).toDouble))).
   filter(x = !droprows.contains(x._1)).
   groupByKey.
   map(x = (x._1, x._2.toSeq.sortBy(_._1)))

 Spark hangs during a broadcast that occurs during the filter step
 (according
 to the Spark UI). The last two lines in the log before it pauses are:

 5/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
 in
 memory on 172.31.49.149:37643 (size: 4.6 KB, free: 113.8 GB)
 15/08/05 18:50:10 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
 in
 memory on 172.31.49.159:41846 (size: 4.6 KB, free: 113.8 GB)

 I've left Spark running for up to 17 minutes one time, and it never
 continues past this point. I'm using a cluster of 30 r3.8xlarge EC2
 instances (244Gb, 32 cores) with spark in standalone mode with 220G
 executor
 and driver memory, and using the kyroserializer.

 Any ideas on what could be causing this hang?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-hangs-at-broadcasting-during-a-filter-tp24143.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Temp file missing when training logistic regression

2015-08-06 Thread Cat
Hello, 

I am using the Python API to perform a grid search and train models using
LogisticRegressionWithSGD. 
I am using r3.xl machines in EC2, running on top of YARN in cluster mode. 

The training RDD is persisted in memory and on disk. Some of the models
train successfully, but then at some point during the grid search I get an
error. It looks like the Python broadcast is looking for a part of the RDD
which is no longer there. I scanned the logs for further errors but could
not find anything. 

Any ideas of what could be causing this, and what should I be looking for? 

Many thanks. 
Cat

  model = LogisticRegressionWithSGD.train(the_training, iterations=i,
regParam=c, miniBatchFraction=0.8)
  File /home/hadoop/spark/python/pyspark/mllib/classification.py, line
164, in train
return _regression_train_wrapper(train, LogisticRegressionModel, data,
initialWeights)
  File /home/hadoop/spark/python/pyspark/mllib/regression.py, line 140, in
_regression_train_wrapper
weights, intercept = train_func(data,
_convert_to_vector(initial_weights))
  File /home/hadoop/spark/python/pyspark/mllib/classification.py, line
162, in train
bool(intercept))
  File /home/hadoop/spark/python/pyspark/mllib/common.py, line 120, in
callMLlibFunc
return callJavaFunc(sc, api, *args)
  File /home/hadoop/spark/python/pyspark/mllib/common.py, line 113, in
callJavaFunc
return _java2py(sc, func(*args))
  File
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__
self.target_id, self.name)
  File
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
300, in get_return_value
format(target_id, '.', name), value)
Py4JJavaError: An error occurred while calling
o271.trainLogisticRegressionModelWithSGD.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
serialization failed: java.io.FileNotFoundException:
/mnt/spark/spark-b07b34f8-66c3-43ae-a3ed-0c291724409b/pyspark-4196e8e5-8024-4ec5-a7bb-a60b216e6e74/tmpbCjiSR
(No such file or directory)
java.io.FileInputStream.open(Native Method)
java.io.FileInputStream.init(FileInputStream.java:146)
org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply$mcJ$sp(PythonRDD.scala:848)
org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply(PythonRDD.scala:847)
org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply(PythonRDD.scala:847)
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
org.apache.spark.api.python.PythonBroadcast.writeObject(PythonRDD.scala:847)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1176)
org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:79)
org.apache.spark.storage.DiskStore.putArray(DiskStore.scala:64)
org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1028)
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:419)
org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:408)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.storage.MemoryStore.ensureFreeSpace(MemoryStore.scala:408)
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:263)
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136)
org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:991)
org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:84)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)

How to binarize data in spark

2015-08-06 Thread Adamantios Corais
I have a set of data based on which I want to create a classification
model. Each row has the following form:

user1,class1,product1
 user1,class1,product2
 user1,class1,product5
 user2,class1,product2
 user2,class1,product5
 user3,class2,product1
 etc


There are about 1M users, 2 classes, and 1M products. What I would like to
do next is create the sparse vectors (something already supported by MLlib)
BUT in order to apply that function I have to create the dense vectors
(with the 0s), first. In other words, I have to binarize my data. What's
the easiest (or most elegant) way of doing that?


*// Adamantios*


Terminate streaming app on cluster restart

2015-08-06 Thread Alexander Krasheninnikov

Hello, everyone!
I have a case, when running standalone cluster: on master 
stop-all.sh/star-all.sh are invoked, streaming app loses all it's 
executors, but does not interrupt.
Since it is a streaming app, expected to get it's results ASAP, an 
downtime is undesirable.

Is there any workaround to solve that problem?

Thanks a lot.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Cheng Lian

Would you mind to provide the driver log?

On 8/6/15 3:58 PM, Philip Weaver wrote:
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and 
tried again.


The initialization time is about 1 minute now, which is still pretty 
terrible.


On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com 
mailto:philip.wea...@gmail.com wrote:


Absolutely, thanks!

On Wed, Aug 5, 2015 at 9:07 PM, Cheng Lian lian.cs@gmail.com
mailto:lian.cs@gmail.com wrote:

We've fixed this issue in 1.5
https://github.com/apache/spark/pull/7396

Could you give it a shot to see whether it helps in your case?
We've observed ~50x performance boost with schema merging
turned on.

Cheng


On 8/6/15 8:26 AM, Philip Weaver wrote:

I have a parquet directory that was produced by partitioning
by two keys, e.g. like this:

df.write.partitionBy(a, b).parquet(asdf)


There are 35 values of a, and about 1100-1200 values of b
for each value of a, for a total of over 40,000 partitions.

Before running any transformations or actions on the
DataFrame, just initializing it like this takes *2 minutes*:

val df = sqlContext.read.parquet(asdf)


Is this normal? Is this because it is doing some bookeeping
to discover all the partitions? Is it perhaps having to merge
the schema from each partition? Would you expect it to get
better or worse if I subpartition by another key?

- Philip