Re: stopping spark stream app

2015-08-09 Thread Shushant Arora
Hi

How to ensure in spark streaming 1.3 with kafka that when an application is
killed , last running batch is fully processed and offsets are written to
checkpointing dir.

On Fri, Aug 7, 2015 at 8:56 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 Hi

 I am using spark stream 1.3 and using custom checkpoint to save kafka
 offsets.

 1.Is doing
 Runtime.getRuntime().addShutdownHook(new Thread() {
   @Override
   public void run() {
   jssc.stop(true, true);
System.out.println(Inside Add Shutdown Hook);
   }
  });

 to handle stop is safe ?

 2.And I need to handle saving checkoinnt in shutdown hook also or driver
 will handle it automatically since it grcaefully stops stream and handle
 completion of foreachRDD function on stream ?
 directKafkaStream.foreachRDD(new FunctionJavaRDDbyte[][], Void() {
 }

 Thanks




deleting application files in standalone cluster

2015-08-09 Thread Lior Chaga
Hi,
Using spark 1.4.0 in standalone mode, with following configuration:

SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true
-Dspark.worker.cleanup.appDataTtl=86400

cleanup interval is set to default.

Application files are not deleted.

Using JavaSparkContext, and when the application ends it stops the context.

Maybe I should also call context.close()?

From what I understand, stop should be enough (
http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-stop-td17826.html#a17847
)

Thanks,

Lior


ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver

2015-08-09 Thread Sadaf
Hi
When i tried to stop spark streaming using ssc.stop(false,true) It gives the
following error.

ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
15/08/07 13:41:11 WARN ReceiverSupervisorImpl: Stopped executor without
error
15/08/07 13:41:20 WARN WriteAheadLogManager : Failed to write to write ahead
log

I've implemented Streaming Listener and a Custom Receiver. Does anyone has
idea about this?

Thanks :)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-ReceiverTracker-Deregistered-receiver-for-stream-0-Stopped-by-driver-tp24183.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-submit not finding main class and the error reflects different path to jar file than specified

2015-08-09 Thread Akhil Das
Are you setting SPARK_PREPEND_CLASSES? try to disable it. Here your uber
jar which does not have the SparkConf is put in the first place of the
class-path which is messing it up.

Thanks
Best Regards

On Thu, Aug 6, 2015 at 5:48 PM, Stephen Boesch java...@gmail.com wrote:

 Given the following command line to spark-submit:

 bin/spark-submit --verbose --master local[2]--class
 org.yardstick.spark.SparkCoreRDDBenchmark
 /shared/ysgood/target/yardstick-spark-uber-0.0.1.jar

 Here is the output:

 NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes
 ahead of assembly.
 Using properties file: /shared/spark-1.4.1/conf/spark-defaults.conf
 Adding default property: spark.akka.askTimeout=180
 Adding default property: spark.master=spark://mellyrn.local:7077
 Error: Cannot load main class from JAR
 file:/shared/spark-1.4.1/org.yardstick.spark.SparkCoreRDDBenchmark
 Run with --help for usage help or --verbose for debug output


 The path
 file:/shared/spark-1.4.1/org.yardstick.spark.SparkCoreRDDBenchmark  does
 not seem to make sense. It  does not reflect the path to the file that was
 specified on the submit-spark command line.

 Note: when attempting to run that jar file via

 java -classpath shared/ysgood/target/yardstick-spark-uber-0.0.1.jar
 org.yardstick.spark.SparkCoreRDDBenchmark

 Then the result is as expected: the main class starts to load and then
 there is a NoClassDefFoundException on the SparkConf.classs (which is not
 inside the jar). This shows the app jar is healthy.







Re: Multiple Thrift servers on one Spark cluster

2015-08-09 Thread Akhil Das
Did you try this way?

export HIVE_SERVER2_THRIFT_PORT=6066
./sbin/start-thriftserver.sh --master master-uri


export HIVE_SERVER2_THRIFT_PORT=6067
./sbin/start-thriftserver.sh --master master-uri


You just have to change HIVE_SERVER2_THRIFT_PORT to instantiate multiple
servers i think.

Thanks
Best Regards

On Thu, Aug 6, 2015 at 2:05 PM, Bojan Kostic blood9ra...@gmail.com wrote:

 Hi,

 Is there a way to instantiate multiple Thrift servers on one Spark Cluster?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Thrift-servers-on-one-Spark-cluster-tp24148.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SparkException: Yarn application has already ended

2015-08-09 Thread Akhil Das
Just make sure your hadoop instances are functioning properly, (check for
ResourceManager, NodeManager). How are you submitting the job? If that is
getting submitted then you can look further in the yarn logs to see whats
really going on.

Thanks
Best Regards

On Thu, Aug 6, 2015 at 6:59 PM, Clint McNeil cl...@impactradius.com wrote:

 Hi

 I am trying to launch a Spark application on a CM cluster and I get the
 following error.

 Exception in thread main org.apache.spark.SparkException: Yarn
 application has already ended! It might have been killed or unable to
 launch application master.

 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:113)

 at
 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59)

 at
 org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141)

 at org.apache.spark.SparkContext.init(SparkContext.scala:379)

 What is the remedy for this type of problem

 --

 *Clint McNeil*

 BI  Data Science Engineer | Impact Radius

 202 Suntyger, 313 Durban Road, Bellville, 7530

 o: +2721 914-1764 %2B2721%20910-3195 | m: +2782 4796 309 |
 cl...@impactradius.com

 *Learn more  – Watch our 2 minute overview
 http://www.impactradius.com/?src=slsap*

 www.impactradius.com | Twitter http://twitter.com/impactradius |
 Facebook https://www.facebook.com/pages/Impact-Radius/153376411365183 |
 LinkedIn http://www.linkedin.com/company/impact-radius-inc. | YouTube
 https://www.youtube.com/user/ImpactRadius

 Maximizing Return on Ad Spend




Re: Temp file missing when training logistic regression

2015-08-09 Thread Akhil Das
Which version of spark are you using? Looks like you are hitting the file
handles. In that case you might want to increase the ulimit. You can
actually validate this by looking in the worker logs (which would probably
say Too many open files exception).

Thanks
Best Regards

On Thu, Aug 6, 2015 at 8:35 PM, Cat caterina.gro...@dsp.io wrote:

 Hello,

 I am using the Python API to perform a grid search and train models using
 LogisticRegressionWithSGD.
 I am using r3.xl machines in EC2, running on top of YARN in cluster mode.

 The training RDD is persisted in memory and on disk. Some of the models
 train successfully, but then at some point during the grid search I get an
 error. It looks like the Python broadcast is looking for a part of the RDD
 which is no longer there. I scanned the logs for further errors but could
 not find anything.

 Any ideas of what could be causing this, and what should I be looking for?

 Many thanks.
 Cat

   model = LogisticRegressionWithSGD.train(the_training, iterations=i,
 regParam=c, miniBatchFraction=0.8)
   File /home/hadoop/spark/python/pyspark/mllib/classification.py, line
 164, in train
 return _regression_train_wrapper(train, LogisticRegressionModel, data,
 initialWeights)
   File /home/hadoop/spark/python/pyspark/mllib/regression.py, line 140,
 in
 _regression_train_wrapper
 weights, intercept = train_func(data,
 _convert_to_vector(initial_weights))
   File /home/hadoop/spark/python/pyspark/mllib/classification.py, line
 162, in train
 bool(intercept))
   File /home/hadoop/spark/python/pyspark/mllib/common.py, line 120, in
 callMLlibFunc
 return callJavaFunc(sc, api, *args)
   File /home/hadoop/spark/python/pyspark/mllib/common.py, line 113, in
 callJavaFunc
 return _java2py(sc, func(*args))
   File
 /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
 line 538, in __call__
 self.target_id, self.name)
   File
 /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
 300, in get_return_value
 format(target_id, '.', name), value)
 Py4JJavaError: An error occurred while calling
 o271.trainLogisticRegressionModelWithSGD.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task
 serialization failed: java.io.FileNotFoundException:

 /mnt/spark/spark-b07b34f8-66c3-43ae-a3ed-0c291724409b/pyspark-4196e8e5-8024-4ec5-a7bb-a60b216e6e74/tmpbCjiSR
 (No such file or directory)
 java.io.FileInputStream.open(Native Method)
 java.io.FileInputStream.init(FileInputStream.java:146)

 org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply$mcJ$sp(PythonRDD.scala:848)

 org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply(PythonRDD.scala:847)

 org.apache.spark.api.python.PythonBroadcast$$anonfun$writeObject$1.apply(PythonRDD.scala:847)
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)

 org.apache.spark.api.python.PythonBroadcast.writeObject(PythonRDD.scala:847)
 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:606)
 java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
 java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)

 java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
 java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)

 org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)

 org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:110)

 org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1176)
 org.apache.spark.storage.DiskStore.putIterator(DiskStore.scala:79)
 org.apache.spark.storage.DiskStore.putArray(DiskStore.scala:64)

 org.apache.spark.storage.BlockManager.dropFromMemory(BlockManager.scala:1028)

 org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:419)

 org.apache.spark.storage.MemoryStore$$anonfun$ensureFreeSpace$4.apply(MemoryStore.scala:408)

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 org.apache.spark.storage.MemoryStore.ensureFreeSpace(MemoryStore.scala:408)
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:263)
 org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:136)
 org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:114)
 org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:786)
 org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
 org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:991)

 

Re: Spark Job Failed (Executor Lost then FS closed)

2015-08-09 Thread Akhil Das
Can you look more in the worker logs and see whats going on? It looks like
a memory issue (kind of GC overhead etc., You need to look in the worker
logs)

Thanks
Best Regards

On Fri, Aug 7, 2015 at 3:21 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Re attaching the images.

 On Thu, Aug 6, 2015 at 2:50 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote:

 Code:
 import java.text.SimpleDateFormat
 import java.util.Calendar
 import java.sql.Date
 import org.apache.spark.storage.StorageLevel

 def extract(array: Array[String], index: Integer) = {
   if (index  array.length) {
 array(index).replaceAll(\, )
   } else {
 
   }
 }


 case class GuidSess(
   guid: String,
   sessionKey: String,
   sessionStartDate: String,
   siteId: String,
   eventCount: String,
   browser: String,
   browserVersion: String,
   operatingSystem: String,
   experimentChannel: String,
   deviceName: String)

 val rowStructText =
 sc.textFile(/user/zeppelin/guidsess/2015/08/05/part-m-1.gz)
 val guidSessRDD = rowStructText.filter(s = s.length != 1).map(s =
 s.split(,)).map(
   {
 s =
   GuidSess(extract(s, 0),
 extract(s, 1),
 extract(s, 2),
 extract(s, 3),
 extract(s, 4),
 extract(s, 5),
 extract(s, 6),
 extract(s, 7),
 extract(s, 8),
 extract(s, 9))
   })

 val guidSessDF = guidSessRDD.toDF()
 guidSessDF.registerTempTable(guidsess)

 Once the temp table is created, i wrote this query

 select siteid, count(distinct guid) total_visitor,
 count(sessionKey) as total_visits
 from guidsess
 group by siteid

 *Metrics:*

 Data Size: 170 MB
 Spark Version: 1.3.1
 YARN: 2.7.x



 Timeline:
 There is 1 Job, 2 stages with 1 task each.

 *1st Stage : mapPartitions*
 [image: Inline image 1]

 1st Stage: Task 1 started to fail. A second attempt started for 1st task
 of first Stage. The first attempt failed Executor LOST
 when i go to YARN resource manager and go to that particular host, i see
 that its running fine.

 *Attempt #1*
 [image: Inline image 2]

 *Attempt #2* Executor LOST AGAIN
 [image: Inline image 3]
 *Attempt 34*

 *[image: Inline image 4]*



 *2nd Stage runJob : SKIPPED*

 *[image: Inline image 5]*

 Any suggestions ?


 --
 Deepak




 --
 Deepak



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Merge metadata error when appending to parquet table

2015-08-09 Thread Krzysztof Zarzycki
Hi there,
I have a problem with a spark streaming job  running on Spark 1.4.1, that
appends to parquet table.

My job receives json strings and creates JsonRdd out of it. The jsons might
come in different shape as most of the fields are optional. But they never
have conflicting schemas.
Next, for each (non-empty) Rdd I'm saving it to parquet files, using append
to existing table:

jsonRdd.write.mode(SaveMode.Append).parquet(dataDirPath)

Unfortunately I'm hitting now an issue on every append of conflict:

Aug 9, 2015 7:58:03 AM WARNING: parquet.hadoop.ParquetOutputCommitter:
could not write summary file for hdfs://example.com:8020/tmp/parquet
java.lang.RuntimeException: could not merge metadata: key
org.apache.spark.sql.parquet.row.metadata has conflicting values:
[{...schema1...}, {...schema2...} ]

The schemas are very similar, some attributes may be missing comparing to
other, but for sure they are not conflicting. They are pretty lengthy, but
I compared them with diff and ensured, that there are no conflicts.

Even with this WARNING, the write actually succeeds, I'm able to read this
data.  But on every batch, there is yet another schema in the displayed
conflicting values array. I would like the job to run forever, so I can't
even ignore this warning because it will probably end with OOM.

Do you know what might be the reason of this error/ warning? How to
overcome this? Maybe it is a Spark bug/regression? I saw tickets like
SPARK-6010 https://issues.apache.org/jira/browse/SPARK-6010, but they
seem to be fixed in 1.3.0 (I'm using 1.4.1).


Thanks for any help!
Krzysiek


Starting a service with Spark Executors

2015-08-09 Thread Daniel Haviv
Hi,
I'd like to start a service with each Spark Executor upon initalization and
have the disributed code reference that service locally.
What I'm trying to do is to invoke torch7 computations without reloading
the model for each row by starting Lua http handler that will recieve http
requests for each row in my data.

Can this be achieved with Spark ?

Thank you.
Daniel


Re: Spark-submit fails when jar is in HDFS

2015-08-09 Thread Akhil Das
Did you try this way?

/usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050 --conf
spark.mesos.executor.docker.image=docker.repo/spark:latest --class
org.apache.spark.examples.SparkPi *--jars hdfs://hdfs1/tmp/spark-*
*examples-1.4.1-hadoop2.6.0-**cdh5.4.4.jar* 100

Thanks
Best Regards

On Fri, Aug 7, 2015 at 5:51 AM, Alan Braithwaite a...@cloudflare.com
wrote:

 Hi All,

 We're trying to run spark with mesos and docker in client mode (since
 mesos doesn't support cluster mode) and load the application Jar from
 HDFS.  The following is the command we're running:

 /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050
 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class
 org.apache.spark.examples.SparkPi
 hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar 100

 We're getting the following warning before an exception from that command:

 Warning: Skip remote jar
 hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar.
 java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi

 Before I debug further, is this even supported?  I started reading the
 code and it wasn't clear that it's possible to load a remote jar in client
 mode at all.  I did see a related issue in [2] but it didn't quite clarify
 everything I was looking for.

 Thanks,
 - Alan

 [1] https://spark.apache.org/docs/latest/submitting-applications.html

 [2]
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-not-working-when-application-jar-is-in-hdfs-td21840.html



Re: Out of memory with twitter spark streaming

2015-08-09 Thread Akhil Das
I'm not sure what you are upto, but if you can explain what you are trying
to achieve then may be we can restructure your code. On a quick glance i
could see :

 tweetsRDD*.collect()*.map(tweet=
DBQuery.saveTweets(tweet))


Which will bring the whole data into your driver machine and it would
possibly run out of memory, You can avoid that.

Thanks
Best Regards

On Fri, Aug 7, 2015 at 11:23 AM, Pankaj Narang pankajnaran...@gmail.com
wrote:

 Hi

 I am running one application using activator where I am retrieving tweets
 and storing them to mysql database using below code.

 I get OOM error after 5-6 hour with xmx1048M. If I increase the memory the
 OOM get delayed only.

 Can anybody give me clue. Here is the code

  var tweetStream  = TwitterUtils.createStream(ssc, None,keywords)
 var tweets = tweetStream.map(tweet = {
   var user = tweet.getUser
   var replyStatusId = tweet.getInReplyToStatusId
   var reTweetStatus = tweet.getRetweetedStatus
   var pTweetId = -1L
   var pcreatedAt = 0L
   if(reTweetStatus != null){
 pTweetId = reTweetStatus.getId
 pcreatedAt = reTweetStatus.getCreatedAt.getTime
   }
   tweet.getCreatedAt.getTime + |$ + tweet.getId +
 |$+user.getId + |$ + user.getName+ |$ + user.getScreenName + |$ +
 user.getDescription +
   |$ + tweet.getText.trim + |$ + user.getFollowersCount +
 |$ + user.getFriendsCount + |$ + tweet.getGeoLocation + |$ +
   user.getLocation + |$ + user.getBiggerProfileImageURL +
 |$
 + replyStatusId + |$ + pTweetId + |$ + pcreatedAt
 } )
   tweets.foreachRDD(tweetsRDD = {tweetsRDD.distinct()
  val count = tweetsRDD.count
  println(* +%s tweets found on
 this RDD.format(count))
  if (count   0){
 var timeMs = System.currentTimeMillis
 var counter =
 DBQuery.getProcessedCount()
var location=tweets/+ counter +/
 tweetsRDD.collect().map(tweet=
 DBQuery.saveTweets(tweet))
 //tweetsRDD.saveAsTextFile(location+
 timeMs)+ .txt
 DBQuery.addTweetRDD(counter)
 }
 })

// Checkpoint directory to recover from failures
println(tweets for the last stream are saved which can be processed
 later)
val= f:/svn1/checkpoint/
 ssc.checkpoint(checkpointDir)
 ssc.start()
 ssc.awaitTermination()


 regards
 Pankaj



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Out-of-memory-with-twitter-spark-streaming-tp24162.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Accessing S3 files with s3n://

2015-08-09 Thread Akhil Das
Depends on which operation you are doing, If you are doing a .count() on a
parquet, it might not download the entire file i think, but if you do a
.count() on a normal text file it might pull the entire file.

Thanks
Best Regards

On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 I've been trying to track down some problems with Spark reads being very
 slow with s3n:// URIs (NativeS3FileSystem).  After some digging around, I
 realized that this file system implementation fetches the entire file,
 which isn't really a Spark problem, but it really slows down things when
 trying to just read headers from a Parquet file or just creating partitions
 in the RDD.  Is this something that others have observed before, or am I
 doing something wrong?

 Thanks,
 Akshat



Re: using Spark or pig group by efficient in my use case?

2015-08-09 Thread Akhil Das
Why not give it a shot? Spark always outruns old mapreduce jobs.

Thanks
Best Regards

On Sat, Aug 8, 2015 at 8:25 AM, linlma lin...@gmail.com wrote:

 I have a tens of million records, which is customer ID and city ID pair.
 There are tens of millions of unique customer ID, and only a few hundreds
 unique city ID. I want to do a merge to get all city ID aggregated for a
 specific customer ID, and pull back all records. I want to do this using
 group by customer ID using Pig on Hadoop, and wondering if it is the most
 efficient way.

 Also wondering if there are overhead for sorting in Hadoop (I do not care
 if
 customer1 before customer2 or not, as long as all city are aggregated
 correctly for customer1 and customer 2)? Do you think Spark is better?

 Here is an example of inputs,

 CustomerID1 City1
 CustomerID2 City2
 CustomerID3 City1
 CustomerID1 City3
 CustomerID2 City4
 I want output like this,

 CustomerID1 City1 City3
 CustomerID2 City2 City4
 CustomerID3 City1

 thanks in advance,
 Lin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/using-Spark-or-pig-group-by-efficient-in-my-use-case-tp24178.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to run start-thrift-server in debug mode?

2015-08-09 Thread Akhil Das
It seems, it is not able to pick up the debug parameters. You can actually
set export
_JAVA_OPTIONS=-agentlib:jdwp=transport=dt_socket,address=8000,server=y,suspend=y
and then submit the job to enable debugging.

Thanks
Best Regards

On Fri, Aug 7, 2015 at 10:20 PM, Benjamin Ross br...@lattice-engines.com
wrote:

 Hi,

 I’m trying to run the hive thrift server in debug mode.  I’ve tried to
 simply pass -Xdebug 
 -Xrunjdwp:transport=dt_socket,address=127.0.0.1:,server=y,suspend=n
 to start-thriftserver.sh as a driver option, but it doesn’t seem to host a
 server.  I’ve then tried to edit the various shell scripts to run hive
 thrift server but couldn’t get things to work.  It seems that there must be
 an easier way to do this.  I’ve also tried to run it directly in eclipse,
 but ran into issues related to Scala that I haven’t quite yet figured out.



 start-thriftserver.sh --driver-java-options
 -agentlib:jdwp=transport=dt_socket,address=localhost:8000,server=y,suspend=n
 -XX:MaxPermSize=512  --master yarn://localhost:9000 --num-executors 2





 jdb -attach localhost:8000

 java.net.ConnectException: Connection refused

 at java.net.PlainSocketImpl.socketConnect(Native Method)

 at
 java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)

 at
 java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)

 at
 java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)

 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

 at java.net.Socket.connect(Socket.java:579)

 at
 com.sun.tools.jdi.SocketTransportService.attach(SocketTransportService.java:222)

 at
 com.sun.tools.jdi.GenericAttachingConnector.attach(GenericAttachingConnector.java:116)

 at
 com.sun.tools.jdi.SocketAttachingConnector.attach(SocketAttachingConnector.java:90)

 at
 com.sun.tools.example.debug.tty.VMConnection.attachTarget(VMConnection.java:519)

 at
 com.sun.tools.example.debug.tty.VMConnection.open(VMConnection.java:328)

 at com.sun.tools.example.debug.tty.Env.init(Env.java:63)

 at com.sun.tools.example.debug.tty.TTY.main(TTY.java:1066)



 Let me know if I’m missing something here…
 Thanks in advance,

 Ben



Questions about SparkSQL join on not equality conditions

2015-08-09 Thread gen tang
Hi,

I might have a stupid question about sparksql's implementation of join on
not equality conditions, for instance condition1 or condition2.

In fact, Hive doesn't support such join, as it is very difficult to express
such conditions as a map/reduce job. However, sparksql supports such
operation. So I would like to know how spark implement it.

As I observe such join runs very slow, I guess that spark implement it by
doing filter on the top of cartesian product. Is it true?

Thanks in advance for your help.

Cheers
Gen


Re: How to create DataFrame from a binary file?

2015-08-09 Thread bo yang
You can create your own data schema (StructType in spark), and use
following method to create data frame with your own data schema:

sqlContext.createDataFrame(yourRDD, structType);

I wrote a post on how to do it. You can also get the sample code there:

Light-Weight Self-Service Data Query through Spark SQL:
https://www.linkedin.com/pulse/light-weight-self-service-data-query-through-spark-sql-bo-yang

Take a look and feel free to  let me know for any question.

Best,
Bo



On Sat, Aug 8, 2015 at 1:42 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi how do we create DataFrame from a binary file stored in HDFS? I was
 thinking to use

 JavaPairRDDString,PortableDataStream pairRdd =
 javaSparkContext.binaryFiles(/hdfs/path/to/binfile);
 JavaRDDPortableDataStream javardd = pairRdd.values();

 I can see that PortableDataStream has method called toArray which can
 convert into byte array I was thinking if I have JavaRDDbyte[] can I call
 the following and get DataFrame

 DataFrame binDataFrame = sqlContext.createDataFrame(javaBinRdd,Byte.class);

 Please guide I am new to Spark. I have my own custom format which is binary
 format and I was thinking if I can convert my custom format into DataFrame
 using binary operations then I dont need to create my own custom Hadoop
 format am I on right track? Will reading binary data into DataFrame scale?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-DataFrame-from-a-binary-file-tp24179.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark-submit fails when jar is in HDFS

2015-08-09 Thread Dean Wampler
Also, Spark on Mesos supports cluster mode:
http://spark.apache.org/docs/latest/running-on-mesos.html#cluster-mode

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Sun, Aug 9, 2015 at 4:30 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Did you try this way?

 /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050
 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class
 org.apache.spark.examples.SparkPi *--jars hdfs://hdfs1/tmp/spark-*
 *examples-1.4.1-hadoop2.6.0-**cdh5.4.4.jar* 100

 Thanks
 Best Regards

 On Fri, Aug 7, 2015 at 5:51 AM, Alan Braithwaite a...@cloudflare.com
 wrote:

 Hi All,

 We're trying to run spark with mesos and docker in client mode (since
 mesos doesn't support cluster mode) and load the application Jar from
 HDFS.  The following is the command we're running:

 /usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050
 --conf spark.mesos.executor.docker.image=docker.repo/spark:latest --class
 org.apache.spark.examples.SparkPi
 hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar 100

 We're getting the following warning before an exception from that command:

 Warning: Skip remote jar
 hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar.
 java.lang.ClassNotFoundException: org.apache.spark.examples.SparkPi

 Before I debug further, is this even supported?  I started reading the
 code and it wasn't clear that it's possible to load a remote jar in client
 mode at all.  I did see a related issue in [2] but it didn't quite clarify
 everything I was looking for.

 Thanks,
 - Alan

 [1] https://spark.apache.org/docs/latest/submitting-applications.html

 [2]
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-not-working-when-application-jar-is-in-hdfs-td21840.html





stream application map transformation constructor called

2015-08-09 Thread Shushant Arora
In stream application how many times the map transformation object being
created?

Say I have

directKafkaStream.repartition(numPartitions).mapPartitions
(new FlatMapFunction_derivedclass(configs));

class FlatMapFunction_derivedclass{
FlatMapFunction_derivedclass(Config config){

}
@Override
public IterableString call(Iteratorbyte[][] t) throws Exception {..}
}

How many times FlatMapFunction_derivedclass will be instantiated ?

1.Will the constructor of this class be called only once and same object is
serialised and transferred to executors at each batch interval and
deserialized at each batch interval on executors.So if my stream
application has run 400 batches, same object has been deserialised 400
times ?

2.So I should avoid creating external connection object say http connection
in constrcuor of FlatMapFunction_derivedclass rather it should only be
created in call function always and will be craeted at each batch interval ?


Re: intellij14 compiling spark-1.3.1 got error: assertion failed: com.google.protobuf.InvalidProtocalBufferException

2015-08-09 Thread Ted Yu
Can you check if there is protobuf version other than 2.5.0 on the
classpath ?

Please show the complete stack trace.

Cheers

On Sun, Aug 9, 2015 at 9:41 AM, longda...@163.com longda...@163.com wrote:

 hi all,
 i compile spark-1.3.1 on linux use intellij14 and got error assertion
 failed: com.google.protobuf.InvalidProtocalBufferException, how could i
 solve the problem?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/intellij14-compiling-spark-1-3-1-got-error-assertion-failed-com-google-protobuf-InvalidProtocalBuffen-tp24186.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: intellij14 compiling spark-1.3.1 got error: assertion failed: com.google.protobuf.InvalidProtocalBufferException

2015-08-09 Thread longda...@163.com
the stack trace is below

Error:scalac: 
 while compiling:
/home/xiaoju/data/spark-1.3.1/core/src/main/scala/org/apache/spark/SparkContext.scala
during phase: typer
 library version: version 2.10.4
compiler version: version 2.10.4
  reconstructed args: -nobootcp -javabootclasspath :
-P:genjavadoc:out=/home/xiaoju/data/spark-1.3.1/core/target/java
-deprecation -feature -classpath

Re: Accessing S3 files with s3n://

2015-08-09 Thread Jerry Lam
Hi Akshat,

Is there a particular reason you don't use s3a? From my experience,s3a performs 
much better than the rest. I believe the inefficiency is from the 
implementation of the s3 interface.

Best Regards,

Jerry

Sent from my iPhone

 On 9 Aug, 2015, at 5:48 am, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 Depends on which operation you are doing, If you are doing a .count() on a 
 parquet, it might not download the entire file i think, but if you do a 
 .count() on a normal text file it might pull the entire file.
 
 Thanks
 Best Regards
 
 On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.com wrote:
 Hi,
 
 I've been trying to track down some problems with Spark reads being very 
 slow with s3n:// URIs (NativeS3FileSystem).  After some digging around, I 
 realized that this file system implementation fetches the entire file, which 
 isn't really a Spark problem, but it really slows down things when trying to 
 just read headers from a Parquet file or just creating partitions in the 
 RDD.  Is this something that others have observed before, or am I doing 
 something wrong?
 
 Thanks,
 Akshat
 


Re: Merge metadata error when appending to parquet table

2015-08-09 Thread Krzysztof Zarzycki
Besides finding to this problem, I think I can workaround at least the
WARNING message by overwriting parquet variable:
parquet.enable.summary-metadata
That according to this PARQUET-107
https://issues.apache.org/jira/browse/PARQUET-107 ticket  can be used to
disable writing summary file which is an issue here.
How can I set this variable? I tried
sql.setConf(parquet.enable.summary-metadata, false)

sql.sql(SET parquet.enable.summary-metadata=false)

As well as: spark-submit --conf parquet.enable.summary-metadata=false

But neither helped. Anyone can help? Of course the original problem stays
open.
Thanks!
Krzysiek

2015-08-09 14:19 GMT+02:00 Krzysztof Zarzycki k.zarzy...@gmail.com:

 Hi there,
 I have a problem with a spark streaming job  running on Spark 1.4.1, that
 appends to parquet table.

 My job receives json strings and creates JsonRdd out of it. The jsons
 might come in different shape as most of the fields are optional. But they
 never have conflicting schemas.
 Next, for each (non-empty) Rdd I'm saving it to parquet files, using
 append to existing table:

 jsonRdd.write.mode(SaveMode.Append).parquet(dataDirPath)

 Unfortunately I'm hitting now an issue on every append of conflict:

 Aug 9, 2015 7:58:03 AM WARNING: parquet.hadoop.ParquetOutputCommitter:
 could not write summary file for hdfs://example.com:8020/tmp/parquet
 java.lang.RuntimeException: could not merge metadata: key
 org.apache.spark.sql.parquet.row.metadata has conflicting values:
 [{...schema1...}, {...schema2...} ]

 The schemas are very similar, some attributes may be missing comparing to
 other, but for sure they are not conflicting. They are pretty lengthy, but
 I compared them with diff and ensured, that there are no conflicts.

 Even with this WARNING, the write actually succeeds, I'm able to read this
 data.  But on every batch, there is yet another schema in the displayed
 conflicting values array. I would like the job to run forever, so I can't
 even ignore this warning because it will probably end with OOM.

 Do you know what might be the reason of this error/ warning? How to
 overcome this? Maybe it is a Spark bug/regression? I saw tickets like
 SPARK-6010 https://issues.apache.org/jira/browse/SPARK-6010, but they
 seem to be fixed in 1.3.0 (I'm using 1.4.1).


 Thanks for any help!
 Krzysiek





intellij14 compiling spark-1.3.1 got error: assertion failed: com.google.protobuf.InvalidProtocalBufferException

2015-08-09 Thread longda...@163.com
hi all,
i compile spark-1.3.1 on linux use intellij14 and got error assertion
failed: com.google.protobuf.InvalidProtocalBufferException, how could i
solve the problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/intellij14-compiling-spark-1-3-1-got-error-assertion-failed-com-google-protobuf-InvalidProtocalBuffen-tp24186.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to create DataFrame from a binary file?

2015-08-09 Thread Umesh Kacha
Hi Bo I know how to create a DataFrame my question is how to create a
DataFrame for binary files and in your blog it is raw text json files
please read my question properly thanks.

On Sun, Aug 9, 2015 at 11:21 PM, bo yang bobyan...@gmail.com wrote:

 You can create your own data schema (StructType in spark), and use
 following method to create data frame with your own data schema:

 sqlContext.createDataFrame(yourRDD, structType);

 I wrote a post on how to do it. You can also get the sample code there:

 Light-Weight Self-Service Data Query through Spark SQL:

 https://www.linkedin.com/pulse/light-weight-self-service-data-query-through-spark-sql-bo-yang

 Take a look and feel free to  let me know for any question.

 Best,
 Bo



 On Sat, Aug 8, 2015 at 1:42 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi how do we create DataFrame from a binary file stored in HDFS? I was
 thinking to use

 JavaPairRDDString,PortableDataStream pairRdd =
 javaSparkContext.binaryFiles(/hdfs/path/to/binfile);
 JavaRDDPortableDataStream javardd = pairRdd.values();

 I can see that PortableDataStream has method called toArray which can
 convert into byte array I was thinking if I have JavaRDDbyte[] can I
 call
 the following and get DataFrame

 DataFrame binDataFrame =
 sqlContext.createDataFrame(javaBinRdd,Byte.class);

 Please guide I am new to Spark. I have my own custom format which is
 binary
 format and I was thinking if I can convert my custom format into DataFrame
 using binary operations then I dont need to create my own custom Hadoop
 format am I on right track? Will reading binary data into DataFrame scale?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-DataFrame-from-a-binary-file-tp24179.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re:Re: intellij14 compiling spark-1.3.1 got error: assertion failed: com.google.protobuf.InvalidProtocalBufferException

2015-08-09 Thread 龙淡
thank you for reply,
   i use sbt to complie spark, but there are both protobuf 2.4.1 and 2.5.0 in 
maven repository , and protobuf 2.5.0 in .ivy repository, 
   the stack trace is below
   Error:scalac:
 while compiling: 
/home/xiaoju/data/spark-1.3.1/core/src/main/scala/org/apache/spark/SparkContext.scala
during phase: typer
 library version: version 2.10.4
compiler version: version 2.10.4
  reconstructed args: -nobootcp -javabootclasspath : 
-P:genjavadoc:out=/home/xiaoju/data/spark-1.3.1/core/target/java -deprecation 
-feature -classpath 

Re: Spark-submit fails when jar is in HDFS

2015-08-09 Thread Alan Braithwaite

 Did you try this way?



/usr/local/spark/bin/spark-submit --master mesos://mesos.master:5050 --conf
 spark.mesos.executor.docker.image=docker.repo/spark:latest --class
 org.apache.spark.examples.SparkPi --jars
 hdfs://hdfs1/tmp/spark-examples-1.4.1-hadoop2.6.0-cdh5.4.4.jar 100


I did, and got the same error (I verified again right now too).

Also, Spark on Mesos supports cluster mode:
 http://spark.apache.org/docs/latest/running-on-mesos.html#cluster-mode


Oh cool!  Looks like this page needs to be updated then:

http://spark.apache.org/docs/latest/submitting-applications.html

Thanks!
- Alan


Re: How to create DataFrame from a binary file?

2015-08-09 Thread bo yang
Well, my post uses raw text json file to show how to create data frame with
a custom data schema. The key idea is to show the flexibility to deal with
any format of data by using your own schema. Sorry if I did not make you
fully understand.

Anyway, let us know once you figure out your problem.




On Sun, Aug 9, 2015 at 11:10 AM, Umesh Kacha umesh.ka...@gmail.com wrote:

 Hi Bo I know how to create a DataFrame my question is how to create a
 DataFrame for binary files and in your blog it is raw text json files
 please read my question properly thanks.

 On Sun, Aug 9, 2015 at 11:21 PM, bo yang bobyan...@gmail.com wrote:

 You can create your own data schema (StructType in spark), and use
 following method to create data frame with your own data schema:

 sqlContext.createDataFrame(yourRDD, structType);

 I wrote a post on how to do it. You can also get the sample code there:

 Light-Weight Self-Service Data Query through Spark SQL:

 https://www.linkedin.com/pulse/light-weight-self-service-data-query-through-spark-sql-bo-yang

 Take a look and feel free to  let me know for any question.

 Best,
 Bo



 On Sat, Aug 8, 2015 at 1:42 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi how do we create DataFrame from a binary file stored in HDFS? I was
 thinking to use

 JavaPairRDDString,PortableDataStream pairRdd =
 javaSparkContext.binaryFiles(/hdfs/path/to/binfile);
 JavaRDDPortableDataStream javardd = pairRdd.values();

 I can see that PortableDataStream has method called toArray which can
 convert into byte array I was thinking if I have JavaRDDbyte[] can I
 call
 the following and get DataFrame

 DataFrame binDataFrame =
 sqlContext.createDataFrame(javaBinRdd,Byte.class);

 Please guide I am new to Spark. I have my own custom format which is
 binary
 format and I was thinking if I can convert my custom format into
 DataFrame
 using binary operations then I dont need to create my own custom Hadoop
 format am I on right track? Will reading binary data into DataFrame
 scale?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-create-DataFrame-from-a-binary-file-tp24179.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Merge metadata error when appending to parquet table

2015-08-09 Thread Cheng Lian
The conflicting metadata values warning is a known issue 
https://issues.apache.org/jira/browse/PARQUET-194


The option parquet.enable.summary-metadata is a Hadoop option rather 
than a Spark option, so you need to either add it to your Hadoop 
configuration file(s) or add it via `sparkContext.hadoopConfiguration` 
before starting your job.


Cheng

On 8/9/15 8:57 PM, Krzysztof Zarzycki wrote:
Besides finding to this problem, I think I can workaround at least the 
WARNING message by overwriting parquet variable: 
parquet.enable.summary-metadata
That according to this PARQUET-107 
https://issues.apache.org/jira/browse/PARQUET-107 ticket  can be 
used to disable writing summary file which is an issue here.

How can I set this variable? I tried
sql.setConf(parquet.enable.summary-metadata, false)
sql.sql(SET parquet.enable.summary-metadata=false)
As well as: spark-submit --conf parquet.enable.summary-metadata=false

But neither helped. Anyone can help? Of course the original problem 
stays open.

Thanks!
Krzysiek

2015-08-09 14:19 GMT+02:00 Krzysztof Zarzycki k.zarzy...@gmail.com 
mailto:k.zarzy...@gmail.com:


Hi there,
I have a problem with a spark streaming job  running on Spark
1.4.1, that appends to parquet table.

My job receives json strings and creates JsonRdd out of it. The
jsons might come in different shape as most of the fields are
optional. But they never have conflicting schemas.
Next, for each (non-empty) Rdd I'm saving it to parquet files,
using append to existing table:

jsonRdd.write.mode(SaveMode.Append).parquet(dataDirPath)

Unfortunately I'm hitting now an issue on every append of conflict:

Aug 9, 2015 7:58:03 AM WARNING:
parquet.hadoop.ParquetOutputCommitter: could not write summary
file for hdfs://example.com:8020/tmp/parquet
http://example.com:8020/tmp/parquet
java.lang.RuntimeException: could not merge metadata: key
org.apache.spark.sql.parquet.row.metadata has conflicting values:
[{...schema1...}, {...schema2...} ]

The schemas are very similar, some attributes may be missing
comparing to other, but for sure they are not conflicting. They
are pretty lengthy, but I compared them with diff and ensured,
that there are no conflicts.

Even with this WARNING, the write actually succeeds, I'm able to
read this data.  But on every batch, there is yet another schema
in the displayed conflicting values array. I would like the job
to run forever, so I can't even ignore this warning because it
will probably end with OOM.

Do you know what might be the reason of this error/ warning? How
to overcome this? Maybe it is a Spark bug/regression? I saw
tickets like SPARK-6010
https://issues.apache.org/jira/browse/SPARK-6010, but they seem
to be fixed in 1.3.0 (I'm using 1.4.1).


Thanks for any help!
Krzysiek







Re: Spark job workflow engine recommendations

2015-08-09 Thread Lars Albertsson
I used to maintain Luigi at Spotify, and got some insight in workflow
manager characteristics and production behaviour in the process.

I am evaluating options for my current employer, and the short list is
basically: Luigi, Azkaban, Pinball, Airflow, and rolling our own. The
latter is not necessarily more work than adapting an existing tool,
since existing managers are typically more or less tied to the
technology used by the company that created them.

Are your users primarily developers building pipelines that drive
data-intensive products, or are they analysts, producing business
intelligence? These groups tend to have preferences for different
types of tools and interfaces.

I have a love/hate relationship with Luigi, but given your
requirements, it is probably the best fit:

* It has support for Spark, and it seems to be used and maintained.

* It has no builtin support for Cassandra, but Cassandra is heavily
used at Spotify. IIRC, the code required to support Cassandra targets
is more or less trivial. There is no obvious single definition of a
dataset in C*, so you'll have to come up with a convention and encode
it as a Target subclass. I guess that is why it never made it outside
Spotify.

* The open source community is active and it is well tested in
production at multiple sites.

* It is easy to write dependencies, but in a Python DSL. If your users
are developers, this is preferable over XML or a web interface. There
are always quirks and odd constraints somewhere that require the
expressive power of a programming language. It also allows you to
create extensions without changing Luigi itself.

* It does not have recurring scheduling bulitin. Luigi needs a motor
to get going, typically cron, installed on a few machines for
redundancy. In a typical pipeline scenario, you give output datasets a
time parameter, which arranges for a dataset to be produced each
hour/day/week/month.

* It supports failure notifications.


Pinball and Airflow have similar architecture to Luigi, with a single
central scheduler and workers that submit and execute jobs. They seem
to be more solidly engineered at a glance, but less battle tested
outside Pinterest/Airbnb, and they have fewer integrations to the data
ecosystem.

Azkaban has a different architecture and user interface, and seems
more geared towards data scientists than developers; it has a good UI
for controlling jobs, but writing extensions and controlling it
programmatically seems more difficult than for Luigi.

All of the tools above are centralised, and the central component can
become a bottleneck and a single point of problem. I am not aware of
any decentralised open source workflow managers, but you can run
multiple instances and shard manually.

Regarding recurring jobs, it is typically undesirable to blindly run
jobs at a certain time. If you run jobs, e.g. with cron, and process
whatever data is available in your input sources, your jobs become
indeterministic and unreliable. If incoming data is late or missing,
your jobs will fail or create artificial skews in output data, leading
to confusing results. Moreover, if jobs fail or have bugs, it will be
difficult to rerun them and get predictable results. This is why I
don't think Chronos is a meaningful alternative for scheduling data
processing.

There are different strategies on this topic, but IMHO, it is easiest
create predictable and reliable pipelines by bucketing incoming data
into datasets that you seal off, and mark ready for processing, and
then use the workflow manager's DAG logic to process data when input
datasets are available, rather than at a certain time. If you use
Kafka for data collection, Secor can handle this logic for you.


In addition to your requirements, there are IMHO a few more topics one
needs to consider:
* How are pipelines tested? I.e. if I change job B below, how can I be
sure that the new output does not break A? You need to involve the
workflow DAG in testing such scenarios.
* How do you debug jobs and DAG problems? In case of trouble, can you
figure out where the job logs are, or why a particular job does not
start?
* Do you need high availability for job scheduling? That will require
additional components.


This became a bit of a brain dump on the topic. I hope that it is
useful. Don't hesitate to get back if I can help.

Regards,

Lars Albertsson



On Fri, Aug 7, 2015 at 5:43 PM, Vikram Kone vikramk...@gmail.com wrote:
 Hi,
 I'm looking for open source workflow tools/engines that allow us to schedule
 spark jobs on a datastax cassandra cluster. Since there are tonnes of
 alternatives out there like Ozzie, Azkaban, Luigi , Chronos etc, I wanted to
 check with people here to see what they are using today.

 Some of the requirements of the workflow engine that I'm looking for are

 1. First class support for submitting Spark jobs on Cassandra. Not some
 wrapper Java code to submit tasks.
 2. Active open source community support and well tested at production 

multiple dependency jars using pyspark

2015-08-09 Thread Jonathan Haddad
I'm trying to write a simple job for Pyspark 1.4 migrating data from MySQL
to Cassandra.  I can work with either the MySQL JDBC jar or the cassandra
jar separately without issue, but when I try to reference both of them it
throws an exception:

Py4JJavaError: An error occurred while calling o32.save.
: java.lang.NoSuchMethodError:
scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

I'm not sure if I'm including the jars correctly as --jars says it's comma
separated and --driver-class-path seems to take a colon delimited
classpath.  If I separate the list in --driver-class-path with a comma, i
get a class not found exception so I'm thinking colon is right.

The job, params for submission, and exception are here.  Help getting this
going would be deeply appreciated.

https://gist.github.com/rustyrazorblade/9a38a9499a7531eefd1e


Error when running pyspark/shell.py to set up iPython notebook

2015-08-09 Thread YaoPau
I'm trying to set up iPython notebook on an edge node with port forwarding so
I can run pyspark off my laptop's browser.  I've mostly been following the
Cloudera guide here:
http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/

I got this working on one cluster running Spark 1.0.  But now on Spark 1.3
(with Python 2.7 and Java 7), I'm getting the error below when I run
/opt/cloudera/parcels/CDH/lib/spark/python/pyspark/shell.py at the line: sc
= SparkContext(appName=PySparkShell, pyFiles=add_files)

Before showing the error, I'll note that running pyspark --master
yarn-client DOES work, so I can run pyspark fine atop YARN, but it looks
like ipython notebook is calling Spark via a different method and producing
an error.  Any ideas?

Traceback (most recent call last):
  File stdin, line 1, in module
  File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py, line
111, in __init__
conf, jsc, profiler_cls)
  File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py, line
159, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
  File /opt/cloudera/parcels/CDH/lib/spark/python/pyspark/context.py, line
212, in _initialize_context
return self._jvm.JavaSparkContext(jconf)
  File
/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 701, in __call__
  File
/opt/cloudera/parcels/CDH/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.io.FileNotFoundException:
/user/spark/applicationHistory/application_1438611042507_0055.inprogress (No
such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.init(FileOutputStream.java:221)
at java.io.FileOutputStream.init(FileOutputStream.java:110)
at
org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:117)
at org.apache.spark.SparkContext.init(SparkContext.scala:399)
at
org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-running-pyspark-shell-py-to-set-up-iPython-notebook-tp24188.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



mllib kmeans produce 1 large and many extremely small clusters

2015-08-09 Thread farhan
I tried running mllib k-means with 20newsgroups data set from sklearn. On a
5000 document data set I get one cluster with most of the documents and
other clusters just have handful of documents.

#code
newsgroups_train =
fetch_20newsgroups(subset='train',random_state=1,remove=('headers',
'footers', 'quotes'))
small_list = random.sample(newsgroups_train.data,5000)

def get_word_vec(text,vocabulary):
word_lst = tokenize_line(text)
word_counter = Counter(word_lst)
lst = []
for v in vocabulary:
if v in word_counter:
lst.append(word_counter[v])
else:
lst.append(0)  
return lst

docsrdd = sc.parallelize(small_list)
tf = docsrdd.map(lambda x : get_word_vec(x,vocabulary))
idf = IDF().fit(tf)
tfidf = idf.transform(tf) 
clusters = KMeans.train(tfidf, 20)

#documents in each cluster, using clusters.predict(x)
Counter({0: 4978, 11: 3, 9: 2, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1, 6: 1, 7: 1, 8:
1, 10: 1, 12: 1, 13: 1, 14: 1, 15: 1, 16: 1, 17: 1, 18: 1, 19: 1})


Please Help !



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mllib-kmeans-produce-1-large-and-many-extremely-small-clusters-tp24189.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Starting a service with Spark Executors

2015-08-09 Thread Koert Kuipers
starting is easy, just use a lazy val. stopping is harder. i do not think
executors have a cleanup hook currently...

On Sun, Aug 9, 2015 at 5:29 AM, Daniel Haviv 
daniel.ha...@veracity-group.com wrote:

 Hi,
 I'd like to start a service with each Spark Executor upon initalization
 and have the disributed code reference that service locally.
 What I'm trying to do is to invoke torch7 computations without reloading
 the model for each row by starting Lua http handler that will recieve http
 requests for each row in my data.

 Can this be achieved with Spark ?

 Thank you.
 Daniel



Re: Accessing S3 files with s3n://

2015-08-09 Thread bo yang
Hi Akshat,

I find some open source library which implements S3 InputFormat for Hadoop.
Then I use Spark newAPIHadoopRDD to load data via that S3 InputFormat.

The open source library is https://github.com/ATLANTBH/emr-s3-io. It is a
little old. I look inside it and make some changes. Then it works, and I
have been using it for more than half year with Spark. It sill work great
so far with latest Spark 1.4.0.

You may need to modify it to avoid reading the whole file. Please feel free
to let me know if you hit any questions.

Best,
Bo




On Sun, Aug 9, 2015 at 6:01 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi Akshat,

 Is there a particular reason you don't use s3a? From my experience,s3a
 performs much better than the rest. I believe the inefficiency is from the
 implementation of the s3 interface.

 Best Regards,

 Jerry

 Sent from my iPhone

 On 9 Aug, 2015, at 5:48 am, Akhil Das ak...@sigmoidanalytics.com wrote:

 Depends on which operation you are doing, If you are doing a .count() on a
 parquet, it might not download the entire file i think, but if you do a
 .count() on a normal text file it might pull the entire file.

 Thanks
 Best Regards

 On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.com wrote:

 Hi,

 I've been trying to track down some problems with Spark reads being very
 slow with s3n:// URIs (NativeS3FileSystem).  After some digging around, I
 realized that this file system implementation fetches the entire file,
 which isn't really a Spark problem, but it really slows down things when
 trying to just read headers from a Parquet file or just creating partitions
 in the RDD.  Is this something that others have observed before, or am I
 doing something wrong?

 Thanks,
 Akshat