Python's ReduceByKeyAndWindow DStream Keeps Growing

2015-08-17 Thread Asim Jalis
When I use reduceByKeyAndWindow with func and invFunc (in PySpark) the size
of the window keeps growing. I am appending the code that reproduces this
issue. This prints out the count() of the dstream which goes up every batch
by 10 elements.

Is this a bug in the Python version of Scala or is this expected behavior?

Here is the code that reproduces this issue.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pprint import pprint

print 'Initializing ssc'
ssc = StreamingContext(SparkContext(), batchDuration=1)
ssc.checkpoint('ckpt')

ds = ssc.textFileStream('input') \
.map(lambda event: (event,1)) \
.reduceByKeyAndWindow(
func=lambda count1,count2: count1+count2,
invFunc=lambda count1,count2: count1-count2,
windowDuration=10,
slideDuration=2)

ds.pprint()
ds.count().pprint()

print 'Starting ssc'
ssc.start()

import itertools
import time
import random

from distutils import dir_util

def batch_write(batch_data, batch_file_path):
with open(batch_file_path,'w') as batch_file:
for element in batch_data:
line = str(element) + \n
batch_file.write(line)

def xrange_write(
batch_size = 5,
batch_dir = 'input',
batch_duration = 1):
'''Every batch_duration write a file with batch_size numbers,
forever. Start at 0 and keep incrementing. Intended for testing
Spark Streaming code.'''

dir_util.mkpath('./input')
for i in itertools.count():
min = batch_size * i
max = batch_size * (i + 1)
batch_data = xrange(min,max)
file_path = batch_dir + '/' + str(i)
batch_write(batch_data, file_path)
time.sleep(batch_duration)

print 'Feeding data to app'
xrange_write()

ssc.awaitTermination()


Re: Spark executor lost because of time out even after setting quite long time out value 1000 seconds

2015-08-17 Thread Akhil Das
It could be stuck on a GC pause, Can you check a bit more in the executor
logs and see whats going on? Also from the driver UI you would get to know
at which stage it is being stuck etc.

Thanks
Best Regards

On Sun, Aug 16, 2015 at 11:45 PM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have written Spark job which seems to be working fine for almost an
 hour
 and after that executor start getting lost because of timeout I see the
 following in log statement

 15/08/16 12:26:46 WARN spark.HeartbeatReceiver: Removing executor 10 with
 no
 recent heartbeats: 1051638 ms exceeds timeout 100 ms

 I dont see any errors but I see above warning and because of it executor
 gets removed by YARN and I see Rpc client disassociated error and
 IOException connection refused and FetchFailedException

 After executor gets removed I see it is again getting added and starts
 working and some other executors fails again. My question is is it normal
 for executor getting lost? What happens to that task lost executors were
 working on? My Spark job keeps on running since it is long around 4-5 hours
 I have very good cluster with 1.2 TB memory and good no of CPU cores. To
 solve above time out issue I tried to increase time spark.akka.timeout to
 1000 seconds but no luck. I am using the following command to run my Spark
 job Please guide I am new to Spark. I am using Spark 1.4.1. Thanks in
 advance.

 /spark-submit --class com.xyz.abc.MySparkJob  --conf
 spark.executor.extraJavaOptions=-XX:MaxPermSize=512M
 --driver-java-options
 -XX:MaxPermSize=512m --driver-memory 4g --master yarn-client
 --executor-memory 25G --executor-cores 8 --num-executors 5 --jars
 /path/to/spark-job.jar



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-lost-because-of-time-out-even-after-setting-quite-long-time-out-value-1000-seconds-tp24289.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark hangs on collect (stuck on scheduler delay)

2015-08-17 Thread Akhil Das
You need to debug further and figure out the bottle neck. Why are you doing
a collect? If the dataset is too huge that will mostly hung the driver
machine. It would be good if you can paste the sample code, without that
its really hard to understand the flow of your program.

Thanks
Best Regards

On Sun, Aug 16, 2015 at 1:14 PM, Sagi r stsa...@gmail.com wrote:

 Hi,
 I'm building a spark application in which I load some data from an
 Elasticsearch cluster (using latest elasticsearch-hadoop connector) and
 continue to perform some calculations on the spark cluster.

 In one case, I use collect on the RDD as soon as it is created (loaded from
 ES).
 However, it is sometimes hangs on one (and sometimes more) node and doesn't
 continue.
 In the web UI, I can see that one node is stuck on scheduler delay and
 prevents from the job to continue,
 (while others have finished).

 Do you have any idea what is going on here?

 The data that is being loaded is fairly small, and only gets mapped once to
 domain objects before being collected.

 Thank you



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hangs-on-collect-stuck-on-scheduler-delay-tp24283.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-17 Thread Petr Novak
Or can I generally create new RDD from transformation and enrich its
partitions with some metadata so that I would copy OffsetRanges in my new
RDD in DStream?

On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak oss.mli...@gmail.com wrote:

 Hi all,
 I need to transform KafkaRDD into a new stream of deserialized case
 classes. I want to use the new stream to save it to file and to perform
 additional transformations on it.

 To save it I want to use offsets in filenames, hence I need OffsetRanges
 in transformed RDD. But KafkaRDD is private, hence I don't know how to do
 it.

 Alternatively I could deserialize directly in messageHandler before
 KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
 messages (KafkaRDD = RDD it would be flatMap).

 Is there a way how to do it using messageHandler, is there another
 approach?

 Many thanks for any help.
 Petr



Re: SparkPi is geting java.lang.NoClassDefFoundError: scala/collection/Seq

2015-08-17 Thread xiaohe lan
Yeah, lots of libraries needs to be changed to compile in order to run the
examples in intellij.

Thanks,
Xiaohe

On Mon, Aug 17, 2015 at 10:01 AM, Jeff Zhang zjf...@gmail.com wrote:

 Check module example's dependency (right click examples and click Open
 Modules Settings), by default scala-library is provided, you need to change
 it to compile to run SparkPi in Intellij. As I remember, you also need to
 change guava and jetty related library to compile too.

 On Mon, Aug 17, 2015 at 2:14 AM, xiaohe lan zombiexco...@gmail.com
 wrote:

 Hi,

 I am trying to run SparkPi in Intellij and getting NoClassDefFoundError.
 Anyone else saw this issue before ?

 Exception in thread main java.lang.NoClassDefFoundError:
 scala/collection/Seq
 at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
 Caused by: java.lang.ClassNotFoundException: scala.collection.Seq
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 6 more

 Process finished with exit code 1

 Thanks,
 Xiaohe




 --
 Best Regards

 Jeff Zhang



Re: Cannot cast to Tuple when running in cluster mode

2015-08-17 Thread Akhil Das
That looks like scala version mismatch.

Thanks
Best Regards

On Fri, Aug 14, 2015 at 9:04 PM, saif.a.ell...@wellsfargo.com wrote:

 Hi All,

 I have a working program, in which I create two big tuples2 out of the
 data. This seems to work in local but when I switch over cluster standalone
 mode, I get this error at the very beggining:

 15/08/14 10:22:25 WARN TaskSetManager: Lost task 4.0 in stage 1.0 (TID 10,
 162.101.194.44): java.lang.ClassCastException:
 scala.collection.Iterator$$anon$13 cannot be cast to scala.Tuple2
 at
 org.apache.spark.sql.DataFrame$$anonfun$33.apply(DataFrame.scala:1189)
 at
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
 at
 org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

 The data comes from JDBC, but I also tried persisting it into memory to
 turn it into a collection, in case JDBC was the problem.

 Any advice?
 Saif




Re: Too many files/dirs in hdfs

2015-08-17 Thread UMESH CHAUDHARY
In Spark Streaming you can simply check whether your RDD contains any
records or not and if records are there you can save them using
FIleOutputStream:

DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR
STUFF} };

This will not create unnecessary files of 0 bytes.

On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 Currently, spark streaming would create a new directory for every batch
 and store the data to it (whether it has anything or not). There is no
 direct append call as of now, but you can achieve this either with
 FileUtil.copyMerge
 http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167
 or have a separate program which will do the clean up for you.

 Thanks
 Best Regards

 On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 Spark stream seems to be creating 0 bytes files even when there is no
 data. Also, I have 2 concerns here:

 1) Extra unnecessary files is being created from the output
 2) Hadoop doesn't work really well with too many files and I see that it
 is creating a directory with a timestamp every 1 second. Is there a better
 way of writing a file, may be use some kind of append mechanism where one
 doesn't have to change the batch interval.





Meaning of local[2]

2015-08-17 Thread praveen S
What does this mean in .setMaster(local[2])

Is this applicable only for standalone Mode?

Can I do this in a cluster setup, eg:
. setMaster(hostname:port[2])..

Is it number of threads per worker node?


Re: Too many files/dirs in hdfs

2015-08-17 Thread Akhil Das
Currently, spark streaming would create a new directory for every batch and
store the data to it (whether it has anything or not). There is no direct
append call as of now, but you can achieve this either with
FileUtil.copyMerge
http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167
or have a separate program which will do the clean up for you.

Thanks
Best Regards

On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 Spark stream seems to be creating 0 bytes files even when there is no
 data. Also, I have 2 concerns here:

 1) Extra unnecessary files is being created from the output
 2) Hadoop doesn't work really well with too many files and I see that it
 is creating a directory with a timestamp every 1 second. Is there a better
 way of writing a file, may be use some kind of append mechanism where one
 doesn't have to change the batch interval.



Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-17 Thread Shushant Arora
How to create classtag in java ?Also Constructor of DirectKafkaInputDStream
takes Function1 not Function but kafkautils.createDirectStream allows
function.

I have below as overriden DirectKafkaInputDStream.


public class CustomDirectKafkaInputDstream extends
DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
kafka.serializer.DefaultDecoder, byte[][]{

public CustomDirectKafkaInputDstream(
StreamingContext ssc_,
MapString, String kafkaParams,
MapTopicAndPartition, Object fromOffsets,
Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler,
ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2,
ClassTagDefaultDecoder evidence$3,
ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) {
super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
evidence$2,
evidence$3, evidence$4, evidence$5);
}
@Override
public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
byte[][] compute(
Time validTime) {
int processe=processedCounter.value();
int failed = failedProcessingsCounter.value();
if((processed==failed)){
System.out.println(backing off since its 100 % failure);
return Option.empty();
}else{
System.out.println(starting the stream );

return super.compute(validTime);
}
}



To create this stream
I am using
scala.collection.immutable.MapString, String scalakafkaParams =
JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.Tuple2String,
Stringconforms());
scala.collection.immutable.MapTopicAndPartition, Long
scalaktopicOffsetMap=
JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.Tuple2TopicAndPartition,
Longconforms());

scala.Function1MessageAndMetadatabyte[], byte[], byte[][] handler = new
FunctionMessageAndMetadatabyte[], byte[], byte[][]() {
..});
JavaDStreambyte[][] directKafkaStream = new
CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
kafka.serializer.DefaultDecoder.class,byte[][].class);



How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And
how to use Function instead of Function1 ?



On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger c...@koeninger.org wrote:

 I'm not aware of an existing api per se, but you could create your own
 subclass of the DStream that returns None for compute() under certain
 conditions.



 On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Hi Cody

 Can you help here if streaming 1.3 has any api for not consuming any
 message in next few runs?

 Thanks

 -- Forwarded message --
 From: Shushant Arora shushantaror...@gmail.com
 Date: Wed, Aug 12, 2015 at 11:23 PM
 Subject: spark streaming 1.3 doubts(force it to not consume anything)
 To: user user@spark.apache.org


 I Can't make my stream application batch interval to change at run time .
 Its always fixed and it always creates jobs at specified batch inetval and
 enqueue them if earleir batch is not finished.

 My requirement is to process the events and post them to some external
 server and if external server is down I want to increase the batch time -
 that is not possible but can I make it not to consume any messages in say
 next 5 successive runs ?







Re: S3n, parallelism, partitions

2015-08-17 Thread Akhil Das
s3n underneath uses the hadoop api, so i guess it would partition according
to your hadoop configuration (128MB per partition by default)

Thanks
Best Regards

On Mon, Aug 17, 2015 at 2:29 PM, matd matd...@gmail.com wrote:

 Hello,

 I would like to understand how the work is parallelized accross a Spark
 cluster (and what is left to the driver) when I read several files from a
 single folder in s3 s3n://bucket_xyz/some_folder_having_many_files_in_it/

 How files (or file parts) are mapped to partitions ?

 Thanks
 Mathieu



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/S3n-parallelism-partitions-tp24293.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to run spark in standalone mode on cassandra with high availability?

2015-08-17 Thread Akhil Das
Have a look at Mesos.

Thanks
Best Regards

On Sat, Aug 15, 2015 at 1:03 PM, Vikram Kone vikramk...@gmail.com wrote:

 Hi,
 We are planning to install Spark in stand alone mode on cassandra cluster.
 The problem, is since Cassandra has a no-SPOF architecture ie any node can
 become the master for the cluster, it creates the problem for Spark master
 since it's not a peer-peer architecture where any node can become the
 master.

 What are our options here? Are there any framworks or tools out there that
 would allow any application to run on a cluster of machines with high
 availablity?



Re: Meaning of local[2]

2015-08-17 Thread Daniel Darabos
Hi Praveen,

On Mon, Aug 17, 2015 at 12:34 PM, praveen S mylogi...@gmail.com wrote:

 What does this mean in .setMaster(local[2])

Local mode (executor in the same JVM) with 2 executor threads.

 Is this applicable only for standalone Mode?

It is not applicable for standalone mode, only for local.

 Can I do this in a cluster setup, eg:
 . setMaster(hostname:port[2])..

No. It's faster to try than to ask a mailing list, actually. Also it's
documented at
http://spark.apache.org/docs/latest/submitting-applications.html#master-urls
.

 Is it number of threads per worker node?

You can control the number of total threads with
spark-submit's --total-executor-cores parameter, if that's what you're
looking for.


Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation

2015-08-17 Thread Petr Novak
Hi all,
I need to transform KafkaRDD into a new stream of deserialized case
classes. I want to use the new stream to save it to file and to perform
additional transformations on it.

To save it I want to use offsets in filenames, hence I need OffsetRanges in
transformed RDD. But KafkaRDD is private, hence I don't know how to do it.

Alternatively I could deserialize directly in messageHandler before
KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
messages (KafkaRDD = RDD it would be flatMap).

Is there a way how to do it using messageHandler, is there another approach?

Many thanks for any help.
Petr


S3n, parallelism, partitions

2015-08-17 Thread matd
Hello,

I would like to understand how the work is parallelized accross a Spark
cluster (and what is left to the driver) when I read several files from a
single folder in s3 s3n://bucket_xyz/some_folder_having_many_files_in_it/

How files (or file parts) are mapped to partitions ?

Thanks 
Mathieu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/S3n-parallelism-partitions-tp24293.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help with persist: Data is requested again

2015-08-17 Thread Akhil Das
Are you triggering an action within the while loop? How are you loading the
data from jdbc? You need to make sure the job has enough partitions to run
parallel to increase the performance.

Thanks
Best Regards

On Sat, Aug 15, 2015 at 2:41 AM, saif.a.ell...@wellsfargo.com wrote:

 Hello all,

 I am writing a program which calls from a database. A run a couple
 computations, but in the end I have a while loop, in which I make a
 modification to the persisted thata. eg:

 val data = PairRDD... persist()
 var i = 0
 while (i  10) {
 val data_mod = data.map(_._1 + 1, _._2)
 val data_joined = data.join(data_mod)
 ... do stuff with data_joined
 }

 Sadly, the result causes that the shuffle inside the WHILE loop is causing
 a jdbc call and that is very slow. It is not finding the data locally

 How can I help myself?
 Saif




Re: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString

2015-08-17 Thread Ted Yu
Have you tried adding path to hbase-protocol jar to
spark.driver.extraClassPath and spark.executor.extraClassPath ?

Cheers

On Mon, Aug 17, 2015 at 7:51 PM, stark_summer stark_sum...@qq.com wrote:

 spark vesion:1.4.1
 java version:1.7
 hadoop version:
 Hadoop 2.3.0-cdh5.1.0

 submit spark job to yarn cluster that read hbase data,after job running, it
 comes  below error :

 15/08/17 19:28:33 ERROR yarn.ApplicationMaster: User class threw exception:
 org.apache.hadoop.hbase.DoNotRetryIOException:
 java.lang.IllegalAccessError:
 class com.google.protobuf.HBaseZeroCopyByteString ca
 nnot access its superclass com.google.protobuf.LiteralByteString
 org.apache.hadoop.hbase.DoNotRetryIOException:
 java.lang.IllegalAccessError:
 class com.google.protobuf.HBaseZeroCopyByteString cannot access its
 superclass com.google.protobuf.LiteralByteString
 at

 org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:210)
 at

 org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:121)
 at

 org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
 at

 org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:264)
 at

 org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:169)
 at
 org.apache.hadoop.hbase.client.ClientScanner.init(ClientScanner.java:164)
 at
 org.apache.hadoop.hbase.client.ClientScanner.init(ClientScanner.java:107)
 at
 org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:736)
 at
 org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:178)
 at
 org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:82)
 at

 org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.isTableAvailable(HConnectionManager.java:962)
 at

 org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1081)
 at

 org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1089)
 at com.umeng.dp.yuliang.play.HBaseToES$.main(HBaseToES.scala:28)
 at com.umeng.dp.yuliang.play.HBaseToES.main(HBaseToES.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at

 org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:483)
 Caused by: java.lang.IllegalAccessError: class
 com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass
 com.google.protobuf.LiteralByteString
 at java.lang.ClassLoader.defineClass1(Native Method)
 at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
 at
 java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
 at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
 at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at

 org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:930)
 at

 org.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:434)
 at

 org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:297)
 at

 org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:157)
 at

 org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:57)
 at

 org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
 ... 18 more

 PS:
 running hadoop mr on yarn that read hbase data, also have this error,
 https://issues.apache.org/jira/browse/HBASE-10304,that is hbase  issues ,

 when submit hadoop mr, add  export

 HADOOP_CLASSPATH=./hbase/hbase-protocol/target/hbase-protocol-0.99.0-SNAPSHOT.jar
 to shell comand
 or add export

 HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar
 to linux /etc/basrc file,it can work well,

 but  submit spark job ,it can not work










 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalAccessError-class-com-google-protobuf-HBaseZeroCopyByteString-cannot-access-its-supg-tp24303.html
 Sent from the Apache Spark 

how do I execute a job on a single worker node in standalone mode

2015-08-17 Thread Axel Dahl
I have a 4 node cluster and have been playing around with the num-executors
parameters, executor-memory and executor-cores

I set the following:
--executor-memory=10G
--num-executors=1
--executor-cores=8

But when I run the job, I see that each worker, is running one executor
which has  2 cores and 2.5G memory.

What I'd like to do instead is have Spark just allocate the job to a single
worker node?

Is that possible in standalone mode or do I need a job/resource scheduler
like Yarn to do that?

Thanks in advance,

-Axel


Spark 1.4.1 - Mac OSX Yosemite

2015-08-17 Thread Alun Champion
Has anyone experienced issues running Spark 1.4.1 on a Mac OSX Yosemite?

I'm been running a standalone 1.3.1 fine but it failed when trying to run
1.4.1. (I also trie 1.4.0).

I've tried both the pre-built packages as well as compiling from source,
both with the same results (I can successfully compile with both mvn and
sbt (after fixing the sbt.jar - which was corrupt)
After downloading/building spark and running ./bin/pyspark or
./bin/spark-shell it silently exits with a code 1.
Creating a context in python I get: Exception: Java gateway process exited
before sending the driver its port number

I couldn't find any specific resolutions on the web.
I did add 'pyspark-shell' to the PYSPARK_SUBMIT_ARGS but to no effect.

Anyone have any further ideas I can explore?
Cheers
   -Alun.


Re: grpah x issue spark 1.3

2015-08-17 Thread David Zeelen
the code below is taken from the spark website and generates the error
detailed

Hi using spark 1.3 and trying some sample code:
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, (rxin, student)), (7L, (jgonzal,
postdoc)),
(5L, (franklin, prof)), (2L, (istoica, prof
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, collab), Edge(5L, 3L, advisor),
Edge(2L, 5L, colleague), Edge(5L, 7L, pi)))
// Define a default user in case there are relationship with missing user
val defaultUser = (John Doe, Missing)
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)


when i run:
graph.numEdges
all works well but with
graph.numVertices
it falls over and i get a whole heap of errors:
Failed to open file: /tmp/spark..shuffle_0_21_0.index
at
org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112)
at
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74)
at org.apache.spark.network.server.TransporSLF4J: Class path contains
multiple SLF4J bindings.

Is anyone else experiencing this? Ive tried different graphs and always end
up with the same results.

thanks

On Tue, 18 Aug 2015 at 12:15 am, Sonal Goyal sonalgoy...@gmail.com wrote:

 I have been using graphx in production on 1.3 and 1.4 with no issues.
 What's the  exception you see and what are you trying to do?
 On Aug 17, 2015 10:49 AM, dizzy5112 dave.zee...@gmail.com wrote:

 Hi using spark 1.3 and trying some sample code:


 when i run:

 all works well but with

 it falls over and i get a whole heap of errors:

 Is anyone else experiencing this? Ive tried different graphs and always
 end
 up with the same results.

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/grpah-x-issue-spark-1-3-tp24292.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark 1.4.1 - Mac OSX Yosemite

2015-08-17 Thread Charlie Hack
I had success earlier today on OSX Yosemite 10.10.4 building Spark 1.4.1
using these instructions
http://genomegeek.blogspot.com/2014/11/how-to-install-apache-spark-on-mac-os-x.html
(using
`$ sbt/sbt clean assembly`, with the additional step of downloading the
proper sbt-launch.jar (0.13.7) from here
http://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/
and replacing the one that is in build/ as you noted. You've set SCALA_HOME
and JAVA_HOME environment variables?

On Mon, Aug 17, 2015 at 8:36 PM, Alun Champion a...@achampion.net wrote:

 Has anyone experienced issues running Spark 1.4.1 on a Mac OSX Yosemite?

 I'm been running a standalone 1.3.1 fine but it failed when trying to run
 1.4.1. (I also trie 1.4.0).

 I've tried both the pre-built packages as well as compiling from source,
 both with the same results (I can successfully compile with both mvn and
 sbt (after fixing the sbt.jar - which was corrupt)
 After downloading/building spark and running ./bin/pyspark or
 ./bin/spark-shell it silently exits with a code 1.
 Creating a context in python I get: Exception: Java gateway process exited
 before sending the driver its port number

 I couldn't find any specific resolutions on the web.
 I did add 'pyspark-shell' to the PYSPARK_SUBMIT_ARGS but to no effect.

 Anyone have any further ideas I can explore?
 Cheers
-Alun.




-- 
# +17344761472


Re: java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString

2015-08-17 Thread stark_summer
approach1:
submit spark job add bolow:
 --conf
spark.driver.extraClassPath=/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar
--conf
spark.executor.extraClassPath=/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar
such as:

/home/dp/spark/spark-1.4/spark-1.4.1/bin/spark-submit --class
com.umeng.dp.yuliang.play.HBaseToES --master yarn-cluster --conf
spark.driver.extraClassPath=/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar
--conf
spark.executor.extraClassPath=/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar
  
--jars /home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar
ScalaMR-0.0.1-jar-with-dependencies.jar 


approach2:
add below config to $SPARK_HOME/conf/spark-deafults.conf
spark.driver.extraClassPath
/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar
spark.executor.extraClassPath
/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalAccessError-class-com-google-protobuf-HBaseZeroCopyByteString-cannot-access-its-supg-tp24303p24306.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.lang.IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString

2015-08-17 Thread stark_summer
spark vesion:1.4.1
java version:1.7
hadoop version:
Hadoop 2.3.0-cdh5.1.0

submit spark job to yarn cluster that read hbase data,after job running, it
comes  below error :

15/08/17 19:28:33 ERROR yarn.ApplicationMaster: User class threw exception:
org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalAccessError:
class com.google.protobuf.HBaseZeroCopyByteString ca
nnot access its superclass com.google.protobuf.LiteralByteString
org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalAccessError:
class com.google.protobuf.HBaseZeroCopyByteString cannot access its
superclass com.google.protobuf.LiteralByteString
at
org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:210)
at
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:121)
at
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
at
org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:264)
at
org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:169)
at
org.apache.hadoop.hbase.client.ClientScanner.init(ClientScanner.java:164)
at
org.apache.hadoop.hbase.client.ClientScanner.init(ClientScanner.java:107)
at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:736)
at
org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:178)
at
org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:82)
at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.isTableAvailable(HConnectionManager.java:962)
at
org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1081)
at
org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1089)
at com.umeng.dp.yuliang.play.HBaseToES$.main(HBaseToES.scala:28)
at com.umeng.dp.yuliang.play.HBaseToES.main(HBaseToES.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:483)
Caused by: java.lang.IllegalAccessError: class
com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass
com.google.protobuf.LiteralByteString
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at
org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:930)
at
org.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:434)
at
org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:297)
at
org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:157)
at
org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:57)
at
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
... 18 more

PS:
running hadoop mr on yarn that read hbase data, also have this error,
https://issues.apache.org/jira/browse/HBASE-10304,that is hbase  issues ,

when submit hadoop mr, add  export
HADOOP_CLASSPATH=./hbase/hbase-protocol/target/hbase-protocol-0.99.0-SNAPSHOT.jar
 
to shell comand 
or add export
HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/home/cluster/apps/hbase/lib/hbase-protocol-0.98.1-cdh5.1.0.jar
 
to linux /etc/basrc file,it can work well,

but  submit spark job ,it can not work










--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-IllegalAccessError-class-com-google-protobuf-HBaseZeroCopyByteString-cannot-access-its-supg-tp24303.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Serializing MLlib MatrixFactorizationModel

2015-08-17 Thread Joseph Bradley
I'd recommend using the built-in save and load, which will be better for
cross-version compatibility.  You should be able to call
myModel.save(path), and load it back with
MatrixFactorizationModel.load(path).

On Mon, Aug 17, 2015 at 6:31 AM, Madawa Soysa madawa...@cse.mrt.ac.lk
wrote:

 Hi All,

 I have an issue when i try to serialize a MatrixFactorizationModel object
 as a java object in a Java application. When I deserialize the object, I
 get the following exception.

 Caused by: java.lang.ClassNotFoundException:
 org.apache.spark.OneToOneDependency cannot be found by
 org.scala-lang.scala-library_2.10.4.v20140209-180020-VFINAL-b66a39653b

 Any solution for this?

 --

 *_**Madawa Soysa*

 Undergraduate,

 Department of Computer Science and Engineering,

 University of Moratuwa.


 Mobile: +94 71 461 6050 %2B94%2075%20812%200726 | Email:
 madawa...@cse.mrt.ac.lk
 LinkedIn http://lk.linkedin.com/in/madawasoysa | Twitter
 https://twitter.com/madawa_rc | Tumblr http://madawas.tumblr.com/



Re: Spark 1.4.1 - Mac OSX Yosemite

2015-08-17 Thread Alun Champion
Yes, they both are set. Just recompiled and still no success, silent
failure.
Which versions of java and scala are you using?


On 17 August 2015 at 19:59, Charlie Hack charles.t.h...@gmail.com wrote:

 I had success earlier today on OSX Yosemite 10.10.4 building Spark 1.4.1
 using these instructions
 http://genomegeek.blogspot.com/2014/11/how-to-install-apache-spark-on-mac-os-x.html
  (using
 `$ sbt/sbt clean assembly`, with the additional step of downloading the
 proper sbt-launch.jar (0.13.7) from here
 http://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/
 and replacing the one that is in build/ as you noted. You've set SCALA_HOME
 and JAVA_HOME environment variables?

 On Mon, Aug 17, 2015 at 8:36 PM, Alun Champion a...@achampion.net wrote:

 Has anyone experienced issues running Spark 1.4.1 on a Mac OSX Yosemite?

 I'm been running a standalone 1.3.1 fine but it failed when trying to run
 1.4.1. (I also trie 1.4.0).

 I've tried both the pre-built packages as well as compiling from source,
 both with the same results (I can successfully compile with both mvn and
 sbt (after fixing the sbt.jar - which was corrupt)
 After downloading/building spark and running ./bin/pyspark or
 ./bin/spark-shell it silently exits with a code 1.
 Creating a context in python I get: Exception: Java gateway process
 exited before sending the driver its port number

 I couldn't find any specific resolutions on the web.
 I did add 'pyspark-shell' to the PYSPARK_SUBMIT_ARGS but to no effect.

 Anyone have any further ideas I can explore?
 Cheers
-Alun.




 --
 # +17344761472



Re: spark streaming 1.3 doubts(force it to not consume anything)

2015-08-17 Thread Cody Koeninger
Look at the definitions of the java-specific KafkaUtils.createDirectStream
methods (the ones that take a JavaStreamingContext)

On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora shushantaror...@gmail.com
wrote:

 How to create classtag in java ?Also Constructor
 of DirectKafkaInputDStream takes Function1 not Function but
 kafkautils.createDirectStream allows function.

 I have below as overriden DirectKafkaInputDStream.


 public class CustomDirectKafkaInputDstream extends
 DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder,
 kafka.serializer.DefaultDecoder, byte[][]{

 public CustomDirectKafkaInputDstream(
 StreamingContext ssc_,
 MapString, String kafkaParams,
 MapTopicAndPartition, Object fromOffsets,
 Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler,
 ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2,
 ClassTagDefaultDecoder evidence$3,
 ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) {
 super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
 evidence$2,
 evidence$3, evidence$4, evidence$5);
 }
 @Override
 public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder,
 byte[][] compute(
 Time validTime) {
 int processe=processedCounter.value();
 int failed = failedProcessingsCounter.value();
 if((processed==failed)){
 System.out.println(backing off since its 100 % failure);
 return Option.empty();
 }else{
 System.out.println(starting the stream );

 return super.compute(validTime);
 }
 }



 To create this stream
 I am using
 scala.collection.immutable.MapString, String scalakafkaParams =
 JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.Tuple2String,
 Stringconforms());
 scala.collection.immutable.MapTopicAndPartition, Long
 scalaktopicOffsetMap=
 JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.Tuple2TopicAndPartition,
 Longconforms());

 scala.Function1MessageAndMetadatabyte[], byte[], byte[][] handler =
 new FunctionMessageAndMetadatabyte[], byte[], byte[][]() {
 ..});
 JavaDStreambyte[][] directKafkaStream = new
 CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
 handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
 kafka.serializer.DefaultDecoder.class,byte[][].class);



 How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And
 how to use Function instead of Function1 ?



 On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger c...@koeninger.org
 wrote:

 I'm not aware of an existing api per se, but you could create your own
 subclass of the DStream that returns None for compute() under certain
 conditions.



 On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi Cody

 Can you help here if streaming 1.3 has any api for not consuming any
 message in next few runs?

 Thanks

 -- Forwarded message --
 From: Shushant Arora shushantaror...@gmail.com
 Date: Wed, Aug 12, 2015 at 11:23 PM
 Subject: spark streaming 1.3 doubts(force it to not consume anything)
 To: user user@spark.apache.org


 I Can't make my stream application batch interval to change at run time
 . Its always fixed and it always creates jobs at specified batch inetval
 and enqueue them if earleir batch is not finished.

 My requirement is to process the events and post them to some external
 server and if external server is down I want to increase the batch time -
 that is not possible but can I make it not to consume any messages in say
 next 5 successive runs ?








rdd count is throwing null pointer exception

2015-08-17 Thread Priya Ch
Hi All,

 Thank you very much for the detailed explanation.

I have scenario like this-
I have rdd of ticket records and another rdd of booking records. for each
ticket record, i need to check whether any link exists in booking table.

val ticketCachedRdd = ticketRdd.cache

ticketRdd.foreach{
ticket =
val bookingRecords =  queryOnBookingTable (date, flightNumber,
flightCarrier)  // this function queries the booking table and retrieves
the booking rows
println(ticketCachedRdd.count) // this is throwing Null pointer exception

}

Is there somthing wrong in the count, i am trying to use the count of
cached rdd when looping through the actual rdd. whats wrong in this ?

Thanks,
Padma Ch


Re: grpah x issue spark 1.3

2015-08-17 Thread Sonal Goyal
I have been using graphx in production on 1.3 and 1.4 with no issues.
What's the  exception you see and what are you trying to do?
On Aug 17, 2015 10:49 AM, dizzy5112 dave.zee...@gmail.com wrote:

 Hi using spark 1.3 and trying some sample code:


 when i run:

 all works well but with

 it falls over and i get a whole heap of errors:

 Is anyone else experiencing this? Ive tried different graphs and always end
 up with the same results.

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/grpah-x-issue-spark-1-3-tp24292.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Interview Questions

2015-08-17 Thread Sandeep Giri
This statement is from the Spark's website itself.


Regards,
Sandeep Giri,
+1 347 781 4573 (US)
+91-953-899-8962 (IN)

www.KnowBigData.com. http://KnowBigData.com.
Phone: +1-253-397-1945 (Office)

[image: linkedin icon] https://linkedin.com/company/knowbigdata [image:
other site icon] http://knowbigdata.com  [image: facebook icon]
https://facebook.com/knowbigdata [image: twitter icon]
https://twitter.com/IKnowBigData https://twitter.com/IKnowBigData


On Wed, Aug 12, 2015 at 10:42 PM, Peyman Mohajerian mohaj...@gmail.com
wrote:

 I think this statement is inaccurate:
 Q7: What are Actions? A: An action brings back the data from the RDD to
 the local machine -

 Also I wouldn't say Spark is 100x faster than Hadoop and it is memory
 based. This is the kind of statement that will not get you the job. When it
 comes to shuffle it has to write to disk, it is a faster in many cases but
 100x is just some marketing statement in a very narrow use cases.






 On Thu, Jul 30, 2015 at 4:55 AM, Sandeep Giri sand...@knowbigdata.com
 wrote:

 i have prepared some interview questions:
 http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-1
 http://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2

 please provide your feedback.

 On Wed, Jul 29, 2015, 23:43 Pedro Rodriguez ski.rodrig...@gmail.com
 wrote:

 You might look at the edx course on Apache Spark or ML with Spark. There
 are probably some homework problems or quiz questions that might be
 relevant. I haven't looked at the course myself, but thats where I would go
 first.


 https://www.edx.org/course/introduction-big-data-apache-spark-uc-berkeleyx-cs100-1x

 https://www.edx.org/course/scalable-machine-learning-uc-berkeleyx-cs190-1x

 --
 Pedro Rodriguez
 PhD Student in Distributed Machine Learning | CU Boulder
 UC Berkeley AMPLab Alumni

 ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
 Github: github.com/EntilZha | LinkedIn:
 https://www.linkedin.com/in/pedrorodriguezscience





Serializing MLlib MatrixFactorizationModel

2015-08-17 Thread Madawa Soysa
Hi All,

I have an issue when i try to serialize a MatrixFactorizationModel object
as a java object in a Java application. When I deserialize the object, I
get the following exception.

Caused by: java.lang.ClassNotFoundException:
org.apache.spark.OneToOneDependency cannot be found by
org.scala-lang.scala-library_2.10.4.v20140209-180020-VFINAL-b66a39653b

Any solution for this?

-- 

*_**Madawa Soysa*

Undergraduate,

Department of Computer Science and Engineering,

University of Moratuwa.


Mobile: +94 71 461 6050 %2B94%2075%20812%200726 | Email:
madawa...@cse.mrt.ac.lk
LinkedIn http://lk.linkedin.com/in/madawasoysa | Twitter
https://twitter.com/madawa_rc | Tumblr http://madawas.tumblr.com/


Re: rdd count is throwing null pointer exception

2015-08-17 Thread Preetam
The error could be because of the missing brackets after the word cache - 
.ticketRdd.cache()

 On Aug 17, 2015, at 7:26 AM, Priya Ch learnings.chitt...@gmail.com wrote:
 
 Hi All,
 
  Thank you very much for the detailed explanation.
 
 I have scenario like this- 
 I have rdd of ticket records and another rdd of booking records. for each 
 ticket record, i need to check whether any link exists in booking table.
 
 val ticketCachedRdd = ticketRdd.cache
 
 ticketRdd.foreach{
 ticket =
 val bookingRecords =  queryOnBookingTable (date, flightNumber, flightCarrier) 
  // this function queries the booking table and retrieves the booking rows
 println(ticketCachedRdd.count) // this is throwing Null pointer exception
 
 }
 
 Is there somthing wrong in the count, i am trying to use the count of cached 
 rdd when looping through the actual rdd. whats wrong in this ?
 
 Thanks,
 Padma Ch

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Paper on Spark SQL

2015-08-17 Thread Todd
Hi,
I can't access 
http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf.
Could someone help try to see if  it is available and reply with it?Thanks!


Re: Paper on Spark SQL

2015-08-17 Thread Ted Yu
I got 404 when trying to access the link. 



 On Aug 17, 2015, at 5:31 AM, Todd bit1...@163.com wrote:
 
 Hi,
 I can't access 
 http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf.
 Could someone help try to see if  it is available and reply with it?Thanks!


Re: Paper on Spark SQL

2015-08-17 Thread Nan Zhu
an extra “,” is at the end

--  
Nan Zhu
http://codingcat.me


On Monday, August 17, 2015 at 9:28 AM, Ted Yu wrote:

 I got 404 when trying to access the link.  
  
  
  
 On Aug 17, 2015, at 5:31 AM, Todd bit1...@163.com (mailto:bit1...@163.com) 
 wrote:
  
  Hi,
  I can't access 
  http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf. 
  (http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf,)
  Could someone help try to see if  it is available and reply with it?Thanks!



Exception when S3 path contains colons

2015-08-17 Thread Brian Stempin
Hi,
I'm running Spark on Amazon EMR (Spark 1.4.1, Hadoop 2.6.0).  I'm seeing
the exception below when encountering file names that contain colons.  Any
idea on how to get around this?

scala val files = sc.textFile(s3a://redactedbucketname/*)

2015-08-18 04:38:34,567 INFO  [main] storage.MemoryStore
(Logging.scala:logInfo(59)) - ensureFreeSpace(242224) called with
curMem=669367, maxMem=285203496

2015-08-18 04:38:34,568 INFO  [main] storage.MemoryStore
(Logging.scala:logInfo(59)) - Block broadcast_3 stored as values in memory
(estimated size 236.5 KB, free 271.1 MB)

2015-08-18 04:38:34,663 INFO  [main] storage.MemoryStore
(Logging.scala:logInfo(59)) - ensureFreeSpace(21533) called with
curMem=911591, maxMem=285203496

2015-08-18 04:38:34,664 INFO  [main] storage.MemoryStore
(Logging.scala:logInfo(59)) - Block broadcast_3_piece0 stored as bytes in
memory (estimated size 21.0 KB, free 271.1 MB)

2015-08-18 04:38:34,665 INFO
 [sparkDriver-akka.actor.default-dispatcher-19] storage.BlockManagerInfo
(Logging.scala:logInfo(59)) - Added broadcast_3_piece0 in memory on
10.182.184.26:60338 (size: 21.0 KB, free: 271.9 MB)

2015-08-18 04:38:34,667 INFO  [main] spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 3 from textFile at
console:21

files: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile
at console:21


scala files.count

2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
(S3AFileSystem.java:listStatus(533)) - List status for path:
s3a://redactedbucketname/

2015-08-18 04:38:37,262 INFO  [main] s3a.S3AFileSystem
(S3AFileSystem.java:getFileStatus(684)) - Getting path status for
s3a://redactedbucketname/ ()

java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative
path in absolute URI:
[922-212-4438]-[119]-[1]-[2015-08-13T15:43:12.346193%5D-%5B2015-01-01T00:00:00%5D-redacted.csv

at org.apache.hadoop.fs.Path.initialize(Path.java:206)

at org.apache.hadoop.fs.Path.init(Path.java:172)

at org.apache.hadoop.fs.Path.init(Path.java:94)

at org.apache.hadoop.fs.Globber.glob(Globber.java:240)

at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1700)

at
org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:229)

at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:200)

at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:279)

at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)

at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)

at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)

at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:219)

at org.apache.spark.rdd.RDD$anonfun$partitions$2.apply(RDD.scala:217)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781)

at org.apache.spark.rdd.RDD.count(RDD.scala:1099)

at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:24)

at $iwC$iwC$iwC$iwC$iwC$iwC$iwC.init(console:29)

at $iwC$iwC$iwC$iwC$iwC$iwC.init(console:31)

at $iwC$iwC$iwC$iwC$iwC.init(console:33)

at $iwC$iwC$iwC$iwC.init(console:35)

at $iwC$iwC$iwC.init(console:37)

at $iwC$iwC.init(console:39)

at $iwC.init(console:41)

at init(console:43)

at .init(console:47)

at .clinit(console)

at .init(console:7)

at .clinit(console)

at $print(console)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)

at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)

at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)

at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)

at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)

at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)

at org.apache.spark.repl.SparkILoop.org
http://org.apache.spark.repl.sparkiloop.org/
$apache$spark$repl$SparkILoop$loop(SparkILoop.scala:670)

at
org.apache.spark.repl.SparkILoop$anonfun$org$apache$spark$repl$SparkILoop$process$1.apply$mcZ$sp(SparkILoop.scala:997)

at

Re: Paper on Spark SQL

2015-08-17 Thread Ted Yu
Thanks Nan.

That is why I always put an extra space between URL and punctuation in my
comments / emails.

On Mon, Aug 17, 2015 at 6:31 AM, Nan Zhu zhunanmcg...@gmail.com wrote:

 an extra “,” is at the end

 --
 Nan Zhu
 http://codingcat.me

 On Monday, August 17, 2015 at 9:28 AM, Ted Yu wrote:

 I got 404 when trying to access the link.



 On Aug 17, 2015, at 5:31 AM, Todd bit1...@163.com wrote:

 Hi,
 I can't access
 http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf.
 http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf,
 Could someone help try to see if  it is available and reply with it?Thanks!





Re: Left outer joining big data set with small lookups

2015-08-17 Thread Silvio Fiorito
Try doing a count on both lookups to force the caching to occur before the join.




On 8/17/15, 12:39 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote:

Thanks for your help

I tried to cache the lookup tables and left out join with the big table (DF). 
Join does not seem to be using broadcast join-still it goes with hash 
partition join and shuffling big table. Here is the scenario


…
table1 as big_df
left outer join
table2 as lkup
on big_df.lkupid = lkup.lkupid

table1 above is well distributed across all 40 partitions because 
sqlContext.sql(SET spark.sql.shuffle.partitions=40). table2 is small, using 
just 2 partition.  s. After the join stage, sparkUI showed me that all 
activities ended up in  just 2 executors. When I tried to dump the data in 
hdfs after join stage, all data ended up in 2 partition files and rest 38 
files are 0 sized files.

Since above one did not work, I tried to broadcast DF and registered as table 
before join. 

val table2_df = sqlContext.sql(select * from table2)
val broadcast_table2 =sc.broadcast(table2_df)
broadcast_table2.value.registerTempTable(“table2”)

Broadcast is also having same issue as explained above. All data processed by 
just executors due to lookup skew.

Any more idea to tackle this issue in Spark Dataframe?

Thanks
Vijay


 On Aug 14, 2015, at 10:27 AM, Silvio Fiorito silvio.fior...@granturing.com 
 wrote:
 
 You could cache the lookup DataFrames, it’ll then do a broadcast join.
 
 
 
 
 On 8/14/15, 9:39 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote:
 
 Hi
 
 I am facing huge performance problem when I am trying to left outer join 
 very big data set (~140GB) with bunch of small lookups [Start schema type]. 
 I am using data frame  in spark sql. It looks like data is shuffled and 
 skewed when that join happens. Is there any way to improve performance of 
 such type of join in spark? 
 
 How can I hint optimizer to go with replicated join etc., to avoid shuffle? 
 Would it help to create broadcast variables on small lookups?  If I create 
 broadcast variables, how can I convert them into data frame and use them in 
 sparksql type of join?
 
 Thanks
 Vijay
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Setting up Spark/flume/? to Ingest 10TB from FTP

2015-08-17 Thread Steve Loughran

with the right ftp client JAR on your classpath (I forget which), you can use 
ftp:// a a source for a hadoop FS operation. you may even be able to use it as 
an input for some spark (non streaming job directly.


On 14 Aug 2015, at 14:11, Varadhan, Jawahar 
varad...@yahoo.com.INVALIDmailto:varad...@yahoo.com.INVALID wrote:

Thanks Marcelo. But our problem is little complicated.

We have 10+ ftp sites that we will be transferring data from. The ftp server 
info, filename, credentials are all coming via Kafka message. So, I want to 
read those kafka message and dynamically connect to the ftp site and download 
those fat files and store it in HDFS.

And hence, I was planning to use Spark Streaming with Kafka or Flume with 
Kafka. But flume runs on a JVM and may not be the best option as the huge file 
will create memory issues. Please suggest someway to run it inside the cluster.





From: Marcelo Vanzin van...@cloudera.commailto:van...@cloudera.com
To: Varadhan, Jawahar varad...@yahoo.commailto:varad...@yahoo.com
Cc: d...@spark.apache.orgmailto:d...@spark.apache.org 
d...@spark.apache.orgmailto:d...@spark.apache.org
Sent: Friday, August 14, 2015 3:23 PM
Subject: Re: Setting up Spark/flume/? to Ingest 10TB from FTP

Why do you need to use Spark or Flume for this?

You can just use curl and hdfs:

  curl ftp://blahftp://blah/ | hdfs dfs -put - /blah




On Fri, Aug 14, 2015 at 1:15 PM, Varadhan, Jawahar 
varad...@yahoo.com.invalidmailto:varad...@yahoo.com.invalid wrote:
What is the best way to bring such a huge file from a FTP server into Hadoop to 
persist in HDFS? Since a single jvm process might run out of memory, I was 
wondering if I can use Spark or Flume to do this. Any help on this matter is 
appreciated.

I prefer a application/process running inside Hadoop which is doing this 
transfer

Thanks.



--
Marcelo







Re: rdd count is throwing null pointer exception

2015-08-17 Thread Priya Ch
Looks like because of Spark-5063
RDD transformations and actions can only be invoked by the driver, not
inside of other transformations; for example, rdd1.map(x =
rdd2.values.count() * x) is invalid because the values transformation and
count action cannot be performed inside of the rdd1.map transformation. For
more information, see SPARK-5063.

On Mon, Aug 17, 2015 at 8:13 PM, Preetam preetam...@gmail.com wrote:

 The error could be because of the missing brackets after the word cache -
 .ticketRdd.cache()

  On Aug 17, 2015, at 7:26 AM, Priya Ch learnings.chitt...@gmail.com
 wrote:
 
  Hi All,
 
   Thank you very much for the detailed explanation.
 
  I have scenario like this-
  I have rdd of ticket records and another rdd of booking records. for
 each ticket record, i need to check whether any link exists in booking
 table.
 
  val ticketCachedRdd = ticketRdd.cache
 
  ticketRdd.foreach{
  ticket =
  val bookingRecords =  queryOnBookingTable (date, flightNumber,
 flightCarrier)  // this function queries the booking table and retrieves
 the booking rows
  println(ticketCachedRdd.count) // this is throwing Null pointer exception
 
  }
 
  Is there somthing wrong in the count, i am trying to use the count of
 cached rdd when looping through the actual rdd. whats wrong in this ?
 
  Thanks,
  Padma Ch



Re: S3n, parallelism, partitions

2015-08-17 Thread Akshat Aranya
This will also depend on the file format you are using.

A word of advice: you would be much better off with the s3a file system.
As I found out recently the hard way, s3n has some issues with reading
through entire files even when looking for headers.

On Mon, Aug 17, 2015 at 2:10 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 s3n underneath uses the hadoop api, so i guess it would partition
 according to your hadoop configuration (128MB per partition by default)

 Thanks
 Best Regards

 On Mon, Aug 17, 2015 at 2:29 PM, matd matd...@gmail.com wrote:

 Hello,

 I would like to understand how the work is parallelized accross a Spark
 cluster (and what is left to the driver) when I read several files from a
 single folder in s3
 s3n://bucket_xyz/some_folder_having_many_files_in_it/

 How files (or file parts) are mapped to partitions ?

 Thanks
 Mathieu



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/S3n-parallelism-partitions-tp24293.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Apache Spark - Parallel Processing of messages from Kafka - Java

2015-08-17 Thread unk1102
val numStreams = 4
val kafkaStreams = (1 to numStreams).map { i = KafkaUtils.createStream(...)
}

In a Java in a for loop you will create four streams using
KafkaUtils.createStream() so that each receiver will run in different
threads 

for more information please visit
http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

Hope it helps!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Parallel-Processing-of-messages-from-Kafka-Java-tp24284p24297.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What's the logic in RangePartitioner.rangeBounds method of Apache Spark

2015-08-17 Thread ihainan
*Firstly so sorry for my poor English.*

I was reading the source code of Apache Spark 1.4.1 and I really got stuck
at the logic of RangePartitioner.rangeBounds method. The code is shown
below.



So can anyone please explain me that:

1. What is 3.0 * for in the code line of val sampleSizePerPartition =
math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt? Why choose 3.0
rather than other values?

2. Why fraction * n  sampleSizePerPartition means that a partition
contains much more than the average number of items. Can you give an example
that we need to re-sample the partition?

Thanks a lot!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-s-the-logic-in-RangePartitioner-rangeBounds-method-of-Apache-Spark-tp24296.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Programmatically create SparkContext on YARN

2015-08-17 Thread Andreas Fritzler
Hi all,

when runnig the Spark cluster in standalone mode I am able to create the
Spark context from Java via the following code snippet:

SparkConf conf = new SparkConf()
.setAppName(MySparkApp)
.setMaster(spark://SPARK_MASTER:7077)
.setJars(jars);
 JavaSparkContext sc = new JavaSparkContext(conf);


As soon as I'm done with my processing, I can just close it via

 sc.stop();

Now my question: Is the same also possible when running Spark on YARN? I
currently don't see how this should be possible without submitting your
application as a packaged jar file. Is there a way to get this kind of
interactivity from within your Scala/Java code?

Regards,
Andrea


Re: issue Running Spark Job on Yarn Cluster

2015-08-17 Thread poolis
Did you resolve this issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21779p24300.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Embarassingly parallel computation in SparkR?

2015-08-17 Thread Kristina Rogale Plazonic
Hi,

I'm wondering how to achieve, say, a Monte Carlo simulation in SparkR
without use of low level RDD functions that were made private in 1.4, such
as parallelize and map. Something like

parallelize(sc, 1:1000).map (
   ### R code that does my computation
)

where the code is the same on every node, only with different seeds.

(I'm going to use this code with SparkR:::parallelize, but I'm wondering if
there is a better way, or whether this might be a use case that would
justify not making those functions private?)

Many thanks!

kristina


registering an empty RDD as a temp table in a PySpark SQL context

2015-08-17 Thread Eric Walker
I have an RDD queried from a scan of a data source.  Sometimes the RDD has
rows and at other times it has none.  I would like to register this RDD as
a temporary table in a SQL context.  I suspect this will work in Scala, but
in PySpark some code assumes that the RDD has rows in it, which are used to
verify the schema:

https://github.com/apache/spark/blob/branch-1.3/python/pyspark/sql/context.py#L299

Before I attempt to extend the Scala code to handle an empty RDD or provide
an empty DataFrame that can be registered, I was wondering what people
recommend in this case.  Perhaps there's a simple way of registering an
empty RDD as a temporary table in a PySpark SQL context that I'm
overlooking.

An alternative is to add special case logic in the client code to deal with
an RDD backed by an empty table scan.  But since the SQL will already
handle that, I was hoping to avoid special case logic.

Eric


Re: Spark on scala 2.11 build fails due to incorrect jline dependency in REPL

2015-08-17 Thread Ted Yu
You were building against 1.4.x, right ?

In master branch, switch-to-scala-2.11.sh is gone. There is scala-2.11
profile.

FYI

On Sun, Aug 16, 2015 at 11:12 AM, Stephen Boesch java...@gmail.com wrote:


 I am building spark with the following options - most notably the
 **scala-2.11**:

  . dev/switch-to-scala-2.11.sh
 mvn -Phive -Pyarn -Phadoop-2.6 -Dhadoop2.6.2 -Pscala-2.11 -DskipTests
 -Dmaven.javadoc.skip=true clean package


 The build goes pretty far but fails in one of the minor modules *repl*:

 [INFO]
 
 [ERROR] Failed to execute goal on project spark-repl_2.11: Could not
 resolve dependencies
 for project org.apache.spark:spark-repl_2.11:jar:1.5.0-SNAPSHOT:
  Could not   find artifact org.scala-lang:jline:jar:2.11.7 in central
  (https://repo1.maven.org/maven2) - [Help 1]

 Upon investigation - from 2.11.5 and later the scala version of jline is
 no longer required: they use the default jline distribution.

 And in fact the repl only shows dependency on jline for the 2.10.4 scala
 version:

 profile
   idscala-2.10/id
   activation
 propertyname!scala-2.11/name/property
   /activation
   properties
 scala.version2.10.4/scala.version
 scala.binary.version2.10/scala.binary.version
 jline.version${scala.version}/jline.version
 jline.groupidorg.scala-lang/jline.groupid
   /properties
   dependencyManagement
 dependencies
   dependency
 groupId${jline.groupid}/groupId
 artifactIdjline/artifactId
 version${jline.version}/version
   /dependency
 /dependencies
   /dependencyManagement
 /profile

 So then it is not clear why this error is occurring. Pointers appreciated.





Re: Spark on scala 2.11 build fails due to incorrect jline dependency in REPL

2015-08-17 Thread Stephen Boesch
In 1.4 it is change-scala-version.sh  2.11

But the problem was it is a -Dscala-211  not  a -P.  I misread the doc's.

2015-08-17 14:17 GMT-07:00 Ted Yu yuzhih...@gmail.com:

 You were building against 1.4.x, right ?

 In master branch, switch-to-scala-2.11.sh is gone. There is scala-2.11
 profile.

 FYI

 On Sun, Aug 16, 2015 at 11:12 AM, Stephen Boesch java...@gmail.com
 wrote:


 I am building spark with the following options - most notably the
 **scala-2.11**:

  . dev/switch-to-scala-2.11.sh
 mvn -Phive -Pyarn -Phadoop-2.6 -Dhadoop2.6.2 -Pscala-2.11 -DskipTests
 -Dmaven.javadoc.skip=true clean package


 The build goes pretty far but fails in one of the minor modules *repl*:

 [INFO]
 
 [ERROR] Failed to execute goal on project spark-repl_2.11: Could not
 resolve dependencies
 for project org.apache.spark:spark-repl_2.11:jar:1.5.0-SNAPSHOT:
  Could not   find artifact org.scala-lang:jline:jar:2.11.7 in central
  (https://repo1.maven.org/maven2) - [Help 1]

 Upon investigation - from 2.11.5 and later the scala version of jline is
 no longer required: they use the default jline distribution.

 And in fact the repl only shows dependency on jline for the 2.10.4 scala
 version:

 profile
   idscala-2.10/id
   activation
 propertyname!scala-2.11/name/property
   /activation
   properties
 scala.version2.10.4/scala.version
 scala.binary.version2.10/scala.binary.version
 jline.version${scala.version}/jline.version
 jline.groupidorg.scala-lang/jline.groupid
   /properties
   dependencyManagement
 dependencies
   dependency
 groupId${jline.groupid}/groupId
 artifactIdjline/artifactId
 version${jline.version}/version
   /dependency
 /dependencies
   /dependencyManagement
 /profile

 So then it is not clear why this error is occurring. Pointers appreciated.






Spark Job Hangs on our production cluster

2015-08-17 Thread java8964
I am comparing the log of Spark line by line between the hanging case (big 
dataset) and not hanging case (small dataset). 
In the hanging case, the Spark's log looks identical with not hanging case for 
reading the first block data from the HDFS.
But after that, starting from line 438 in the spark-hang.log, I only see the 
log generated from Worker, like following in the next 10 minutes:
15/08/14 14:24:19 DEBUG Worker: [actor] received message SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:24:19 DEBUG Worker: 
[actor] handled message (0.121965 ms) SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]...15/08/14
 14:33:04 DEBUG Worker: [actor] received message SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]15/08/14 14:33:04 DEBUG Worker: 
[actor] handled message (0.136146 ms) SendHeartbeat from 
Actor[akka://sparkWorker/user/Worker#90699948]
until almost 10 minutes I have to kill the job. I know it will hang forever.
But in the good log (spark-finished.log), starting from the line 361, Spark 
started to read the 2nd split data, I can see all the debug message from 
BlockReaderLocal, BlockManger.
If I compared between these 2 cases log:
in the good log case from line 478, I can saw this message:15/08/14 14:37:09 
DEBUG BlockReaderLocal: putting FileInputStream for ..
But in the hang log case for reading the 2nd split data, I don't see this 
message any more (It existed for the 1st split). I believe in this case, this 
log message should show up, as the 2nd split block also existed on this Spark 
node, as just before it, I can see the following debug message:
15/08/14 14:24:11 DEBUG BlockReaderLocal: Created BlockReaderLocal for file 
/services/contact2/data/contacts/20150814004805-part-r-2.avro block 
BP-834217708-10.20.95.130-1438701195738:blk_1074484553_1099531839081 in 
datanode 10.20.95.146:5001015/08/14 14:24:11 DEBUG Project: Creating 
MutableProj: WrappedArray(), inputSchema: ArrayBuffer(account_id#0L, 
contact_id#1, sequence_id#2, state#3, name#4, kind#5, prefix_name#6, 
first_name#7, middle_name#8, company_name#9, job_title#10, source_name#11, 
source_details#12, provider_name#13, provider_details#14, created_at#15L, 
create_source#16, updated_at#17L, update_source#18, accessed_at#19L, 
deleted_at#20L, delta#21, birthday_day#22, birthday_month#23, anniversary#24L, 
contact_fields#25, related_contacts#26, contact_channels#27, 
contact_notes#28, contact_service_addresses#29, contact_street_addresses#30), 
codegen:false
This log is generated on node (10.20.95.146), and Spark created 
BlockReaderLocal to read the data from the local node.
Now my question is, can someone give me any idea why DEBUG BlockReaderLocal: 
putting FileInputStream for  doesn't show up any more in this case?
I attached the log files again in this email, and really hope I can get some 
help from this list.
Thanks
Yong
From: java8...@hotmail.com
To: user@spark.apache.org
Subject: RE: Spark Job Hangs on our production cluster
Date: Fri, 14 Aug 2015 15:14:10 -0400




I still want to check if anyone can provide any help related to the Spark 1.2.2 
will hang on our production cluster when reading Big HDFS data (7800 avro 
blocks), while looks fine for small data (769 avro blocks).
I enable the debug level in the spark log4j, and attached the log file if it 
helps to trouble shooting in this case.
Summary of our cluster:
IBM BigInsight V3.0.0.2 (running with Hadoop 2.2.0 + Hive 0.12)42 Data nodes, 
each one is running HDFS data node process + task tracker + spark workerOne 
master, running HDFS Name node + Spark masterAnother master node, running 2nd 
Name node + JobTracker
The test cases I did are 2, using very simple spark shell to read 2 folders, 
one is big data with 1T avro files; another one is small data with 160G avro 
files.
The avro files schema of 2 folders are different, but I don't think that will 
make any difference here.
The test script is like following:
import org.apache.spark.sql.SQLContextval sqlContext = new 
org.apache.spark.sql.hive.HiveContext(sc)import com.databricks.spark.avro._val 
testdata = sqlContext.avroFile(hdfs://namenode:9000/bigdata_folder)   // vs 
sqlContext.avroFile(hdfs://namenode:9000/smalldata_folder)testdata.registerTempTable(testdata)testdata.count()
Both cases are kicking off as the same following:/opt/spark/bin/spark-shell 
--jars /opt/ibm/cclib/spark-avro.jar --conf spark.ui.port=4042 
--executor-memory 24G --total-executor-cores 42 --conf 
spark.storage.memoryFraction=0.1 --conf spark.sql.shuffle.partitions=2000 
--conf spark.default.parallelism=2000
When the script point to the small data folder, the Spark can finish very fast. 
Each task of scanning the HDFS block can finish within 30 seconds or less.
When the script point to the big data folder, most of the nodes can finish scan 
the first block of HDFS within 2 mins (longer than case 1), then the scanning 
will 

[survey] [spark-ec2] What do you like/dislike about spark-ec2?

2015-08-17 Thread Nicholas Chammas
Howdy folks!

I’m interested in hearing about what people think of spark-ec2
http://spark.apache.org/docs/latest/ec2-scripts.html outside of the
formal JIRA process. Your answers will all be anonymous and public.

If the embedded form below doesn’t work for you, you can use this link to
get the same survey:

http://goo.gl/forms/erct2s6KRR

Cheers!
Nick
​


Calling hiveContext.sql(insert into table xyz...) in multiple threads?

2015-08-17 Thread unk1102
Hi I have around 2000 Hive source partitions to process and insert data into
same table and different partition. For e.g. I have the following query

hiveContext.sql(insert into table myTable
partition(mypartition=someparition) bla bla)

If I call above query in Spark driver program it runs fine and creates
corresponding partition in HDFS. Now this works but it is very slow takes
4-5 hours to process all 2000 partitions. So I though of using
ExecutorService and calling above query with couple of similar insert into
queries in Callable threads. Now using threads become definitely faster but
I dont see any parition created in HDFS is it concurrency issue since every
thread is trying to insert into same table but different patition I see
tasks are running very fast and getting finished but dont see any partition
in HDFS please guide I am new to Spark and Hive.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Calling-hiveContext-sql-insert-into-table-xyz-in-multiple-threads-tp24298.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Left outer joining big data set with small lookups

2015-08-17 Thread VIJAYAKUMAR JAWAHARLAL
Thanks for your help

I tried to cache the lookup tables and left out join with the big table (DF). 
Join does not seem to be using broadcast join-still it goes with hash partition 
join and shuffling big table. Here is the scenario


…
table1 as big_df
left outer join
table2 as lkup
on big_df.lkupid = lkup.lkupid

table1 above is well distributed across all 40 partitions because 
sqlContext.sql(SET spark.sql.shuffle.partitions=40). table2 is small, using 
just 2 partition.  s. After the join stage, sparkUI showed me that all 
activities ended up in  just 2 executors. When I tried to dump the data in hdfs 
after join stage, all data ended up in 2 partition files and rest 38 files are 
0 sized files.

Since above one did not work, I tried to broadcast DF and registered as table 
before join. 

val table2_df = sqlContext.sql(select * from table2)
val broadcast_table2 =sc.broadcast(table2_df)
broadcast_table2.value.registerTempTable(“table2”)

Broadcast is also having same issue as explained above. All data processed by 
just executors due to lookup skew.

Any more idea to tackle this issue in Spark Dataframe?

Thanks
Vijay


 On Aug 14, 2015, at 10:27 AM, Silvio Fiorito silvio.fior...@granturing.com 
 wrote:
 
 You could cache the lookup DataFrames, it’ll then do a broadcast join.
 
 
 
 
 On 8/14/15, 9:39 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote:
 
 Hi
 
 I am facing huge performance problem when I am trying to left outer join 
 very big data set (~140GB) with bunch of small lookups [Start schema type]. 
 I am using data frame  in spark sql. It looks like data is shuffled and 
 skewed when that join happens. Is there any way to improve performance of 
 such type of join in spark? 
 
 How can I hint optimizer to go with replicated join etc., to avoid shuffle? 
 Would it help to create broadcast variables on small lookups?  If I create 
 broadcast variables, how can I convert them into data frame and use them in 
 sparksql type of join?
 
 Thanks
 Vijay
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Subscribe

2015-08-17 Thread Rishitesh Mishra



Re: [survey] [spark-ec2] What do you like/dislike about spark-ec2?

2015-08-17 Thread Jerry Lam
Hi Nick,

I forgot to mention in the survey that ganglia is never installed properly
for some reasons.

I have this exception every time I launched the cluster:

Starting httpd: httpd: Syntax error on line 154 of
/etc/httpd/conf/httpd.conf: Cannot load
/etc/httpd/modules/mod_authz_core.so into server:
/etc/httpd/modules/mod_authz_core.so: cannot open shared object file: No
such file or directory

[FAILED]

Best Regards,

Jerry

On Mon, Aug 17, 2015 at 11:09 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Howdy folks!

 I’m interested in hearing about what people think of spark-ec2
 http://spark.apache.org/docs/latest/ec2-scripts.html outside of the
 formal JIRA process. Your answers will all be anonymous and public.

 If the embedded form below doesn’t work for you, you can use this link to
 get the same survey:

 http://goo.gl/forms/erct2s6KRR

 Cheers!
 Nick
 ​