Topic Modelling- LDA

2015-09-22 Thread Subshiri S
Hi,
I am experimenting with Spark LDA.
How do I create Topic Model for Prediction in Spark ?
How do I evaluate the topics modelled in Spark ?

Could you point some examples.

Regards,
Subshiri


Re: Yarn Shutting Down Spark Processing

2015-09-22 Thread Tathagata Das
Does your simple Spark batch jobs work in the same YARN setup? May be YARN
is not able to provide resources that you are asking for.

On Tue, Sep 22, 2015 at 5:49 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I have a Spark streaming job running on a cluster managed by Yarn.  The
> spark streaming job starts and receives data from Kafka.  It is processing
> well and then after several seconds I see the following error:
>
> 15/09/22 14:53:49 ERROR yarn.ApplicationMaster: SparkContext did not
> initialize after waiting for 10 ms. Please check earlier log output for
> errors. Failing the application.
> 15/09/22 14:53:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 13, (reason: Timed out waiting for SparkContext.)
>
> The spark process is then (obviously) shut down by Yarn.
>
> What do I need to change to allow Yarn to initialize Spark streaming (vs.
> batch) jobs?
>
> Thank you,
>
> Bryan Jeffrey
>


Re: Spark as standalone or with Hadoop stack.

2015-09-22 Thread Sean Owen
Might be better for another list, but, I suspect it's more than HBase
is simply much more integrated with YARN, and because it's run with
other services that are as well.

On Wed, Sep 23, 2015 at 12:02 AM, Jacek Laskowski  wrote:
> That sentence caught my attention. Could you explain the reasons for
> not running HBase on Mesos, i.e. what makes Mesos inappropriate for
> HBase?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get RDD from PairRDD in Java

2015-09-22 Thread Andy Huang
use .values() which will return an RDD of just values

On Wed, Sep 23, 2015 at 4:24 PM, Zhang, Jingyu 
wrote:

> Hi All,
>
> I want to extract the "value" RDD from PairRDD in Java
>
> Please let me know how can  I get it easily.
>
> Thanks
>
> Jingyu
>
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.




-- 
Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
f: 02 9376 0730| m: 0433221979


How to get RDD from PairRDD in Java

2015-09-22 Thread Zhang, Jingyu
Hi All,

I want to extract the "value" RDD from PairRDD in Java

Please let me know how can  I get it easily.

Thanks

Jingyu

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: Why RDDs are being dropped by Executors?

2015-09-22 Thread Tathagata Das
If the RDD is not constantly in use, then the LRU scheme in each executor
can kick out some of the partitions from memory.
If you want to avoid recomputing in such cases, you could persist with
StorageLevel.MEMORY_AND_DISK, where the partitions will dropped to disk
when kicked from memory. That will avoid recomputing, as the partitions
will be read from disk -- better than recomputing in most cases.

On Tue, Sep 22, 2015 at 4:20 AM, Uthayan Suthakar <
uthayan.sutha...@gmail.com> wrote:

>
> Hello All,
>
> We have a Spark Streaming job that reads data from DB (three tables) and
> cache them into memory ONLY at the start then it will happily carry out the
> incremental calculation with the new data. What we've noticed occasionally
> is that one of the RDDs caches only 90% of the data. Therefore, at each
> execution time the remaining 10% had to be recomputed. This operation is
> very expensive as it will need to establish a connection to DB and
> transform the data.We are allocating more than enough memories for each
> executor ( 4 Executors and each has 2GB memory). Have you come across this
> issue? Do you know what may cause this issue?
>
> Another observation:
> I started the job yesterday it cached 100% of the RDD but looking at it
> today it show 90%. what happened to that 10% of data?
>


Re: WAL on S3

2015-09-22 Thread Tathagata Das
Responses inline.


On Tue, Sep 22, 2015 at 8:35 PM, Michal Čizmazia  wrote:

> Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)?
>
> Yes. Because checkpoints are single files by itself, and does not require
flush semantics to work. So S3 is fine.



> Trying to answer this question, I looked into
> Checkpoint.getCheckpointFiles [1]. It is doing findFirstIn which would
> probably be calling the S3 LIST operation. S3 LIST is prone to eventual
> consistency [2]. What would happen when getCheckpointFiles retrieves an
> incomplete list of files to Checkpoint.read [1]?
>
> There is a non-zero chance of that happening. But in that case it will
just use an older checkpoint file to recover the DAG of DStreams. That just
means that it will recover at an earlier point in time, and undergo more
computation.



> The pluggable WAL interface allows me to work around the eventual
> consistency of S3 by storing an index of filenames in DynamoDB. However it
> seems that something similar is required for checkpoints as well.
>

How are you getting around the fact that flush does not work in S3? So
until the current WAL file is closed, the file is not readable, even if you
know the index.
This should not need for checkpoint because of the reason I mentioned above
in this mail.


>
> I am implementing a Reliable Receiver for Amazon SQS. Alternatively, is
> there something I can borrow from DirectKafkaInputDStream? After a DStream
> computes an RDD, is there a way for the DStream to tell when processing of
> that RDD has been finished and only after that delete the SQS messages.
>

You could borrow from Direct Kafka! For that to work, you should be able to
do the following.
1. Each message in SQS should have a unique identifier, using which you can
specify ranges of messages to read.

2.  You should be able to query from SQS the identifier of the latest
message, so that you can decide the range to read -- last read message to
latest message

3. There must be a way to find out identifier of the Nth record from the
current record. This is necessary for rate limiting -- if you want to read
at most 1000 message in each interval, and you have read till ID X, then
you should be able to find out (without reading the data), the ID of (X +
1000)th record. This is possible in Kafka, as offsets are continuous, but
not possible in Kinesis as sequence numbers are not continuous numbers.

I am not sure SQS satisfies these properties. If it satisfies 1 and 2, but
not 3, then you can consider looking at the Kinesis Receiver in Spark 1.5,
which still uses receivers, but keeps track of Kinesis sequence numbers in
the metadata WAL.


>
> I was also considering Amazon EFS, but it is only available in a single
> region for a preview. EBS could be an option, but it cannot be used across
> multiple Availability Zones.
>
> [1]:
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
> [2]:
> http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel
>
>
>
> On 22 September 2015 at 21:09, Tathagata Das  wrote:
>
>> You can keep the checkpoints in the Hadoop-compatible file system and the
>> WAL somewhere else using your custom WAL implementation. Yes, cleaning up
>> the stuff gets complicated as it is not as easy as deleting off the
>> checkpoint directory - you will have to clean up checkpoint directory as
>> well as the whatever other storage that your custom WAL uses. However, if I
>> remember correctly, the WAL information is used only when the Dstreams are
>> recovered correctly from checkpoints.
>>
>> Note that, there are further details here that require deeper
>> understanding. There are actually two uses of WALs in the system -
>>
>> 1. Data WAL for received data  - This is what is usually referred to as
>> the WAL everywhere. Each receiver writes to a different WAL. This deals
>> with bulk data.
>> 2. Metadata WAL - This is used by the driver to save metadata information
>> like  block to data WAL segment mapping, etc. I usually skip mentioning
>> this. This WAL is automatically used when data WAL is enabled. And this
>> deals with small data.
>>
>> If you have to get around S3's limitations, you will have to plugin both
>> WALs (see this
>> 
>> for SparkConfs, but not that we havent made these confs public). While the
>> system supports plugging them in, we havent made this information public
>> yet because of such complexities in working with it.  And we have invested
>> time in making common sources like Kafka not require WALs (e.g. Direct
>> Kafka  approach). In future, we do hope to have a better solution for
>> general receivers + WALs + S3 (personally, I really wish S3's semantics
>> improve and fixes this issue).
>>
>> Another alternative direction may be Amazon EFS. Since it based on EBS,
>> it may give the necessary se

Re: Invalid checkpoint url

2015-09-22 Thread Tathagata Das
Bingo! That is the problem. The solution is now obvious I presume :)

On Tue, Sep 22, 2015 at 9:41 PM, srungarapu vamsi 
wrote:

> @Das,
> No, i am getting in the cluster mode.
> I think i understood why i am getting this error, please correct me if i
> am wrong.
> Reason is:
> checkpointing writes rdd to disk, so this checkpointing happens on all
> workers. Whenever, spark has to read back the rdd , checkpoint directory
> should be reachable to all the workers and should be a common place where
> workers can write to and read from. This  asks for commonly accessible file
> system like nfs or hdfs or s3.
> So, if i give ssc.checkpoint("some local directory"), since workers are
> not able to read the rdds from the other worker's checkpoint directory , i
> am facing the above mentioned error.
> With this understanding, i am creating a t2 medium, hdfs 2.7.1 node and
> pointing the check point directory to "hdfs://ip:port/path/to/directory"
>
> Please correct me if i am wrong.
>
> On Wed, Sep 23, 2015 at 4:53 AM, Tathagata Das 
> wrote:
>
>> Are you getting this error in local mode?
>>
>>
>> On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi <
>> srungarapu1...@gmail.com> wrote:
>>
>>> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i
>>> don't use reduceByKeyAndWindow.
>>>
>>> When i start using "reduceByKeyAndWindow" it complains me with the error
>>> "Exception in thread "main" org.apache.spark.SparkException: Invalid
>>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$
>>> 2-97be-e3862eb5c944/rdd-8"
>>>
>>> The stack trace is as below:
>>>
>>> Exception in thread "main" org.apache.spark.SparkException: Invalid
>>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$
>>> 2-97be-e3862eb5c944/rdd-8
>>> at
>>> org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>>> at
>>> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504)
>>> at
>>> com.datastax.spark.connector.streaming.

Re: Py4j issue with Python Kafka Module

2015-09-22 Thread Tathagata Das
SPARK_CLASSPATH is I believe deprecated right now. So I am not surprised
that there is some difference in the code paths.

On Tue, Sep 22, 2015 at 9:45 PM, Saisai Shao  wrote:

> I think it is something related to class loader, the behavior is different
> for classpath and --jars. If you want to know the details I think you'd
> better dig out some source code.
>
> Thanks
> Jerry
>
> On Tue, Sep 22, 2015 at 9:10 PM, ayan guha  wrote:
>
>> I must have been gone mad :) Thanks for pointing it out. I downloaded
>> 1.5.0 assembly jar and added it in SPARK_CLASSPATH.
>>
>> However, I am getting a new error now
>>
>> >>> kvs =
>> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
>> ocalhost:9092'})
>>
>>
>> 
>> 
>>
>>   Spark Streaming's Kafka libraries not found in class path. Try one of
>> the foll
>> owing.
>>
>>   1. Include the Kafka library and its dependencies with in the
>>  spark-submit command as
>>
>>  $ bin/spark-submit --packages
>> org.apache.spark:spark-streaming-kafka:1.5.0
>> ...
>>
>>   2. Download the JAR of the artifact from Maven Central
>> http://search.maven.org
>> /,
>>  Group Id = org.apache.spark, Artifact Id =
>> spark-streaming-kafka-assembly,
>> Version = 1.5.0.
>>  Then, include the jar in the spark-submit command as
>>
>>  $ bin/spark-submit --jars  ...
>>
>>
>> 
>> 
>>
>>
>>
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>> \streaming\kafka.py", line 130, in createDirectStream
>> raise e
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o30.loadClass.
>> : java.lang.ClassNotFoundException:
>> org.apache.spark.streaming.kafka.KafkaUtilsP
>> ythonHelper
>> at java.net.URLClassLoader.findClass(Unknown Source)
>> at java.lang.ClassLoader.loadClass(Unknown Source)
>> at java.lang.ClassLoader.loadClass(Unknown Source)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
>> at java.lang.reflect.Method.invoke(Unknown Source)
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>> at
>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
>> at py4j.Gateway.invoke(Gateway.java:259)
>> at
>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>> at java.lang.Thread.run(Unknown Source)
>>
>> >>> os.environ['SPARK_CLASSPATH']
>> 'D:\\sw\\spark-streaming-kafka-assembly_2.10-1.5.0'
>> >>>
>>
>>
>> So I launched pyspark with --jars with the assembly jar. Now it is
>> working.
>>
>> THANK YOU for help.
>>
>> Curiosity:  Why adding it to SPARK CLASSPATH did not work?
>>
>> Best
>> Ayan
>>
>> On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao 
>> wrote:
>>
>>> I think you're using the wrong version of kafka assembly jar, I think
>>> Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
>>> better change to version 1.5.0, looks like you're using Spark 1.5.0, why
>>> you choose Kafka assembly 1.3.0?
>>>
>>>
>>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>>>
>>>
>>>
>>> On Tue, Sep 22, 2015 at 6:41 AM, ayan guha  wrote:
>>>
 Hi

 I have added spark assembly jar to SPARK CLASSPATH

 >>> print os.environ['SPARK_CLASSPATH']
 D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar


 Now  I am facing below issue with a test topic

 >>> ssc = StreamingContext(sc, 2)
 >>> kvs =
 KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
 ocalhost:9092'})
 Traceback (most recent call last):
   File "", line 1, in 
   File
 "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
 \streaming\kafka.py", line 126, in createDirectStream
 jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
 set(topics), jfr
 omOffsets)
   File
 "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
 j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
   File
 "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
 \sql\utils.py", line 36, in deco
 return f(*a, **kw)
   File
 "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
 j-0.8.2.1-src.zip\py4j\protocol.py", line 304, in get_return_value
 py4j.protocol.Py4JError: An error occurred while calling
 o22.createDirectStream.
  Trace:
 p

Re: Streaming Receiver Imbalance Problem

2015-09-22 Thread Tathagata Das
Also, you could switch to the Direct KAfka API which was first released as
experimental in 1.3. In 1.5 we graduated it from experimental, but its
quite usable in Spark 1.3.1

TD

On Tue, Sep 22, 2015 at 7:45 PM, SLiZn Liu  wrote:

> Cool, we are still sticking with 1.3.1, will upgrade to 1.5 ASAP. Thanks
> for the tips, Tathagata!
>
> On Wed, Sep 23, 2015 at 10:40 AM Tathagata Das 
> wrote:
>
>> A lot of these imbalances were solved in spark 1.5. Could you give that a
>> spin?
>>
>> https://issues.apache.org/jira/browse/SPARK-8882
>>
>> On Tue, Sep 22, 2015 at 12:17 AM, SLiZn Liu 
>> wrote:
>>
>>> Hi spark users,
>>>
>>> In our Spark Streaming app via Kafka integration on Mesos, we initialed
>>> 3 receivers to receive 3 Kafka partitions, whereas records receiving rate
>>> imbalance been observed, with spark.streaming.receiver.maxRate is set
>>> to 120, sometimes 1 of which receives very close to the limit while the
>>> other two only at roughly fifty per second.
>>>
>>> This may be caused by previous receiver failure, where one of the
>>> receivers’ receiving rate drop to 0. We restarted the Spark Streaming app,
>>> and the imbalance began. We suspect that the partition which received by
>>> the failing receiver got jammed, and the other two receivers cannot take up
>>> its data.
>>>
>>> The 3-nodes cluster tends to run slowly, nearly all the tasks is
>>> registered at the node with previous receiver failure(I used unionto
>>> combine 3 receivers’ DStream, thus I expect the combined DStream is
>>> well distributed across all nodes), cannot guarantee to finish one batch in
>>> a single batch time, stages get piled up, and the digested log shows as
>>> following:
>>>
>>> ...
>>> 5728.399: [GC (Allocation Failure) [PSYoungGen: 6954678K->17088K(6961152K)] 
>>> 7074614K->138108K(20942336K), 0.0203877 secs] [Times: user=0.20 sys=0.00, 
>>> real=0.02 secs]
>>>
>>> ...
>>> 5/09/22 13:33:35 ERROR TaskSchedulerImpl: Ignoring update with state 
>>> FINISHED for TID 77219 because its task set is gone (this is likely the 
>>> result of
>>> receiving duplicate task finished status updates)
>>>
>>> ...
>>>
>>> the two type of log was printed in execution of some (not all) stages.
>>>
>>> My configurations:
>>> # of cores on each node: 64
>>> # of nodes: 3
>>> batch time is set to 10 seconds
>>>
>>> spark.streaming.receiver.maxRate120
>>> spark.streaming.blockInterval   160  // set to the value that 
>>> divides 10 seconds approx. to  total cores, which is 64, to max out all the 
>>> nodes: 10s * 1000 / 64
>>> spark.storage.memoryFraction0.1  // this one doesn't seem to 
>>> work, since the young gen / old gen ratio is nearly 0.3 instead of 0.1
>>>
>>> anyone got an idea? Appreciate for your patience.
>>>
>>> BR,
>>> Todd Leo
>>> ​
>>>
>>
>>


How to make Group By/reduceByKey more efficient?

2015-09-22 Thread swetha
Hi,

How to make Group By more efficient? Is it recommended to use a custom
partitioner and then do a Group By? And can we use a custom partitioner and
then use a  reduceByKey for optimization?


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-Group-By-reduceByKey-more-efficient-tp24780.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Partitions on RDDs

2015-09-22 Thread Yashwanth Kumar
HI,
In the first rdd transformation (eg: reading from a file
sc.textfile("path",partition)), the partition you specify will be applied to
all further transformations and actions from this rdd.

In few places repartitioning your rdd will give a added advantage.
Repartition is usually done during actions stage.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-on-RDDs-tp24775p24779.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkR vs R

2015-09-22 Thread Yashwanth Kumar
Hi,

1. The main difference between SparkR and R is that "SparkR" can handle
bigdata.

Yes, you can use other core libraries inside SparkR(not algos like
lm(),glm(),kmean())

2.Yes, core R libraries will not be distributed. You can use function from
these libraries which are applicabe for mapper kind of thing. funnctions
which can be applied on each line individually.


3. SparkR is an wrapper for an underlying scala code, Whereas for R it is
not.
R gives you complete flexibility to do any machine learning u want, While
SparkR is still in developing stage.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-vs-R-tp24772p24778.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-avro takes a lot time to load thousands of files

2015-09-22 Thread Deenar Toraskar
Daniel

Can you elaborate why are you using a broadcast variable to concatenate
many Avro files into a single ORC file. Look at wholetextfiles on Spark
context.

SparkContext.wholeTextFiles lets you read a directory containing multiple
small text files, and returns each of them as (filename, content) pairs.
This is in contrast with textFile, which would return one record per line
in each file.
​
You can then process this RDD in parallel over the cluster, convert to a
dataframe and save as a ORC file.

Regards
Deenar


JdbcRDD Constructor

2015-09-22 Thread satish chandra j
HI All,

JdbcRDD constructor has following parameters,

*JdbcRDD
*(SparkContext

sc,
scala.Function0 getConnection, String sql, *long
lowerBound,
long upperBound, int numPartitions*, scala.Function1https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/rdd/JdbcRDD.html>>
mapRow,
scala.reflect.ClassTaghttps://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/rdd/JdbcRDD.html>
> evidence$1)

where the below parameters *lowerBound* refers to Lower boundary of
entire data, *upperBound *refers to Upper boundary of entire data and
*numPartitions
*refer to Number of partitions

Source table to which JbdcRDD is fetching data from Oracle DB has more than
500 records but its confusing when I tried several executions by changing
"numPartitions" parameter

LowerBound,UpperBound,numPartitions: Output Count

0 ,100  ,1   : 100

0 ,100  ,2   : 151

0 ,100  ,3   : 201


Please help me in understanding the why Output count is 151 if
numPartitions is 2 and Output count is 201 if numPartitions is 3

Regards,

Satish Chandra


Re: Creating BlockMatrix with java API

2015-09-22 Thread Pulasthi Supun Wickramasinghe
Hi Sabarish

Thanks, that would indeed solve my problem

Best Regards,
Pulasthi

On Wed, Sep 23, 2015 at 12:55 AM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> Hi Pulasthi
>
> You can always use JavaRDD.rdd() to get the scala rdd. So in your case,
>
> new BlockMatrix(rdd.rdd(), 2, 2)
>
> should work.
>
> Regards
> Sab
>
> On Tue, Sep 22, 2015 at 10:50 PM, Pulasthi Supun Wickramasinghe <
> pulasthi...@gmail.com> wrote:
>
>> Hi Yanbo,
>>
>> Thanks for the reply. I thought i might be missing something. Anyway i
>> moved to using scala since it is the complete API.
>>
>> Best Regards,
>> Pulasthi
>>
>> On Tue, Sep 22, 2015 at 7:03 AM, Yanbo Liang  wrote:
>>
>>> This is due to the distributed matrices like 
>>> BlockMatrix/RowMatrix/IndexedRowMatrix/CoordinateMatrix do
>>> not provide Java friendly constructors. I have file a SPARK-10757
>>>  to track this issue.
>>>
>>> 2015-09-18 3:36 GMT+08:00 Pulasthi Supun Wickramasinghe <
>>> pulasthi...@gmail.com>:
>>>
 Hi All,

 I am new to Spark and i am trying to do some BlockMatrix operations
 with the Mllib API's. But i can't seem to create a BlockMatrix with the
 java API. I tried the following

 Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
 List,Matrix>> list = new 
 ArrayList, Matrix>>();
 Tuple2 intTuple = new Tuple2(0,0);
 Tuple2,Matrix> tuple2MatrixTuple2 = new 
 Tuple2, Matrix>(intTuple,matrixa );
 list.add(tuple2MatrixTuple2);
 JavaRDD, Matrix>> rdd = 
 sc.parallelize(list);

 BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);


 but since BlockMatrix only
 takes 
 "RDD,Matrix>>"
 this code does not work. sc.parallelize() returns a JavaRDD so the two
 are not compatible. I also couldn't find any code samples for this. Any
 help on this would be highly appreciated.

 Best Regards,
 Pulasthi
 --
 Pulasthi S. Wickramasinghe
 Graduate Student  | Research Assistant
 School of Informatics and Computing | Digital Science Center
 Indiana University, Bloomington
 cell: 224-386-9035

>>>
>>>
>>
>>
>> --
>> Pulasthi S. Wickramasinghe
>> Graduate Student  | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> cell: 224-386-9035
>>
>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>



-- 
Pulasthi S. Wickramasinghe
Graduate Student  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035


Re: Creating BlockMatrix with java API

2015-09-22 Thread Sabarish Sasidharan
Hi Pulasthi

You can always use JavaRDD.rdd() to get the scala rdd. So in your case,

new BlockMatrix(rdd.rdd(), 2, 2)

should work.

Regards
Sab

On Tue, Sep 22, 2015 at 10:50 PM, Pulasthi Supun Wickramasinghe <
pulasthi...@gmail.com> wrote:

> Hi Yanbo,
>
> Thanks for the reply. I thought i might be missing something. Anyway i
> moved to using scala since it is the complete API.
>
> Best Regards,
> Pulasthi
>
> On Tue, Sep 22, 2015 at 7:03 AM, Yanbo Liang  wrote:
>
>> This is due to the distributed matrices like 
>> BlockMatrix/RowMatrix/IndexedRowMatrix/CoordinateMatrix do
>> not provide Java friendly constructors. I have file a SPARK-10757
>>  to track this issue.
>>
>> 2015-09-18 3:36 GMT+08:00 Pulasthi Supun Wickramasinghe <
>> pulasthi...@gmail.com>:
>>
>>> Hi All,
>>>
>>> I am new to Spark and i am trying to do some BlockMatrix operations with
>>> the Mllib API's. But i can't seem to create a BlockMatrix with the java
>>> API. I tried the following
>>>
>>> Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
>>> List,Matrix>> list = new 
>>> ArrayList, Matrix>>();
>>> Tuple2 intTuple = new Tuple2(0,0);
>>> Tuple2,Matrix> tuple2MatrixTuple2 = new 
>>> Tuple2, Matrix>(intTuple,matrixa );
>>> list.add(tuple2MatrixTuple2);
>>> JavaRDD, Matrix>> rdd = 
>>> sc.parallelize(list);
>>>
>>> BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);
>>>
>>>
>>> but since BlockMatrix only
>>> takes 
>>> "RDD,Matrix>>"
>>> this code does not work. sc.parallelize() returns a JavaRDD so the two
>>> are not compatible. I also couldn't find any code samples for this. Any
>>> help on this would be highly appreciated.
>>>
>>> Best Regards,
>>> Pulasthi
>>> --
>>> Pulasthi S. Wickramasinghe
>>> Graduate Student  | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> cell: 224-386-9035
>>>
>>
>>
>
>
> --
> Pulasthi S. Wickramasinghe
> Graduate Student  | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> cell: 224-386-9035
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: Apache Spark job in local[*] is slower than regular 1-thread Python program

2015-09-22 Thread Jonathan Coveney
It's highly conceivable to be able to beat spark in performance on tiny
data sets like this. That's not really what it has been optimized for.

El martes, 22 de septiembre de 2015, juljoin 
escribió:

> Hello,
>
> I am trying to figure Spark out and I still have some problems with its
> speed, I can't figure them out. In short, I wrote two programs that loop
> through a 3.8Gb file and filter each line depending of if a certain word is
> present.
>
> I wrote a one-thread python program doing the job and I obtain:
> - for the 3.8Gb file:
> / lines found: 82100
>  in: *10.54 seconds*/
>  - no filter, just looping through the file:
> / in: 01.65 seconds/
>
> The Spark app doing the same and executed on 8 threads gives:
>  - for the 3.8Gb file:
> / lines found: 82100
>  in: *18.27 seconds*/
>  - for a 38Mb file:
> /lines found: 821
> in: 2.53 seconds/
>
> I must do something wrong to obtain a result twice as slow on the 8 threads
> than on 1 thread.
>
> 1. First, I thought it might be because of the setting-up cost of Spark.
> But
> for smaller files it only takes 2 seconds which makes this option
> improbable.
> 2. Looping through the file takes up 1.65 seconds (thank you SSD ^_^ ),
> processing takes up the other 9seconds (for the python app).
> -> This is why I thought splitting it up on the different processes will
> definitely speed it up.
>
> Note: Increasing the number of threads in Spark improves the speed (from 57
> seconds with 1 thread to 18 seconds with 8 threads). But still, there is a
> big difference in performance between simple python and Spark, it must be
> my
> doing!
>
> Can someone point me out on what I am doing wrong? That would be greatly
> appreciated :) I am new with all this big data stuff.
>
>
>
> *Here is the code for the Spark app:*
>
>
>
>
>
> *And the python code:*
>
>
>
> Thank you for reading up to this point :)
>
> Have a nice day!
>
>
> - Julien
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-job-in-local-is-slower-than-regular-1-thread-Python-program-tp24771.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> For additional commands, e-mail: user-h...@spark.apache.org 
>
>


Re: Py4j issue with Python Kafka Module

2015-09-22 Thread Saisai Shao
I think it is something related to class loader, the behavior is different
for classpath and --jars. If you want to know the details I think you'd
better dig out some source code.

Thanks
Jerry

On Tue, Sep 22, 2015 at 9:10 PM, ayan guha  wrote:

> I must have been gone mad :) Thanks for pointing it out. I downloaded
> 1.5.0 assembly jar and added it in SPARK_CLASSPATH.
>
> However, I am getting a new error now
>
> >>> kvs =
> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
> ocalhost:9092'})
>
>
> 
> 
>
>   Spark Streaming's Kafka libraries not found in class path. Try one of
> the foll
> owing.
>
>   1. Include the Kafka library and its dependencies with in the
>  spark-submit command as
>
>  $ bin/spark-submit --packages
> org.apache.spark:spark-streaming-kafka:1.5.0
> ...
>
>   2. Download the JAR of the artifact from Maven Central
> http://search.maven.org
> /,
>  Group Id = org.apache.spark, Artifact Id =
> spark-streaming-kafka-assembly,
> Version = 1.5.0.
>  Then, include the jar in the spark-submit command as
>
>  $ bin/spark-submit --jars  ...
>
>
> 
> 
>
>
>
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
> \streaming\kafka.py", line 130, in createDirectStream
> raise e
> py4j.protocol.Py4JJavaError: An error occurred while calling o30.loadClass.
> : java.lang.ClassNotFoundException:
> org.apache.spark.streaming.kafka.KafkaUtilsP
> ythonHelper
> at java.net.URLClassLoader.findClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
> at java.lang.reflect.Method.invoke(Unknown Source)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Unknown Source)
>
> >>> os.environ['SPARK_CLASSPATH']
> 'D:\\sw\\spark-streaming-kafka-assembly_2.10-1.5.0'
> >>>
>
>
> So I launched pyspark with --jars with the assembly jar. Now it is
> working.
>
> THANK YOU for help.
>
> Curiosity:  Why adding it to SPARK CLASSPATH did not work?
>
> Best
> Ayan
>
> On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao 
> wrote:
>
>> I think you're using the wrong version of kafka assembly jar, I think
>> Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
>> better change to version 1.5.0, looks like you're using Spark 1.5.0, why
>> you choose Kafka assembly 1.3.0?
>>
>>
>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>>
>>
>>
>> On Tue, Sep 22, 2015 at 6:41 AM, ayan guha  wrote:
>>
>>> Hi
>>>
>>> I have added spark assembly jar to SPARK CLASSPATH
>>>
>>> >>> print os.environ['SPARK_CLASSPATH']
>>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>>>
>>>
>>> Now  I am facing below issue with a test topic
>>>
>>> >>> ssc = StreamingContext(sc, 2)
>>> >>> kvs =
>>> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
>>> ocalhost:9092'})
>>> Traceback (most recent call last):
>>>   File "", line 1, in 
>>>   File
>>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>>> \streaming\kafka.py", line 126, in createDirectStream
>>> jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
>>> set(topics), jfr
>>> omOffsets)
>>>   File
>>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>>> j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
>>>   File
>>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>>> \sql\utils.py", line 36, in deco
>>> return f(*a, **kw)
>>>   File
>>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>>> j-0.8.2.1-src.zip\py4j\protocol.py", line 304, in get_return_value
>>> py4j.protocol.Py4JError: An error occurred while calling
>>> o22.createDirectStream.
>>>  Trace:
>>> py4j.Py4JException: Method createDirectStream([class
>>> org.apache.spark.streaming.
>>> api.java.JavaStreamingContext, class java.util.HashMap, class
>>> java.util.HashSet,
>>>  class java.util.HashMap]) does not exist
>>> at
>>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>>>
>>> at
>

Re: Invalid checkpoint url

2015-09-22 Thread srungarapu vamsi
@Das,
No, i am getting in the cluster mode.
I think i understood why i am getting this error, please correct me if i am
wrong.
Reason is:
checkpointing writes rdd to disk, so this checkpointing happens on all
workers. Whenever, spark has to read back the rdd , checkpoint directory
should be reachable to all the workers and should be a common place where
workers can write to and read from. This  asks for commonly accessible file
system like nfs or hdfs or s3.
So, if i give ssc.checkpoint("some local directory"), since workers are not
able to read the rdds from the other worker's checkpoint directory , i am
facing the above mentioned error.
With this understanding, i am creating a t2 medium, hdfs 2.7.1 node and
pointing the check point directory to "hdfs://ip:port/path/to/directory"

Please correct me if i am wrong.

On Wed, Sep 23, 2015 at 4:53 AM, Tathagata Das  wrote:

> Are you getting this error in local mode?
>
>
> On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi <
> srungarapu1...@gmail.com> wrote:
>
>> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i
>> don't use reduceByKeyAndWindow.
>>
>> When i start using "reduceByKeyAndWindow" it complains me with the error
>> "Exception in thread "main" org.apache.spark.SparkException: Invalid
>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$
>> 2-97be-e3862eb5c944/rdd-8"
>>
>> The stack trace is as below:
>>
>> Exception in thread "main" org.apache.spark.SparkException: Invalid
>> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$
>> 2-97be-e3862eb5c944/rdd-8
>> at
>> org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
>> at
>> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504)
>> at
>> com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33)
>> at
>> com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33)
>> at
>> org.apache.spark.streami

Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-22 Thread Andy Huang
Alternatively, I would suggest you looking at programmatically building the
schema

refer to
http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema

Cheers
Andy

On Wed, Sep 23, 2015 at 2:07 PM, Ted Yu  wrote:

> Can you switch to 2.11 ?
>
> The following has been fixed in 2.11:
> https://issues.scala-lang.org/browse/SI-7296
>
> Otherwise consider packaging related values into a case class of their own.
>
> On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j <
> jsatishchan...@gmail.com> wrote:
>
>> HI All,
>> Do we have any alternative solutions in Scala to avoid limitation in
>> defining a Case Class having more than 22 arguments
>>
>> We are using Scala version 2.10.2, currently I need to define a case
>> class with 37 arguments but getting an error as "*error: Implementation
>> restriction: case classes cannot have more than 22 parameters.*"
>>
>> It would be a great help if any inputs on the same
>>
>> Regards,
>> Satish Chandra
>>
>>
>>
>


-- 
Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
f: 02 9376 0730| m: 0433221979


Re: Py4j issue with Python Kafka Module

2015-09-22 Thread ayan guha
I must have been gone mad :) Thanks for pointing it out. I downloaded 1.5.0
assembly jar and added it in SPARK_CLASSPATH.

However, I am getting a new error now

>>> kvs =
KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
ocalhost:9092'})




  Spark Streaming's Kafka libraries not found in class path. Try one of the
foll
owing.

  1. Include the Kafka library and its dependencies with in the
 spark-submit command as

 $ bin/spark-submit --packages
org.apache.spark:spark-streaming-kafka:1.5.0
...

  2. Download the JAR of the artifact from Maven Central
http://search.maven.org
/,
 Group Id = org.apache.spark, Artifact Id =
spark-streaming-kafka-assembly,
Version = 1.5.0.
 Then, include the jar in the spark-submit command as

 $ bin/spark-submit --jars  ...






Traceback (most recent call last):
  File "", line 1, in 
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
\streaming\kafka.py", line 130, in createDirectStream
raise e
py4j.protocol.Py4JJavaError: An error occurred while calling o30.loadClass.
: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaUtilsP
ythonHelper
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Unknown Source)

>>> os.environ['SPARK_CLASSPATH']
'D:\\sw\\spark-streaming-kafka-assembly_2.10-1.5.0'
>>>


So I launched pyspark with --jars with the assembly jar. Now it is working.

THANK YOU for help.

Curiosity:  Why adding it to SPARK CLASSPATH did not work?

Best
Ayan

On Wed, Sep 23, 2015 at 2:25 AM, Saisai Shao  wrote:

> I think you're using the wrong version of kafka assembly jar, I think
> Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
> better change to version 1.5.0, looks like you're using Spark 1.5.0, why
> you choose Kafka assembly 1.3.0?
>
>
> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>
>
>
> On Tue, Sep 22, 2015 at 6:41 AM, ayan guha  wrote:
>
>> Hi
>>
>> I have added spark assembly jar to SPARK CLASSPATH
>>
>> >>> print os.environ['SPARK_CLASSPATH']
>> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>>
>>
>> Now  I am facing below issue with a test topic
>>
>> >>> ssc = StreamingContext(sc, 2)
>> >>> kvs =
>> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
>> ocalhost:9092'})
>> Traceback (most recent call last):
>>   File "", line 1, in 
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>> \streaming\kafka.py", line 126, in createDirectStream
>> jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
>> set(topics), jfr
>> omOffsets)
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>> j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
>> \sql\utils.py", line 36, in deco
>> return f(*a, **kw)
>>   File
>> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
>> j-0.8.2.1-src.zip\py4j\protocol.py", line 304, in get_return_value
>> py4j.protocol.Py4JError: An error occurred while calling
>> o22.createDirectStream.
>>  Trace:
>> py4j.Py4JException: Method createDirectStream([class
>> org.apache.spark.streaming.
>> api.java.JavaStreamingContext, class java.util.HashMap, class
>> java.util.HashSet,
>>  class java.util.HashMap]) does not exist
>> at
>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>>
>> at
>> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
>>
>> at py4j.Gateway.invoke(Gateway.java:252)
>> at
>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>> at java.lang.Thread.run(Unknown Source)
>>
>>
>> >>>
>>
>> Am I doing something wrong?
>>
>

Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-09-22 Thread Ted Yu
Can you switch to 2.11 ?

The following has been fixed in 2.11:
https://issues.scala-lang.org/browse/SI-7296

Otherwise consider packaging related values into a case class of their own.

On Tue, Sep 22, 2015 at 8:48 PM, satish chandra j 
wrote:

> HI All,
> Do we have any alternative solutions in Scala to avoid limitation in
> defining a Case Class having more than 22 arguments
>
> We are using Scala version 2.10.2, currently I need to define a case class
> with 37 arguments but getting an error as "*error: Implementation
> restriction: case classes cannot have more than 22 parameters.*"
>
> It would be a great help if any inputs on the same
>
> Regards,
> Satish Chandra
>
>
>


Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhiliang Zhu
Hi Zhan,
I really appreciate your help, I will do as that next.And on the local machine, 
no hadoop/spark needs to be installed, but only copied with the 
/etc/hadoop/conf... whether the information (for example IP, hostname etc) of 
local machine 
would be set in the conf files...

Moreover, do you have any exprience to submit hadoop/spark job by way of java 
program deployed on thegateway node, but not by way of hadoop/spark command...
Thank you very much~Best Regards,Zhiliang


 


 On Wednesday, September 23, 2015 11:30 AM, Zhan Zhang 
 wrote:
   

 Hi Zhiliang,
I cannot find a specific doc. But as far as I remember, you can log in one of 
your cluster machine, and find the hadoop configuration location, for example 
/etc/hadoop/conf, copy that directory to your local machine. Typically it has 
hdfs-site.xml, yarn-site.xml etc. In spark, the former is used to access hdfs, 
and the latter is used to launch application on top of yarn.
Then in the spark-env.sh, you add export HADOOP_CONF_DIR=/etc/hadoop/conf. 
Thanks.
Zhan Zhang

On Sep 22, 2015, at 8:14 PM, Zhiliang Zhu  wrote:

Hi Zhan,
Yes, I get it now. 
I have not ever deployed hadoop configuration locally, and do not find the 
specific doc, would you help provide the doc to do that...
Thank you,Zhiliang

On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang  
wrote:


There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.
http://spark.apache.org/docs/latest/running-on-yarn.html
Thanks.
Zhan Zhang
On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu  wrote:

Hi Zhan,
Thanks very much for your help comment.I also view it would be similar to 
hadoop job submit, however, I was not deciding whether it is like that whenit 
comes to spark.  
Have you ever tried that for spark...Would you give me the deployment doc for 
hadoop and spark gateway, since this is the first time for meto do that, I do 
not find the specific doc for it.

Best Regards,Zhiliang




On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang  
wrote:


It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
configuration.
Thanks
Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu  wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,however, I would like to submit the 
job from another machine which does not belong to the cluster.I know for this, 
hadoop job could be done by way of another machine which is installed hadoop 
gateway which is usedto connect the cluster.
Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...
Thank you very much~~Zhiliang 













  

Scala Limitation - Case Class definition with more than 22 arguments

2015-09-22 Thread satish chandra j
HI All,
Do we have any alternative solutions in Scala to avoid limitation in
defining a Case Class having more than 22 arguments

We are using Scala version 2.10.2, currently I need to define a case class
with 37 arguments but getting an error as "*error: Implementation
restriction: case classes cannot have more than 22 parameters.*"

It would be a great help if any inputs on the same

Regards,
Satish Chandra


RE: Why is 1 executor overworked and other sit idle?

2015-09-22 Thread Chirag Dewan
Thanks Ted and Rich.

So if I repartition my RDD programmatically and call coalesce on the RDD to 1 
partition would that generate 1 output file?

Ahh.. Is my coalesce operation causing 1 partition, hence 1 output file and 1 
executor working on all the data?

To summarize this is what I do :-


1)  Create a Cassandra RDD

2)  Cache this RDD

3)  Map it to CSV

4)  Coalesce(because I need a single output file)

5)  Write to file on local file system

This makes sense.

Thanks,

Chirag


From: Richard Eggert [mailto:richard.egg...@gmail.com]
Sent: Wednesday, September 23, 2015 5:39 AM
To: Ted Yu
Cc: User; Chirag Dewan
Subject: Re: Why is 1 executor overworked and other sit idle?


If there's only one partition, by definition it will only be handled by one 
executor. Repartition to divide the work up. Note that this will also result in 
multiple output files,  however. If you absolutely need them to be combined 
into a single file,  I suggest using the Unix/Linux 'cat' command to 
concatenate the files afterwards.

Rich
On Sep 22, 2015 9:20 AM, "Ted Yu" 
mailto:yuzhih...@gmail.com>> wrote:
Have you tried using repartition to spread the load ?

Cheers

On Sep 22, 2015, at 4:22 AM, Chirag Dewan 
mailto:chirag.de...@ericsson.com>> wrote:
Hi,

I am using Spark to access around 300m rows in Cassandra.

My job is pretty simple as I am just mapping my row into a CSV format and 
saving it as a text file.


public String call(CassandraRow row)

throws Exception {
StringBuilder 
sb = new StringBuilder();

sb.append(row.getString(10));
sb.append(",");

sb.append(row.getString(11));
sb.append(",");

sb.append(row.getString(8));
sb.append(",");

sb.append(row.getString(7));

return 
sb.toString();
}

My map methods looks like this.

I am having a 3 node cluster. I observe that driver starts on Node A. And 
executors are spawned on all 3 nodes. But the executor of Node B or C are doing 
all the tasks. It starts a saveasTextFile job with 1 output partition and 
stores the RDDs in memory and also commits the file on local file system.

This executor is using a lot of system memory and CPU while others are sitting 
idle.

Am I doing something wrong? Is my RDD correctly partitioned?

Thanks in advance.


Chirag


Re: SparkR for accumulo

2015-09-22 Thread madhvi.gupta

Hi Rui,

Cant we use the accumulo data RDD created from JAVA in spark, in sparkR?

Thanks and Regards
Madhvi Gupta

On Tuesday 22 September 2015 04:42 PM, Sun, Rui wrote:

I am afraid that there is no support for accumulo in SparkR now, because:

1. It seems that there is no data source support for accumulo, so we can't 
create SparkR dataframe on accumulo
2. It is possible to create RDD from accumulo via AccumuloInputFormat in Scala. 
But unfortunately, SparkR does not support creating RDD from Hadoop files other 
than text file.

-Original Message-
From: madhvi.gupta [mailto:madhvi.gu...@orkash.com]
Sent: Tuesday, September 22, 2015 6:25 PM
To: user
Subject: SparkR for accumulo

Hi,

I want to process accumulo data in R through sparkR.Can anyone help me and let 
me know how to get accumulo data in spark to be used in R?

--
Thanks and Regards
Madhvi Gupta


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: WAL on S3

2015-09-22 Thread Michal Čizmazia
Can checkpoints be stored to S3 (via S3/S3A Hadoop URL)?

Trying to answer this question, I looked into Checkpoint.getCheckpointFiles
[1]. It is doing findFirstIn which would probably be calling the S3 LIST
operation. S3 LIST is prone to eventual consistency [2]. What would happen
when getCheckpointFiles retrieves an incomplete list of files to
Checkpoint.read [1]?

The pluggable WAL interface allows me to work around the eventual
consistency of S3 by storing an index of filenames in DynamoDB. However it
seems that something similar is required for checkpoints as well.

I am implementing a Reliable Receiver for Amazon SQS. Alternatively, is
there something I can borrow from DirectKafkaInputDStream? After a DStream
computes an RDD, is there a way for the DStream to tell when processing of
that RDD has been finished and only after that delete the SQS messages.

I was also considering Amazon EFS, but it is only available in a single
region for a preview. EBS could be an option, but it cannot be used across
multiple Availability Zones.

[1]:
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
[2]:
http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel



On 22 September 2015 at 21:09, Tathagata Das  wrote:

> You can keep the checkpoints in the Hadoop-compatible file system and the
> WAL somewhere else using your custom WAL implementation. Yes, cleaning up
> the stuff gets complicated as it is not as easy as deleting off the
> checkpoint directory - you will have to clean up checkpoint directory as
> well as the whatever other storage that your custom WAL uses. However, if I
> remember correctly, the WAL information is used only when the Dstreams are
> recovered correctly from checkpoints.
>
> Note that, there are further details here that require deeper
> understanding. There are actually two uses of WALs in the system -
>
> 1. Data WAL for received data  - This is what is usually referred to as
> the WAL everywhere. Each receiver writes to a different WAL. This deals
> with bulk data.
> 2. Metadata WAL - This is used by the driver to save metadata information
> like  block to data WAL segment mapping, etc. I usually skip mentioning
> this. This WAL is automatically used when data WAL is enabled. And this
> deals with small data.
>
> If you have to get around S3's limitations, you will have to plugin both
> WALs (see this
> 
> for SparkConfs, but not that we havent made these confs public). While the
> system supports plugging them in, we havent made this information public
> yet because of such complexities in working with it.  And we have invested
> time in making common sources like Kafka not require WALs (e.g. Direct
> Kafka  approach). In future, we do hope to have a better solution for
> general receivers + WALs + S3 (personally, I really wish S3's semantics
> improve and fixes this issue).
>
> Another alternative direction may be Amazon EFS. Since it based on EBS, it
> may give the necessary semantics. But I havent given that a spin, so its
> uncharted territory :)
>
> TD
>
>
> On Tue, Sep 22, 2015 at 5:15 PM, Michal Čizmazia 
> wrote:
>
>> My understanding of pluggable WAL was that it eliminates the need for
>> having a Hadoop-compatible file system [1].
>>
>> What is the use of pluggable WAL when it can be only used together with
>> checkpointing which still requires a Hadoop-compatible file system?
>>
>> [1]: https://issues.apache.org/jira/browse/SPARK-7056
>>
>>
>>
>> On 22 September 2015 at 19:57, Tathagata Das > > wrote:
>>
>>> 1. Currently, the WAL can be used only with checkpointing turned on,
>>> because it does not make sense to recover from WAL if there is not
>>> checkpoint information to recover from.
>>>
>>> 2. Since the current implementation saves the WAL in the checkpoint
>>> directory, they share the fate -- if checkpoint directory is deleted, then
>>> both checkpoint info and WAL info is deleted.
>>>
>>> 3. Checkpointing is currently not pluggable. Why do do you want that?
>>>
>>>
>>>
>>> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia 
>>> wrote:
>>>
 I am trying to use pluggable WAL, but it can be used only with
 checkpointing turned on. Thus I still need have a Hadoop-compatible file
 system.

 Is there something like pluggable checkpointing?

 Or can WAL be used without checkpointing? What happens when WAL is
 available but the checkpoint directory is lost?

 Thanks!


 On 18 September 2015 at 05:47, Tathagata Das 
 wrote:

> I dont think it would work with multipart upload either. The file is
> not visible until the multipart download is explicitly closed. So even if
> each write a part upload, all the parts are not visible until the multiple
> download is closed.
>
> TD
>
> O

Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhan Zhang
Hi Zhiliang,

I cannot find a specific doc. But as far as I remember, you can log in one of 
your cluster machine, and find the hadoop configuration location, for example 
/etc/hadoop/conf, copy that directory to your local machine.
Typically it has hdfs-site.xml, yarn-site.xml etc. In spark, the former is used 
to access hdfs, and the latter is used to launch application on top of yarn.

Then in the spark-env.sh, you add export HADOOP_CONF_DIR=/etc/hadoop/conf.

Thanks.

Zhan Zhang


On Sep 22, 2015, at 8:14 PM, Zhiliang Zhu 
mailto:zchl.j...@yahoo.com>> wrote:

Hi Zhan,

Yes, I get it now.
I have not ever deployed hadoop configuration locally, and do not find the 
specific doc, would you help provide the doc to do that...

Thank you,
Zhiliang

On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang 
mailto:zzh...@hortonworks.com>> wrote:


There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.

http://spark.apache.org/docs/latest/running-on-yarn.html

Thanks.

Zhan Zhang

On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu 
mailto:zchl.j...@yahoo.com>> wrote:

Hi Zhan,

Thanks very much for your help comment.
I also view it would be similar to hadoop job submit, however, I was not 
deciding whether it is like that when
it comes to spark.

Have you ever tried that for spark...
Would you give me the deployment doc for hadoop and spark gateway, since this 
is the first time for me
to do that, I do not find the specific doc for it.

Best Regards,
Zhiliang





On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang 
mailto:zzh...@hortonworks.com>> wrote:


It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
configuration.

Thanks

Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu 
mailto:zchl.j...@yahoo.com.INVALID>> wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,
however, I would like to submit the job from another machine which does not 
belong to the cluster.
I know for this, hadoop job could be done by way of another machine which is 
installed hadoop gateway which is used
to connect the cluster.

Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...

Thank you very much~~
Zhiliang










Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhiliang Zhu
Hi Zhan,
Yes, I get it now. 
I have not ever deployed hadoop configuration locally, and do not find the 
specific doc, would you help provide the doc to do that...
Thank you,Zhiliang

 On Wednesday, September 23, 2015 11:08 AM, Zhan Zhang 
 wrote:
   

 There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.
http://spark.apache.org/docs/latest/running-on-yarn.html
Thanks.
Zhan Zhang
On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu  wrote:

Hi Zhan,
Thanks very much for your help comment.I also view it would be similar to 
hadoop job submit, however, I was not deciding whether it is like that whenit 
comes to spark.  
Have you ever tried that for spark...Would you give me the deployment doc for 
hadoop and spark gateway, since this is the first time for meto do that, I do 
not find the specific doc for it.

Best Regards,Zhiliang




On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang  
wrote:


It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
configuration.
Thanks
Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu  wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,however, I would like to submit the 
job from another machine which does not belong to the cluster.I know for this, 
hadoop job could be done by way of another machine which is installed hadoop 
gateway which is usedto connect the cluster.
Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...
Thank you very much~~Zhiliang 









  

Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhan Zhang
There is no difference between running the client in or out of the client 
(assuming there is no firewall or network connectivity issue), as long as you 
have hadoop configuration locally.  Here is the doc for running on yarn.

http://spark.apache.org/docs/latest/running-on-yarn.html

Thanks.

Zhan Zhang

On Sep 22, 2015, at 7:49 PM, Zhiliang Zhu 
mailto:zchl.j...@yahoo.com>> wrote:

Hi Zhan,

Thanks very much for your help comment.
I also view it would be similar to hadoop job submit, however, I was not 
deciding whether it is like that when
it comes to spark.

Have you ever tried that for spark...
Would you give me the deployment doc for hadoop and spark gateway, since this 
is the first time for me
to do that, I do not find the specific doc for it.

Best Regards,
Zhiliang





On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang 
mailto:zzh...@hortonworks.com>> wrote:


It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
configuration.

Thanks

Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu 
mailto:zchl.j...@yahoo.com.INVALID>> wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,
however, I would like to submit the job from another machine which does not 
belong to the cluster.
I know for this, hadoop job could be done by way of another machine which is 
installed hadoop gateway which is used
to connect the cluster.

Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...

Thank you very much~~
Zhiliang







Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhiliang Zhu
Hi Zhan,
Thanks very much for your help comment.I also view it would be similar to 
hadoop job submit, however, I was not deciding whether it is like that when it 
comes to spark.  
Have you ever tried that for spark...Would you give me the deployment doc for 
hadoop and spark gateway, since this is the first time for meto do that, I do 
not find the specific doc for it. 

Best Regards,Zhiliang

 


 On Wednesday, September 23, 2015 10:20 AM, Zhan Zhang 
 wrote:
   

 It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
configuration.
Thanks
Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu  wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,however, I would like to submit the 
job from another machine which does not belong to the cluster.I know for this, 
hadoop job could be done by way of another machine which is installed hadoop 
gateway which is usedto connect the cluster.
Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...
Thank you very much~~Zhiliang 





  

Re: Streaming Receiver Imbalance Problem

2015-09-22 Thread SLiZn Liu
Cool, we are still sticking with 1.3.1, will upgrade to 1.5 ASAP. Thanks
for the tips, Tathagata!

On Wed, Sep 23, 2015 at 10:40 AM Tathagata Das  wrote:

> A lot of these imbalances were solved in spark 1.5. Could you give that a
> spin?
>
> https://issues.apache.org/jira/browse/SPARK-8882
>
> On Tue, Sep 22, 2015 at 12:17 AM, SLiZn Liu 
> wrote:
>
>> Hi spark users,
>>
>> In our Spark Streaming app via Kafka integration on Mesos, we initialed 3
>> receivers to receive 3 Kafka partitions, whereas records receiving rate
>> imbalance been observed, with spark.streaming.receiver.maxRate is set to
>> 120, sometimes 1 of which receives very close to the limit while the
>> other two only at roughly fifty per second.
>>
>> This may be caused by previous receiver failure, where one of the
>> receivers’ receiving rate drop to 0. We restarted the Spark Streaming app,
>> and the imbalance began. We suspect that the partition which received by
>> the failing receiver got jammed, and the other two receivers cannot take up
>> its data.
>>
>> The 3-nodes cluster tends to run slowly, nearly all the tasks is
>> registered at the node with previous receiver failure(I used unionto
>> combine 3 receivers’ DStream, thus I expect the combined DStream is well
>> distributed across all nodes), cannot guarantee to finish one batch in a
>> single batch time, stages get piled up, and the digested log shows as
>> following:
>>
>> ...
>> 5728.399: [GC (Allocation Failure) [PSYoungGen: 6954678K->17088K(6961152K)] 
>> 7074614K->138108K(20942336K), 0.0203877 secs] [Times: user=0.20 sys=0.00, 
>> real=0.02 secs]
>>
>> ...
>> 5/09/22 13:33:35 ERROR TaskSchedulerImpl: Ignoring update with state 
>> FINISHED for TID 77219 because its task set is gone (this is likely the 
>> result of
>> receiving duplicate task finished status updates)
>>
>> ...
>>
>> the two type of log was printed in execution of some (not all) stages.
>>
>> My configurations:
>> # of cores on each node: 64
>> # of nodes: 3
>> batch time is set to 10 seconds
>>
>> spark.streaming.receiver.maxRate120
>> spark.streaming.blockInterval   160  // set to the value that 
>> divides 10 seconds approx. to  total cores, which is 64, to max out all the 
>> nodes: 10s * 1000 / 64
>> spark.storage.memoryFraction0.1  // this one doesn't seem to 
>> work, since the young gen / old gen ratio is nearly 0.3 instead of 0.1
>>
>> anyone got an idea? Appreciate for your patience.
>>
>> BR,
>> Todd Leo
>> ​
>>
>
>


Re: Streaming Receiver Imbalance Problem

2015-09-22 Thread Tathagata Das
A lot of these imbalances were solved in spark 1.5. Could you give that a
spin?

https://issues.apache.org/jira/browse/SPARK-8882

On Tue, Sep 22, 2015 at 12:17 AM, SLiZn Liu  wrote:

> Hi spark users,
>
> In our Spark Streaming app via Kafka integration on Mesos, we initialed 3
> receivers to receive 3 Kafka partitions, whereas records receiving rate
> imbalance been observed, with spark.streaming.receiver.maxRate is set to
> 120, sometimes 1 of which receives very close to the limit while the
> other two only at roughly fifty per second.
>
> This may be caused by previous receiver failure, where one of the
> receivers’ receiving rate drop to 0. We restarted the Spark Streaming app,
> and the imbalance began. We suspect that the partition which received by
> the failing receiver got jammed, and the other two receivers cannot take up
> its data.
>
> The 3-nodes cluster tends to run slowly, nearly all the tasks is
> registered at the node with previous receiver failure(I used unionto
> combine 3 receivers’ DStream, thus I expect the combined DStream is well
> distributed across all nodes), cannot guarantee to finish one batch in a
> single batch time, stages get piled up, and the digested log shows as
> following:
>
> ...
> 5728.399: [GC (Allocation Failure) [PSYoungGen: 6954678K->17088K(6961152K)] 
> 7074614K->138108K(20942336K), 0.0203877 secs] [Times: user=0.20 sys=0.00, 
> real=0.02 secs]
>
> ...
> 5/09/22 13:33:35 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED 
> for TID 77219 because its task set is gone (this is likely the result of
> receiving duplicate task finished status updates)
>
> ...
>
> the two type of log was printed in execution of some (not all) stages.
>
> My configurations:
> # of cores on each node: 64
> # of nodes: 3
> batch time is set to 10 seconds
>
> spark.streaming.receiver.maxRate120
> spark.streaming.blockInterval   160  // set to the value that divides 
> 10 seconds approx. to  total cores, which is 64, to max out all the nodes: 
> 10s * 1000 / 64
> spark.storage.memoryFraction0.1  // this one doesn't seem to 
> work, since the young gen / old gen ratio is nearly 0.3 instead of 0.1
>
> anyone got an idea? Appreciate for your patience.
>
> BR,
> Todd Leo
> ​
>


Parallel collection in driver programs

2015-09-22 Thread Andy Huang
Hi All,

Would like know if anyone has experienced with parallel collection in the
driver program. And, if there is actual advantage/disadvantage of doing so.

E.g. With a collection of Jdbc connections and tables

We have adapted our non-spark code which utilize parallel collection to the
spark code and it seems to work fine.

val conf = List(
  ("tbl1","dbo.tbl1::tb1_id::0::127::128"),
  ("tbl2","dbo.tbl2::tb2_id::0::31::32"),
  ("tbl3","dbo.tbl3::tb3_id::0::63::64")
)

val _JDBC_DEFAULT = "jdbc:sqlserver://192.168.52.1;database=TestSource"
val _STORE_DEFAULT = "hdfs://192.168.52.132:9000/"

val prop = new Properties()
prop.setProperty("user","sa")
prop.setProperty("password","password")

conf.par.map(pair=>{

  val qry = pair._2.split("::")(0)
  val pCol = pair._2.split("::")(1)
  val lo = pair._2.split("::")(2).toInt
  val hi = pair._2.split("::")(3).toInt
  val part = pair._2.split("::")(4).toInt

  //create dataframe from jdbc table
  val jdbcDF = sqlContext.read.jdbc(
_JDBC_DEFAULT,
"("+qry+") a",
pCol,
lo, //lower bound
hi, //upper bound
part, //number of partitions
prop //java.utils.Properties - key value pair
  )

  //save to parquet
  jdbcDF.write.mode("overwrite").parquet(_STORE_DEFAULT+pair._1+".parquet")

})


Thanks.
-- 
Andy Huang | Managing Consultant | Servian Pty Ltd | t: 02 9376 0700 |
f: 02 9376 0730| m: 0433221979


Re: Re: Spark standalone/Mesos on top of Ceph

2015-09-22 Thread Jerry Lam
Hi Sun,

The issue with Ceph as the underlying file system for Spark is that you
lose data locality. Ceph is not designed to have spark run directly on top
of the OSDs. I know that cephfs provides data location information via
hadoop compatible API. The last time I researched on this is that the
integration is experimental (just google it and you will find a lot of
discussions eg.
http://lists.ceph.com/pipermail/ceph-users-ceph.com/2015-July/002837.html).

However, this might not be a biggest issue as long as you have GREAT
network bandwidth like infiniband or +10 Gigabit Ethernet. My guess is that
the architecture and the performance will be similar to S3+Spark at best
(with 10GE instances) if you guys do the network stuff seriously.

HTH,

Jerry

On Tue, Sep 22, 2015 at 9:59 PM, fightf...@163.com 
wrote:

> Hi Jerry
>
> Yeah, we managed to run and use ceph already in our few production
> environment, especially with OpenStack.
>
> The reason we want to use Ceph is that we aim to look for some workarounds
> for unified storage layer and the design
>
> concepts of ceph is quite catching. I am just interested in such work like
> the hadoop cephfs plugin and recently we
>
> are going to do some benchmark tests between HDFS and cephfs.
>
> So the ongoing progress would be benificial if some related work between
> Apache Spark and Ceph could dedicate some
>
> thoughful insights.
>
> BTW, for the Ceph Object Gateway s3 rest api, agreed for such
> inconvinience and some incompobilities. However, we had not
>
> yet quite researched and tested over radosgw a lot. But we had some little
> requirements using gw in some use cases.
>
> Hope for more considerations and talks.
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* Jerry Lam 
> *Date:* 2015-09-23 09:37
> *To:* fightf...@163.com
> *CC:* user 
> *Subject:* Re: Spark standalone/Mesos on top of Ceph
> Do you have specific reasons to use Ceph? I used Ceph before, I'm not too
> in love with it especially when I was using the Ceph Object Gateway S3 API.
> There are some incompatibilities with aws s3 api. You really really need to
> try it because making the commitment. Did you managed to install it?
>
> On Tue, Sep 22, 2015 at 9:28 PM, fightf...@163.com 
> wrote:
>
>> Hi guys,
>>
>> Here is the info for Ceph : http://ceph.com/
>>
>> We are investigating and using Ceph for distributed storage and
>> monitoring, specifically interested
>>
>> in using Ceph as the underlied file system storage for spark. However, we
>> had no experience for achiveing
>>
>> that. Any body has seen such progress ?
>>
>> Best,
>> Sun.
>>
>> --
>> fightf...@163.com
>>
>
>
>
>
>


Re: how to submit the spark job outside the cluster

2015-09-22 Thread Zhan Zhang
It should be similar to other hadoop jobs. You need hadoop configuration in 
your client machine, and point the HADOOP_CONF_DIR in spark to the 
configuration.

Thanks

Zhan Zhang
On Sep 22, 2015, at 6:37 PM, Zhiliang Zhu 
mailto:zchl.j...@yahoo.com.INVALID>> wrote:

Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,
however, I would like to submit the job from another machine which does not 
belong to the cluster.
I know for this, hadoop job could be done by way of another machine which is 
installed hadoop gateway which is used
to connect the cluster.

Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...

Thank you very much~~
Zhiliang




Re: Long GC pauses with Spark SQL 1.3.0 and billion row tables

2015-09-22 Thread tridib
By skewed did you mean it's not distributed uniformly across partition?
All of my columns are string and almost of same size. i.e.

id1,field11,fields12
id2,field21,field22




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Long-GC-pauses-with-Spark-SQL-1-3-0-and-billion-row-tables-tp22750p24776.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: Spark standalone/Mesos on top of Ceph

2015-09-22 Thread fightf...@163.com
Hi Jerry


Yeah, we managed to run and use ceph already in our few production environment, 
especially with OpenStack. 


The reason we want to use Ceph is that we aim to look for some workarounds for 
unified storage layer and the design


concepts of ceph is quite catching. I am just interested in such work like the 
hadoop cephfs plugin and recently we 


are going to do some benchmark tests between HDFS and cephfs. 


So the ongoing progress would be benificial if some related work between Apache 
Spark and Ceph could dedicate some
 
thoughful insights. 


BTW, for the Ceph Object Gateway s3 rest api, agreed for such inconvinience and 
some incompobilities. However, we had not


yet quite researched and tested over radosgw a lot. But we had some little 
requirements using gw in some use cases. 


Hope for more considerations and talks.


Best,
Sun.


fightf...@163.com
 
From: Jerry Lam
Date: 2015-09-23 09:37
To: fightf...@163.com
CC: user
Subject: Re: Spark standalone/Mesos on top of Ceph
Do you have specific reasons to use Ceph? I used Ceph before, I'm not too in 
love with it especially when I was using the Ceph Object Gateway S3 API. There 
are some incompatibilities with aws s3 api. You really really need to try it 
because making the commitment. Did you managed to install it? 


On Tue, Sep 22, 2015 at 9:28 PM, fightf...@163.com wrote:

Hi guys,


Here is the info for Ceph : http://ceph.com/ 


We are investigating and using Ceph for distributed storage and monitoring, 
specifically interested


in using Ceph as the underlied file system storage for spark. However, we had 
no experience for achiveing 


that. Any body has seen such progress ? 


Best,
Sun.


fightf...@163.com



how to submit the spark job outside the cluster

2015-09-22 Thread Zhiliang Zhu
 Dear Experts,

Spark job is running on the cluster by yarn. Since the job can be submited at 
the place on the machine from the cluster,however, I would like to submit the 
job from another machine which does not belong to the cluster.I know for this, 
hadoop job could be done by way of another machine which is installed hadoop 
gateway which is usedto connect the cluster.
Then what would go for spark, is it same as hadoop... And where is the 
instruction doc for installing this gateway...
Thank you very much~~Zhiliang 

 

Re: Spark standalone/Mesos on top of Ceph

2015-09-22 Thread Jerry Lam
Do you have specific reasons to use Ceph? I used Ceph before, I'm not too
in love with it especially when I was using the Ceph Object Gateway S3 API.
There are some incompatibilities with aws s3 api. You really really need to
try it because making the commitment. Did you managed to install it?

On Tue, Sep 22, 2015 at 9:28 PM, fightf...@163.com 
wrote:

> Hi guys,
>
> Here is the info for Ceph : http://ceph.com/
>
> We are investigating and using Ceph for distributed storage and
> monitoring, specifically interested
>
> in using Ceph as the underlied file system storage for spark. However, we
> had no experience for achiveing
>
> that. Any body has seen such progress ?
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>


Spark standalone/Mesos on top of Ceph

2015-09-22 Thread fightf...@163.com
Hi guys,

Here is the info for Ceph : http://ceph.com/ 

We are investigating and using Ceph for distributed storage and monitoring, 
specifically interested

in using Ceph as the underlied file system storage for spark. However, we had 
no experience for achiveing 

that. Any body has seen such progress ? 

Best,
Sun.



fightf...@163.com


Re: WAL on S3

2015-09-22 Thread Tathagata Das
You can keep the checkpoints in the Hadoop-compatible file system and the
WAL somewhere else using your custom WAL implementation. Yes, cleaning up
the stuff gets complicated as it is not as easy as deleting off the
checkpoint directory - you will have to clean up checkpoint directory as
well as the whatever other storage that your custom WAL uses. However, if I
remember correctly, the WAL information is used only when the Dstreams are
recovered correctly from checkpoints.

Note that, there are further details here that require deeper
understanding. There are actually two uses of WALs in the system -

1. Data WAL for received data  - This is what is usually referred to as the
WAL everywhere. Each receiver writes to a different WAL. This deals with
bulk data.
2. Metadata WAL - This is used by the driver to save metadata information
like  block to data WAL segment mapping, etc. I usually skip mentioning
this. This WAL is automatically used when data WAL is enabled. And this
deals with small data.

If you have to get around S3's limitations, you will have to plugin both
WALs (see this

for SparkConfs, but not that we havent made these confs public). While the
system supports plugging them in, we havent made this information public
yet because of such complexities in working with it.  And we have invested
time in making common sources like Kafka not require WALs (e.g. Direct
Kafka  approach). In future, we do hope to have a better solution for
general receivers + WALs + S3 (personally, I really wish S3's semantics
improve and fixes this issue).

Another alternative direction may be Amazon EFS. Since it based on EBS, it
may give the necessary semantics. But I havent given that a spin, so its
uncharted territory :)

TD


On Tue, Sep 22, 2015 at 5:15 PM, Michal Čizmazia  wrote:

> My understanding of pluggable WAL was that it eliminates the need for
> having a Hadoop-compatible file system [1].
>
> What is the use of pluggable WAL when it can be only used together with
> checkpointing which still requires a Hadoop-compatible file system?
>
> [1]: https://issues.apache.org/jira/browse/SPARK-7056
>
>
>
> On 22 September 2015 at 19:57, Tathagata Das 
> wrote:
>
>> 1. Currently, the WAL can be used only with checkpointing turned on,
>> because it does not make sense to recover from WAL if there is not
>> checkpoint information to recover from.
>>
>> 2. Since the current implementation saves the WAL in the checkpoint
>> directory, they share the fate -- if checkpoint directory is deleted, then
>> both checkpoint info and WAL info is deleted.
>>
>> 3. Checkpointing is currently not pluggable. Why do do you want that?
>>
>>
>>
>> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia 
>> wrote:
>>
>>> I am trying to use pluggable WAL, but it can be used only with
>>> checkpointing turned on. Thus I still need have a Hadoop-compatible file
>>> system.
>>>
>>> Is there something like pluggable checkpointing?
>>>
>>> Or can WAL be used without checkpointing? What happens when WAL is
>>> available but the checkpoint directory is lost?
>>>
>>> Thanks!
>>>
>>>
>>> On 18 September 2015 at 05:47, Tathagata Das 
>>> wrote:
>>>
 I dont think it would work with multipart upload either. The file is
 not visible until the multipart download is explicitly closed. So even if
 each write a part upload, all the parts are not visible until the multiple
 download is closed.

 TD

 On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran >>> > wrote:

>
> > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
> >
> > Actually, the current WAL implementation (as of Spark 1.5) does not
> work with S3 because S3 does not support flushing. Basically, the current
> implementation assumes that after write + flush, the data is immediately
> durable, and readable if the system crashes without closing the WAL file.
> This does not work with S3 as data is durable only and only if the S3 file
> output stream is cleanly closed.
> >
>
>
> more precisely, unless you turn multipartition uploads on, the S3n/s3a
> clients Spark uses *doesn't even upload anything to s3*.
>
> It's not a filesystem, and you have to bear that in mind.
>
> Amazon's own s3 client used in EMR behaves differently; it may be
> usable as a destination (I haven't tested)
>
>

>>>
>>
>


Yarn Shutting Down Spark Processing

2015-09-22 Thread Bryan Jeffrey
Hello.

I have a Spark streaming job running on a cluster managed by Yarn.  The
spark streaming job starts and receives data from Kafka.  It is processing
well and then after several seconds I see the following error:

15/09/22 14:53:49 ERROR yarn.ApplicationMaster: SparkContext did not
initialize after waiting for 10 ms. Please check earlier log output for
errors. Failing the application.
15/09/22 14:53:49 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 13, (reason: Timed out waiting for SparkContext.)

The spark process is then (obviously) shut down by Yarn.

What do I need to change to allow Yarn to initialize Spark streaming (vs.
batch) jobs?

Thank you,

Bryan Jeffrey


Re: WAL on S3

2015-09-22 Thread Michal Čizmazia
My understanding of pluggable WAL was that it eliminates the need for
having a Hadoop-compatible file system [1].

What is the use of pluggable WAL when it can be only used together with
checkpointing which still requires a Hadoop-compatible file system?

[1]: https://issues.apache.org/jira/browse/SPARK-7056



On 22 September 2015 at 19:57, Tathagata Das 
wrote:

> 1. Currently, the WAL can be used only with checkpointing turned on,
> because it does not make sense to recover from WAL if there is not
> checkpoint information to recover from.
>
> 2. Since the current implementation saves the WAL in the checkpoint
> directory, they share the fate -- if checkpoint directory is deleted, then
> both checkpoint info and WAL info is deleted.
>
> 3. Checkpointing is currently not pluggable. Why do do you want that?
>
>
>
> On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia 
> wrote:
>
>> I am trying to use pluggable WAL, but it can be used only with
>> checkpointing turned on. Thus I still need have a Hadoop-compatible file
>> system.
>>
>> Is there something like pluggable checkpointing?
>>
>> Or can WAL be used without checkpointing? What happens when WAL is
>> available but the checkpoint directory is lost?
>>
>> Thanks!
>>
>>
>> On 18 September 2015 at 05:47, Tathagata Das  wrote:
>>
>>> I dont think it would work with multipart upload either. The file is not
>>> visible until the multipart download is explicitly closed. So even if each
>>> write a part upload, all the parts are not visible until the multiple
>>> download is closed.
>>>
>>> TD
>>>
>>> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran 
>>> wrote:
>>>

 > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
 >
 > Actually, the current WAL implementation (as of Spark 1.5) does not
 work with S3 because S3 does not support flushing. Basically, the current
 implementation assumes that after write + flush, the data is immediately
 durable, and readable if the system crashes without closing the WAL file.
 This does not work with S3 as data is durable only and only if the S3 file
 output stream is cleanly closed.
 >


 more precisely, unless you turn multipartition uploads on, the S3n/s3a
 clients Spark uses *doesn't even upload anything to s3*.

 It's not a filesystem, and you have to bear that in mind.

 Amazon's own s3 client used in EMR behaves differently; it may be
 usable as a destination (I haven't tested)


>>>
>>
>


Re: Why is 1 executor overworked and other sit idle?

2015-09-22 Thread Richard Eggert
If there's only one partition, by definition it will only be handled by one
executor. Repartition to divide the work up. Note that this will also
result in multiple output files,  however. If you absolutely need them to
be combined into a single file,  I suggest using the Unix/Linux 'cat'
command to concatenate the files afterwards.

Rich
On Sep 22, 2015 9:20 AM, "Ted Yu"  wrote:

> Have you tried using repartition to spread the load ?
>
> Cheers
>
> On Sep 22, 2015, at 4:22 AM, Chirag Dewan 
> wrote:
>
> Hi,
>
>
>
> I am using Spark to access around 300m rows in Cassandra.
>
>
>
> My job is pretty simple as I am just mapping my row into a CSV format and
> saving it as a text file.
>
>
>
>
>
> public String call(CassandraRow row)
>
>
> throws Exception {
>
>
> StringBuilder sb = new StringBuilder();
>
>
> sb.append(row.getString(10));
>
>
> sb.append(",");
>
>
> sb.append(row.getString(11));
>
>
> sb.append(",");
>
>
> sb.append(row.getString(8));
>
>
> sb.append(",");
>
>
> sb.append(row.getString(7));
>
>
>
> return
> sb.toString();
>
> }
>
>
>
> My map methods looks like this.
>
>
>
> I am having a 3 node cluster. I observe that driver starts on Node A. And
> executors are spawned on all 3 nodes. But the executor of Node B or C are
> doing all the tasks. It starts a saveasTextFile job with 1 output partition
> and stores the RDDs in memory and also commits the file on local file
> system.
>
>
>
> This executor is using a lot of system memory and CPU while others are
> sitting idle.
>
>
>
> Am I doing something wrong? Is my RDD correctly partitioned?
>
>
>
> Thanks in advance.
>
>
>
>
>
> Chirag
>
>


Re: Apache Spark job in local[*] is slower than regular 1-thread Python program

2015-09-22 Thread Richard Eggert
Maybe it's just my phone,  but I don't see any code.
On Sep 22, 2015 11:46 AM, "juljoin"  wrote:

> Hello,
>
> I am trying to figure Spark out and I still have some problems with its
> speed, I can't figure them out. In short, I wrote two programs that loop
> through a 3.8Gb file and filter each line depending of if a certain word is
> present.
>
> I wrote a one-thread python program doing the job and I obtain:
> - for the 3.8Gb file:
> / lines found: 82100
>  in: *10.54 seconds*/
>  - no filter, just looping through the file:
> / in: 01.65 seconds/
>
> The Spark app doing the same and executed on 8 threads gives:
>  - for the 3.8Gb file:
> / lines found: 82100
>  in: *18.27 seconds*/
>  - for a 38Mb file:
> /lines found: 821
> in: 2.53 seconds/
>
> I must do something wrong to obtain a result twice as slow on the 8 threads
> than on 1 thread.
>
> 1. First, I thought it might be because of the setting-up cost of Spark.
> But
> for smaller files it only takes 2 seconds which makes this option
> improbable.
> 2. Looping through the file takes up 1.65 seconds (thank you SSD ^_^ ),
> processing takes up the other 9seconds (for the python app).
> -> This is why I thought splitting it up on the different processes will
> definitely speed it up.
>
> Note: Increasing the number of threads in Spark improves the speed (from 57
> seconds with 1 thread to 18 seconds with 8 threads). But still, there is a
> big difference in performance between simple python and Spark, it must be
> my
> doing!
>
> Can someone point me out on what I am doing wrong? That would be greatly
> appreciated :) I am new with all this big data stuff.
>
>
>
> *Here is the code for the Spark app:*
>
>
>
>
>
> *And the python code:*
>
>
>
> Thank you for reading up to this point :)
>
> Have a nice day!
>
>
> - Julien
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-job-in-local-is-slower-than-regular-1-thread-Python-program-tp24771.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: WAL on S3

2015-09-22 Thread Tathagata Das
1. Currently, the WAL can be used only with checkpointing turned on,
because it does not make sense to recover from WAL if there is not
checkpoint information to recover from.

2. Since the current implementation saves the WAL in the checkpoint
directory, they share the fate -- if checkpoint directory is deleted, then
both checkpoint info and WAL info is deleted.

3. Checkpointing is currently not pluggable. Why do do you want that?



On Tue, Sep 22, 2015 at 4:53 PM, Michal Čizmazia  wrote:

> I am trying to use pluggable WAL, but it can be used only with
> checkpointing turned on. Thus I still need have a Hadoop-compatible file
> system.
>
> Is there something like pluggable checkpointing?
>
> Or can WAL be used without checkpointing? What happens when WAL is
> available but the checkpoint directory is lost?
>
> Thanks!
>
>
> On 18 September 2015 at 05:47, Tathagata Das  wrote:
>
>> I dont think it would work with multipart upload either. The file is not
>> visible until the multipart download is explicitly closed. So even if each
>> write a part upload, all the parts are not visible until the multiple
>> download is closed.
>>
>> TD
>>
>> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran 
>> wrote:
>>
>>>
>>> > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
>>> >
>>> > Actually, the current WAL implementation (as of Spark 1.5) does not
>>> work with S3 because S3 does not support flushing. Basically, the current
>>> implementation assumes that after write + flush, the data is immediately
>>> durable, and readable if the system crashes without closing the WAL file.
>>> This does not work with S3 as data is durable only and only if the S3 file
>>> output stream is cleanly closed.
>>> >
>>>
>>>
>>> more precisely, unless you turn multipartition uploads on, the S3n/s3a
>>> clients Spark uses *doesn't even upload anything to s3*.
>>>
>>> It's not a filesystem, and you have to bear that in mind.
>>>
>>> Amazon's own s3 client used in EMR behaves differently; it may be usable
>>> as a destination (I haven't tested)
>>>
>>>
>>
>


Re: WAL on S3

2015-09-22 Thread Michal Čizmazia
I am trying to use pluggable WAL, but it can be used only with
checkpointing turned on. Thus I still need have a Hadoop-compatible file
system.

Is there something like pluggable checkpointing?

Or can WAL be used without checkpointing? What happens when WAL is
available but the checkpoint directory is lost?

Thanks!


On 18 September 2015 at 05:47, Tathagata Das  wrote:

> I dont think it would work with multipart upload either. The file is not
> visible until the multipart download is explicitly closed. So even if each
> write a part upload, all the parts are not visible until the multiple
> download is closed.
>
> TD
>
> On Fri, Sep 18, 2015 at 1:55 AM, Steve Loughran 
> wrote:
>
>>
>> > On 17 Sep 2015, at 21:40, Tathagata Das  wrote:
>> >
>> > Actually, the current WAL implementation (as of Spark 1.5) does not
>> work with S3 because S3 does not support flushing. Basically, the current
>> implementation assumes that after write + flush, the data is immediately
>> durable, and readable if the system crashes without closing the WAL file.
>> This does not work with S3 as data is durable only and only if the S3 file
>> output stream is cleanly closed.
>> >
>>
>>
>> more precisely, unless you turn multipartition uploads on, the S3n/s3a
>> clients Spark uses *doesn't even upload anything to s3*.
>>
>> It's not a filesystem, and you have to bear that in mind.
>>
>> Amazon's own s3 client used in EMR behaves differently; it may be usable
>> as a destination (I haven't tested)
>>
>>
>


Re: HDP 2.3 support for Spark 1.5.x

2015-09-22 Thread Zhan Zhang
Hi Krishna,

For the time being, you can download from upstream, and it should be running OK 
for HDP2.3.  For hdp specific problem, you can ask in Hortonworks forum.

Thanks.

Zhan Zhang

On Sep 22, 2015, at 3:42 PM, Krishna Sankar 
mailto:ksanka...@gmail.com>> wrote:

Guys,

  *   We have HDP 2.3 installed just now. It comes with Spark 1.3.x. The 
current wisdom is that it will support the 1.4.x train (which is good, need 
DataFrame et al).
  *   What is the plan to support Spark 1.5.x ? Can we install 1.5.0 on HDP 2.3 
? Or will Spark 1.5.x support be in HDP 2.3.x and if so ~when ?

Cheers & Thanks




Re: Invalid checkpoint url

2015-09-22 Thread Tathagata Das
Are you getting this error in local mode?


On Tue, Sep 22, 2015 at 7:34 AM, srungarapu vamsi 
wrote:

> Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i
> don't use reduceByKeyAndWindow.
>
> When i start using "reduceByKeyAndWindow" it complains me with the error
> "Exception in thread "main" org.apache.spark.SparkException: Invalid
> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$
> 2-97be-e3862eb5c944/rdd-8"
>
> The stack trace is as below:
>
> Exception in thread "main" org.apache.spark.SparkException: Invalid
> checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$
> 2-97be-e3862eb5c944/rdd-8
> at
> org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at
> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504)
> at
> com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33)
> at
> com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
> at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>

Re: Partitions on RDDs

2015-09-22 Thread Richard Eggert
In general,  RDDs get partitioned automatically without programmer
intervention. You generally don't need to worry about them unless you need
to adjust the size/number of partitions or the partitioning scheme
according to the needs of your application. Partitions get redistributed
among nodes whenever a shuffle occurs.  Repartitioning may cause a shuffle
to occur in some situations,  but it is not guaranteed to occur in all
cases.

In general,  smaller/more numerous partitions allow work to be distributed
among more workers,  but larger/fewer partitions allow work to be done in
larger chunks,  which may result in the work getting done more quickly as
long as all workers are kept busy, due to reduced overhead. Also,  the
number of partitions determines how many files get generated by actions
that save RDDs to files.

The maximum size of any one partition is ultimately limited by the
available memory of any single executor.

Rich
On Sep 22, 2015 6:42 PM, "XIANDI"  wrote:

> I'm always confused by the partitions. We may have many RDDs in the code.
> Do
> we need to partition on all of them? Do the rdds get rearranged among all
> the nodes whenever we do a partition? What is a wise way of doing
> partitions?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-on-RDDs-tp24775.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark as standalone or with Hadoop stack.

2015-09-22 Thread Jacek Laskowski
On Tue, Sep 22, 2015 at 10:03 PM, Ted Yu  wrote:

> To my knowledge, no one runs HBase on top of Mesos.

Hi,

That sentence caught my attention. Could you explain the reasons for
not running HBase on Mesos, i.e. what makes Mesos inappropriate for
HBase?

Jacek

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SPARK_WORKER_INSTANCES was detected (set to '2')…This is deprecated in Spark 1.0+

2015-09-22 Thread Jacek Laskowski
Hi,

This is for Spark 1.6.0-SNAPSHOT (SHA1
a96ba40f7ee1352288ea676d8844e1c8174202eb).

I've been toying with Spark Standalone cluster and have the following
file in conf/spark-env.sh:

➜  spark git:(master) ✗ cat conf/spark-env.sh
SPARK_WORKER_CORES=2
SPARK_WORKER_MEMORY=2g

# multiple Spark worker processes on a machine
SPARK_WORKER_INSTANCES=2

It's fine and the cluster works fine. It's also fine according to
https://spark.apache.org/docs/latest/spark-standalone.html.

So far so good.

Just today I saw the following when I executed `spark-submit`:

=
15/09/23 00:48:26 WARN SparkConf:
SPARK_WORKER_INSTANCES was detected (set to '2').
This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --num-executors to specify the number of executors
 - Or set SPARK_EXECUTOR_INSTANCES
 - spark.executor.instances to configure the number of instances in
the spark config.
=

Why is the deprecation? Is it not supported (not recommended given the
message) to have a Spark Standalone cluster and executing spark-submit
on the same machine?

Pozdrawiam,
Jacek

--
Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



HDP 2.3 support for Spark 1.5.x

2015-09-22 Thread Krishna Sankar
Guys,

   - We have HDP 2.3 installed just now. It comes with Spark 1.3.x. The
   current wisdom is that it will support the 1.4.x train (which is good, need
   DataFrame et al).
   - What is the plan to support Spark 1.5.x ? Can we install 1.5.0 on HDP
   2.3 ? Or will Spark 1.5.x support be in HDP 2.3.x and if so ~when ?

Cheers & Thanks



Partitions on RDDs

2015-09-22 Thread XIANDI
I'm always confused by the partitions. We may have many RDDs in the code. Do
we need to partition on all of them? Do the rdds get rearranged among all
the nodes whenever we do a partition? What is a wise way of doing
partitions?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-on-RDDs-tp24775.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to share memory in a broadcast between tasks in the same executor?

2015-09-22 Thread Deenar Toraskar
Clement

In local mode all worker threads run in the driver VM. Your dictionary
should not be copied 32 times, in fact it wont be broadcast at all. Have
you tried increasing spark.driver.memory to ensure that the driver uses all
the memory on the machine.

Deenar

On 22 September 2015 at 19:42, Clément Frison 
wrote:

> Hello,
>
> My team and I have a 32-core machine and we would like to use a huge
> object - for example a large dictionary - in a map transformation and use
> all our cores in parallel by sharing this object among some tasks.
>
> We broadcast our large dictionary.
>
> dico_br = sc.broadcast(dico)
>
> We use it in a map:
>
> rdd.map(lambda x: (x[0], function(x[1], dico_br)))
>
> where function does a lookup : dico_br.value[x]
>
> Our issue is that our dictionary is loaded 32 times in memory, and it
> doesn't fit. So what we are doing is limiting the number of executors. It
> works fine but we only have 8 cpus working in parallel instead of 32.
>
> We would like to take advantage of multicore processing and shared memory,
> as the 32 cores are in the same machine. For example we would like to load
> the dictionary in memory 8 times only and make 4 cores share it. How could
> we achieve that with Spark ?
>
>
> What we have tried - without success :
>
> 1) One driver/worker with 32 cores : local[32]
>
> 2) Standalone with one master and 8 workers - each of them having 4 cores
>
> Thanks a lot for your help, Clement
>


Re: Spark 1.5 UDAF ArrayType

2015-09-22 Thread Deenar Toraskar
Michael

Thank you for your prompt answer. I will repost after I try this again on
1.5.1 or branch-1.5. In addition a blog post on SparkSQL data types would
be very helpful. I am familiar with the Hive data types, but there is very
little documentation on Spark SQL data types.

Regards
Deenar

On 22 September 2015 at 19:28, Michael Armbrust 
wrote:

> I think that you are hitting a bug (which should be fixed in Spark
> 1.5.1).  I'm hoping we can cut an RC for that this week.  Until then you
> could try building branch-1.5.
>
> On Tue, Sep 22, 2015 at 11:13 AM, Deenar Toraskar <
> deenar.toras...@gmail.com> wrote:
>
>> Hi
>>
>> I am trying to write an UDAF ArraySum, that does element wise sum of
>> arrays of Doubles returning an array of Double following the sample in
>>
>> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html.
>> I am getting the following error. Any guidance on handle complex type in
>> Spark SQL would be appreciated.
>>
>> Regards
>> Deenar
>>
>> import org.apache.spark.sql.expressions.MutableAggregationBuffer
>> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.types._
>> import org.apache.spark.sql.functions._
>>
>> class ArraySum extends UserDefinedAggregateFunction {
>>def inputSchema: org.apache.spark.sql.types.StructType =
>> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>>
>>   def bufferSchema: StructType =
>> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>>
>>   def dataType: DataType = ArrayType(DoubleType, false)
>>
>>   def deterministic: Boolean = true
>>
>>   def initialize(buffer: MutableAggregationBuffer): Unit = {
>> buffer(0) = Nil
>>   }
>>
>>   def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
>> val currentSum : Seq[Double] = buffer.getSeq(0)
>> val currentRow : Seq[Double] = input.getSeq(0)
>> buffer(0) = (currentSum, currentRow) match {
>>   case (Nil, Nil) => Nil
>>   case (Nil, row) => row
>>   case (sum, Nil) => sum
>>   case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a +
>> b }
>>   // TODO handle different sizes arrays here
>> }
>>   }
>>
>>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>> val currentSum : Seq[Double] = buffer1.getSeq(0)
>> val currentRow : Seq[Double] = buffer2.getSeq(0)
>> buffer1(0) = (currentSum, currentRow) match {
>>   case (Nil, Nil) => Nil
>>   case (Nil, row) => row
>>   case (sum, Nil) => sum
>>   case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a +
>> b }
>>   // TODO handle different sizes arrays here
>> }
>>   }
>>
>>   def evaluate(buffer: Row): Any = {
>> buffer.getSeq(0)
>>   }
>> }
>>
>> val arraySum = new ArraySum
>> sqlContext.udf.register("ArraySum", arraySum)
>>
>> *%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
>> '2015-05-22' limit 10*
>>
>> gives me the following error
>>
>>
>> Error in SQL statement: SparkException: Job aborted due to stage failure:
>> Task 0 in stage 219.0 failed 4 times, most recent failure: Lost task 0.3 in
>> stage 219.0 (TID 11242, 10.172.255.236): java.lang.ClassCastException:
>> scala.collection.mutable.WrappedArray$ofRef cannot be cast to
>> org.apache.spark.sql.types.ArrayData at
>> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
>> at
>> org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
>> at
>> org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
>> Source) at
>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373)
>> at
>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:362)
>> at
>> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:141)
>> at
>> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:30)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
>> scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at
>> scala.collection.Iterator$class.foreach(Iterator.scala:727) at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) at
>> scala.collection.TraversableOnc

KafkaProducer using Cassandra as source

2015-09-22 Thread kali.tumm...@gmail.com
Hi All, 

I am new bee in spark.

I managed to write up kafka prodcuder in spark where data comes from
Cassandra table but I have few questions.

Spark data output from Cassandra looks like below.

[2,Joe,Smith]
[1,Barack,Obama]

I would like something like this in my kafka consumer, so need to remove []
at the beginning and end

2~Joe~Smith
1~Barack~Obama

Also when I collect on rdd and add mkstring(",") two lines of data are
getting combined as one 
(Ex:- [2,Joe,Smith][1,Barack,Obama]), so I used mkstring("\n") so now data
looks like
[2,Joe,Smith]
[1,Barack,Obama]

but I need something like this in my kafka consumer when I receive data any
idea.

2~Joe~Smith
1~Barack~Obama


Spark Code:-

package com.examples

/**
 * Created by kalit_000 on 22/09/2015.
 */

import kafka.producer.KeyedMessage
import kafka.producer.Producer
import kafka.producer.ProducerConfig
import java.util.Properties
import _root_.kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming._
import org.apache.spark.streaming.{Seconds,StreamingContext}
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtils

object SparkProducerDBCassandra {

  case class TestTable (TRADE_ID:String,TRADE_PRICE: String)

  def main(args: Array[String]): Unit =
  {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)

val conf = new
SparkConf().setMaster("local[2]").setAppName("testkali2").set("spark.cassandra.connection.host",
"127.0.0.1")
val sc=new SparkContext("local","test",conf)

print("Test kali Spark Cassandra")

val cc = new org.apache.spark.sql.cassandra.CassandraSQLContext(sc)

val p=cc.sql("select * from people.person")

p.collect().foreach(println)

val props:Properties = new Properties()
props.put("metadata.broker.list", "localhost:9092")
props.put("serializer.class", "kafka.serializer.StringEncoder")

val config= new ProducerConfig(props)
val producer= new Producer[String,String](config)

val x=p.collect().mkString("\n")

producer.send(new KeyedMessage[String, String]("trade", x))

  }
}

Thanks
Sri 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaProducer-using-Cassandra-as-source-tp24774.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark as standalone or with Hadoop stack.

2015-09-22 Thread Ted Yu
bq. it's relatively harder to use it with HBase

I agree with Sean.
I work on HBase. To my knowledge, no one runs HBase on top of Mesos.

On Tue, Sep 22, 2015 at 12:31 PM, Sean Owen  wrote:

> Who told you Mesos would make Spark 100x faster? does it make sense
> that just the resource manager could make that kind of difference?
> This sounds entirely wrong, or, maybe a mishearing.
>
> I don't know if Mesos is somehow easier to use with Cassandra, but
> it's relatively harder to use it with HBase, HDFS, etc. You probably
> want to use the Hadoop resource manager, YARN, if using Hadoop-ish
> stack components.
>
> As for Spark, the YARN integration actually has some advantages at the
> moment, like dynamic allocation. I think the security story is more
> complete too (? not sure).
>
> On Tue, Sep 22, 2015 at 8:25 PM, Shiv Kandavelu
>  wrote:
> >
> >
> > Hi All,
> >
> >
> >
> > We currently have a Hadoop cluster having Yarn as the resource manager.
> >
> > We are planning to use HBase as the data store due to the C-P aspects of
> the
> > CAP Theorem.
> >
> > We now want to do extensive data processing both stored data in HBase as
> > well as Steam processing from online website / API
> >
> > We now want to use both Spark/Mapreduce on an existing Hadoop cluster.
> >
> >
> >
> > One of the recommendation we got was to use Spark Cluster as a standalone
> > with Mesos as a resource manager on top of it to Monitor and scale. The
> > reason for this recommendation is that Standalone Spark with Mesos is
> 100x
> > faster than the Spark/Yarn/Hadoop combination. It was also mentioned that
> > building on Spark/Mesos can help automatically add spark nodes on the fly
> > for processing to scale. Also, it is easy to switch the bottom data stack
> > HBASE to Cassandra or something else if we use Spark.
> >
> >
> >
> > We are in the process of evaluating which stack will work best and with
> the
> > knowledge we have, it is getting tough to pick one versus the other b/c
> of
> > our inexperience in these platforms.
> >
> >
> >
> > Can you help us understand the pros and cons of having Spark as a
> Standalone
> > cluster Vs running on top of Hadoop stack?
> >
> >
> >
> > Thanks!
> >
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.5 UDAF ArrayType

2015-09-22 Thread Michael Armbrust
Check out:
http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types

On Tue, Sep 22, 2015 at 12:49 PM, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> Michael
>
> Thank you for your prompt answer. I will repost after I try this again on
> 1.5.1 or branch-1.5. In addition a blog post on SparkSQL data types would
> be very helpful. I am familiar with the Hive data types, but there is very
> little documentation on Spark SQL data types.
>
> Regards
> Deenar
>
>
> *Think Reactive Ltd*
> deenar.toras...@thinkreactive.co.uk
> 07714140812
>
>
>
> On 22 September 2015 at 19:28, Michael Armbrust 
> wrote:
>
>> I think that you are hitting a bug (which should be fixed in Spark
>> 1.5.1).  I'm hoping we can cut an RC for that this week.  Until then you
>> could try building branch-1.5.
>>
>> On Tue, Sep 22, 2015 at 11:13 AM, Deenar Toraskar <
>> deenar.toras...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I am trying to write an UDAF ArraySum, that does element wise sum of
>>> arrays of Doubles returning an array of Double following the sample in
>>>
>>> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html.
>>> I am getting the following error. Any guidance on handle complex type in
>>> Spark SQL would be appreciated.
>>>
>>> Regards
>>> Deenar
>>>
>>> import org.apache.spark.sql.expressions.MutableAggregationBuffer
>>> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.types._
>>> import org.apache.spark.sql.functions._
>>>
>>> class ArraySum extends UserDefinedAggregateFunction {
>>>def inputSchema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>>>
>>>   def bufferSchema: StructType =
>>> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>>>
>>>   def dataType: DataType = ArrayType(DoubleType, false)
>>>
>>>   def deterministic: Boolean = true
>>>
>>>   def initialize(buffer: MutableAggregationBuffer): Unit = {
>>> buffer(0) = Nil
>>>   }
>>>
>>>   def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
>>> val currentSum : Seq[Double] = buffer.getSeq(0)
>>> val currentRow : Seq[Double] = input.getSeq(0)
>>> buffer(0) = (currentSum, currentRow) match {
>>>   case (Nil, Nil) => Nil
>>>   case (Nil, row) => row
>>>   case (sum, Nil) => sum
>>>   case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a
>>> + b }
>>>   // TODO handle different sizes arrays here
>>> }
>>>   }
>>>
>>>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>>> val currentSum : Seq[Double] = buffer1.getSeq(0)
>>> val currentRow : Seq[Double] = buffer2.getSeq(0)
>>> buffer1(0) = (currentSum, currentRow) match {
>>>   case (Nil, Nil) => Nil
>>>   case (Nil, row) => row
>>>   case (sum, Nil) => sum
>>>   case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a
>>> + b }
>>>   // TODO handle different sizes arrays here
>>> }
>>>   }
>>>
>>>   def evaluate(buffer: Row): Any = {
>>> buffer.getSeq(0)
>>>   }
>>> }
>>>
>>> val arraySum = new ArraySum
>>> sqlContext.udf.register("ArraySum", arraySum)
>>>
>>> *%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
>>> '2015-05-22' limit 10*
>>>
>>> gives me the following error
>>>
>>>
>>> Error in SQL statement: SparkException: Job aborted due to stage
>>> failure: Task 0 in stage 219.0 failed 4 times, most recent failure: Lost
>>> task 0.3 in stage 219.0 (TID 11242, 10.172.255.236):
>>> java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef
>>> cannot be cast to org.apache.spark.sql.types.ArrayData at
>>> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
>>> Source) at
>>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373)
>>> at
>>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:362)
>>> at
>>> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:141)
>>> at
>>> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:30)
>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
>>> scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at
>>> scala.collection.Iterator$class.foreach(Iterator.scala:727) at
>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
>>> scala.collectio

unsubscribe

2015-09-22 Thread Stuart Layton
-- 
Stuart Layton


Re: Deploying spark-streaming application on production

2015-09-22 Thread Adrian Tanase
btw I re-read the docs and I want to clarify that reliable receiver + WAL gives 
you at least once, not exactly once semantics.

Sent from my iPhone

On 21 Sep 2015, at 21:50, Adrian Tanase 
mailto:atan...@adobe.com>> wrote:

I'm wondering, isn't this the canonical use case for WAL + reliable receiver?

As far as I know you can tune Mqtt server to wait for ack on messages (qos 
level 2?).
With some support from the client libray you could achieve exactly once 
semantics on the read side, if you ack message only after writing it to WAL, 
correct?

-adrian

Sent from my iPhone

On 21 Sep 2015, at 12:35, Petr Novak 
mailto:oss.mli...@gmail.com>> wrote:

In short there is no direct support for it in Spark AFAIK. You will either 
manage it in MQTT or have to add another layer of indirection - either 
in-memory based (observable streams, in-mem db) or disk based (Kafka, hdfs 
files, db) which will keep you unprocessed events.

Now realizing, there is support for backpressure in v1.5.0 but I don't know if 
it could be exploited aka I don't know if it is possible to decouple event 
reading into memory and actual processing code in Spark which could be swapped 
on the fly. Probably not without some custom built facility for it.

Petr

On Mon, Sep 21, 2015 at 11:26 AM, Petr Novak 
mailto:oss.mli...@gmail.com>> wrote:
I should read my posts at least once to avoid so many typos. Hopefully you are 
brave enough to read through.

Petr

On Mon, Sep 21, 2015 at 11:23 AM, Petr Novak 
mailto:oss.mli...@gmail.com>> wrote:
I think you would have to persist events somehow if you don't want to miss 
them. I don't see any other option there. Either in MQTT if it is supported 
there or routing them through Kafka.

There is WriteAheadLog in Spark but you would have decouple stream MQTT reading 
and processing into 2 separate job so that you could upgrade the processing one 
assuming the reading one would be stable (without changes) across versions. But 
it is problematic because there is no easy way how to share DStreams between 
jobs - you would have develop your own facility for it.

Alternatively the reading job could could save MQTT event in its the most raw 
form into files - to limit need to change code - and then the processing job 
would work on top of it using Spark streaming based on files. I this is 
inefficient and can get quite complex if you would like to make it reliable.

Basically either MQTT supports prsistence (which I don't know) or there is 
Kafka for these use case.

Another option would be I think to place observable streams in between MQTT and 
Spark streaming with bakcpressure as far as you could perform upgrade till 
buffers fills up.

I'm sorry that it is not thought out well from my side, it is just a brainstorm 
but it might lead you somewhere.

Regards,
Petr

On Mon, Sep 21, 2015 at 10:09 AM, Jeetendra Gangele 
mailto:gangele...@gmail.com>> wrote:
Hi All,

I have an spark streaming application with batch (10 ms) which is reading the 
MQTT channel and dumping the data from MQTT to HDFS.

So suppose if I have to deploy new application jar(with changes in spark 
streaming application) what is the best way to deploy, currently I am doing as 
below

1.killing the running streaming app using yarn application -kill ID
2. and then starting the application again

Problem with above approach is since we are not persisting the events in MQTT 
we will miss the events for the period of deploy.

how to handle this case?

regards
jeeetndra





Re: Spark as standalone or with Hadoop stack.

2015-09-22 Thread Sean Owen
Who told you Mesos would make Spark 100x faster? does it make sense
that just the resource manager could make that kind of difference?
This sounds entirely wrong, or, maybe a mishearing.

I don't know if Mesos is somehow easier to use with Cassandra, but
it's relatively harder to use it with HBase, HDFS, etc. You probably
want to use the Hadoop resource manager, YARN, if using Hadoop-ish
stack components.

As for Spark, the YARN integration actually has some advantages at the
moment, like dynamic allocation. I think the security story is more
complete too (? not sure).

On Tue, Sep 22, 2015 at 8:25 PM, Shiv Kandavelu
 wrote:
>
>
> Hi All,
>
>
>
> We currently have a Hadoop cluster having Yarn as the resource manager.
>
> We are planning to use HBase as the data store due to the C-P aspects of the
> CAP Theorem.
>
> We now want to do extensive data processing both stored data in HBase as
> well as Steam processing from online website / API
>
> We now want to use both Spark/Mapreduce on an existing Hadoop cluster.
>
>
>
> One of the recommendation we got was to use Spark Cluster as a standalone
> with Mesos as a resource manager on top of it to Monitor and scale. The
> reason for this recommendation is that Standalone Spark with Mesos is 100x
> faster than the Spark/Yarn/Hadoop combination. It was also mentioned that
> building on Spark/Mesos can help automatically add spark nodes on the fly
> for processing to scale. Also, it is easy to switch the bottom data stack
> HBASE to Cassandra or something else if we use Spark.
>
>
>
> We are in the process of evaluating which stack will work best and with the
> knowledge we have, it is getting tough to pick one versus the other b/c of
> our inexperience in these platforms.
>
>
>
> Can you help us understand the pros and cons of having Spark as a Standalone
> cluster Vs running on top of Hadoop stack?
>
>
>
> Thanks!
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark as standalone or with Hadoop stack.

2015-09-22 Thread Shiv Kandavelu

Hi All,

We currently have a Hadoop cluster having Yarn as the resource manager.
We are planning to use HBase as the data store due to the C-P aspects of the 
CAP Theorem.
We now want to do extensive data processing both stored data in HBase as well 
as Steam processing from online website / API
We now want to use both Spark/Mapreduce on an existing Hadoop cluster.

One of the recommendation we got was to use Spark Cluster as a standalone with 
Mesos as a resource manager on top of it to Monitor and scale. The reason for 
this recommendation is that Standalone Spark with Mesos is 100x faster than the 
Spark/Yarn/Hadoop combination. It was also mentioned that building on 
Spark/Mesos can help automatically add spark nodes on the fly for processing to 
scale. Also, it is easy to switch the bottom data stack HBASE to Cassandra or 
something else if we use Spark.

We are in the process of evaluating which stack will work best and with the 
knowledge we have, it is getting tough to pick one versus the other b/c of our 
inexperience in these platforms.

Can you help us understand the pros and cons of having Spark as a Standalone 
cluster Vs running on top of Hadoop stack?

Thanks!



pyspark question: create RDD from csr_matrix

2015-09-22 Thread jeff saremi
i've tried desperately to create an RDD from a matrix i have. Every combination 
failed.

I have a sparse matrix returned from a call to 
dv = DictVectorizer()sv_tf = dv.fit_transform(tf)

which is supposed to be a matrix of document terms and their frequencies.
I need to convert this to an RDD so I can feed it to pyspark functions such as 
IDF().fit()

I tried applying a Vectors.sparse(??, sv_tf) but i didn't know what the 
dimension should be
I tried doing a sc.parallelize(sv_tf) which didn't work either
I tried both above methods with sv_tf.toarray(). Again no luck

thanks
Jeff  

Re: How to share memory in a broadcast between tasks in the same executor?

2015-09-22 Thread Utkarsh Sengar
If broadcast variable doesn't fit in memory, I think is not the right fit
for you.
You can think about fitting it with an RDD as a tuple with other data you
are working on.

Say you are working on RDD (rdd in your case), run a map/reduce
to convert it to RDD> so now you have
relevant data from the dict as part of your RDD available locally in the
task.
Its much efficient than finding workarounds to loading it partially.

Thanks,
-Utkarsh

On Tue, Sep 22, 2015 at 11:42 AM, Clément Frison 
wrote:

> Hello,
>
> My team and I have a 32-core machine and we would like to use a huge
> object - for example a large dictionary - in a map transformation and use
> all our cores in parallel by sharing this object among some tasks.
>
> We broadcast our large dictionary.
>
> dico_br = sc.broadcast(dico)
>
> We use it in a map:
>
> rdd.map(lambda x: (x[0], function(x[1], dico_br)))
>
> where function does a lookup : dico_br.value[x]
>
> Our issue is that our dictionary is loaded 32 times in memory, and it
> doesn't fit. So what we are doing is limiting the number of executors. It
> works fine but we only have 8 cpus working in parallel instead of 32.
>
> We would like to take advantage of multicore processing and shared memory,
> as the 32 cores are in the same machine. For example we would like to load
> the dictionary in memory 8 times only and make 4 cores share it. How could
> we achieve that with Spark ?
>
>
> What we have tried - without success :
>
> 1) One driver/worker with 32 cores : local[32]
>
> 2) Standalone with one master and 8 workers - each of them having 4 cores
>
> Thanks a lot for your help, Clement
>



-- 
Thanks,
-Utkarsh


How to share memory in a broadcast between tasks in the same executor?

2015-09-22 Thread Clément Frison
Hello,

My team and I have a 32-core machine and we would like to use a huge object
- for example a large dictionary - in a map transformation and use all our
cores in parallel by sharing this object among some tasks.

We broadcast our large dictionary.

dico_br = sc.broadcast(dico)

We use it in a map:

rdd.map(lambda x: (x[0], function(x[1], dico_br)))

where function does a lookup : dico_br.value[x]

Our issue is that our dictionary is loaded 32 times in memory, and it
doesn't fit. So what we are doing is limiting the number of executors. It
works fine but we only have 8 cpus working in parallel instead of 32.

We would like to take advantage of multicore processing and shared memory,
as the 32 cores are in the same machine. For example we would like to load
the dictionary in memory 8 times only and make 4 cores share it. How could
we achieve that with Spark ?


What we have tried - without success :

1) One driver/worker with 32 cores : local[32]

2) Standalone with one master and 8 workers - each of them having 4 cores

Thanks a lot for your help, Clement


Re: Spark 1.5 UDAF ArrayType

2015-09-22 Thread Michael Armbrust
I think that you are hitting a bug (which should be fixed in Spark 1.5.1).
I'm hoping we can cut an RC for that this week.  Until then you could try
building branch-1.5.

On Tue, Sep 22, 2015 at 11:13 AM, Deenar Toraskar  wrote:

> Hi
>
> I am trying to write an UDAF ArraySum, that does element wise sum of
> arrays of Doubles returning an array of Double following the sample in
>
> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html.
> I am getting the following error. Any guidance on handle complex type in
> Spark SQL would be appreciated.
>
> Regards
> Deenar
>
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions._
>
> class ArraySum extends UserDefinedAggregateFunction {
>def inputSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>
>   def bufferSchema: StructType =
> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
>
>   def dataType: DataType = ArrayType(DoubleType, false)
>
>   def deterministic: Boolean = true
>
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer(0) = Nil
>   }
>
>   def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
> val currentSum : Seq[Double] = buffer.getSeq(0)
> val currentRow : Seq[Double] = input.getSeq(0)
> buffer(0) = (currentSum, currentRow) match {
>   case (Nil, Nil) => Nil
>   case (Nil, row) => row
>   case (sum, Nil) => sum
>   case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a +
> b }
>   // TODO handle different sizes arrays here
> }
>   }
>
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> val currentSum : Seq[Double] = buffer1.getSeq(0)
> val currentRow : Seq[Double] = buffer2.getSeq(0)
> buffer1(0) = (currentSum, currentRow) match {
>   case (Nil, Nil) => Nil
>   case (Nil, row) => row
>   case (sum, Nil) => sum
>   case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a +
> b }
>   // TODO handle different sizes arrays here
> }
>   }
>
>   def evaluate(buffer: Row): Any = {
> buffer.getSeq(0)
>   }
> }
>
> val arraySum = new ArraySum
> sqlContext.udf.register("ArraySum", arraySum)
>
> *%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
> '2015-05-22' limit 10*
>
> gives me the following error
>
>
> Error in SQL statement: SparkException: Job aborted due to stage failure:
> Task 0 in stage 219.0 failed 4 times, most recent failure: Lost task 0.3 in
> stage 219.0 (TID 11242, 10.172.255.236): java.lang.ClassCastException:
> scala.collection.mutable.WrappedArray$ofRef cannot be cast to
> org.apache.spark.sql.types.ArrayData at
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
> at
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
> at
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
> Source) at
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373)
> at
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:362)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:141)
> at
> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:30)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
> scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at
> scala.collection.Iterator$class.foreach(Iterator.scala:727) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157) at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(Spark

Spark 1.5 UDAF ArrayType

2015-09-22 Thread Deenar Toraskar
Hi

I am trying to write an UDAF ArraySum, that does element wise sum of arrays
of Doubles returning an array of Double following the sample in
https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html.
I am getting the following error. Any guidance on handle complex type in
Spark SQL would be appreciated.

Regards
Deenar

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

class ArraySum extends UserDefinedAggregateFunction {
   def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)

  def bufferSchema: StructType =
StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)

  def dataType: DataType = ArrayType(DoubleType, false)

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Nil
  }

  def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
val currentSum : Seq[Double] = buffer.getSeq(0)
val currentRow : Seq[Double] = input.getSeq(0)
buffer(0) = (currentSum, currentRow) match {
  case (Nil, Nil) => Nil
  case (Nil, row) => row
  case (sum, Nil) => sum
  case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a + b
}
  // TODO handle different sizes arrays here
}
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val currentSum : Seq[Double] = buffer1.getSeq(0)
val currentRow : Seq[Double] = buffer2.getSeq(0)
buffer1(0) = (currentSum, currentRow) match {
  case (Nil, Nil) => Nil
  case (Nil, row) => row
  case (sum, Nil) => sum
  case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a + b
}
  // TODO handle different sizes arrays here
}
  }

  def evaluate(buffer: Row): Any = {
buffer.getSeq(0)
  }
}

val arraySum = new ArraySum
sqlContext.udf.register("ArraySum", arraySum)

*%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
'2015-05-22' limit 10*

gives me the following error


Error in SQL statement: SparkException: Job aborted due to stage failure:
Task 0 in stage 219.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 219.0 (TID 11242, 10.172.255.236): java.lang.ClassCastException:
scala.collection.mutable.WrappedArray$ofRef cannot be cast to
org.apache.spark.sql.types.ArrayData at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
at
org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
at
org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
Source) at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:362)
at
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:141)
at
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:30)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at
scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at
scala.collection.Iterator$class.foreach(Iterator.scala:727) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at
scala.collection.AbstractIterator.to(Iterator.scala:1157) at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1839)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at
org.apache.spark.scheduler.Task.run(Task.scala:88) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(

Re: How to speed up MLlib LDA?

2015-09-22 Thread Marko Asplund
How optimized are the Commons math3 methods that showed up in profiling?
Are there any higher performance alternatives to these?

marko


Re: Has anyone used the Twitter API for location filtering?

2015-09-22 Thread Jo Sunad
Thanks Akhil, but I can't seem to get any tweets that include location
data. For example, when I do stream.filter(status =>
status.getPlace().getName) and run the code for 20 minutes I only get null
values.It seems like Twitter might purposely be removing the Place for free
users?



On Tue, Sep 22, 2015 at 2:20 AM, Akhil Das 
wrote:

> ​That's because sometime getPlace returns null and calling getLang over
> null throws up either null pointer exception or noSuchMethodError. You need
> to filter out those statuses which doesn't include location data.​
>
> Thanks
> Best Regards
>
> On Fri, Sep 18, 2015 at 12:46 AM, Jo Sunad  wrote:
>
>> I've been trying to filter for GeoLocation, Place or even Time Zone and I
>> keep getting null values. I think I got one Place in 20 minutes of the app
>> running (without any filters on tweets).
>>
>> Is this normal? Do I have to try querying rather than filtering?
>>
>> my code is following TD's example...
>>
>> val stream = TwitterUtils
>>
>> val hashtags = stream.map (status => status.getPlace().getName(),
>> status.getText())
>>
>> getText, getFollowers, etc all work fine, I just don't get anything
>> location based (and getLang() for some reason throws a noMethodError).
>>
>> Thanks for the help!
>>
>
>


Re: How to speed up MLlib LDA?

2015-09-22 Thread Charles Earl
It seems that the Vowpal Wabbit version is most similar to what is in

https://github.com/intel-analytics/TopicModeling/blob/master/src/main/scala/org/apache/spark/mllib/topicModeling/OnlineHDP.scala
Although the Intel seems to implement the Hierarchical Dirichlet Process
(topics and subtopics) as opposed to the implementation in VW, which is
based on
   https://www.cs.princeton.edu/~blei/papers/HoffmanBleiBach2010b.pdf
As opposed to Monte Carlo methods, in the HDP/VW they are using iterative
optimization of model parameters with respect predicted tokens (my best
shot at a one sentence).
The VW code is *highly* optimized.

https://github.com/JohnLangford/vowpal_wabbit/blob/master/vowpalwabbit/lda_core.cc
A fast inferencer for Spark LDA would be of great value.
C

On Tue, Sep 22, 2015 at 1:30 PM, Pedro Rodriguez 
wrote:

> I helped some with the LDA and worked quite a bit on a Gibbs version. I
> don't know if the Gibbs version might help, but since it is not (yet) in
> MLlib, Intel Analytics kindly created a spark package with their adapted
> version plus a couple other LDA algorithms:
> http://spark-packages.org/package/intel-analytics/TopicModeling
> https://github.com/intel-analytics/TopicModeling
>
> It might be worth trying out. Do you know what LDA algorithm VW uses?
>
> Pedro
>
>
> On Tue, Sep 22, 2015 at 1:54 AM, Marko Asplund 
> wrote:
>
>> Hi,
>>
>> I did some profiling for my LDA prototype code that requests topic
>> distributions from a model.
>> According to Java Mission Control more than 80 % of execution time during
>> sample interval is spent in the following methods:
>>
>> org.apache.commons.math3.util.FastMath.log(double); count: 337; 47.07%
>> org.apache.commons.math3.special.Gamma.digamma(double); count: 164; 22.91%
>> org.apache.commons.math3.util.FastMath.log(double, double[]); count: 50;
>> 6.98%
>> java.lang.Double.valueOf(double); count: 31; 4.33%
>>
>> Is there any way of using the API more optimally?
>> Are there any opportunities for optimising the "topicDistributions" code
>> path in MLlib?
>>
>> My code looks like this:
>>
>> // executed once
>> val model = LocalLDAModel.load(ctx, ModelFileName)
>>
>> // executed four times
>> val samples = Transformers.toSparseVectors(vocabularySize,
>> ctx.parallelize(Seq(input))) // fast
>> model.topicDistributions(samples.zipWithIndex.map(_.swap)) // <== this
>> seems to take about 4 seconds to execute
>>
>>
>> marko
>>
>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


-- 
- Charles


Re: spark + parquet + schema name and metadata

2015-09-22 Thread Cheng Lian
Michael reminded me that although we don't support direct manipulation 
over Parquet metadata, you can still save/query metadata to/from Parquet 
via DataFrame per-column metadata. For example:


import sqlContext.implicits._
import org.apache.spark.sql.types.MetadataBuilder

val path = "file:///tmp/parquet/meta"

// Saving metadata
val meta = new MetadataBuilder().putString("appVersion", "1.0.2").build()
sqlContext.range(10).select($"id".as("id", 
meta)).coalesce(1).write.mode("overwrite").parquet(path)


// Querying metadata
sqlContext.read.parquet(path).schema("id").metadata.getString("appVersion")

The metadata is saved together with Spark SQL schema as a JSON string. 
For example, the above code generates the following Parquet metadata 
(inspected with parquet-meta):


file: 
file:/private/tmp/parquet/meta/part-r-0-77cb2237-e6a8-4cb6-a452-ae205ba7b660.gz.parquet

creator: parquet-mr version 1.6.0
extra:   org.apache.spark.sql.parquet.row.metadata = 
{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,*"metadata":{"appVersion":"1.0.2"}*}]}


Cheng

On 9/22/15 9:37 AM, Cheng Lian wrote:

I see, this makes sense. We should probably add this in Spark SQL.

However, there's one corner case to note about user-defined Parquet 
metadata. When committing a write job, ParquetOutputCommitter writes 
Parquet summary files (_metadata and _common_metadata), and 
user-defined key-value metadata written in all Parquet part-files get 
merged here. The problem is that, if a single key is associated with 
multiple values, Parquet doesn't know how to reconcile this situation, 
and simply gives up writing summary files. This can be particular 
annoying for appending. In general, users should avoid storing 
"unstable" values like timestamps as Parquet metadata.


Cheng

On 9/22/15 1:58 AM, Borisa Zivkovic wrote:

thanks for answer.

I need this in order to be able to track schema metadata.

basically when I create parquet files from Spark I want to be able to 
"tag" them in some way (giving the schema appropriate name or 
attaching some key/values) and then it is fairly easy to get basic 
metadata about parquet files when processing and discovering those 
later on.


On Mon, 21 Sep 2015 at 18:17 Cheng Lian  wrote:

Currently Spark SQL doesn't support customizing schema name and
metadata. May I know why these two matters in your use case? Some
Parquet data models, like parquet-avro, do support it, while some
others
don't (e.g. parquet-hive).

Cheng

On 9/21/15 7:13 AM, Borisa Zivkovic wrote:
> Hi,
>
> I am trying to figure out how to write parquet metadata when
> persisting DataFrames to parquet using Spark (1.4.1)
>
> I could not find a way to change schema name (which seems to be
> hardcoded to root) and also how to add data to key/value
metadata in
> parquet footer.
>
> org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData
>
> org.apache.parquet.schema.Type#getName
>
> thanks
>
>







Re: Count for select not matching count for group by

2015-09-22 Thread Michael Armbrust
This looks like something is wrong with predicate pushdown.  Can you
include the output of calling explain, and tell us what format the data is
stored in?

On Mon, Sep 21, 2015 at 8:06 AM, Michael Kelly 
wrote:

> Hi,
>
> I'm seeing some strange behaviour with spark 1.5, I have a dataframe
> that I have built from loading and joining some hive tables stored in
> s3.
>
> The dataframe is cached in memory, using df.cache.
>
> What I'm seeing is that the counts I get when I do a group by on a
> column are different from what I get when I filter/select and count.
>
> df.select("outcome").groupBy("outcome").count.show
> outcome | count
> --
> 'A'   |  100
> 'B'   |  200
>
> df.filter("outcome = 'A'").count
> # 50
>
> df.filter(df("outcome") === "A").count
> # 50
>
> I expect the count of columns that match 'A' in the groupBy to match
> the count when filtering. Any ideas what might be happening?
>
> Thanks,
>
> Michael
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to speed up MLlib LDA?

2015-09-22 Thread Pedro Rodriguez
I helped some with the LDA and worked quite a bit on a Gibbs version. I
don't know if the Gibbs version might help, but since it is not (yet) in
MLlib, Intel Analytics kindly created a spark package with their adapted
version plus a couple other LDA algorithms:
http://spark-packages.org/package/intel-analytics/TopicModeling
https://github.com/intel-analytics/TopicModeling

It might be worth trying out. Do you know what LDA algorithm VW uses?

Pedro


On Tue, Sep 22, 2015 at 1:54 AM, Marko Asplund 
wrote:

> Hi,
>
> I did some profiling for my LDA prototype code that requests topic
> distributions from a model.
> According to Java Mission Control more than 80 % of execution time during
> sample interval is spent in the following methods:
>
> org.apache.commons.math3.util.FastMath.log(double); count: 337; 47.07%
> org.apache.commons.math3.special.Gamma.digamma(double); count: 164; 22.91%
> org.apache.commons.math3.util.FastMath.log(double, double[]); count: 50;
> 6.98%
> java.lang.Double.valueOf(double); count: 31; 4.33%
>
> Is there any way of using the API more optimally?
> Are there any opportunities for optimising the "topicDistributions" code
> path in MLlib?
>
> My code looks like this:
>
> // executed once
> val model = LocalLDAModel.load(ctx, ModelFileName)
>
> // executed four times
> val samples = Transformers.toSparseVectors(vocabularySize,
> ctx.parallelize(Seq(input))) // fast
> model.topicDistributions(samples.zipWithIndex.map(_.swap)) // <== this
> seems to take about 4 seconds to execute
>
>
> marko
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 208-340-1703
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Creating BlockMatrix with java API

2015-09-22 Thread Pulasthi Supun Wickramasinghe
Hi Yanbo,

Thanks for the reply. I thought i might be missing something. Anyway i
moved to using scala since it is the complete API.

Best Regards,
Pulasthi

On Tue, Sep 22, 2015 at 7:03 AM, Yanbo Liang  wrote:

> This is due to the distributed matrices like 
> BlockMatrix/RowMatrix/IndexedRowMatrix/CoordinateMatrix do
> not provide Java friendly constructors. I have file a SPARK-10757
>  to track this issue.
>
> 2015-09-18 3:36 GMT+08:00 Pulasthi Supun Wickramasinghe <
> pulasthi...@gmail.com>:
>
>> Hi All,
>>
>> I am new to Spark and i am trying to do some BlockMatrix operations with
>> the Mllib API's. But i can't seem to create a BlockMatrix with the java
>> API. I tried the following
>>
>> Matrix matrixa = Matrices.rand(4, 4, new Random(1000));
>> List,Matrix>> list = new 
>> ArrayList, Matrix>>();
>> Tuple2 intTuple = new Tuple2(0,0);
>> Tuple2,Matrix> tuple2MatrixTuple2 = new 
>> Tuple2, Matrix>(intTuple,matrixa );
>> list.add(tuple2MatrixTuple2);
>> JavaRDD, Matrix>> rdd = sc.parallelize(list);
>>
>> BlockMatrix blockMatrix = new BlockMatrix(rdd,2,2);
>>
>>
>> but since BlockMatrix only
>> takes 
>> "RDD,Matrix>>"
>> this code does not work. sc.parallelize() returns a JavaRDD so the two
>> are not compatible. I also couldn't find any code samples for this. Any
>> help on this would be highly appreciated.
>>
>> Best Regards,
>> Pulasthi
>> --
>> Pulasthi S. Wickramasinghe
>> Graduate Student  | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> cell: 224-386-9035
>>
>
>


-- 
Pulasthi S. Wickramasinghe
Graduate Student  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035


Re: SparkR - calling as.vector() with rdd dataframe causes error

2015-09-22 Thread Luciano Resende
localDF is a pure R data frame and as.vector will work with no problems, as
for calling it in the SparkR objects, try calling collect before you call
as.vector (or in your case, the algorithms), that should solve your problem.

On Mon, Sep 21, 2015 at 8:48 AM, Ellen Kraffmiller <
ellen.kraffmil...@gmail.com> wrote:

> Thank you for the link! I was using
> http://apache-spark-user-list.1001560.n3.nabble.com/, and I didn't see
> replies there.
>
> Regarding your code example, I'm doing the same thing and successfully
> creating the rdd, but the problem is that when I call a clustering
> algorithm like amap::hcluster(), I get an error from as.vector() that the
> rdd cannot be coerced into a vector.
>
> On Fri, Sep 18, 2015 at 12:33 PM, Luciano Resende 
> wrote:
>
>> I see the thread with all the responses on the bottom at mail-archive :
>>
>> https://www.mail-archive.com/user%40spark.apache.org/msg36882.html
>>
>> On Fri, Sep 18, 2015 at 7:58 AM, Ellen Kraffmiller <
>> ellen.kraffmil...@gmail.com> wrote:
>>
>>> Thanks for your response.  Is there a reason why this thread isn't
>>> appearing on the mailing list?  So far, I only see my post, with no
>>> answers, although I have received 2 answers via email.  It would be nice if
>>> other people could see these answers as well.
>>>
>>> On Thu, Sep 17, 2015 at 2:22 AM, Sun, Rui  wrote:
>>>
 The existing algorithms operating on R data.frame can't simply operate
 on SparkR DataFrame. They have to be re-implemented to be based on SparkR
 DataFrame API.

 -Original Message-
 From: ekraffmiller [mailto:ellen.kraffmil...@gmail.com]
 Sent: Thursday, September 17, 2015 3:30 AM
 To: user@spark.apache.org
 Subject: SparkR - calling as.vector() with rdd dataframe causes error

 Hi,
 I have a library of clustering algorithms that I'm trying to run in the
 SparkR interactive shell. (I am working on a proof of concept for a
 document classification tool.) Each algorithm takes a term document matrix
 in the form of a dataframe.  When I pass the method a local dataframe, the
 clustering algorithm works correctly, but when I pass it a spark rdd, it
 gives an error trying to coerce the data into a vector.  Here is the code,
 that I'm calling within SparkR:

 # get matrix from a file
 file <-

 "/Applications/spark-1.5.0-bin-hadoop2.6/examples/src/main/resources/matrix.csv"

 #read it into variable
  raw_data <- read.csv(file,sep=',',header=FALSE)

 #convert to a local dataframe
 localDF = data.frame(raw_data)

 # create the rdd
 rdd  <- createDataFrame(sqlContext,localDF)

 #call the algorithm with the localDF - this works result <-
 galileo(localDF, model='hclust',dist='euclidean',link='ward',K=5)

 #call with the rdd - this produces error result <- galileo(rdd,
 model='hclust',dist='euclidean',link='ward',K=5)

 Error in as.vector(data) :
   no method for coercing this S4 class to a vector


 I get the same error if I try to directly call as.vector(rdd) as well.

 Is there a reason why this works for localDF and not rdd?  Should I be
 doing something else to coerce the object into a vector?

 Thanks,
 Ellen



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-calling-as-vector-with-rdd-dataframe-causes-error-tp24717.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For
 additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>>
>> --
>> Luciano Resende
>> http://people.apache.org/~lresende
>> http://twitter.com/lresende1975
>> http://lresende.blogspot.com/
>>
>
>


-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: spark + parquet + schema name and metadata

2015-09-22 Thread Cheng Lian

I see, this makes sense. We should probably add this in Spark SQL.

However, there's one corner case to note about user-defined Parquet 
metadata. When committing a write job, ParquetOutputCommitter writes 
Parquet summary files (_metadata and _common_metadata), and user-defined 
key-value metadata written in all Parquet part-files get merged here. 
The problem is that, if a single key is associated with multiple values, 
Parquet doesn't know how to reconcile this situation, and simply gives 
up writing summary files. This can be particular annoying for appending. 
In general, users should avoid storing "unstable" values like timestamps 
as Parquet metadata.


Cheng

On 9/22/15 1:58 AM, Borisa Zivkovic wrote:

thanks for answer.

I need this in order to be able to track schema metadata.

basically when I create parquet files from Spark I want to be able to 
"tag" them in some way (giving the schema appropriate name or 
attaching some key/values) and then it is fairly easy to get basic 
metadata about parquet files when processing and discovering those 
later on.


On Mon, 21 Sep 2015 at 18:17 Cheng Lian > wrote:


Currently Spark SQL doesn't support customizing schema name and
metadata. May I know why these two matters in your use case? Some
Parquet data models, like parquet-avro, do support it, while some
others
don't (e.g. parquet-hive).

Cheng

On 9/21/15 7:13 AM, Borisa Zivkovic wrote:
> Hi,
>
> I am trying to figure out how to write parquet metadata when
> persisting DataFrames to parquet using Spark (1.4.1)
>
> I could not find a way to change schema name (which seems to be
> hardcoded to root) and also how to add data to key/value metadata in
> parquet footer.
>
> org.apache.parquet.hadoop.metadata.FileMetaData#getKeyValueMetaData
>
> org.apache.parquet.schema.Type#getName
>
> thanks
>
>





Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
The indices are definitely necessary. My first solution was just
reduceByKey { case (v, _) => v } and that didn't work. I needed to look at
both values and see which had the lower index.

On Tue, Sep 22, 2015 at 8:54 AM, Sean Owen  wrote:

> The point is that this only works if you already knew the file was
> presented in order within and across partitions, which was the
> original problem anyway. I don't think it is in general, but in
> practice, I do imagine it's already in the expected order from
> textFile. Maybe under the hood this ends up being ensured by
> TextInputFormat.
>
> So, adding the index and sorting on it doesn't add anything.
>
> On Tue, Sep 22, 2015 at 4:38 PM, Adrian Tanase  wrote:
> > just give zipWithIndex a shot, use it early in the pipeline. I think it
> > provides exactly the info you need, as the index is the original line
> number
> > in the file, not the index in the partition.
> >
> > Sent from my iPhone
> >
> > On 22 Sep 2015, at 17:50, Philip Weaver  wrote:
> >
> > Thanks. If textFile can be used in a way that preserves order, than both
> the
> > partition index and the index within each partition should be consistent,
> > right?
> >
> > I overcomplicated the question by asking about removing duplicates.
> > Fundamentally I think my question is, how does one sort lines in a file
> by
> > line number.
> >
> > On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase 
> wrote:
> >>
> >> By looking through the docs and source code, I think you can get away
> with
> >> rdd.zipWithIndex to get the index of each line in the file, as long as
> you
> >> define the parallelism upfront:
> >> sc.textFile("README.md", 4)
> >>
> >> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
> >> skimming through some tuples, hopefully this is clear enough.
> >>
> >> -adrian
> >>
> >> From: Philip Weaver
> >> Date: Tuesday, September 22, 2015 at 3:26 AM
> >> To: user
> >> Subject: Remove duplicate keys by always choosing first in file.
> >>
> >> I am processing a single file and want to remove duplicate rows by some
> >> key by always choosing the first row in the file for that key.
> >>
> >> The best solution I could come up with is to zip each row with the
> >> partition index and local index, like this:
> >>
> >> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
> >>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> >> ((partitionIndex, localIndex), row)) }
> >> }
> >>
> >>
> >> And then using reduceByKey with a min ordering on the (partitionIndex,
> >> localIndex) pair.
> >>
> >> First, can i count on SparkContext.textFile to read the lines in such
> that
> >> the partition indexes are always increasing so that the above works?
> >>
> >> And, is there a better way to accomplish the same effect?
> >>
> >> Thanks!
> >>
> >> - Philip
> >>
> >
>


Re: Py4j issue with Python Kafka Module

2015-09-22 Thread Saisai Shao
I think you're using the wrong version of kafka assembly jar, I think
Python API from direct Kafka stream is not supported for Spark 1.3.0, you'd
better change to version 1.5.0, looks like you're using Spark 1.5.0, why
you choose Kafka assembly 1.3.0?


D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar



On Tue, Sep 22, 2015 at 6:41 AM, ayan guha  wrote:

> Hi
>
> I have added spark assembly jar to SPARK CLASSPATH
>
> >>> print os.environ['SPARK_CLASSPATH']
> D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar
>
>
> Now  I am facing below issue with a test topic
>
> >>> ssc = StreamingContext(sc, 2)
> >>> kvs =
> KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
> ocalhost:9092'})
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
> \streaming\kafka.py", line 126, in createDirectStream
> jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
> set(topics), jfr
> omOffsets)
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
> j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
> \sql\utils.py", line 36, in deco
> return f(*a, **kw)
>   File
> "D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
> j-0.8.2.1-src.zip\py4j\protocol.py", line 304, in get_return_value
> py4j.protocol.Py4JError: An error occurred while calling
> o22.createDirectStream.
>  Trace:
> py4j.Py4JException: Method createDirectStream([class
> org.apache.spark.streaming.
> api.java.JavaStreamingContext, class java.util.HashMap, class
> java.util.HashSet,
>  class java.util.HashMap]) does not exist
> at
> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
>
> at
> py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
>
> at py4j.Gateway.invoke(Gateway.java:252)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Unknown Source)
>
>
> >>>
>
> Am I doing something wrong?
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Using Spark for portfolio manager app

2015-09-22 Thread Thúy Hằng Lê
That's great answer Andrian.
I find a lots of information here. I have direction for application now, i
will try your suggestion :)

Vào Thứ Ba, ngày 22 tháng 9 năm 2015, Adrian Tanase  đã
viết:

>
>1. reading from kafka has exactly once guarantees - we are using it in
>production today (with the direct receiver)
>1. ​you will probably have 2 topics, loading both into spark and
>   joining / unioning as needed is not an issue
>   2. tons of optimizations you can do there, assuming everything else
>   works
>2. ​for ad-hoc query I would say you absolutely need to look at
>external storage
>1. ​querying the Dstream or spark's RDD's directly should be done
>   mostly for aggregates/metrics, not by users
>   2. if you look at HBase or Cassandra for storage then 50k
>   writes /sec are not a problem at all, especially combined with a smart
>   client that does batch puts (like async hbase
>   )
>   3. you could also consider writing the updates to another kafka
>   topic and have  a different component that updates the DB, if you think 
> of
>   other optimisations there
>3. ​by stats I assume you mean metrics (operational or business)
>1. ​there are multiple ways to do this, however I would not encourage
>   you to query spark directly, especially if you need an archive/history 
> of
>   your datapoints
>   2. we are using OpenTSDB (we already have a HBase cluster) +
>   Grafana for dashboarding
>   3. collecting the metrics is a bit hairy in a streaming app - we
>   have experimented with both accumulators and RDDs specific for metrics -
>   chose the RDDs that write to OpenTSDB using foreachRdd
>
> ​-adrian
>
> --
> *From:* Thúy Hằng Lê  >
> *Sent:* Sunday, September 20, 2015 7:26 AM
> *To:* Jörn Franke
> *Cc:* user@spark.apache.org
> 
> *Subject:* Re: Using Spark for portfolio manager app
>
> Thanks Adrian and Jorn for the answers.
>
> Yes, you're right there are lot of things I need to consider if I want to
> use Spark for my app.
>
> I still have few concerns/questions from your information:
>
> 1/ I need to combine trading stream with tick stream, I am planning to use
> Kafka for that
> If I am using approach #2 (Direct Approach) in this tutorial
> https://spark.apache.org/docs/latest/streaming-kafka-integration.html
> 
> Spark Streaming + Kafka Integration Guide - Spark 1.4.1 ...
> Spark Streaming + Kafka Integration Guide. Apache Kafka is
> publish-subscribe messaging rethought as a distributed, partitioned,
> replicated commit log service.
> Read more...
> 
>
> Will I receive exactly one semantics? Or I have to add some logic in my
> code to archive that.
> As your suggestion of using delta update, exactly one semantic is required
> for this application.
>
> 2/ For ad-hoc query, I must output of Spark to external storage and query
> on that right?
> Is there any way to do ah-hoc query on Spark? my application could have
> 50k updates per second at pick time.
> Persistent to external storage lead to high latency in my app.
>
> 3/ How to get real-time statistics from Spark,
> In  most of the Spark streaming examples, the statistics are echo to the
> stdout.
> However, I want to display those statics on GUI, is there any way to
> retrieve data from Spark directly without using external Storage?
>
>
> 2015-09-19 16:23 GMT+07:00 Jörn Franke  >:
>
>> If you want to be able to let your users query their portfolio then you
>> may want to think about storing the current state of the portfolios in
>> hbase/phoenix or alternatively a cluster of relationaldatabases can make
>> sense. For the rest you may use Spark.
>>
>> Le sam. 19 sept. 2015 à 4:43, Thúy Hằng Lê > > a écrit :
>>
>>> Hi all,
>>>
>>> I am going to build a financial application for Portfolio Manager, where
>>> each portfolio contains a list of stocks, the number of shares purchased,
>>> and the purchase price.
>>> Another source of information is stocks price from market data. The
>>> application need to calculate real-time gain or lost of each stock in each
>>> portfolio ( compared to the purchase price).
>>>
>>> I am new with Spark, i know using Spark Streaming I can aggregate
>>> portfolio possitions in real-time, for example:
>>> user A contains:
>>>   - 100 IBM stock with transactionValue=$15000
>>>   - 500 AAPL stock with transactionValue=$11400
>>>
>>> Now given the stock prices change in real-time too, e.g if IBM price at
>>> 151, i want to update the gain or lost of it: gainOrLost(IBM) = 151*100 -
>>> 15000 = $100
>>>
>>> My questions are:
>>>
>>>  * What is the best method to combine 2 real-time streams(
>>> transaction made by user and market pricing data) in Spark.
>>> 

Re: Spark Web UI + NGINX

2015-09-22 Thread Ruslan Dautkhanov
It should be really simple to setup..

Check this Hue + NGINX setup page
http://gethue.com/using-nginx-to-speed-up-hue-3-8-0/

In that config file change
1)
> server_name NGINX_HOSTNAME;
to "Machine A, with a public IP"
2)
> server HUE_HOST1: max_fails=3;
> server HUE_HOST2: max_fails=3;
to "Machine B, where Spark is installed"
3)
You may want to adjust "location /static/" that fits your Spark Web UI..
4)
With a few more config lines you can setup SSL offloading too.



-- 
Ruslan Dautkhanov

On Thu, Sep 17, 2015 at 3:06 AM, Renato Perini 
wrote:

> Hello!
> I'm trying to set up a reverse proxy (using nginx) for the Spark Web UI.
> I have 2 machines:
>1) Machine A, with a public IP. This machine will be used to access
> Spark Web UI on the Machine B through its private IP address.
>2) Machine B, where Spark is installed (standalone master cluster, 1
> worker node and the history server) not accessible from the outside.
>
> Basically I want to access the Spark Web UI through my Machine A using the
> URL:
> http://machine_A_ip_address/spark
>
> Any advised setup for Spark Web UI + nginx?
>
> Thank you.
>
>
>


Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Sean Owen
The point is that this only works if you already knew the file was
presented in order within and across partitions, which was the
original problem anyway. I don't think it is in general, but in
practice, I do imagine it's already in the expected order from
textFile. Maybe under the hood this ends up being ensured by
TextInputFormat.

So, adding the index and sorting on it doesn't add anything.

On Tue, Sep 22, 2015 at 4:38 PM, Adrian Tanase  wrote:
> just give zipWithIndex a shot, use it early in the pipeline. I think it
> provides exactly the info you need, as the index is the original line number
> in the file, not the index in the partition.
>
> Sent from my iPhone
>
> On 22 Sep 2015, at 17:50, Philip Weaver  wrote:
>
> Thanks. If textFile can be used in a way that preserves order, than both the
> partition index and the index within each partition should be consistent,
> right?
>
> I overcomplicated the question by asking about removing duplicates.
> Fundamentally I think my question is, how does one sort lines in a file by
> line number.
>
> On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase  wrote:
>>
>> By looking through the docs and source code, I think you can get away with
>> rdd.zipWithIndex to get the index of each line in the file, as long as you
>> define the parallelism upfront:
>> sc.textFile("README.md", 4)
>>
>> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
>> skimming through some tuples, hopefully this is clear enough.
>>
>> -adrian
>>
>> From: Philip Weaver
>> Date: Tuesday, September 22, 2015 at 3:26 AM
>> To: user
>> Subject: Remove duplicate keys by always choosing first in file.
>>
>> I am processing a single file and want to remove duplicate rows by some
>> key by always choosing the first row in the file for that key.
>>
>> The best solution I could come up with is to zip each row with the
>> partition index and local index, like this:
>>
>> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
>> ((partitionIndex, localIndex), row)) }
>> }
>>
>>
>> And then using reduceByKey with a min ordering on the (partitionIndex,
>> localIndex) pair.
>>
>> First, can i count on SparkContext.textFile to read the lines in such that
>> the partition indexes are always increasing so that the above works?
>>
>> And, is there a better way to accomplish the same effect?
>>
>> Thanks!
>>
>> - Philip
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Apache Spark job in local[*] is slower than regular 1-thread Python program

2015-09-22 Thread juljoin
Hello,

I am trying to figure Spark out and I still have some problems with its
speed, I can't figure them out. In short, I wrote two programs that loop
through a 3.8Gb file and filter each line depending of if a certain word is
present. 

I wrote a one-thread python program doing the job and I obtain:
- for the 3.8Gb file:
/ lines found: 82100
 in: *10.54 seconds*/
 - no filter, just looping through the file:
/ in: 01.65 seconds/

The Spark app doing the same and executed on 8 threads gives:
 - for the 3.8Gb file:
/ lines found: 82100
  
 in: *18.27 seconds*/
 - for a 38Mb file:
/lines found: 821
in: 2.53 seconds/

I must do something wrong to obtain a result twice as slow on the 8 threads
than on 1 thread. 

1. First, I thought it might be because of the setting-up cost of Spark. But
for smaller files it only takes 2 seconds which makes this option
improbable. 
2. Looping through the file takes up 1.65 seconds (thank you SSD ^_^ ),
processing takes up the other 9seconds (for the python app). 
-> This is why I thought splitting it up on the different processes will
definitely speed it up.

Note: Increasing the number of threads in Spark improves the speed (from 57
seconds with 1 thread to 18 seconds with 8 threads). But still, there is a
big difference in performance between simple python and Spark, it must be my
doing!

Can someone point me out on what I am doing wrong? That would be greatly
appreciated :) I am new with all this big data stuff.



*Here is the code for the Spark app:*





*And the python code:*



Thank you for reading up to this point :)

Have a nice day!


- Julien





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-job-in-local-is-slower-than-regular-1-thread-Python-program-tp24771.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
I have used the mapPartitionsWithIndex/zipWithIndex solution and so far it
has done the correct thing.

On Tue, Sep 22, 2015 at 8:38 AM, Adrian Tanase  wrote:

> just give zipWithIndex a shot, use it early in the pipeline. I think it
> provides exactly the info you need, as the index is the original line
> number in the file, not the index in the partition.
>
> Sent from my iPhone
>
> On 22 Sep 2015, at 17:50, Philip Weaver  wrote:
>
> Thanks. If textFile can be used in a way that preserves order, than both
> the partition index and the index within each partition should be
> consistent, right?
>
> I overcomplicated the question by asking about removing duplicates.
> Fundamentally I think my question is, how does one sort lines in a file by
> line number.
>
> On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase  wrote:
>
>> By looking through the docs and source code, I think you can get away
>> with rdd.zipWithIndex to get the index of each line in the file, as long
>> as you define the parallelism upfront:
>> sc.textFile("README.md", 4)
>>
>> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
>> skimming through some tuples, hopefully this is clear enough.
>>
>> -adrian
>>
>> From: Philip Weaver
>> Date: Tuesday, September 22, 2015 at 3:26 AM
>> To: user
>> Subject: Remove duplicate keys by always choosing first in file.
>>
>> I am processing a single file and want to remove duplicate rows by some
>> key by always choosing the first row in the file for that key.
>>
>> The best solution I could come up with is to zip each row with the
>> partition index and local index, like this:
>>
>> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
>> ((partitionIndex, localIndex), row)) }
>> }
>>
>>
>> And then using reduceByKey with a min ordering on the (partitionIndex,
>> localIndex) pair.
>>
>> First, can i count on SparkContext.textFile to read the lines in such
>> that the partition indexes are always increasing so that the above works?
>>
>> And, is there a better way to accomplish the same effect?
>>
>> Thanks!
>>
>> - Philip
>>
>>
>


Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Adrian Tanase
just give zipWithIndex a shot, use it early in the pipeline. I think it 
provides exactly the info you need, as the index is the original line number in 
the file, not the index in the partition.

Sent from my iPhone

On 22 Sep 2015, at 17:50, Philip Weaver 
mailto:philip.wea...@gmail.com>> wrote:

Thanks. If textFile can be used in a way that preserves order, than both the 
partition index and the index within each partition should be consistent, right?

I overcomplicated the question by asking about removing duplicates. 
Fundamentally I think my question is, how does one sort lines in a file by line 
number.

On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase 
mailto:atan...@adobe.com>> wrote:
By looking through the docs and source code, I think you can get away with 
rdd.zipWithIndex to get the index of each line in the file, as long as you 
define the parallelism upfront:
sc.textFile("README.md", 4)

You can then just do .groupBy(...).mapValues(_.sortBy(...).head) - I'm skimming 
through some tuples, hopefully this is clear enough.

-adrian

From: Philip Weaver
Date: Tuesday, September 22, 2015 at 3:26 AM
To: user
Subject: Remove duplicate keys by always choosing first in file.

I am processing a single file and want to remove duplicate rows by some key by 
always choosing the first row in the file for that key.

The best solution I could come up with is to zip each row with the partition 
index and local index, like this:

rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
  rows.zipWithIndex.map { case (row, localIndex) => (row.key, ((partitionIndex, 
localIndex), row)) }
}

And then using reduceByKey with a min ordering on the (partitionIndex, 
localIndex) pair.

First, can i count on SparkContext.textFile to read the lines in such that the 
partition indexes are always increasing so that the above works?

And, is there a better way to accomplish the same effect?

Thanks!

- Philip




Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Sean Owen
I don't know of a way to do this, out of the box, without maybe
digging into custom InputFormats. The RDD from textFile doesn't have
an ordering. I can't imagine a world in which partitions weren't
iterated in line order, of course, but there's also no real guarantee
about ordering among partitions.

On Tue, Sep 22, 2015 at 3:50 PM, Philip Weaver  wrote:
> I overcomplicated the question by asking about removing duplicates.
> Fundamentally I think my question is, how does one sort lines in a file by
> line number.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help getting started with Kafka

2015-09-22 Thread Yana Kadiyska
Thanks a lot Cody! I was punting on the decoders by calling count (or
trying to, since my types require a custom decoder) but your sample code is
exactly what I was trying to achieve. The error message threw me off, will
work on the decoders now

On Tue, Sep 22, 2015 at 10:50 AM, Cody Koeninger  wrote:

> You need type parameters for the call to createRDD indicating the type of
> the key / value and the decoder to use for each.
>
> See
>
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/BasicRDD.scala
>
> Also, you need to check to see if offsets 0 through 100 are still actually
> present in the kafka logs.
>
> On Tue, Sep 22, 2015 at 9:38 AM, Yana Kadiyska 
> wrote:
>
>> Hi folks, I'm trying to write a simple Spark job that dumps out a Kafka
>> queue into HDFS. Being very new to Kafka, not sure if I'm messing something
>> up on that side...My hope is to read the messages presently in the queue
>> (or at least the first 100 for now)
>>
>> Here is what I have:
>> Kafka side:
>>
>>  ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ingress 
>> --broker-list IP1:9092,IP2:9092,IP3:9092 --time -1
>> ingress:0:34386
>> ingress:1:34148
>> ingress:2:34300
>>
>> ​
>>
>> On Spark side I'm trying this(1.4.1):
>>
>> bin/spark-shell --jars
>> kafka-clients-0.8.2.0.jar,spark-streaming-kafka_2.10-1.4.1.jar,kafka_2.10-0.8.2.0.jar,metrics-core-2.2.0.ja
>>
>>
>>
>> val brokers="IP1:9092,IP2:9092,IP3:9092" //same as IPs above
>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>>
>> val offsetRange= (0 to 2).map(part=>OffsetRange.create("ingress",part,0,100))
>> val messages= KafkaUtils.createRDD(sc,kafkaParams,offsetRange.toArray)
>> messages: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = KafkaRDD[1] at RDD 
>> at KafkaRDD.scala:45
>>
>> ​
>>
>> when I try messages.count I get:
>>
>> 15/09/22 14:01:17 ERROR TaskContextImpl: Error in TaskCompletionListener
>> java.lang.NullPointerException
>>  at 
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:157)
>>  at 
>> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>>  at 
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
>>  at 
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
>>  at 
>> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:56)
>>  at 
>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:75)
>>  at 
>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
>>  at 
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>  at 
>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:73)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:72)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>


RE: spark-avro takes a lot time to load thousands of files

2015-09-22 Thread java8964
Your performance problem sounds like in the driver, which is trying to 
boardcast 10k files by itself alone, which becomes the bottle neck.
What you wants is just transfer the data from AVRO format per file to another 
format. In MR, most likely each mapper process one file, and you utilized the 
whole cluster, instead of just using the Driver in MR.
Not sure exactly how to help you, but to do that in the Spark:
1) Disable the boardcast from the driver, let the each task in the Spark to 
process one file. Maybe use something like hadoop NLineInputFormat, which 
including all the filenames of your data, so each Spark task will receive the 
HDFS location of each file, then start the transform logic. In this case, you 
concurrently transform all your small files by using all the available cores of 
your executors.2) If above sounds too complex, you need to find the way to 
disable boardcasting small files from the Spark Driver. This sounds like a good 
normal way to handle small files, but I cannot find a configuration to force 
spark disable it.
Yong

From: daniel.ha...@veracity-group.com
Subject: Re: spark-avro takes a lot time to load thousands of files
Date: Tue, 22 Sep 2015 16:54:26 +0300
CC: user@spark.apache.org
To: jcove...@gmail.com

I Agree but it's a constraint I have to deal with.The idea is load these files 
and merge them into ORC.When using hive on Tez it takes less than a minute. 
Daniel
On 22 בספט׳ 2015, at 16:00, Jonathan Coveney  wrote:

having a file per record is pretty inefficient on almost any file system

El martes, 22 de septiembre de 2015, Daniel Haviv 
 escribió:
Hi,We are trying to load around 10k avro files (each file holds only one 
record) using spark-avro but it takes over 15 minutes to load.It seems that 
most of the work is being done at the driver where it created a broadcast 
variable for each file.
Any idea why is it behaving that way ?Thank you.Daniel

  

Re: Help getting started with Kafka

2015-09-22 Thread Cody Koeninger
You need type parameters for the call to createRDD indicating the type of
the key / value and the decoder to use for each.

See

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/BasicRDD.scala

Also, you need to check to see if offsets 0 through 100 are still actually
present in the kafka logs.

On Tue, Sep 22, 2015 at 9:38 AM, Yana Kadiyska 
wrote:

> Hi folks, I'm trying to write a simple Spark job that dumps out a Kafka
> queue into HDFS. Being very new to Kafka, not sure if I'm messing something
> up on that side...My hope is to read the messages presently in the queue
> (or at least the first 100 for now)
>
> Here is what I have:
> Kafka side:
>
>  ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ingress 
> --broker-list IP1:9092,IP2:9092,IP3:9092 --time -1
> ingress:0:34386
> ingress:1:34148
> ingress:2:34300
>
> ​
>
> On Spark side I'm trying this(1.4.1):
>
> bin/spark-shell --jars
> kafka-clients-0.8.2.0.jar,spark-streaming-kafka_2.10-1.4.1.jar,kafka_2.10-0.8.2.0.jar,metrics-core-2.2.0.ja
>
>
>
> val brokers="IP1:9092,IP2:9092,IP3:9092" //same as IPs above
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>
> val offsetRange= (0 to 2).map(part=>OffsetRange.create("ingress",part,0,100))
> val messages= KafkaUtils.createRDD(sc,kafkaParams,offsetRange.toArray)
> messages: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = KafkaRDD[1] at RDD 
> at KafkaRDD.scala:45
>
> ​
>
> when I try messages.count I get:
>
> 15/09/22 14:01:17 ERROR TaskContextImpl: Error in TaskCompletionListener
> java.lang.NullPointerException
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:157)
>   at 
> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
>   at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
>   at 
> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:56)
>   at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:75)
>   at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>   at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:73)
>   at org.apache.spark.scheduler.Task.run(Task.scala:72)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
>
>


Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
Thanks. If textFile can be used in a way that preserves order, than both
the partition index and the index within each partition should be
consistent, right?

I overcomplicated the question by asking about removing duplicates.
Fundamentally I think my question is, how does one sort lines in a file by
line number.

On Tue, Sep 22, 2015 at 6:15 AM, Adrian Tanase  wrote:

> By looking through the docs and source code, I think you can get away with
> rdd.zipWithIndex to get the index of each line in the file, as long as
> you define the parallelism upfront:
> sc.textFile("README.md", 4)
>
> You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m
> skimming through some tuples, hopefully this is clear enough.
>
> -adrian
>
> From: Philip Weaver
> Date: Tuesday, September 22, 2015 at 3:26 AM
> To: user
> Subject: Remove duplicate keys by always choosing first in file.
>
> I am processing a single file and want to remove duplicate rows by some
> key by always choosing the first row in the file for that key.
>
> The best solution I could come up with is to zip each row with the
> partition index and local index, like this:
>
> rdd.mapPartitionsWithIndex { case (partitionIndex, rows) =>
>   rows.zipWithIndex.map { case (row, localIndex) => (row.key,
> ((partitionIndex, localIndex), row)) }
> }
>
>
> And then using reduceByKey with a min ordering on the (partitionIndex,
> localIndex) pair.
>
> First, can i count on SparkContext.textFile to read the lines in such that
> the partition indexes are always increasing so that the above works?
>
> And, is there a better way to accomplish the same effect?
>
> Thanks!
>
> - Philip
>
>


Error while saving parquet

2015-09-22 Thread gtinside
Please refer to the code snippet below . I get following error

*/tmp/temp/trade.parquet/part-r-00036.parquet is not a Parquet file.
expected magic number at tail [80, 65, 82, 49] but found [20, -28, -93, 93]
 at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:494)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$readMetaData$3.apply(ParquetTypes.scala:494)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.readMetaData(ParquetTypes.scala:494)
at
org.apache.spark.sql.parquet.ParquetTypesConverter$.readSchemaFromFile(ParquetTypes.scala:515)
at
org.apache.spark.sql.parquet.ParquetRelation.(ParquetRelation.scala:67)
at org.apache.spark.sql.SQLContext.parquetFile(SQLContext.scala:542)
at
com.bfm.spark.data.handlers.input.InputFormatRegistry$.registerTable(InputFormatRegistry.scala:42)
at
com.bfm.spark.data.handlers.input.CassandraInputHandler.handleConfiguration(CassandraInputHandler.scala:43)
at
com.bfm.spark.data.handlers.input.CassandraInputHandler.handleConfiguration(CassandraInputHandler.scala:21)
at
com.bfm.spark.data.services.CompositeConfigurationHandler$$anonfun$handleConfiguration$1.apply(CompositeConfigurationHandler.scala:18)
at
com.bfm.spark.data.services.CompositeConfigurationHandler$$anonfun$handleConfiguration$1.apply(CompositeConfigurationHandler.scala:15)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
com.bfm.spark.data.services.CompositeConfigurationHandler.handleConfiguration(CompositeConfigurationHandler.scala:15)
at
com.bfm.spark.data.services.CompositeConfigurationHandler$$anonfun$handleConfiguration$1.apply(CompositeConfigurationHandler.scala:18)
at
com.bfm.spark.data.services.CompositeConfigurationHandler$$anonfun$handleConfiguration$1.apply(CompositeConfigurationHandler.scala:15)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
com.bfm.spark.data.services.CompositeConfigurationHandler.handleConfiguration(CompositeConfigurationHandler.scala:15)
at
com.bfm.spark.data.services.ConfigurationHandlerService.execute(ConfigurationHandlerService.scala:43)
at
com.bfm.spark.data.scheduler.DataLoadingScheduler$$anonfun$scheduleJobsByHour$2.apply(DataLoadingScheduler.scala:45)
at
com.bfm.spark.data.scheduler.DataLoadingScheduler$$anonfun$scheduleJobsByHour$2.apply(DataLoadingScheduler.scala:45)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at
com.bfm.spark.data.scheduler.DataLoadingScheduler.scheduleJobsByHour(DataLoadingScheduler.scala:45)
at
com.bfm.spark.data.scheduler.DataLoadingScheduler.everyThreeHours(DataLoadingScheduler.scala:20)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
*

 import com.datastax.spark.connector._ 
 import com.google.gson.GsonBuilder
 import scala.collection.mutable._ 
 import scala.util._  

 case class Trade(org_ :String, fund:Int, invnum:Int, touch_count:Int,
blob:String) 
 
 val rdd = sc.cassandraTable[Trade]( "TEST", "trade")
 val filteredRDD = rdd.filter(data => data.org_.equals("DEV")) 
 val cassandraRDD = rdd.map(data => data.blob) 
 sqlContext.jsonRDD(cassandraRDD, 0.01).registerTempTable("trade") 
 sqlContext.sql("select * from
trade").repartition(1).saveAsParquetFile("/tmp/temp/trade1.parquet")

If I don't do the repartition, this works fine. Am I missing something here
? I am using Spark -1.3.1

Regards,
Gaurav





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-saving-parquet-tp24770.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Help getting started with Kafka

2015-09-22 Thread Yana Kadiyska
Hi folks, I'm trying to write a simple Spark job that dumps out a Kafka
queue into HDFS. Being very new to Kafka, not sure if I'm messing something
up on that side...My hope is to read the messages presently in the queue
(or at least the first 100 for now)

Here is what I have:
Kafka side:

 ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic ingress
--broker-list IP1:9092,IP2:9092,IP3:9092 --time -1
ingress:0:34386
ingress:1:34148
ingress:2:34300

​

On Spark side I'm trying this(1.4.1):

bin/spark-shell --jars
kafka-clients-0.8.2.0.jar,spark-streaming-kafka_2.10-1.4.1.jar,kafka_2.10-0.8.2.0.jar,metrics-core-2.2.0.ja



val brokers="IP1:9092,IP2:9092,IP3:9092" //same as IPs above
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)

val offsetRange= (0 to 2).map(part=>OffsetRange.create("ingress",part,0,100))
val messages= KafkaUtils.createRDD(sc,kafkaParams,offsetRange.toArray)
messages: org.apache.spark.rdd.RDD[(Nothing, Nothing)] = KafkaRDD[1]
at RDD at KafkaRDD.scala:45

​

when I try messages.count I get:

15/09/22 14:01:17 ERROR TaskContextImpl: Error in TaskCompletionListener
java.lang.NullPointerException
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:157)
at 
org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:100)
at 
org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:56)
at 
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:75)
at 
org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:73)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:73)
at org.apache.spark.scheduler.Task.run(Task.scala:72)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Re: Invalid checkpoint url

2015-09-22 Thread srungarapu vamsi
Yes, I tried ssc.checkpoint("checkpoint"), it works for me as long as i
don't use reduceByKeyAndWindow.

When i start using "reduceByKeyAndWindow" it complains me with the error
"Exception in thread "main" org.apache.spark.SparkException: Invalid
checkpoint directory: file:/home/ubuntu/checkpoint/342e3171-01f3-48$
2-97be-e3862eb5c944/rdd-8"

The stack trace is as below:

Exception in thread "main" org.apache.spark.SparkException: Invalid
checkpoint directory: file:/home/ubuntu/checkpoint/342e3171[22/9706$
2-97be-e3862eb5c944/rdd-8
at
org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:97)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1415)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1417)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1417)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1468)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1483)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1504)
at
com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33)
at
com.datastax.spark.connector.streaming.DStreamFunctions$$anonfun$saveToCassandra$1.apply(DStreamFunctions.scala:33)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.Thread

RE: Performance Spark SQL vs Dataframe API faster

2015-09-22 Thread Cheng, Hao
Yes, should be the same, as they are just different frontend, but the same 
thing in optimization / execution.

-Original Message-
From: sanderg [mailto:s.gee...@wimionline.be] 
Sent: Tuesday, September 22, 2015 10:06 PM
To: user@spark.apache.org
Subject: Performance Spark SQL vs Dataframe API faster

Is there a difference in performance between writing a spark job using only SQL 
statements and writing it using the dataframe api or does it translate to the 
same thing under the hood?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-Spark-SQL-vs-Dataframe-API-faster-tp24768.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark on mesos gets killed by cgroups for too much memory

2015-09-22 Thread oggie
I'm using spark 1.2.2 on mesos 0.21

I have a java job that is submitted to mesos from marathon. 

I also have cgroups configured for mesos on each node. Even though the job,
when running, uses 512MB, it tries to take over 3GB at startup and is killed
by cgroups.

When I start mesos-slave, It's started like this (we use supervisord):
command=/usr/sbin/mesos-slave --disk_watch_interval=10secs
--gc_delay=480mins --isolation=cgroups/cpu,cgroups/mem
--cgroups_hierarchy=/cgroup --resources="mem(*
):3000;cpus(*):2;ports(*):[25000-3];disk(*):5000" --cgroups_root=mesos
--master=zk://prodMesosMaster01:2181,prodMesosMaster02:2181,prodMesosMaster03:2181/me
sos --work_dir=/tmp/mesos --log_dir=/var/log/mesos

In cgconfig.conf:
memory.limit_in_bytes="3221225472";

spark-submit from marathon:
bin/spark-submit --executor-memory 128m --master
mesos://zk://prodMesosMaster01:2181,prodMesosMaster02:2181,prodMesosMaster03:2181/mesos
--class com.company.alert.AlertConsumer AlertConsumer.jar --zk
prodMesosMaster01:2181,prodMesosMaster02:2181,prodMesosMaster03:2181 --mesos
mesos://zk://prodMesosMaster01:2181,prodMesosMaster02:2181,prodMesosMaster03:2181/mesos
--spark_executor_uri
http://prodmesosfileserver01/spark-dist/1.2.2/spark-dist-1.2.2.tgz

We increased the cgroup limit to 6GB and the memory resources from 3000 to
6000 for the startup of mesos and now cgroups doesn't kill the job anymore.

But the question is, how do I limit the start of the job so it isn't trying
to take 3GB, even if when running it's only using 512MB? 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-on-mesos-gets-killed-by-cgroups-for-too-much-memory-tp24769.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Performance Spark SQL vs Dataframe API faster

2015-09-22 Thread sanderg
Is there a difference in performance between writing a spark job using only
SQL statements and writing it using the dataframe api or does it translate
to the same thing under the hood?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-Spark-SQL-vs-Dataframe-API-faster-tp24768.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-avro takes a lot time to load thousands of files

2015-09-22 Thread Daniel Haviv
I Agree but it's a constraint I have to deal with.
The idea is load these files and merge them into ORC.
When using hive on Tez it takes less than a minute. 

Daniel

> On 22 בספט׳ 2015, at 16:00, Jonathan Coveney  wrote:
> 
> having a file per record is pretty inefficient on almost any file system
> 
> El martes, 22 de septiembre de 2015, Daniel Haviv 
>  escribió:
>> Hi,
>> We are trying to load around 10k avro files (each file holds only one 
>> record) using spark-avro but it takes over 15 minutes to load.
>> It seems that most of the work is being done at the driver where it created 
>> a broadcast variable for each file.
>> 
>> Any idea why is it behaving that way ?
>> Thank you.
>> Daniel


Py4j issue with Python Kafka Module

2015-09-22 Thread ayan guha
Hi

I have added spark assembly jar to SPARK CLASSPATH

>>> print os.environ['SPARK_CLASSPATH']
D:\sw\spark-streaming-kafka-assembly_2.10-1.3.0.jar


Now  I am facing below issue with a test topic

>>> ssc = StreamingContext(sc, 2)
>>> kvs =
KafkaUtils.createDirectStream(ssc,['spark'],{"metadata.broker.list":'l
ocalhost:9092'})
Traceback (most recent call last):
  File "", line 1, in 
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
\streaming\kafka.py", line 126, in createDirectStream
jstream = helper.createDirectStream(ssc._jssc, kafkaParams,
set(topics), jfr
omOffsets)
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\pyspark
\sql\utils.py", line 36, in deco
return f(*a, **kw)
  File
"D:\sw\spark-1.5.0-bin-hadoop2.6\spark-1.5.0-bin-hadoop2.6\python\lib\py4
j-0.8.2.1-src.zip\py4j\protocol.py", line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling
o22.createDirectStream.
 Trace:
py4j.Py4JException: Method createDirectStream([class
org.apache.spark.streaming.
api.java.JavaStreamingContext, class java.util.HashMap, class
java.util.HashSet,
 class java.util.HashMap]) does not exist
at
py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)

at
py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)

at py4j.Gateway.invoke(Gateway.java:252)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Unknown Source)


>>>

Am I doing something wrong?


-- 
Best Regards,
Ayan Guha


  1   2   >