Re: Memory-efficient successive calls to repartition()

2015-08-23 Thread Alexis Gillain
Hi Aurelien,

The first code should create a new RDD in memory at each iteration (check
the webui).
The second code will unpersist the RDD but that's not the main problem.

I think you have trouble due to long lineage as .cache() keep track of
lineage for recovery.
You should have a look at checkpointing :
https://github.com/JerryLead/SparkInternals/blob/master/markdown/english/6-CacheAndCheckpoint.md
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/impl/PeriodicRDDCheckpointer.scala

You can also have a look at the code of others iterative algorithms in
mlllib for best practices.


2015-08-20 17:26 GMT+08:00 abellet :

> Hello,
>
> For the need of my application, I need to periodically "shuffle" the data
> across nodes/partitions of a reasonably-large dataset. This is an expensive
> operation but I only need to do it every now and then. However it seems
> that
> I am doing something wrong because as the iterations go the memory usage
> increases, causing the job to spill onto HDFS, which eventually gets full.
> I
> am also getting some "Lost executor" errors that I don't get if I don't
> repartition.
>
> Here's a basic piece of code which reproduces the problem:
>
> data = sc.textFile("ImageNet_gist_train.txt",50).map(parseLine).cache()
> data.count()
> for i in range(1000):
> data=data.repartition(50).persist()
> # below several operations are done on data
>
>
> What am I doing wrong? I tried the following but it doesn't solve the
> issue:
>
> for i in range(1000):
> data2=data.repartition(50).persist()
> data2.count() # materialize rdd
> data.unpersist() # unpersist previous version
> data=data2
>
>
> Help and suggestions on this would be greatly appreciated! Thanks a lot!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Memory-efficient-successive-calls-to-repartition-tp24358.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Alexis GILLAIN


How to remove worker node but let it finish first?

2015-08-23 Thread Romi Kuntsman
Hi,
I have a spark standalone cluster with 100s of applications per day, and it
changes size (more or less workers) at various hours. The driver runs on a
separate machine outside the spark cluster.

When a job is running and it's worker is killed (because at that hour the
number of workers is reduced), it sometimes fails, instead of
redistributing the work to other workers.

How is it possible to decomission a worker, so that it doesn't receive any
new work, but does finish all existing work before shutting down?

Thanks!


Re: Spark GraphaX

2015-08-23 Thread Robineast
GrapX is a graph analytics engine rather than a graph database. It's typical
use case is running large-scale graph algorithms like page rank , connected
components, label propagation and so on. It can be an element of complex
processing pipelines that involve other Spark components such as Data
Frames, machine learning and Spark Streaming.

If you need to store, update and query graph structures you might be better
served looking at Neo4j or Titan. If you still need the analytics capability
you can integrate Spark with the database.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphaX-tp24408p24411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



DataFrame rollup with alias?

2015-08-23 Thread Isabelle Phan
Hello,

I am new to Spark and just running some tests to get familiar with the APIs.

When calling the rollup function on my DataFrame, I get different results
when I alias the columns I am grouping on (see below for example data set).
I was expecting alias function to only affect the column name. Why is it
also affecting the rollup results?
(I know I can rename my columns after the rollup call, using
withColumnRenamed function, my question is just to get better understanding
of alias function.)

scala> df.show
++--+-+
|Name|  Game|Score|
++--+-+
| Bob|Game 1|   20|
| Bob|Game 2|   30|
| Lea|Game 1|   25|
| Lea|Game 2|   30|
| Ben|Game 1|5|
| Ben|Game 3|   35|
| Bob|Game 3|   15|
++--+-+

//rollup results as expected
scala> df.rollup(df("Name"), df("Game")).sum().orderBy("Name", "Game").show
++--+--+
|Name|  Game|SUM(Score)|
++--+--+
|null|  null|   160|
| Ben|  null|40|
| Ben|Game 1| 5|
| Ben|Game 3|35|
| Bob|  null|65|
| Bob|Game 1|20|
| Bob|Game 2|30|
| Bob|Game 3|15|
| Lea|  null|55|
| Lea|Game 1|25|
| Lea|Game 2|30|
++--+--+

//rollup with aliases return strange results
scala> df.rollup(df("Name") as "Player", df("Game") as
"Round").sum().orderBy("Player", "Round").show
+--+--+--+
|Player| Round|SUM(Score)|
+--+--+--+
|   Ben|Game 1| 5|
|   Ben|Game 1| 5|
|   Ben|Game 1| 5|
|   Ben|Game 3|35|
|   Ben|Game 3|35|
|   Ben|Game 3|35|
|   Bob|Game 1|20|
|   Bob|Game 1|20|
|   Bob|Game 1|20|
|   Bob|Game 2|30|
|   Bob|Game 2|30|
|   Bob|Game 2|30|
|   Bob|Game 3|15|
|   Bob|Game 3|15|
|   Bob|Game 3|15|
|   Lea|Game 1|25|
|   Lea|Game 1|25|
|   Lea|Game 1|25|
|   Lea|Game 2|30|
|   Lea|Game 2|30|
+--+--+--+


Thanks in advance for your help,

Isabelle


Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-23 Thread Philip Weaver
1 minute to discover 1000s of partitions -- yes, that is what I have
observed. And I would assert that is very slow.

On Sun, Aug 23, 2015 at 7:16 PM, Michael Armbrust 
wrote:

> We should not be actually scanning all of the data of all of the
> partitions, but we do need to at least list all of the available
> directories so that we can apply your predicates to the actual values that
> are present when we are deciding which files need to be read in a given
> spark job.  While this is a somewhat expensive operation, we do it in
> parallel and we cache this information when you access the same relation
> more than once.
>
> Can you provide a little more detail about how exactly you are accessing
> the parquet data (are you using sqlContext.read or creating persistent
> tables in the metastore?), and how long it is taking?  It would also be
> good to know how many partitions we are talking about and how much data is
> in each.  Finally, I'd like to see the stacktrace where it is hanging to
> make sure my above assertions are correct.
>
> We have several tables internally that have 1000s of partitions and while
> it takes ~1 minute initially to discover the metadata, after that we are
> able to query the data interactively.
>
>
>
> On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang 
> wrote:
>
>> anybody has any suggestions?
>>
>> On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang 
>> wrote:
>>
>>> Is there a workaround without updating Hadoop? Would really appreciate
>>> if someone can explain what spark is trying to do here and what is an easy
>>> way to turn this off. Thanks all!
>>>
>>> On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey <
>>> raghavendra.pan...@gmail.com> wrote:
>>>
 Did you try with hadoop version 2.7.1 .. It is known that s3a works
 really well with parquet which is available in 2.7. They fixed lot of
 issues related to metadata reading there...
 On Aug 21, 2015 11:24 PM, "Jerrick Hoang" 
 wrote:

> @Cheng, Hao : Physical plans show that it got stuck on scanning S3!
>
> (table is partitioned by date_prefix and hour)
> explain select count(*) from test_table where date_prefix='20150819'
> and hour='00';
>
> TungstenAggregate(key=[],
> value=[(count(1),mode=Final,isDistinct=false)]
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[],
> value=[(count(1),mode=Partial,isDistinct=false)]
>Scan ParquetRelation[ ..  ]
>
> Why does spark have to scan all partitions when the query only
> concerns with 1 partitions? Doesn't it defeat the purpose of partitioning?
>
> Thanks!
>
> On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver <
> philip.wea...@gmail.com> wrote:
>
>> I hadn't heard of spark.sql.sources.partitionDiscovery.enabled
>> before, and I couldn't find much information about it online. What does 
>> it
>> mean exactly to disable it? Are there any negative consequences to
>> disabling it?
>>
>> On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao 
>> wrote:
>>
>>> Can you make some more profiling? I am wondering if the driver is
>>> busy with scanning the HDFS / S3.
>>>
>>> Like jstack 
>>>
>>>
>>>
>>> And also, it’s will be great if you can paste the physical plan for
>>> the simple query.
>>>
>>>
>>>
>>> *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
>>> *Sent:* Thursday, August 20, 2015 1:46 PM
>>> *To:* Cheng, Hao
>>> *Cc:* Philip Weaver; user
>>> *Subject:* Re: Spark Sql behaves strangely with tables with a lot
>>> of partitions
>>>
>>>
>>>
>>> I cloned from TOT after 1.5.0 cut off. I noticed there were a couple
>>> of CLs trying to speed up spark sql with tables with a huge number of
>>> partitions, I've made sure that those CLs are included but it's still 
>>> very
>>> slow
>>>
>>>
>>>
>>> On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao 
>>> wrote:
>>>
>>> Yes, you can try set the
>>> spark.sql.sources.partitionDiscovery.enabled to false.
>>>
>>>
>>>
>>> BTW, which version are you using?
>>>
>>>
>>>
>>> Hao
>>>
>>>
>>>
>>> *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
>>> *Sent:* Thursday, August 20, 2015 12:16 PM
>>> *To:* Philip Weaver
>>> *Cc:* user
>>> *Subject:* Re: Spark Sql behaves strangely with tables with a lot
>>> of partitions
>>>
>>>
>>>
>>> I guess the question is why does spark have to do partition
>>> discovery with all partitions when the query only needs to look at one
>>> partition? Is there a conf flag to turn this off?
>>>
>>>
>>>
>>> On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver <
>>> philip.wea...@gmail.com> wrote:
>>>
>>> I've had the same problem. It turns out that Spark (specifically
>>> parquet) is very slow at partition discovery. It got better in 

Re: SparkSQL concerning materials

2015-08-23 Thread Michael Armbrust
Here's a longer version of that talk that I gave, which goes into more
detail on the internals:
http://www.slideshare.net/databricks/spark-sql-deep-dive-melbroune

On Fri, Aug 21, 2015 at 8:28 AM, Sameer Farooqui 
wrote:

> Have you seen the Spark SQL paper?:
> https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf
>
> On Thu, Aug 20, 2015 at 11:35 PM, Dawid Wysakowicz <
> wysakowicz.da...@gmail.com> wrote:
>
>> Hi,
>>
>> thanks for answers. I have read answers you provided, but I rather look
>> for some materials on the internals. E.g how the optimizer works, how the
>> query is translated into rdd operations etc. The API I am quite familiar
>> with.
>> A good starting point for me was: Spark DataFrames: Simple and Fast
>> Analysis of Structured Data
>> 
>>
>> 2015-08-20 18:29 GMT+02:00 Dhaval Patel :
>>
>>> Or if you're a python lover then this is a good place -
>>> https://spark.apache.org/docs/1.4.1/api/python/pyspark.sql.html#
>>>
>>>
>>>
>>> On Thu, Aug 20, 2015 at 10:58 AM, Ted Yu  wrote:
>>>
 See also
 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.package

 Cheers

 On Thu, Aug 20, 2015 at 7:50 AM, Muhammad Atif <
 muhammadatif...@gmail.com> wrote:

> Hi Dawid
>
> The best pace to get started is the Spark SQL Guide from Apache
> http://spark.apache.org/docs/latest/sql-programming-guide.html
>
> Regards
> Muhammad
>
> On Thu, Aug 20, 2015 at 5:46 AM, Dawid Wysakowicz <
> wysakowicz.da...@gmail.com> wrote:
>
>> Hi,
>>
>> I would like to dip into SparkSQL. Get to know better the
>> architecture, good practices, some internals. Could you advise me some
>> materials on this matter?
>>
>> Regards
>> Dawid
>>
>
>

>>>
>>
>


Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-23 Thread Michael Armbrust
We should not be actually scanning all of the data of all of the
partitions, but we do need to at least list all of the available
directories so that we can apply your predicates to the actual values that
are present when we are deciding which files need to be read in a given
spark job.  While this is a somewhat expensive operation, we do it in
parallel and we cache this information when you access the same relation
more than once.

Can you provide a little more detail about how exactly you are accessing
the parquet data (are you using sqlContext.read or creating persistent
tables in the metastore?), and how long it is taking?  It would also be
good to know how many partitions we are talking about and how much data is
in each.  Finally, I'd like to see the stacktrace where it is hanging to
make sure my above assertions are correct.

We have several tables internally that have 1000s of partitions and while
it takes ~1 minute initially to discover the metadata, after that we are
able to query the data interactively.



On Sun, Aug 23, 2015 at 2:00 AM, Jerrick Hoang 
wrote:

> anybody has any suggestions?
>
> On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang 
> wrote:
>
>> Is there a workaround without updating Hadoop? Would really appreciate if
>> someone can explain what spark is trying to do here and what is an easy way
>> to turn this off. Thanks all!
>>
>> On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey <
>> raghavendra.pan...@gmail.com> wrote:
>>
>>> Did you try with hadoop version 2.7.1 .. It is known that s3a works
>>> really well with parquet which is available in 2.7. They fixed lot of
>>> issues related to metadata reading there...
>>> On Aug 21, 2015 11:24 PM, "Jerrick Hoang" 
>>> wrote:
>>>
 @Cheng, Hao : Physical plans show that it got stuck on scanning S3!

 (table is partitioned by date_prefix and hour)
 explain select count(*) from test_table where date_prefix='20150819'
 and hour='00';

 TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
  TungstenExchange SinglePartition
   TungstenAggregate(key=[],
 value=[(count(1),mode=Partial,isDistinct=false)]
Scan ParquetRelation[ ..  ]

 Why does spark have to scan all partitions when the query only concerns
 with 1 partitions? Doesn't it defeat the purpose of partitioning?

 Thanks!

 On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver >>> > wrote:

> I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
> and I couldn't find much information about it online. What does it mean
> exactly to disable it? Are there any negative consequences to disabling 
> it?
>
> On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao 
> wrote:
>
>> Can you make some more profiling? I am wondering if the driver is
>> busy with scanning the HDFS / S3.
>>
>> Like jstack 
>>
>>
>>
>> And also, it’s will be great if you can paste the physical plan for
>> the simple query.
>>
>>
>>
>> *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
>> *Sent:* Thursday, August 20, 2015 1:46 PM
>> *To:* Cheng, Hao
>> *Cc:* Philip Weaver; user
>> *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
>> partitions
>>
>>
>>
>> I cloned from TOT after 1.5.0 cut off. I noticed there were a couple
>> of CLs trying to speed up spark sql with tables with a huge number of
>> partitions, I've made sure that those CLs are included but it's still 
>> very
>> slow
>>
>>
>>
>> On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao 
>> wrote:
>>
>> Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled
>> to false.
>>
>>
>>
>> BTW, which version are you using?
>>
>>
>>
>> Hao
>>
>>
>>
>> *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
>> *Sent:* Thursday, August 20, 2015 12:16 PM
>> *To:* Philip Weaver
>> *Cc:* user
>> *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
>> partitions
>>
>>
>>
>> I guess the question is why does spark have to do partition discovery
>> with all partitions when the query only needs to look at one partition? 
>> Is
>> there a conf flag to turn this off?
>>
>>
>>
>> On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver <
>> philip.wea...@gmail.com> wrote:
>>
>> I've had the same problem. It turns out that Spark (specifically
>> parquet) is very slow at partition discovery. It got better in 1.5 (not 
>> yet
>> released), but was still unacceptably slow. Sadly, we ended up reading
>> parquet files manually in Python (via C++) and had to abandon Spark SQL
>> because of this problem.
>>
>>
>>
>> On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang <
>> jerrickho...@gmail.com> wrote:
>>
>> Hi all,
>>
>>

Re: Error when saving a dataframe as ORC file

2015-08-23 Thread Ted Yu
SPARK-8458 is in 1.4.1 release.

You can upgrade to 1.4.1 or, wait for the upcoming 1.5.0 release.

On Sun, Aug 23, 2015 at 2:05 PM, lostrain A 
wrote:

> Hi Zhan,
>   Thanks for the point. Yes I'm using a cluster with spark-1.4.0 and it
> looks like this is most likely the reason. I'll verify this again once the
> we make the upgrade.
>
> Best,
> los
>
> On Sun, Aug 23, 2015 at 1:25 PM, Zhan Zhang 
> wrote:
>
>> If you are using spark-1.4.0, probably it is caused by SPARK-8458
>> 
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> On Aug 23, 2015, at 12:49 PM, lostrain A 
>> wrote:
>>
>> Ted,
>>   Thanks for the suggestions. Actually I tried both s3n and s3 and the
>> result remains the same.
>>
>>
>> On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu  wrote:
>>
>>> In your case, I would specify "fs.s3.awsAccessKeyId" /
>>> "fs.s3.awsSecretAccessKey" since you use s3 protocol.
>>>
>>> On Sun, Aug 23, 2015 at 11:03 AM, lostrain A <
>>> donotlikeworkingh...@gmail.com> wrote:
>>>
 Hi Ted,
   Thanks for the reply. I tried setting both of the keyid and accesskey
 via

 sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "***")
> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "**")


 However, the error still occurs for ORC format.

 If I change the format to JSON, although the error does not go, the
 JSON files can be saved successfully.




 On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu  wrote:

> You may have seen this:
> http://search-hadoop.com/m/q3RTtdSyM52urAyI
>
>
>
> On Aug 23, 2015, at 1:01 AM, lostrain A <
> donotlikeworkingh...@gmail.com> wrote:
>
> Hi,
>   I'm trying to save a simple dataframe to S3 in ORC format. The code
> is as follows:
>
>
>  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>   import sqlContext.implicits._
>>   val df=sc.parallelize(1 to 1000).toDF()
>>   df.write.format("orc").save("s3://logs/dummy)
>
>
> I ran the above code in spark-shell and only the _SUCCESS file was
> saved under the directory.
> The last part of the spark-shell log said:
>
> 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished
>> task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal
>> (100/100)
>>
>
>
>> 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler:
>> ResultStage 2 (save at :29) finished in 0.834 s
>>
>
>
>> 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed
>> TaskSet 2.0, whose tasks have all completed, from pool
>>
>
>
>> 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
>> :29, took 0.895912 s
>>
>
>
>> 15/08/23 07:38:24 main INFO
>> LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
>> /media/ephemeral0/s3/output-
>>
>
>
>> 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
>> dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
>>  4, -23, -103, 9, -104, -20, -8, 66, 126]
>>
>
>
>> 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
>> committed.
>
>
> Anyone has experienced this before?
> Thanks!
>
>
>

>>>
>>
>>
>


Re: Error when saving a dataframe as ORC file

2015-08-23 Thread lostrain A
Hi Zhan,
  Thanks for the point. Yes I'm using a cluster with spark-1.4.0 and it
looks like this is most likely the reason. I'll verify this again once the
we make the upgrade.

Best,
los

On Sun, Aug 23, 2015 at 1:25 PM, Zhan Zhang  wrote:

> If you are using spark-1.4.0, probably it is caused by SPARK-8458
> 
>
> Thanks.
>
> Zhan Zhang
>
> On Aug 23, 2015, at 12:49 PM, lostrain A 
> wrote:
>
> Ted,
>   Thanks for the suggestions. Actually I tried both s3n and s3 and the
> result remains the same.
>
>
> On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu  wrote:
>
>> In your case, I would specify "fs.s3.awsAccessKeyId" /
>> "fs.s3.awsSecretAccessKey" since you use s3 protocol.
>>
>> On Sun, Aug 23, 2015 at 11:03 AM, lostrain A <
>> donotlikeworkingh...@gmail.com> wrote:
>>
>>> Hi Ted,
>>>   Thanks for the reply. I tried setting both of the keyid and accesskey
>>> via
>>>
>>> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "***")
 sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "**")
>>>
>>>
>>> However, the error still occurs for ORC format.
>>>
>>> If I change the format to JSON, although the error does not go, the JSON
>>> files can be saved successfully.
>>>
>>>
>>>
>>>
>>> On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu  wrote:
>>>
 You may have seen this:
 http://search-hadoop.com/m/q3RTtdSyM52urAyI



 On Aug 23, 2015, at 1:01 AM, lostrain A 
 wrote:

 Hi,
   I'm trying to save a simple dataframe to S3 in ORC format. The code
 is as follows:


  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>   import sqlContext.implicits._
>   val df=sc.parallelize(1 to 1000).toDF()
>   df.write.format("orc").save("s3://logs/dummy)


 I ran the above code in spark-shell and only the _SUCCESS file was
 saved under the directory.
 The last part of the spark-shell log said:

 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished
> task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal
> (100/100)
>


> 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler:
> ResultStage 2 (save at :29) finished in 0.834 s
>


> 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed
> TaskSet 2.0, whose tasks have all completed, from pool
>


> 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
> :29, took 0.895912 s
>


> 15/08/23 07:38:24 main INFO
> LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
> /media/ephemeral0/s3/output-
>


> 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
> dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
>  4, -23, -103, 9, -104, -20, -8, 66, 126]
>


> 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
> committed.


 Anyone has experienced this before?
 Thanks!



>>>
>>
>
>


Re: Error when saving a dataframe as ORC file

2015-08-23 Thread Zhan Zhang
If you are using spark-1.4.0, probably it is caused by 
SPARK-8458

Thanks.

Zhan Zhang

On Aug 23, 2015, at 12:49 PM, lostrain A 
mailto:donotlikeworkingh...@gmail.com>> wrote:

Ted,
  Thanks for the suggestions. Actually I tried both s3n and s3 and the result 
remains the same.


On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu 
mailto:yuzhih...@gmail.com>> wrote:
In your case, I would specify "fs.s3.awsAccessKeyId" / 
"fs.s3.awsSecretAccessKey" since you use s3 protocol.

On Sun, Aug 23, 2015 at 11:03 AM, lostrain A 
mailto:donotlikeworkingh...@gmail.com>> wrote:
Hi Ted,
  Thanks for the reply. I tried setting both of the keyid and accesskey via

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "***")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "**")

However, the error still occurs for ORC format.

If I change the format to JSON, although the error does not go, the JSON files 
can be saved successfully.




On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu 
mailto:yuzhih...@gmail.com>> wrote:
You may have seen this:
http://search-hadoop.com/m/q3RTtdSyM52urAyI



On Aug 23, 2015, at 1:01 AM, lostrain A 
mailto:donotlikeworkingh...@gmail.com>> wrote:

Hi,
  I'm trying to save a simple dataframe to S3 in ORC format. The code is as 
follows:


 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
  import sqlContext.implicits._
  val df=sc.parallelize(1 to 1000).toDF()
  df.write.format("orc").save("s3://logs/dummy)

I ran the above code in spark-shell and only the _SUCCESS file was saved under 
the directory.
The last part of the spark-shell log said:

15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 95.0 
in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)

15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 
(save at :29) finished in 0.834 s

15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 2.0, 
whose tasks have all completed, from pool

15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at :29, 
took 0.895912 s

15/08/23 07:38:24 main INFO LocalDirAllocator$AllocatorPerContext$DirSelector: 
Returning directory: /media/ephemeral0/s3/output-

15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS is 
[-44, 29, -128, -39, -113, 0, -78,
 4, -23, -103, 9, -104, -20, -8, 66, 126]

15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ committed.

Anyone has experienced this before?
Thanks!







Re: Error when saving a dataframe as ORC file

2015-08-23 Thread lostrain A
Ted,
  Thanks for the suggestions. Actually I tried both s3n and s3 and the
result remains the same.


On Sun, Aug 23, 2015 at 12:27 PM, Ted Yu  wrote:

> In your case, I would specify "fs.s3.awsAccessKeyId" /
> "fs.s3.awsSecretAccessKey" since you use s3 protocol.
>
> On Sun, Aug 23, 2015 at 11:03 AM, lostrain A <
> donotlikeworkingh...@gmail.com> wrote:
>
>> Hi Ted,
>>   Thanks for the reply. I tried setting both of the keyid and accesskey
>> via
>>
>> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "***")
>>> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "**")
>>
>>
>> However, the error still occurs for ORC format.
>>
>> If I change the format to JSON, although the error does not go, the JSON
>> files can be saved successfully.
>>
>>
>>
>>
>> On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu  wrote:
>>
>>> You may have seen this:
>>> http://search-hadoop.com/m/q3RTtdSyM52urAyI
>>>
>>>
>>>
>>> On Aug 23, 2015, at 1:01 AM, lostrain A 
>>> wrote:
>>>
>>> Hi,
>>>   I'm trying to save a simple dataframe to S3 in ORC format. The code is
>>> as follows:
>>>
>>>
>>>  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
   import sqlContext.implicits._
   val df=sc.parallelize(1 to 1000).toDF()
   df.write.format("orc").save("s3://logs/dummy)
>>>
>>>
>>> I ran the above code in spark-shell and only the _SUCCESS file was saved
>>> under the directory.
>>> The last part of the spark-shell log said:
>>>
>>> 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished
 task 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal
 (100/100)

>>>
>>>
 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler:
 ResultStage 2 (save at :29) finished in 0.834 s

>>>
>>>
 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed
 TaskSet 2.0, whose tasks have all completed, from pool

>>>
>>>
 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
 :29, took 0.895912 s

>>>
>>>
 15/08/23 07:38:24 main INFO
 LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
 /media/ephemeral0/s3/output-

>>>
>>>
 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
 dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
  4, -23, -103, 9, -104, -20, -8, 66, 126]

>>>
>>>
 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
 committed.
>>>
>>>
>>> Anyone has experienced this before?
>>> Thanks!
>>>
>>>
>>>
>>
>


B2i Healthcare "Powered by Spark" addition

2015-08-23 Thread Brandon Ulrich
Another addition to the Powered by Spark page:

B2i Healthcare (http://b2i.sg) uses Spark in healthcare analytics with
medical ontologies like SNOMED CT. Our Snow Owl MQ (
http://b2i.sg/snow-owl-mq) product relies on the Spark ecosystem to analyze
~1 billion health records with over 70 healthcare terminologies. An online
demo is available at https://mq.b2i.sg. Use cases include:

   - Creating cohorts to group patients with similar demographic traits,
   drug exposures, clinical findings, procedures, and observations (Spark
   Core, Spark SQL, GraphX)
   - Inspecting patient records to identify trends and correlations (SparkR)
   - Statistical analysis of patient cohorts to test and verify clinical
   hypotheses (MLlib)
   - Identification of potential adverse drug events and interactions with
   pharmacovigilance signal detection (Streaming)

Thanks,
Brandon


Re: Error when saving a dataframe as ORC file

2015-08-23 Thread Ted Yu
In your case, I would specify "fs.s3.awsAccessKeyId" /
"fs.s3.awsSecretAccessKey" since you use s3 protocol.

On Sun, Aug 23, 2015 at 11:03 AM, lostrain A  wrote:

> Hi Ted,
>   Thanks for the reply. I tried setting both of the keyid and accesskey via
>
> sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "***")
>> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "**")
>
>
> However, the error still occurs for ORC format.
>
> If I change the format to JSON, although the error does not go, the JSON
> files can be saved successfully.
>
>
>
>
> On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu  wrote:
>
>> You may have seen this:
>> http://search-hadoop.com/m/q3RTtdSyM52urAyI
>>
>>
>>
>> On Aug 23, 2015, at 1:01 AM, lostrain A 
>> wrote:
>>
>> Hi,
>>   I'm trying to save a simple dataframe to S3 in ORC format. The code is
>> as follows:
>>
>>
>>  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>>   import sqlContext.implicits._
>>>   val df=sc.parallelize(1 to 1000).toDF()
>>>   df.write.format("orc").save("s3://logs/dummy)
>>
>>
>> I ran the above code in spark-shell and only the _SUCCESS file was saved
>> under the directory.
>> The last part of the spark-shell log said:
>>
>> 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task
>>> 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)
>>>
>>
>>
>>> 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler:
>>> ResultStage 2 (save at :29) finished in 0.834 s
>>>
>>
>>
>>> 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed
>>> TaskSet 2.0, whose tasks have all completed, from pool
>>>
>>
>>
>>> 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
>>> :29, took 0.895912 s
>>>
>>
>>
>>> 15/08/23 07:38:24 main INFO
>>> LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
>>> /media/ephemeral0/s3/output-
>>>
>>
>>
>>> 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
>>> dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
>>>  4, -23, -103, 9, -104, -20, -8, 66, 126]
>>>
>>
>>
>>> 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
>>> committed.
>>
>>
>> Anyone has experienced this before?
>> Thanks!
>>
>>
>>
>


is there a 'knack' to docker and mesos?

2015-08-23 Thread Dick Davies
Really excited to try out the new Docker executor support on 1.4.1, I'm
making progress but feel like I'm missing something.

(versions:

spark-1.4.1-hadoop2.6 - not using hadoop yet
mac os x yosemite java 8 spark-shell
mesos 0.22.1  : 2 slaves, 1 master + zk , all on centos 6.x
docker 1.8.x
)

I wanted to use a generic docker image with e.g. Java 8 in it, and then
deploy the 1.4.1 distro into it. The docs seem to indicate that's supported.


Mesos and Docker pull the requested docker images ( 'docker pull java'
essentially)
and extract the spark.executor.uri distro correctly. It seems to fall
over at cd'ing of
all places:

"cd: can't cd to spark-1*"

this causes the task to fail and eventually have spark blacklist the slave.

Is this just because 'spark-1*' matches both the tarball and the directory?

full mesos stderr looks like:



I0823 19:13:25.608206  3069 fetcher.cpp:214] Fetching URI
'http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz'
I0823 19:13:25.608582  3069 fetcher.cpp:125] Fetching URI
'http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz'
with os::net
I0823 19:13:25.608620  3069 fetcher.cpp:135] Downloading
'http://d3kbcqa49mib13.cloudfront.net/spark-1.4.1-bin-hadoop2.6.tgz'
to 
'/var/mesos/slaves/20150823-110659-1862270986-5050-3230-S1/frameworks/20150823-191138-1862270986-5050-3768-/executors/0/runs/a5a13cd4-013a-4ebc-8ef7-eb9c33358429/spark-1.4.1-bin-hadoop2.6.tgz'
I0823 19:14:37.765060  3069 fetcher.cpp:78] Extracted resource
'/var/mesos/slaves/20150823-110659-1862270986-5050-3230-S1/frameworks/20150823-191138-1862270986-5050-3768-/executors/0/runs/a5a13cd4-013a-4ebc-8ef7-eb9c33358429/spark-1.4.1-bin-hadoop2.6.tgz'
into 
'/var/mesos/slaves/20150823-110659-1862270986-5050-3230-S1/frameworks/20150823-191138-1862270986-5050-3768-/executors/0/runs/a5a13cd4-013a-4ebc-8ef7-eb9c33358429'
/bin/sh: 1: cd: can't cd to spark-1*
/bin/sh: 1: ./bin/spark-class: not found
I0823 19:14:38.365190  3138 exec.cpp:132] Version: 0.22.1
I0823 19:14:38.369495  3156 exec.cpp:206] Executor registered on slave
20150823-110659-1862270986-5050-3230-S1

-

and the spark-shell says:

15/08/23 20:15:51 INFO CoarseMesosSchedulerBackend: Mesos task 3 is
now TASK_RUNNING
15/08/23 20:15:51 INFO CoarseMesosSchedulerBackend: Mesos task 3 is
now TASK_FAILED
15/08/23 20:15:51 INFO CoarseMesosSchedulerBackend: Blacklisting Mesos
slave value: "20150823-110659-1862270986-5050-3230-S0"
 due to too many failures; is Spark installed on it?

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark YARN executors are not launching when using +UseG1GC

2015-08-23 Thread unk1102
Hi I am hitting issue of long GC pauses in my Spark job and because of it
YARN is killing executors one by one and Spark job becomes slower and
slower. I came across this article where they mentioned about using G1GC I
tried to use the same command but something seems wrong

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html

./spark-submit --class com.xyz.MySpark --conf
"spark.executor.extraJavaOptions=-XX:MaxPermSize=512M -XX:+UseG1GC
-XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy
-XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -Xms25g -Xmx25g
-XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThread=20"
--driver-java-options -XX:MaxPermSize=512m --driver-memory 3g --master
yarn-client --executor-memory 25G --executor-cores 8 --num-executors 12 
/home/myuser/myspark-1.0.jar

First it said you cant use Xms/Xmx for executor so I removed it but
executors never gets launched if I use above command please guide. Thanks in
advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-YARN-executors-are-not-launching-when-using-UseG1GC-tp24407.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set environment of worker applications

2015-08-23 Thread Sathish Kumaran Vairavelu
spark-env.sh works for me in Spark 1.4 but not
spark.executor.extraJavaOptions.

On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> I think the only way to pass on environment variables to worker node is to
> write it in spark-env.sh file on each worker node.
>
> On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat 
> wrote:
>
>> Check for spark.driver.extraJavaOptions and
>> spark.executor.extraJavaOptions in the following article. I think you can
>> use -D to pass system vars:
>>
>> spark.apache.org/docs/latest/configuration.html#runtime-environment
>> Hi,
>>
>> I am starting a spark streaming job in standalone mode with spark-submit.
>>
>> Is there a way to make the UNIX environment variables with which
>> spark-submit is started available to the processes started on the worker
>> nodes?
>>
>> Jan
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Error when saving a dataframe as ORC file

2015-08-23 Thread lostrain A
Hi Ted,
  Thanks for the reply. I tried setting both of the keyid and accesskey via

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "***")
> sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "**")


However, the error still occurs for ORC format.

If I change the format to JSON, although the error does not go, the JSON
files can be saved successfully.




On Sun, Aug 23, 2015 at 5:51 AM, Ted Yu  wrote:

> You may have seen this:
> http://search-hadoop.com/m/q3RTtdSyM52urAyI
>
>
>
> On Aug 23, 2015, at 1:01 AM, lostrain A 
> wrote:
>
> Hi,
>   I'm trying to save a simple dataframe to S3 in ORC format. The code is
> as follows:
>
>
>  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>   import sqlContext.implicits._
>>   val df=sc.parallelize(1 to 1000).toDF()
>>   df.write.format("orc").save("s3://logs/dummy)
>
>
> I ran the above code in spark-shell and only the _SUCCESS file was saved
> under the directory.
> The last part of the spark-shell log said:
>
> 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task
>> 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)
>>
>
>
>> 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage
>> 2 (save at :29) finished in 0.834 s
>>
>
>
>> 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed
>> TaskSet 2.0, whose tasks have all completed, from pool
>>
>
>
>> 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
>> :29, took 0.895912 s
>>
>
>
>> 15/08/23 07:38:24 main INFO
>> LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
>> /media/ephemeral0/s3/output-
>>
>
>
>> 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
>> dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
>>  4, -23, -103, 9, -104, -20, -8, 66, 126]
>>
>
>
>> 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
>> committed.
>
>
> Anyone has experienced this before?
> Thanks!
>
>
>


Re: Spark streaming multi-tasking during I/O

2015-08-23 Thread Akhil Das
If you set concurrentJobs flag to 2, then it lets you run two jobs
parallel. It will be a bit hard for you predict the application behavior
with this flag thus debugging would be a headache.

Thanks
Best Regards

On Sun, Aug 23, 2015 at 10:36 AM, Sateesh Kavuri 
wrote:

> Hi Akhil,
>
> Think of the scenario as running a piece of code in normal Java with
> multiple threads. Lets say there are 4 threads spawned by a Java process to
> handle reading from database, some processing and storing to database. In
> this process, while a thread is performing a database I/O, the CPU could
> allow another thread to perform the processing, thus efficiently using the
> resources.
>
> Incase of Spark, while a node executor is running the same "read from DB
> => process data => store to DB", during the "read from DB" and "store to
> DB" phase, the CPU is not given to other requests in queue, since the
> executor will allocate the resources completely to the current ongoing
> request.
>
> Does not flag spark.streaming.concurrentJobs enable this kind of scenario
> or is there any other way to achieve what I am looking for
>
> Thanks,
> Sateesh
>
> On Sat, Aug 22, 2015 at 7:26 PM, Akhil Das 
> wrote:
>
>> Hmm for a singl core VM you will have to run it in local mode(specifying
>> master= local[4]). The flag is available in all the versions of spark i
>> guess.
>> On Aug 22, 2015 5:04 AM, "Sateesh Kavuri" 
>> wrote:
>>
>>> Thanks Akhil. Does this mean that the executor running in the VM can
>>> spawn two concurrent jobs on the same core? If this is the case, this is
>>> what we are looking for. Also, which version of Spark is this flag in?
>>>
>>> Thanks,
>>> Sateesh
>>>
>>> On Sat, Aug 22, 2015 at 1:44 AM, Akhil Das 
>>> wrote:
>>>
 You can look at the spark.streaming.concurrentJobs by default it runs a
 single job. If set it to 2 then it can run 2 jobs parallely. Its an
 experimental flag, but go ahead and give it a try.
 On Aug 21, 2015 3:36 AM, "Sateesh Kavuri" 
 wrote:

> Hi,
>
> My scenario goes like this:
> I have an algorithm running in Spark streaming mode on a 4 core
> virtual machine. Majority of the time, the algorithm does disk I/O and
> database I/O. Question is, during the I/O, where the CPU is not
> considerably loaded, is it possible to run any other task/thread so as to
> efficiently utilize the CPU?
>
> Note that one DStream of the algorithm runs completely on a single CPU
>
> Thank you,
> Sateesh
>

>>>
>


Re: How to parse multiple event types using Kafka

2015-08-23 Thread Cody Koeninger
Each spark partition will contain messages only from a single kafka
topcipartition.  Use hasOffsetRanges to tell which kafka topicpartition
it's from.  See the docs
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

On Sun, Aug 23, 2015 at 10:56 AM, Spark Enthusiast  wrote:

> Folks,
>
> I use the following Streaming API from KafkaUtils :
>
> public JavaPairInputDStream inputDStream() {
>
> HashSet topicsSet = new 
> HashSet(Arrays.asList(topics.split(",")));
> HashMap kafkaParams = new HashMap();
> kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), 
> brokers);
>
> return KafkaUtils.createDirectStream(
> streamingContext,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams,
> topicsSet
> );
>
> }
>
>
> I catch the messages using :
>
> JavaDStream messages = inputDStream.map(new Function String>, String>() {
> @Override
> public String call(Tuple2 tuple2) {
> return tuple2._2();
> }
> });
>
>
> My problem is, each of these Kafka Topics stream in different message types. 
> How do I distinguish messages that are of type1, messages that are of type2, 
> . ?
>
>
> I tried the following:
>
>
> private class ParseEvents implements Function {
> final Class parameterClass;
>
> private ParseEvents(Class parameterClass) {
> this.parameterClass = parameterClass;
> }
>
> public T call(String message) throws Exception {
> ObjectMapper mapper = new ObjectMapper();
>
> T parsedMessage = null;
>
> try {
> parsedMessage = mapper.readValue(message, 
> this.parameterClass);
> } catch (Exception e1) {
> logger.error("Ignoring Unknown Message %s", message);
>
> }
> return parsedMessage;
> }
> }
>
> JavaDStream type1Events = messages.map(new 
> ParseEvents(Type1.class));
>
> JavaDStream type2Events = messages.map(new 
> ParseEvents(Type2.class));
>
> JavaDStream type3Events = messages.map(new 
> ParseEvents(Type3.class));
>
>
> But this does not work because type1 catches type2 messages and ignores them. 
> Is there a clean way of handling this ?
>
>
>
>
>


How to parse multiple event types using Kafka

2015-08-23 Thread Spark Enthusiast
Folks,
I use the following Streaming API from KafkaUtils :
public JavaPairInputDStream inputDStream() {

HashSet topicsSet = new 
HashSet(Arrays.asList(topics.split(",")));
HashMap kafkaParams = new HashMap();
kafkaParams.put(Tokens.KAFKA_BROKER_LIST_TOKEN.getRealTokenName(), brokers);

return KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);

}

I catch the messages using :JavaDStream messages = inputDStream.map(new 
Function, String>() {
@Override
public String call(Tuple2 tuple2) {
return tuple2._2();
}
});
My problem is, each of these Kafka Topics stream in different message types. 
How do I distinguish messages that are of type1, messages that are of type2, 
. ?
I tried the following:
private class ParseEvents implements Function {
final Class parameterClass;

private ParseEvents(Class parameterClass) {
this.parameterClass = parameterClass;
}

public T call(String message) throws Exception {
ObjectMapper mapper = new ObjectMapper();

T parsedMessage = null;

try {
parsedMessage = mapper.readValue(message, this.parameterClass);
} catch (Exception e1) {
logger.error("Ignoring Unknown Message %s", message);
  
}
return parsedMessage;
}
}JavaDStream type1Events = messages.map(new 
ParseEvents(Type1.class));JavaDStream type2Events = 
messages.map(new ParseEvents(Type2.class));JavaDStream 
type3Events = messages.map(new ParseEvents(Type3.class));
But this does not work because type1 catches type2 messages and ignores them. 
Is there a clean way of handling this ?




Re: How to set environment of worker applications

2015-08-23 Thread Raghavendra Pandey
I think the only way to pass on environment variables to worker node is to
write it in spark-env.sh file on each worker node.

On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat 
wrote:

> Check for spark.driver.extraJavaOptions and
> spark.executor.extraJavaOptions in the following article. I think you can
> use -D to pass system vars:
>
> spark.apache.org/docs/latest/configuration.html#runtime-environment
> Hi,
>
> I am starting a spark streaming job in standalone mode with spark-submit.
>
> Is there a way to make the UNIX environment variables with which
> spark-submit is started available to the processes started on the worker
> nodes?
>
> Jan
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Mesos Dispatcher

2015-08-23 Thread Timothy Chen
Hi Bcjaes,

Sorry I didn't see the previous thread so not sure what issues you are running 
into.

In cluster mode the driver logs and results are all available through the Mesos 
UI, you need to look at terminated frameworks if it's a job that's already 
finished.

I'll try to add more docs as we are still completing some other features around 
cluster mode on Mesos.

Tim


> On Aug 23, 2015, at 7:22 AM, bcajes  wrote:
> 
> I'm currently having the same issues.  The documentation for Mesos dispatcher
> is sparse.  I'll also add that I'm able to see the framework running in the
> mesos and spark driver UIs, but when viewing the spark job ui on a slave, no
> job is seen.
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-Dispatcher-tp24238p24404.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set environment of worker applications

2015-08-23 Thread Hemant Bhanawat
Check for spark.driver.extraJavaOptions and spark.executor.extraJavaOptions
in the following article. I think you can use -D to pass system vars:

spark.apache.org/docs/latest/configuration.html#runtime-environment
Hi,

I am starting a spark streaming job in standalone mode with spark-submit.

Is there a way to make the UNIX environment variables with which
spark-submit is started available to the processes started on the worker
nodes?

Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


Re: Spark Mesos Dispatcher

2015-08-23 Thread bcajes
I'm currently having the same issues.  The documentation for Mesos dispatcher
is sparse.  I'll also add that I'm able to see the framework running in the
mesos and spark driver UIs, but when viewing the spark job ui on a slave, no
job is seen.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Mesos-Dispatcher-tp24238p24404.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Error when saving a dataframe as ORC file

2015-08-23 Thread Ted Yu
You may have seen this:
http://search-hadoop.com/m/q3RTtdSyM52urAyI



> On Aug 23, 2015, at 1:01 AM, lostrain A  
> wrote:
> 
> Hi,
>   I'm trying to save a simple dataframe to S3 in ORC format. The code is as 
> follows:
> 
> 
>>  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>   import sqlContext.implicits._
>>   val df=sc.parallelize(1 to 1000).toDF()
>>   df.write.format("orc").save("s3://logs/dummy)
> 
> I ran the above code in spark-shell and only the _SUCCESS file was saved 
> under the directory.
> The last part of the spark-shell log said:
> 
>> 15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task 
>> 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)
>  
>> 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage 2 
>> (save at :29) finished in 0.834 s
>  
>> 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet 
>> 2.0, whose tasks have all completed, from pool
>  
>> 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at 
>> :29, took 0.895912 s
>  
>> 15/08/23 07:38:24 main INFO 
>> LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory: 
>> /media/ephemeral0/s3/output-
>  
>> 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for dummy/_SUCCESS 
>> is [-44, 29, -128, -39, -113, 0, -78,
>>  4, -23, -103, 9, -104, -20, -8, 66, 126]
>  
>> 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__ 
>> committed.
> 
> Anyone has experienced this before?
> Thanks!
>  


How to set environment of worker applications

2015-08-23 Thread Jan Algermissen
Hi,

I am starting a spark streaming job in standalone mode with spark-submit.

Is there a way to make the UNIX environment variables with which spark-submit 
is started available to the processes started on the worker nodes?

Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-23 Thread Jerrick Hoang
anybody has any suggestions?

On Fri, Aug 21, 2015 at 3:14 PM, Jerrick Hoang 
wrote:

> Is there a workaround without updating Hadoop? Would really appreciate if
> someone can explain what spark is trying to do here and what is an easy way
> to turn this off. Thanks all!
>
> On Fri, Aug 21, 2015 at 11:09 AM, Raghavendra Pandey <
> raghavendra.pan...@gmail.com> wrote:
>
>> Did you try with hadoop version 2.7.1 .. It is known that s3a works
>> really well with parquet which is available in 2.7. They fixed lot of
>> issues related to metadata reading there...
>> On Aug 21, 2015 11:24 PM, "Jerrick Hoang"  wrote:
>>
>>> @Cheng, Hao : Physical plans show that it got stuck on scanning S3!
>>>
>>> (table is partitioned by date_prefix and hour)
>>> explain select count(*) from test_table where date_prefix='20150819' and
>>> hour='00';
>>>
>>> TungstenAggregate(key=[], value=[(count(1),mode=Final,isDistinct=false)]
>>>  TungstenExchange SinglePartition
>>>   TungstenAggregate(key=[],
>>> value=[(count(1),mode=Partial,isDistinct=false)]
>>>Scan ParquetRelation[ ..  ]
>>>
>>> Why does spark have to scan all partitions when the query only concerns
>>> with 1 partitions? Doesn't it defeat the purpose of partitioning?
>>>
>>> Thanks!
>>>
>>> On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver 
>>> wrote:
>>>
 I hadn't heard of spark.sql.sources.partitionDiscovery.enabled before,
 and I couldn't find much information about it online. What does it mean
 exactly to disable it? Are there any negative consequences to disabling it?

 On Wed, Aug 19, 2015 at 10:53 PM, Cheng, Hao 
 wrote:

> Can you make some more profiling? I am wondering if the driver is busy
> with scanning the HDFS / S3.
>
> Like jstack 
>
>
>
> And also, it’s will be great if you can paste the physical plan for
> the simple query.
>
>
>
> *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
> *Sent:* Thursday, August 20, 2015 1:46 PM
> *To:* Cheng, Hao
> *Cc:* Philip Weaver; user
> *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
> partitions
>
>
>
> I cloned from TOT after 1.5.0 cut off. I noticed there were a couple
> of CLs trying to speed up spark sql with tables with a huge number of
> partitions, I've made sure that those CLs are included but it's still very
> slow
>
>
>
> On Wed, Aug 19, 2015 at 10:43 PM, Cheng, Hao 
> wrote:
>
> Yes, you can try set the spark.sql.sources.partitionDiscovery.enabled
> to false.
>
>
>
> BTW, which version are you using?
>
>
>
> Hao
>
>
>
> *From:* Jerrick Hoang [mailto:jerrickho...@gmail.com]
> *Sent:* Thursday, August 20, 2015 12:16 PM
> *To:* Philip Weaver
> *Cc:* user
> *Subject:* Re: Spark Sql behaves strangely with tables with a lot of
> partitions
>
>
>
> I guess the question is why does spark have to do partition discovery
> with all partitions when the query only needs to look at one partition? Is
> there a conf flag to turn this off?
>
>
>
> On Wed, Aug 19, 2015 at 9:02 PM, Philip Weaver <
> philip.wea...@gmail.com> wrote:
>
> I've had the same problem. It turns out that Spark (specifically
> parquet) is very slow at partition discovery. It got better in 1.5 (not 
> yet
> released), but was still unacceptably slow. Sadly, we ended up reading
> parquet files manually in Python (via C++) and had to abandon Spark SQL
> because of this problem.
>
>
>
> On Wed, Aug 19, 2015 at 7:51 PM, Jerrick Hoang 
> wrote:
>
> Hi all,
>
>
>
> I did a simple experiment with Spark SQL. I created a partitioned
> parquet table with only one partition (date=20140701). A simple `select
> count(*) from table where date=20140701` would run very fast (0.1 
> seconds).
> However, as I added more partitions the query takes longer and longer. 
> When
> I added about 10,000 partitions, the query took way too long. I feel like
> querying for a single partition should not be affected by having more
> partitions. Is this a known behaviour? What does spark try to do here?
>
>
>
> Thanks,
>
> Jerrick
>
>
>
>
>
>
>


>>>
>


Error when saving a dataframe as ORC file

2015-08-23 Thread lostrain A
Hi,
  I'm trying to save a simple dataframe to S3 in ORC format. The code is as
follows:


 val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
>   import sqlContext.implicits._
>   val df=sc.parallelize(1 to 1000).toDF()
>   df.write.format("orc").save("s3://logs/dummy)


I ran the above code in spark-shell and only the _SUCCESS file was saved
under the directory.
The last part of the spark-shell log said:

15/08/23 07:38:23 task-result-getter-1 INFO TaskSetManager: Finished task
> 95.0 in stage 2.0 (TID 295) in 801 ms on ip-*-*-*-*.ec2.internal (100/100)
>


> 15/08/23 07:38:23 dag-scheduler-event-loop INFO DAGScheduler: ResultStage
> 2 (save at :29) finished in 0.834 s
>


> 15/08/23 07:38:23 task-result-getter-1 INFO YarnScheduler: Removed TaskSet
> 2.0, whose tasks have all completed, from pool
>


> 15/08/23 07:38:23 main INFO DAGScheduler: Job 2 finished: save at
> :29, took 0.895912 s
>


> 15/08/23 07:38:24 main INFO
> LocalDirAllocator$AllocatorPerContext$DirSelector: Returning directory:
> /media/ephemeral0/s3/output-
>


> 15/08/23 07:38:24 main ERROR NativeS3FileSystem: md5Hash for
> dummy/_SUCCESS is [-44, 29, -128, -39, -113, 0, -78,
>  4, -23, -103, 9, -104, -20, -8, 66, 126]
>


> 15/08/23 07:38:24 main INFO DefaultWriterContainer: Job job__
> committed.


Anyone has experienced this before?
Thanks!