Fwd: Spark partition size tuning

2016-01-25 Thread Jia Zou
Dear all,

First to update that the local file system data partition size can be tuned
by:
sc.hadoopConfiguration().setLong("fs.local.block.size", blocksize)

However, I also need to tune Spark data partition size for input data that
is stored in Tachyon (default is 512MB), but above method can't work for
Tachyon data.

Do you have any suggestions? Thanks very much!

Best Regards,
Jia


-- Forwarded message --
From: Jia Zou 
Date: Thu, Jan 21, 2016 at 10:05 PM
Subject: Spark partition size tuning
To: "user @spark" 


Dear all!

When using Spark to read from local file system, the default partition size
is 32MB, how can I increase the partition size to 128MB, to reduce the
number of tasks?

Thank you very much!

Best Regards,
Jia


spark-sql[1.4.0] not compatible hive sql when using in with date_sub or regexp_replace

2016-01-25 Thread our...@cnsuning.com
hi , all
when migrating hive sql to spark sql  encountor  a  incompatibility 
problem . Please give me some suggestions.


hive table description and data format as following :
1
use spark;
drop table spark.test_or1;
CREATE TABLE `spark.test_or1`( 
`statis_date` string, 
`lbl_nm` string)  row format delimited fields terminated by ',';; 

example data data.txt:
20160110 , item_XX_tab_buy03
20160114  , item_XX_tab_buy01   
20160115 , item_XX_tab_buy11
20160118 , item_XX_tab_buy01
20160101 , item_XX_tab_buy01
20160102 , item_XX_tab_buy03
20160103 , item_XX_tab_buy04

load data local inpath 'data.txt' into table  spark.test_or1; 


when execute this hive command in spark-sql(1.4.0)   encountor 
UnresolvedException: Invalid call to dataType on unresolved object, tree: 
'ta.lbl_nm . However ,this command can be executed succussfully in hive-shell;
select case when ta.lbl_nm in ('item_XX_tab_buy03','item_XX_gmq_buy01') 
then 1 
else 0 
end as  defaultVaule
from spark.test_or1 ta 
where ta.statis_date <= '20160118' 
and ta.statis_date > 
regexp_replace(date_sub(from_unixtime(to_unix_timestamp('20160118', 
'MMdd'), '-MM-dd'), 20), '-', '')  limit 10 ;  



this command also has errors : UnresolvedException: Invalid call to dataType on 
unresolved object, tree: 'ta.lbl_nm . However ,this command can be executed 
succussfully in hive-shell;
select case when ta.lbl_nm in ('item_XX_tab_buy03','item_XX_gmq_buy01') 
then 1 
else 0 
end as  defaultVaule
from spark.test_or1 ta 
where ta.statis_date <= '20160118' 
and ta.statis_date > date_sub(from_unixtime(to_unix_timestamp('20160118', 
'MMdd'), 'MMdd'), 20)  limit 10 ;  


this command also has errors : UnresolvedException: Invalid call to dataType on 
unresolved object, tree: 'ta.lbl_nm . However ,this command can be executed 
succussfully in hive-shell
select case when ta.lbl_nm in ('item_XX_tab_buy03','item_XX_gmq_buy01') 
then 1 
else 0 
end as  defaultVaule
from spark.test_or1 ta 
where ta.statis_date <= '20160118' 
and ta.statis_date > regexp_replace(2016-01-18, '-', '')  limit 10 ;   


while change `in`  to `==` , this command can be executed succussfully in 
hive-shell and spark sql 
select case when ta.lbl_nm='item_XX_tab_buy03' and ta.lbl_nm = 
'item_XX_gmq_buy01' 
then 1 
else 0 
end as  defaultVaule
from spark.test_or1 ta 
where ta.statis_date <= '20160118' 
and ta.statis_date > 
regexp_replace(date_sub(from_unixtime(to_unix_timestamp('20160118', 
'MMdd'), '-MM-dd'), 20), '-', '')  limit 10 ;  


spark-sql> 
> 
> select case when ta.lbl_nm='item_XX_tab_buy03' and ta.lbl_nm = 
> 'item_XX_gmq_buy01' 
> then 1 
> else 0 
> end as defaultVaule 
> from spark.test_or1 ta 
> where ta.statis_date <= '20160118' 
> and ta.statis_date > 
> regexp_replace(date_sub(from_unixtime(to_unix_timestamp('20160118', 
> 'MMdd'), '-MM-dd'), 20), '-', '') limit 10 ; 
0 
0 
0 
0 
0 
0 
Time taken: 3.725 seconds, Fetched 6 row(s)


detail error log
16/01/26 11:10:36 INFO ParseDriver: Parse Completed 
16/01/26 11:10:36 INFO HiveMetaStore: 0: get_table : db=spark tbl=test_or1 
16/01/26 11:10:36 INFO audit: ugi=spark ip=unknown-ip-addr cmd=get_table : 
db=spark tbl=test_or1 
16/01/26 11:10:37 ERROR SparkSQLDriver: Failed in [ select case when ta.lbl_nm 
in ('item_XX_tab_buy03','item_XX_gmq_buy01') 
then 1 
else 0 
end as defaultVaule 
from spark.test_or1 ta 
where ta.statis_date <= '20160118' 
and ta.statis_date > 
regexp_replace(date_sub(from_unixtime(to_unix_timestamp('20160118', 
'MMdd'), '-MM-dd'), 20), '-', '') limit 10 ] 
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
dataType on unresolved object, tree: 'ta.lbl_nm 
at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:59)
 
at 
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5$$anonfun$applyOrElse$15.apply(HiveTypeCoercion.scala:299)
 
at 
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5$$anonfun$applyOrElse$15.apply(HiveTypeCoercion.scala:299)
 
at 
scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:80) 
at scala.collection.immutable.List.exists(List.scala:84) 
at 
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5.applyOrElse(HiveTypeCoercion.scala:299)
 
at 
org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion$InConversion$$anonfun$apply$5.applyOrElse(HiveTypeCoercion.scala:298)
 
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
 
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
 
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
 
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:221) 
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$7.apply(TreeNode.scala:261)
 
at 

multi-threaded Spark jobs

2016-01-25 Thread Elango Cheran
Hi everyone,
I've gone through the effort of figuring out how to modify a Spark job to
have an operation become multi-threaded inside an executor.  I've written
up an explanation of what worked, what didn't work, and why:

http://www.elangocheran.com/blog/2016/01/using-clojure-to-create-multi-threaded-spark-jobs/

I think the ideas there should be applicable generally -- which would
include Scala and Java since the JVM is genuinely multi-threaded -- and
therefore may be of interest to others.  I will need to convert this code
to Scala for personal requirements in the near future, anyways.

I hope this helps.

-- Elango


Re: multi-threaded Spark jobs

2016-01-25 Thread Igor Berman
IMHO, you are making mistake.
spark manages tasks and cores internally. when you open new threads inside
executor - meaning you "over-provisioning" executor(e.g. tasks on other
cores will be preempted)



On 26 January 2016 at 07:59, Elango Cheran  wrote:

> Hi everyone,
> I've gone through the effort of figuring out how to modify a Spark job to
> have an operation become multi-threaded inside an executor.  I've written
> up an explanation of what worked, what didn't work, and why:
>
>
> http://www.elangocheran.com/blog/2016/01/using-clojure-to-create-multi-threaded-spark-jobs/
>
> I think the ideas there should be applicable generally -- which would
> include Scala and Java since the JVM is genuinely multi-threaded -- and
> therefore may be of interest to others.  I will need to convert this code
> to Scala for personal requirements in the near future, anyways.
>
> I hope this helps.
>
> -- Elango
>


hivethriftserver2 problems on upgrade to 1.6.0

2016-01-25 Thread james.gre...@baesystems.com
On upgrade from 1.5.0 to 1.6.0 I have a problem with the hivethriftserver2, I 
have this code:

val hiveContext = new HiveContext(SparkContext.getOrCreate(conf));

val thing = 
hiveContext.read.parquet("hdfs://dkclusterm1.imp.net:8020/user/jegreen1/ex208")

thing.registerTempTable("thing")


HiveThriftServer2.startWithContext(hiveContext)


When I start things up on the cluster my hive-site.xml is found – I can see 
that the metastore connects:


INFO  metastore - Trying to connect to metastore with URI 
thrift://dkclusterm2.imp.net:9083
INFO  metastore - Connected to metastore.


But then later on the thrift server seems not to connect to the remote hive 
metastore but to start a derby instance instead:

INFO  AbstractService - Service:CLIService is started.
INFO  ObjectStore - ObjectStore, initialize called
INFO  Query - Reading in results for query 
"org.datanucleus.store.rdbms.query.SQLQuery@0" since the connection used is 
closing
INFO  MetaStoreDirectSql - Using direct SQL, underlying DB is DERBY
INFO  ObjectStore - Initialized ObjectStore
INFO  HiveMetaStore - 0: get_databases: default
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=get_databases: 
default
INFO  HiveMetaStore - 0: Shutting down the object store...
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=Shutting down the 
object store...
INFO  HiveMetaStore - 0: Metastore shutdown complete.
INFO  audit - ugi=jegreen1  ip=unknown-ip-addr  cmd=Metastore shutdown 
complete.
INFO  AbstractService - Service:ThriftBinaryCLIService is started.
INFO  AbstractService - Service:HiveServer2 is started.


So if I connect to this with JDBC I can see all the tables on the hive server – 
but not anything temporary – I guess they are going to derby.

I see someone on the databricks website is also having this problem.


Thanks

James






From: patcharee [mailto:patcharee.thong...@uni.no]
Sent: 25 January 2016 14:31
To: user@spark.apache.org
Cc: Eirik Thorsnes
Subject: streaming textFileStream problem - got only ONE line

Hi,

My streaming application is receiving data from file system and just prints the 
input count every 1 sec interval, as the code below:

val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Milliseconds(interval_ms))
val lines = ssc.textFileStream(args(0))
lines.count().print()

The problem is sometimes the data received from scc.textFileStream is ONLY ONE 
line. But in fact there are multiple lines in the new file found in that 
interval. See log below which shows three intervals. In the 2nd interval, the 
new file is: hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt. This 
file contains 6288 lines. The ssc.textFileStream returns ONLY ONE line (the 
header).

Any ideas/suggestions what the problem is?

-
SPARK LOG
-

16/01/25 15:11:11 INFO FileInputDStream: Cleared 1 old files that were older 
than 1453731011000 ms: 145373101 ms
16/01/25 15:11:11 INFO FileInputDStream: Cleared 0 old files that were older 
than 1453731011000 ms:
16/01/25 15:11:12 INFO FileInputDStream: Finding new files took 4 ms
16/01/25 15:11:12 INFO FileInputDStream: New files at time 1453731072000 ms:
hdfs://helmhdfs/user/patcharee/cerdata/datetime_19616.txt
---
Time: 1453731072000 ms
---
6288

16/01/25 15:11:12 INFO FileInputDStream: Cleared 1 old files that were older 
than 1453731012000 ms: 1453731011000 ms
16/01/25 15:11:12 INFO FileInputDStream: Cleared 0 old files that were older 
than 1453731012000 ms:
16/01/25 15:11:13 INFO FileInputDStream: Finding new files took 4 ms
16/01/25 15:11:13 INFO FileInputDStream: New files at time 1453731073000 ms:
hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt
---
Time: 1453731073000 ms
---
1

16/01/25 15:11:13 INFO FileInputDStream: Cleared 1 old files that were older 
than 1453731013000 ms: 1453731012000 ms
16/01/25 15:11:13 INFO FileInputDStream: Cleared 0 old files that were older 
than 1453731013000 ms:
16/01/25 15:11:14 INFO FileInputDStream: Finding new files took 3 ms
16/01/25 15:11:14 INFO FileInputDStream: New files at time 1453731074000 ms:
hdfs://helmhdfs/user/patcharee/cerdata/datetime_19618.txt
---
Time: 1453731074000 ms
---
6288


Thanks,
Patcharee
Please consider the environment before printing this email. This message should 
be regarded as confidential. If you have received this email in error please 
notify the sender and destroy it immediately. Statements of intent shall only 
become binding when confirmed in hard copy by an authorised signatory. The 
contents of this email may relate to dealings with other companies under 

Re: How to discretize Continuous Variable with Spark DataFrames

2016-01-25 Thread Jeff Zhang
It's very straightforward, please refer the document here
http://spark.apache.org/docs/latest/ml-features.html#bucketizer


On Mon, Jan 25, 2016 at 10:09 PM, Eli Super  wrote:

> Thanks Joshua ,
>
> I can't understand what algorithm behind Bucketizer  , how discretization
> done ?
>
> Best Regards
>
>
> On Mon, Jan 25, 2016 at 3:40 PM, Joshua TAYLOR 
> wrote:
>
>> It sounds like you may want the Bucketizer in SparkML.  The overview docs
>> [1] include, "Bucketizer transforms a column of continuous features to a
>> column of feature buckets, where the buckets are specified by users."
>>
>> [1]: http://spark.apache.org/docs/latest/ml-features.html#bucketizer
>>
>> On Mon, Jan 25, 2016 at 5:34 AM, Eli Super  wrote:
>>
>>> Hi
>>>
>>> What is a best way to discretize Continuous Variable within  Spark
>>> DataFrames ?
>>>
>>> I want to discretize some variable 1) by equal frequency 2) by k-means
>>>
>>> I usually use R  for this porpoises
>>>
>>> _http://www.inside-r.org/packages/cran/arules/docs/discretize
>>>
>>> R code for example :
>>>
>>> ### equal frequency
>>> table(discretize(data$some_column, "frequency", categories=10))
>>>
>>>
>>> #k-means
>>> table(discretize(data$some_column, "cluster", categories=10))
>>>
>>> Thanks a lot !
>>>
>>
>>
>>
>> --
>> Joshua Taylor, http://www.cs.rpi.edu/~tayloj/
>>
>
>


-- 
Best Regards

Jeff Zhang


[Spark] Reading avro file in Spark 1.3.0

2016-01-25 Thread diplomatic Guru
Hello guys,

I've been trying to read avro file using Spark's DataFrame but it's
throwing this error:
java.lang.NoSuchMethodError:
org.apache.spark.sql.SQLContext.read()Lorg/apache/spark/sql/DataFrameReader;

This is what I've done so far:

I've added the dependency to pom.xml:


com.databricks
spark-avro_2.10
1.0.0


Java code:

JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
DataFrame df =
sqlContext.read().format("com.databricks.spark.avro").load(args[0]);

Could you please let me know what am I doing wrong?

Thanks.


Re: Launching EC2 instances with Spark compiled for Scala 2.11

2016-01-25 Thread Darren Govoni


Why not deploy it. Then build a custom distribution with Scala 2.11 and just 
overlay it.


Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: Nuno Santos  
Date: 01/25/2016  7:38 AM  (GMT-05:00) 
To: user@spark.apache.org 
Subject: Re: Launching EC2 instances with Spark compiled for Scala 2.11 

Hello, 

Any updates on this question? I'm also very interested in a solution, as I'm
trying to use Spark on EC2 but need Scala 2.11 support. The scripts in the
ec2 directory of the Spark distribution install use Scala 2.10 by default
and I can't see any obvious option to change to Scala 2.11. 

Regards, 
Nuno



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Launching-EC2-instances-with-Spark-compiled-for-Scala-2-11-tp24979p26059.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Sharing HiveContext in Spark JobServer / getOrCreate

2016-01-25 Thread Deenar Toraskar
Hi

I am using a shared sparkContext for all of my Spark jobs. Some of the jobs
use HiveContext, but there isn't a getOrCreate method on HiveContext which
will allow reuse of an existing HiveContext. Such a method exists on
SQLContext only (def getOrCreate(sparkContext: SparkContext): SQLContext).

Is there any reason that a HiveContext cannot be shared amongst multiple
threads within the same Spark driver process?

In addition I cannot seem to be able to cast a HiveContext to a SQLContext,
but this works fine in the spark shell, I am doing something wrong here?

scala> sqlContext

res19: org.apache.spark.sql.SQLContext =
org.apache.spark.sql.hive.HiveContext@383b3357

scala> import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.SQLContext

scala> SQLContext.getOrCreate(sc)

res18: org.apache.spark.sql.SQLContext =
org.apache.spark.sql.hive.HiveContext@383b3357



Regards
Deenar


Re: Launching EC2 instances with Spark compiled for Scala 2.11

2016-01-25 Thread Nuno Santos
Hello, 

Any updates on this question? I'm also very interested in a solution, as I'm
trying to use Spark on EC2 but need Scala 2.11 support. The scripts in the
ec2 directory of the Spark distribution install use Scala 2.10 by default
and I can't see any obvious option to change to Scala 2.11. 

Regards, 
Nuno



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Launching-EC2-instances-with-Spark-compiled-for-Scala-2-11-tp24979p26059.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to discretize Continuous Variable with Spark DataFrames

2016-01-25 Thread Eli Super
Thanks Joshua ,

I can't understand what algorithm behind Bucketizer  , how discretization
done ?

Best Regards


On Mon, Jan 25, 2016 at 3:40 PM, Joshua TAYLOR 
wrote:

> It sounds like you may want the Bucketizer in SparkML.  The overview docs
> [1] include, "Bucketizer transforms a column of continuous features to a
> column of feature buckets, where the buckets are specified by users."
>
> [1]: http://spark.apache.org/docs/latest/ml-features.html#bucketizer
>
> On Mon, Jan 25, 2016 at 5:34 AM, Eli Super  wrote:
>
>> Hi
>>
>> What is a best way to discretize Continuous Variable within  Spark
>> DataFrames ?
>>
>> I want to discretize some variable 1) by equal frequency 2) by k-means
>>
>> I usually use R  for this porpoises
>>
>> _http://www.inside-r.org/packages/cran/arules/docs/discretize
>>
>> R code for example :
>>
>> ### equal frequency
>> table(discretize(data$some_column, "frequency", categories=10))
>>
>>
>> #k-means
>> table(discretize(data$some_column, "cluster", categories=10))
>>
>> Thanks a lot !
>>
>
>
>
> --
> Joshua Taylor, http://www.cs.rpi.edu/~tayloj/
>


Re: 10hrs of Scheduler Delay

2016-01-25 Thread Ted Yu
Yes, thread dump plus log would be helpful for debugging. 

Thanks

> On Jan 25, 2016, at 5:59 AM, Sanders, Isaac B  
> wrote:
> 
> Is the thread dump the stack trace you are talking about? If so, I will see 
> if I can capture the few different stages I have seen it in.
> 
> Thanks for the help, I was able to do it for 0.1% of my data. I will create 
> the JIRA.
> 
> Thanks,
> Isaac
> 
> On Jan 25, 2016, at 8:51 AM, Ted Yu  wrote:
> 
>> Opening a JIRA is fine. 
>> 
>> See if you can capture stack trace during the hung stage and attach to JIRA 
>> so that we have more clue. 
>> 
>> Thanks
>> 
>> On Jan 25, 2016, at 4:25 AM, Darren Govoni  wrote:
>> 
>>> Probably we should open a ticket for this.
>>> There's definitely a deadlock situation occurring in spark under certain 
>>> conditions.
>>> 
>>> The only clue I have is it always happens on the last stage. And it does 
>>> seem sensitive to scale. If my job has 300mb of data I'll see the deadlock. 
>>> But if I only run 10mb of it it will succeed. This suggest a serious 
>>> fundamental scaling problem.
>>> 
>>> Workers have plenty of resources.
>>> 
>>> 
>>> 
>>> Sent from my Verizon Wireless 4G LTE smartphone
>>> 
>>> 
>>>  Original message 
>>> From: "Sanders, Isaac B"  
>>> Date: 01/24/2016 2:54 PM (GMT-05:00) 
>>> To: Renu Yadav  
>>> Cc: Darren Govoni , Muthu Jayakumar 
>>> , Ted Yu , user@spark.apache.org 
>>> Subject: Re: 10hrs of Scheduler Delay 
>>> 
>>> I am not getting anywhere with any of the suggestions so far. :(
>>> 
>>> Trying some more outlets, I will share any solution I find.
>>> 
>>> - Isaac
>>> 
 On Jan 23, 2016, at 1:48 AM, Renu Yadav  wrote:
 
 If you turn on spark.speculation on then that might help. it worked  for me
 
> On Sat, Jan 23, 2016 at 3:21 AM, Darren Govoni  
> wrote:
> Thanks for the tip. I will try it. But this is the kind of thing spark is 
> supposed to figure out and handle. Or at least not get stuck forever.
> 
> 
> 
> Sent from my Verizon Wireless 4G LTE smartphone
> 
> 
>  Original message 
> From: Muthu Jayakumar  
> Date: 01/22/2016 3:50 PM (GMT-05:00) 
> To: Darren Govoni , "Sanders, Isaac B" 
> , Ted Yu  
> Cc: user@spark.apache.org 
> Subject: Re: 10hrs of Scheduler Delay 
> 
> Does increasing the number of partition helps? You could try out 
> something 3 times what you currently have. 
> Another trick i used was to partition the problem into multiple 
> dataframes and run them sequentially and persistent the result and then 
> run a union on the results. 
> 
> Hope this helps. 
> 
>> On Fri, Jan 22, 2016, 3:48 AM Darren Govoni  wrote:
>> Me too. I had to shrink my dataset to get it to work. For us at least 
>> Spark seems to have scaling issues.
>> 
>> 
>> 
>> Sent from my Verizon Wireless 4G LTE smartphone
>> 
>> 
>>  Original message 
>> From: "Sanders, Isaac B"  
>> Date: 01/21/2016 11:18 PM (GMT-05:00) 
>> To: Ted Yu  
>> Cc: user@spark.apache.org 
>> Subject: Re: 10hrs of Scheduler Delay 
>> 
>> I have run the driver on a smaller dataset (k=2, n=5000) and it worked 
>> quickly and didn’t hang like this. This dataset is closer to k=10, 
>> n=4.4m, but I am using more resources on this one.
>> 
>> - Isaac
>> 
>>> On Jan 21, 2016, at 11:06 PM, Ted Yu  wrote:
>>> 
>>> You may have seen the following on github page:
>>> 
>>> Latest commit 50fdf0e  on Feb 22, 2015
>>> 
>>> That was 11 months ago.
>>> 
>>> Can you search for similar algorithm which runs on Spark and is newer ?
>>> 
>>> If nothing found, consider running the tests coming from the project to 
>>> determine whether the delay is intrinsic.
>>> 
>>> Cheers
>>> 
 On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B 
  wrote:
 That thread seems to be moving, it oscillates between a few different 
 traces… Maybe it is working. It seems odd that it would take that long.
 
 This is 3rd party code, and after looking at some of it, I think it 
 might not be as Spark-y as it could be.
 
 I linked it below. I don’t know a lot about spark, so it might be 
 fine, but I have my suspicions.
 
 https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/DistanceToNearestNeighborDriver.scala

Re: How to discretize Continuous Variable with Spark DataFrames

2016-01-25 Thread Joshua TAYLOR
It sounds like you may want the Bucketizer in SparkML.  The overview docs
[1] include, "Bucketizer transforms a column of continuous features to a
column of feature buckets, where the buckets are specified by users."

[1]: http://spark.apache.org/docs/latest/ml-features.html#bucketizer

On Mon, Jan 25, 2016 at 5:34 AM, Eli Super  wrote:

> Hi
>
> What is a best way to discretize Continuous Variable within  Spark
> DataFrames ?
>
> I want to discretize some variable 1) by equal frequency 2) by k-means
>
> I usually use R  for this porpoises
>
> _http://www.inside-r.org/packages/cran/arules/docs/discretize
>
> R code for example :
>
> ### equal frequency
> table(discretize(data$some_column, "frequency", categories=10))
>
>
> #k-means
> table(discretize(data$some_column, "cluster", categories=10))
>
> Thanks a lot !
>



-- 
Joshua Taylor, http://www.cs.rpi.edu/~tayloj/


Re: 10hrs of Scheduler Delay

2016-01-25 Thread Darren Govoni


Yeah. I have screenshots and stack traces. I will post them to the ticket. 
Nothing informative.
I should also mention I'm using pyspark but I think the deadlock is inside the 
Java scheduler code.



Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: "Sanders, Isaac B"  
Date: 01/25/2016  8:59 AM  (GMT-05:00) 
To: Ted Yu  
Cc: Darren Govoni , Renu Yadav , Muthu 
Jayakumar , user@spark.apache.org 
Subject: Re: 10hrs of Scheduler Delay 



Is the thread dump the stack trace you are talking about? If so, I will see if 
I can capture the few different stages I have seen it in.



Thanks for the help, I was able to do it for 0.1% of my data. I will create the 
JIRA.



Thanks,
Isaac


On Jan 25, 2016, at 8:51 AM, Ted Yu  wrote:







Opening a JIRA is fine. 



See if you can capture stack trace during the hung stage and attach to JIRA so 
that we have more clue. 



Thanks


On Jan 25, 2016, at 4:25 AM, Darren Govoni  wrote:






Probably we should open a ticket for this.
There's definitely a deadlock situation occurring in spark under certain 
conditions.



The only clue I have is it always happens on the last stage. And it does seem 
sensitive to scale. If my job has 300mb of data I'll see the deadlock. But if I 
only run 10mb of it it will succeed. This suggest a serious fundamental scaling 
problem.



Workers have plenty of resources.










Sent from my Verizon Wireless 4G LTE smartphone





 Original message 

From: "Sanders, Isaac B" 


Date: 01/24/2016 2:54 PM (GMT-05:00) 

To: Renu Yadav  

Cc: Darren Govoni , Muthu Jayakumar , 
Ted Yu ,
user@spark.apache.org 

Subject: Re: 10hrs of Scheduler Delay 



I am not getting anywhere with any of the suggestions so far. :(



Trying some more outlets, I will share any solution I find.



- Isaac




On Jan 23, 2016, at 1:48 AM, Renu Yadav  wrote:



If you turn on spark.speculation on then that might help. it worked  for me




On Sat, Jan 23, 2016 at 3:21 AM, Darren Govoni 
 wrote:



Thanks for the tip. I will try it. But this is the kind of thing spark is 
supposed to figure out and handle. Or at least not get stuck forever.











Sent from my Verizon Wireless 4G LTE smartphone





 Original message 



From: Muthu Jayakumar 


Date: 01/22/2016 3:50 PM (GMT-05:00) 

To: Darren Govoni , "Sanders, Isaac B" 
, Ted Yu 


Cc: user@spark.apache.org


Subject: Re: 10hrs of Scheduler Delay 



Does increasing the number of partition helps? You could try out something 3 
times what you currently have. 
Another trick i used was to partition the problem into multiple dataframes and 
run them sequentially and persistent the result and then run a union on the 
results. 



Hope this helps. 




On Fri, Jan 22, 2016, 3:48 AM Darren Govoni  wrote:




Me too. I had to shrink my dataset to get it to work. For us at least Spark 
seems to have scaling issues.












Sent from my Verizon Wireless 4G LTE smartphone





 Original message 


From: "Sanders, Isaac B" 


Date: 01/21/2016 11:18 PM (GMT-05:00) 

To: Ted Yu 


Cc: user@spark.apache.org


Subject: Re: 10hrs of Scheduler Delay 




I have run the driver on a smaller dataset (k=2, n=5000) and it worked quickly 
and didn’t hang like this. This dataset is closer to k=10, n=4.4m, but I am 
using more resources on this one.



- Isaac






On Jan 21, 2016, at 11:06 PM, Ted Yu  wrote:



You may have seen the following on github page:


Latest commit 50fdf0e  on Feb 22, 2015






That was 11 months ago.



Can you search for similar algorithm which runs on Spark and is newer ?



If nothing found, consider running the tests coming from the project to 
determine whether the delay is intrinsic.



Cheers



On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B 
 wrote:



That thread seems to be moving, it oscillates between a few different traces… 
Maybe it is working. It seems odd that it would take that long.



This is 3rd party code, and after looking at some of it, I think it might not 
be as Spark-y as it could be.



I linked it below. I don’t know a lot about spark, so it might be fine, but I 
have my suspicions.



https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/DistanceToNearestNeighborDriver.scala



- Isaac




On Jan 21, 2016, at 10:08 PM, Ted Yu  wrote:



You may have noticed the following - did this indicate prolonged 

Re: 10hrs of Scheduler Delay

2016-01-25 Thread Ted Yu
Opening a JIRA is fine. 

See if you can capture stack trace during the hung stage and attach to JIRA so 
that we have more clue. 

Thanks

> On Jan 25, 2016, at 4:25 AM, Darren Govoni  wrote:
> 
> Probably we should open a ticket for this.
> There's definitely a deadlock situation occurring in spark under certain 
> conditions.
> 
> The only clue I have is it always happens on the last stage. And it does seem 
> sensitive to scale. If my job has 300mb of data I'll see the deadlock. But if 
> I only run 10mb of it it will succeed. This suggest a serious fundamental 
> scaling problem.
> 
> Workers have plenty of resources.
> 
> 
> 
> Sent from my Verizon Wireless 4G LTE smartphone
> 
> 
>  Original message 
> From: "Sanders, Isaac B"  
> Date: 01/24/2016 2:54 PM (GMT-05:00) 
> To: Renu Yadav  
> Cc: Darren Govoni , Muthu Jayakumar 
> , Ted Yu , user@spark.apache.org 
> Subject: Re: 10hrs of Scheduler Delay 
> 
> I am not getting anywhere with any of the suggestions so far. :(
> 
> Trying some more outlets, I will share any solution I find.
> 
> - Isaac
> 
>> On Jan 23, 2016, at 1:48 AM, Renu Yadav  wrote:
>> 
>> If you turn on spark.speculation on then that might help. it worked  for me
>> 
>>> On Sat, Jan 23, 2016 at 3:21 AM, Darren Govoni  wrote:
>>> Thanks for the tip. I will try it. But this is the kind of thing spark is 
>>> supposed to figure out and handle. Or at least not get stuck forever.
>>> 
>>> 
>>> 
>>> Sent from my Verizon Wireless 4G LTE smartphone
>>> 
>>> 
>>>  Original message 
>>> From: Muthu Jayakumar  
>>> Date: 01/22/2016 3:50 PM (GMT-05:00) 
>>> To: Darren Govoni , "Sanders, Isaac B" 
>>> , Ted Yu  
>>> Cc: user@spark.apache.org 
>>> Subject: Re: 10hrs of Scheduler Delay 
>>> 
>>> Does increasing the number of partition helps? You could try out something 
>>> 3 times what you currently have. 
>>> Another trick i used was to partition the problem into multiple dataframes 
>>> and run them sequentially and persistent the result and then run a union on 
>>> the results. 
>>> 
>>> Hope this helps. 
>>> 
 On Fri, Jan 22, 2016, 3:48 AM Darren Govoni  wrote:
 Me too. I had to shrink my dataset to get it to work. For us at least 
 Spark seems to have scaling issues.
 
 
 
 Sent from my Verizon Wireless 4G LTE smartphone
 
 
  Original message 
 From: "Sanders, Isaac B"  
 Date: 01/21/2016 11:18 PM (GMT-05:00) 
 To: Ted Yu  
 Cc: user@spark.apache.org 
 Subject: Re: 10hrs of Scheduler Delay 
 
 I have run the driver on a smaller dataset (k=2, n=5000) and it worked 
 quickly and didn’t hang like this. This dataset is closer to k=10, n=4.4m, 
 but I am using more resources on this one.
 
 - Isaac
 
> On Jan 21, 2016, at 11:06 PM, Ted Yu  wrote:
> 
> You may have seen the following on github page:
> 
> Latest commit 50fdf0e  on Feb 22, 2015
> 
> That was 11 months ago.
> 
> Can you search for similar algorithm which runs on Spark and is newer ?
> 
> If nothing found, consider running the tests coming from the project to 
> determine whether the delay is intrinsic.
> 
> Cheers
> 
>> On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B 
>>  wrote:
>> That thread seems to be moving, it oscillates between a few different 
>> traces… Maybe it is working. It seems odd that it would take that long.
>> 
>> This is 3rd party code, and after looking at some of it, I think it 
>> might not be as Spark-y as it could be.
>> 
>> I linked it below. I don’t know a lot about spark, so it might be fine, 
>> but I have my suspicions.
>> 
>> https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/DistanceToNearestNeighborDriver.scala
>> 
>> - Isaac
>> 
>>> On Jan 21, 2016, at 10:08 PM, Ted Yu  wrote:
>>> 
>>> You may have noticed the following - did this indicate prolonged 
>>> computation in your code ?


Re: 10hrs of Scheduler Delay

2016-01-25 Thread Sanders, Isaac B
Is the thread dump the stack trace you are talking about? If so, I will see if 
I can capture the few different stages I have seen it in.

Thanks for the help, I was able to do it for 0.1% of my data. I will create the 
JIRA.

Thanks,
Isaac

On Jan 25, 2016, at 8:51 AM, Ted Yu 
> wrote:

Opening a JIRA is fine.

See if you can capture stack trace during the hung stage and attach to JIRA so 
that we have more clue.

Thanks

On Jan 25, 2016, at 4:25 AM, Darren Govoni 
> wrote:

Probably we should open a ticket for this.
There's definitely a deadlock situation occurring in spark under certain 
conditions.

The only clue I have is it always happens on the last stage. And it does seem 
sensitive to scale. If my job has 300mb of data I'll see the deadlock. But if I 
only run 10mb of it it will succeed. This suggest a serious fundamental scaling 
problem.

Workers have plenty of resources.



Sent from my Verizon Wireless 4G LTE smartphone


 Original message 
From: "Sanders, Isaac B" 
>
Date: 01/24/2016 2:54 PM (GMT-05:00)
To: Renu Yadav >
Cc: Darren Govoni >, Muthu 
Jayakumar >, Ted Yu 
>, 
user@spark.apache.org
Subject: Re: 10hrs of Scheduler Delay

I am not getting anywhere with any of the suggestions so far. :(

Trying some more outlets, I will share any solution I find.

- Isaac

On Jan 23, 2016, at 1:48 AM, Renu Yadav 
> wrote:

If you turn on spark.speculation on then that might help. it worked  for me

On Sat, Jan 23, 2016 at 3:21 AM, Darren Govoni 
> wrote:
Thanks for the tip. I will try it. But this is the kind of thing spark is 
supposed to figure out and handle. Or at least not get stuck forever.



Sent from my Verizon Wireless 4G LTE smartphone


 Original message 
From: Muthu Jayakumar >
Date: 01/22/2016 3:50 PM (GMT-05:00)
To: Darren Govoni >, "Sanders, 
Isaac B" >, Ted Yu 
>
Cc: user@spark.apache.org
Subject: Re: 10hrs of Scheduler Delay

Does increasing the number of partition helps? You could try out something 3 
times what you currently have.
Another trick i used was to partition the problem into multiple dataframes and 
run them sequentially and persistent the result and then run a union on the 
results.

Hope this helps.

On Fri, Jan 22, 2016, 3:48 AM Darren Govoni 
> wrote:
Me too. I had to shrink my dataset to get it to work. For us at least Spark 
seems to have scaling issues.



Sent from my Verizon Wireless 4G LTE smartphone


 Original message 
From: "Sanders, Isaac B" 
>
Date: 01/21/2016 11:18 PM (GMT-05:00)
To: Ted Yu >
Cc: user@spark.apache.org
Subject: Re: 10hrs of Scheduler Delay

I have run the driver on a smaller dataset (k=2, n=5000) and it worked quickly 
and didn't hang like this. This dataset is closer to k=10, n=4.4m, but I am 
using more resources on this one.

- Isaac

On Jan 21, 2016, at 11:06 PM, Ted Yu 
> wrote:

You may have seen the following on github page:

Latest commit 50fdf0e  on Feb 22, 2015

That was 11 months ago.

Can you search for similar algorithm which runs on Spark and is newer ?

If nothing found, consider running the tests coming from the project to 
determine whether the delay is intrinsic.

Cheers

On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B 
> wrote:
That thread seems to be moving, it oscillates between a few different traces... 
Maybe it is working. It seems odd that it would take that long.

This is 3rd party code, and after looking at some of it, I think it might not 
be as Spark-y as it could be.

I linked it below. I don't know a lot about spark, so it might be fine, but I 
have my suspicions.

https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/DistanceToNearestNeighborDriver.scala

- Isaac

On Jan 21, 2016, at 10:08 PM, Ted Yu 
> wrote:

You may have noticed the following - did this indicate prolonged computation in 
your code ?


Running kafka consumer in local mode - error - connection timed out

2016-01-25 Thread Supreeth
We are running a Kafka Consumer in local mode using Spark Streaming,
KafkaUtils.createDirectStream. 

The job runs as expected, however once in a very long time time, I see the
following exception.

Wanted to check if others have faced a similar issue, and what are the right
timeout parameters to change to avoid this issue.

Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1 times,
most recent failure: Lost task 5.0 in stage 30499.0 (TID 203307, localhost):
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.Net.connect(Net.java:449)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


When this failure happens, the executor is not retried either.
Any help appreciated.

-S

ps: My last attempt to post did not succeed, apologies if this happens to be
a re-post of my earlier post a 40 mins ago



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-kafka-consumer-in-local-mode-error-connection-timed-out-tp26063.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark integration with HCatalog (specifically regarding partitions)

2016-01-25 Thread Elliot West
Thanks for your response Jorge and apologies for my delay in replying. I
took your advice with case 5 and declared the column names explicitly
instead of the wildcard. This did the trick and I can now add partitions to
an existing table. I also tried removing the 'partitionBy("id")' call as
suggested, by this gave me a NoSuchMethodError. I suspect this would work
if I were running a newer version of the Hive metastore. Oddly I'm still
unable to create a new partitioned table, although I have since found a
somewhat confusing warning while running case 4:

Persisting partitioned data source relation `raboof` into Hive metastore in
Spark SQL specific format, which is NOT compatible with Hive. Input
path(s):


If you have any thoughts, please let me know.

Thanks - Elliot.


On 13 January 2016 at 20:56, Jorge Machado  wrote:

> Hi Elliot,
>
> I´m no Expert to but for the case 5 can it be that you changed the order
> on the second insert ?
> And why do you give the command partitionBy again if the table was
> already create with partition ?
>
>
>
> insert into table foobar PARTITION (id)
>   > values ("xxx", 1), ("yyy", 2);
>
>
>
> hive (default)> insert into table new_record_source
>   > values (3, "zzz");
>
>
>
> Regards
>
>
> On 11/01/2016, at 13:36, Elliot West  wrote:
>
> Hello,
>
> I am in the process of evaluating Spark (1.5.2) for a wide range of use
> cases. In particular I'm keen to understand the depth of the integration
> with HCatalog (aka the Hive Metastore). I am very encouraged when browsing
> the source contained within the org.apache.spark.sql.hive package. My goals
> are to evaluate how effectively Spark handles the following scenarios:
>
>1. Reading from an unpartitioned HCatalog table.
>2. Reading from a partitioned HCatalog table with partition pruning
>from filter pushdown.
>3. Writing to a new unpartitioned HCatalog table.
>4. Writing to a new partitioned HCatalog table.
>5. Adding a partition to a partitioned HCatalog table.
>
> I found that the first three cases appear to function beautifully.
> However, I cannot seem to effectively create new HCatalog aware partitions
> either in a new table or on and existing table (cases 4 & 5). I suspect
> this may be due to my inexperience with Spark so wonder if you could advise
> me on what to try next. Here's what I have:
>
> *Case 4: Writing to a new partitioned HCatalog table*
>
> Create a source in Hive (could be plain data file also):
>
>
> hive (default)> create table foobar ( id int, name string );
> hive (default)> insert into table foobar values (1, "xxx"), (2, "zzz");
>
> Read the source with Spark, partition the data, and write to a new table:
>
> sqlContext.sql("select *
> from foobar").write.format("orc").partitionBy("id").saveAsTable("raboof")
>
>
> Check for the new table in Hive, it is partitioned correctly although the
> formats and schema are unexpected:
>
> hive (default)> show table extended like 'raboof';
> OK
> tab_name
> tableName: raboof
> location:hdfs://host:port/user/hive/warehouse/raboof
> inputformat:org.apache.hadoop.mapred.SequenceFileInputFormat
> outputformat:org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
> columns:struct columns { list col}
> partitioned:true
> partitionColumns:struct partition_columns { i32 id}
>
>
> Check for correctly partitioned data on HDFS, it appears to be there:
>
> [me@host]$ hdfs dfs -ls -R /user/hive/warehouse/raboof
> /user/hive/warehouse/raboof/_SUCCESS
> /user/hive/warehouse/raboof/id=1
> /user/hive/warehouse/raboof/id=1/part-r-0-.orc
> /user/hive/warehouse/raboof/id=2
> /user/hive/warehouse/raboof/id=2/part-r-0-.orc
>
>
> Something is wrong however, no data is returned from this query and the
> column names appear incorrect:
>
> hive (default)> select * from default.raboof;
> OK
> col id
>
> HCatalog reports no partitions for the table:
>
> hive (default)> show partitions default.raboof;
> OK
> partition
>
> *Case 5: Adding a partition to a partitioned HCatalog table*
>
> Created partitioned source table in Hive:
>
> hive (default)> create table foobar ( name string )
>   > partitioned by ( id int )
>   > stored as orc;
> hive (default)> insert into table foobar PARTITION (id)
>   > values ("xxx", 1), ("yyy", 2);
>
>
> Created a source for a new record to add to new_record_source:
>
> hive (default)> create table new_record_source ( id int, name string )
>   > stored as orc;
> hive (default)> insert into table new_record_source
>   > values (3, "zzz");
>
>
> Trying to add a partition with:
>
> sqlContext.sql("select *
> from 
> new_record_source").write.mode("append").partitionBy("id").saveAsTable("foobar")
>
>
> This almost did what I wanted:
>
> hive (default)> show partitions default.foobar;
> partition
> id=1
> id=2
> id=__HIVE_DEFAULT_PARTITION__
>
> hive (default)> select * from default.foobar;
> name id
> xxx 1
> 

Re: Running kafka consumer in local mode - error - connection timed out

2016-01-25 Thread Cody Koeninger
Should be socket.timeout.ms on the map of kafka config parameters.  The
lack of retry is probably due to the differences between running spark in
local mode vs standalone / mesos / yarn.



On Mon, Jan 25, 2016 at 1:19 PM, Supreeth  wrote:

> We are running a Kafka Consumer in local mode using Spark Streaming,
> KafkaUtils.createDirectStream.
>
> The job runs as expected, however once in a very long time time, I see the
> following exception.
>
> Wanted to check if others have faced a similar issue, and what are the
> right
> timeout parameters to change to avoid this issue.
>
> Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1 times,
> most recent failure: Lost task 5.0 in stage 30499.0 (TID 203307,
> localhost):
> java.net.ConnectException: Connection timed out
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.Net.connect(Net.java:449)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
> at
> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
> at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
> at
>
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
>
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
> at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> When this failure happens, the executor is not retried either.
> Any help appreciated.
>
> -S
>
> ps: My last attempt to post did not succeed, apologies if this happens to
> be
> a re-post of my earlier post a 40 mins ago
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Running-kafka-consumer-in-local-mode-error-connection-timed-out-tp26063.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: bug for large textfiles on windows

2016-01-25 Thread Josh Rosen
Hi Christopher,

What would be super helpful here is a standalone reproduction. Ideally this
would be a single Scala file or set of commands that I can run in
`spark-shell` in order to reproduce this. Ideally, this code would generate
a giant file, then try to read it in a way that demonstrates the bug. If
you have such a reproduction, could you attach it to that JIRA ticket?
Thanks!

On Mon, Jan 25, 2016 at 7:53 AM Christopher Bourez <
christopher.bou...@gmail.com> wrote:

> Dears,
>
> I would like to re-open a case for a potential bug (current status is
> resolved but it sounds not) :
>
> *https://issues.apache.org/jira/browse/SPARK-12261
> *
>
> I believe there is something wrong about the memory management under
> windows
>
> It has no sense to work with files smaller than a few Mo...
>
> Do not hesitate to ask me questions if you try to help and reproduce the
> bug,
>
> Best
>
> Christopher Bourez
> 06 17 17 50 60
>


Re: How to setup a long running spark streaming job with continuous window refresh

2016-01-25 Thread Tathagata Das
You can use a 1 minute tumbling window

dstream.window(Minutes(1), Minutes(1)).foreachRDD { rdd =>
   // calculate stats per key
}

On Thu, Jan 21, 2016 at 4:59 AM, Santoshakhilesh <
santosh.akhil...@huawei.com> wrote:

> Hi,
>
> I have following scenario in my project;
>
> 1.I will continue to get a stream of data from a source
>
> 2.I need to calculate mean and variance for a key every minute
>
> 3.After minute is over I should restart fresh computing the values
> for new minute
>
> Example:
>
> *10:00:00 computation and output*
>
> 10:00:00 key =1 , mean = 10 , variance =2
>
> 10:00:00 key =N , mean = 10 , variance =2
>
> *10:00:01 computation and output*
>
> 10:00:00 key =1 , mean = 11 , variance =2
>
> 10:00:00 key =N , mean = 12 , variance =2
>
> *10:00:01 data has no dependency with 10:00:00*
>
> How to setup such jobs in a single java spark streaming application.
>
> Regards,|
>
> Santosh Akhilesh
>
>
>


Re: how to build spark with out hive

2016-01-25 Thread Ted Yu
Spark 1.5.2. depends on slf4j 1.7.10

Looks like there was another version of slf4j on the classpath.

FYI

On Mon, Jan 25, 2016 at 12:19 AM, kevin  wrote:

> HI,all
> I need to test hive on spark ,to use spark as the hive's execute
> engine.
> I download the spark source 1.5.2 from apache web-site.
> I have installed maven3.3.9 and scala 2.10.6 ,so I change
> the /make-distribution.sh
> to point to my mvn location where I installed.
>
> then I run the commond:
> ./make-distribution.sh --name "hadoop2-without-hive" --tgz
> "-Pyarn,hadoop-2.7,hadoop-provided,parquet-provided" -DskipTests
> -Dhadoop.version=2.7.1
>
> is this all right? when I star the spark cluster ,I got error :
>
> Spark Command: /usr/lib/jdk/bin/java -cp
> /dcos/spark/sbin/../conf/:/dcos/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar:/dcos/hadoop/etc/hadoop/
> -Xms
> 1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip
> 10.1.3.107 --port 7077 --webui-port 8080
> 
> Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/Logger
> at java.lang.Class.getDeclaredMethods0(Native Method)
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2615)
> at java.lang.Class.getMethod0(Class.java:2856)
> at java.lang.Class.getMethod(Class.java:1668)
> at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
> at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
> Caused by: java.lang.ClassNotFoundException: org.slf4j.Logger
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 6 more
>
>
> I NEED some advise.
>
>


Re: NA value handling in sparkR

2016-01-25 Thread Deborah Siegel
Maybe not ideal, but since read.df is inferring all columns from the csv
containing "NA" as type of strings, one could filter them rather than using
dropna().

filtered_aq <- filter(aq, aq$Ozone != "NA" & aq$Solar_R != "NA")
head(filtered_aq)

Perhaps it would be better to have an option for read.df to convert any
"NA" it encounters into null types, like createDataFrame does for , and
then one would be able to use dropna() etc.



On Mon, Jan 25, 2016 at 3:24 AM, Devesh Raj Singh 
wrote:

> Hi,
>
> Yes you are right.
>
> I think the problem is with reading of csv files. read.df is not
> considering NAs in the CSV file
>
> So what would be a workable solution in dealing with NAs in csv files?
>
>
>
> On Mon, Jan 25, 2016 at 2:31 PM, Deborah Siegel 
> wrote:
>
>> Hi Devesh,
>>
>> I'm not certain why that's happening, and it looks like it doesn't happen
>> if you use createDataFrame directly:
>> aq <- createDataFrame(sqlContext,airquality)
>> head(dropna(aq,how="any"))
>>
>> If I had to guess.. dropna(), I believe, drops null values. I suppose its
>> possible that createDataFrame converts R's  values to null, so dropna()
>> works with that. But perhaps read.df() does not convert R s to null, as
>> those are most likely interpreted as strings when they come in from the
>> csv. Just a guess, can anyone confirm?
>>
>> Deb
>>
>>
>>
>>
>>
>>
>> On Sun, Jan 24, 2016 at 11:05 PM, Devesh Raj Singh <
>> raj.deves...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have applied the following code on airquality dataset available in R ,
>>> which has some missing values. I want to omit the rows which has NAs
>>>
>>> library(SparkR) Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages"
>>> "com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell"')
>>>
>>> sc <- sparkR.init("local",sparkHome =
>>> "/Users/devesh/Downloads/spark-1.5.1-bin-hadoop2.6")
>>>
>>> sqlContext <- sparkRSQL.init(sc)
>>>
>>> path<-"/Users/devesh/work/airquality/"
>>>
>>> aq <- read.df(sqlContext,path,source = "com.databricks.spark.csv",
>>> header="true", inferSchema="true")
>>>
>>> head(dropna(aq,how="any"))
>>>
>>> I am getting the output as
>>>
>>> Ozone Solar_R Wind Temp Month Day 1 41 190 7.4 67 5 1 2 36 118 8.0 72 5
>>> 2 3 12 149 12.6 74 5 3 4 18 313 11.5 62 5 4 5 NA NA 14.3 56 5 5 6 28 NA
>>> 14.9 66 5 6
>>>
>>> The NAs still exist in the output. Am I missing something here?
>>>
>>> --
>>> Warm regards,
>>> Devesh.
>>>
>>
>>
>
>
> --
> Warm regards,
> Devesh.
>


bug for large textfiles on windows

2016-01-25 Thread Christopher Bourez
Dears,

I would like to re-open a case for a potential bug (current status is
resolved but it sounds not) :

*https://issues.apache.org/jira/browse/SPARK-12261
*

I believe there is something wrong about the memory management under
windows

It has no sense to work with files smaller than a few Mo...

Do not hesitate to ask me questions if you try to help and reproduce the
bug,

Best

Christopher Bourez
06 17 17 50 60


Determine Topic MetaData Spark Streaming Job

2016-01-25 Thread Ashish Soni
Hi All ,

What is the best way to tell spark streaming job for the no of partition to
to a given topic -

Should that be provided as a parameter or command line argument
or
We should connect to kafka in the driver program and query it

Map fromOffsets = new HashMap();
fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);

Thanks,
Ashish


a question about web ui log

2016-01-25 Thread Philip Lee
​Hello, a questino about web UI log.

​I could see web interface log after forwarding the port on my cluster to
my local and click completed application, but when I clicked "application
detail UI"

[image: Inline image 1]

It happened to me. I do not know why. I also checked the specific log
folder. It has a log file in it. Actually, that's why I could click the
completed application link, right?

So is it okay for me to copy the log file in my cluster to my local machine.
And after turning on spark Job Manger on my local by myself, I could see
application deatils UI in my local machine?

Best,
Phil


Re: Determine Topic MetaData Spark Streaming Job

2016-01-25 Thread Ashish Soni
Correct what i am trying to achieve is that before the streaming job starts
query the topic meta data from kafka , determine all the partition and
provide those to direct API.

So my question is should i consider passing all the partition from command
line and query kafka and find and provide , what is the correct approach.

Ashish

On Mon, Jan 25, 2016 at 11:38 AM, Gerard Maas  wrote:

> What are you trying to achieve?
>
> Looks like you want to provide offsets but you're not managing them
> and I'm assuming you're using the direct stream approach.
>
> In that case, use the simpler constructor that takes the kafka config and
> the topics. Let it figure it out the offsets (it will contact kafka and
> request the partitions for the topics provided)
>
> KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)
>
>  -kr, Gerard
>
> On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni 
> wrote:
>
>> Hi All ,
>>
>> What is the best way to tell spark streaming job for the no of partition
>> to to a given topic -
>>
>> Should that be provided as a parameter or command line argument
>> or
>> We should connect to kafka in the driver program and query it
>>
>> Map fromOffsets = new HashMap> Long>();
>> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>>
>> Thanks,
>> Ashish
>>
>
>


Re: Determine Topic MetaData Spark Streaming Job

2016-01-25 Thread Gerard Maas
What are you trying to achieve?

Looks like you want to provide offsets but you're not managing them and I'm
assuming you're using the direct stream approach.

In that case, use the simpler constructor that takes the kafka config and
the topics. Let it figure it out the offsets (it will contact kafka and
request the partitions for the topics provided)

KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)

 -kr, Gerard

On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni  wrote:

> Hi All ,
>
> What is the best way to tell spark streaming job for the no of partition
> to to a given topic -
>
> Should that be provided as a parameter or command line argument
> or
> We should connect to kafka in the driver program and query it
>
> Map fromOffsets = new HashMap Long>();
> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>
> Thanks,
> Ashish
>


Re: Determine Topic MetaData Spark Streaming Job

2016-01-25 Thread Gerard Maas
That's precisely what this constructor does:
KafkaUtils.createDirectStream[...](ssc,
kafkaConfig, topics)

Is there a reason to do that yourself?  In that case, look at how it's done
in Spark Streaming for inspiration:
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L204

-kr, Gerard.




On Mon, Jan 25, 2016 at 5:53 PM, Ashish Soni  wrote:

> Correct what i am trying to achieve is that before the streaming job
> starts query the topic meta data from kafka , determine all the partition
> and provide those to direct API.
>
> So my question is should i consider passing all the partition from command
> line and query kafka and find and provide , what is the correct approach.
>
> Ashish
>
> On Mon, Jan 25, 2016 at 11:38 AM, Gerard Maas 
> wrote:
>
>> What are you trying to achieve?
>>
>> Looks like you want to provide offsets but you're not managing them
>> and I'm assuming you're using the direct stream approach.
>>
>> In that case, use the simpler constructor that takes the kafka config and
>> the topics. Let it figure it out the offsets (it will contact kafka and
>> request the partitions for the topics provided)
>>
>> KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)
>>
>>  -kr, Gerard
>>
>> On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni 
>> wrote:
>>
>>> Hi All ,
>>>
>>> What is the best way to tell spark streaming job for the no of partition
>>> to to a given topic -
>>>
>>> Should that be provided as a parameter or command line argument
>>> or
>>> We should connect to kafka in the driver program and query it
>>>
>>> Map fromOffsets = new
>>> HashMap();
>>> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>>>
>>> Thanks,
>>> Ashish
>>>
>>
>>
>


Re: [Spark] Reading avro file in Spark 1.3.0

2016-01-25 Thread Kevin Mellott
I think that you may be looking at documentation pertaining to the more
recent versions of Spark. Try looking at the examples linked below, which
applies to the Spark 1.3 version. There aren't many Java examples, but the
code should be very similar to the Scala ones (i.e. using "load" instead of
"read' on the DataFrame).

https://github.com/databricks/spark-avro/tree/branch-1.0

On Mon, Jan 25, 2016 at 4:38 AM, diplomatic Guru 
wrote:

> Hello guys,
>
> I've been trying to read avro file using Spark's DataFrame but it's
> throwing this error:
> java.lang.NoSuchMethodError:
> org.apache.spark.sql.SQLContext.read()Lorg/apache/spark/sql/DataFrameReader;
>
> This is what I've done so far:
>
> I've added the dependency to pom.xml:
>
> 
> com.databricks
> spark-avro_2.10
> 1.0.0
> 
>
> Java code:
>
> JavaSparkContext sc = new JavaSparkContext(sparkConf);
> SQLContext sqlContext = new SQLContext(sc);
> DataFrame df =
> sqlContext.read().format("com.databricks.spark.avro").load(args[0]);
>
> Could you please let me know what am I doing wrong?
>
> Thanks.
>
>
>
>
>


Re: a question about web ui log

2016-01-25 Thread Philip Lee
As I mentioned before, I am tryint to see the spark log on a cluster via
ssh-tunnel

1) The error on application details UI is probably from monitoring porting
​4044. Web UI port is 8088, right? so how could I see job web ui view and
application details UI view in the web ui on my local machine?

2) still wondering how to see the log after copyting log file to my local.

The error was metioned in previous mail.

Thanks,
Phil



On Mon, Jan 25, 2016 at 5:36 PM, Philip Lee  wrote:

> ​Hello, a questino about web UI log.
>
> ​I could see web interface log after forwarding the port on my cluster to
> my local and click completed application, but when I clicked "application
> detail UI"
>
> [image: Inline image 1]
>
> It happened to me. I do not know why. I also checked the specific log
> folder. It has a log file in it. Actually, that's why I could click the
> completed application link, right?
>
> So is it okay for me to copy the log file in my cluster to my local
> machine.
> And after turning on spark Job Manger on my local by myself, I could see
> application deatils UI in my local machine?
>
> Best,
> Phil
>


Re: Spark master takes more time with local[8] than local[1]

2016-01-25 Thread nsalian
Hi,

Thanks for the question.
Is it possible for you to elaborate on your application?
The flow of the application will help to understand what could potentially
cause things to slow down?

Do logs give you any idea what goes on? Have you had a chance to look?

Thank you.



-
Neelesh S. Salian
Cloudera
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-master-takes-more-time-with-local-8-than-local-1-tp26052p26061.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-01-25 Thread Nirav Patel
Hi,

Perhaps I should write a blog about this that why spark is focusing more on
writing easier spark jobs and hiding underlaying performance optimization
details from a seasoned spark users. It's one thing to provide such
abstract framework that does optimization for you so you don't have to
worry about it as a data scientist or data analyst but what about
developers who do not want overhead of SQL and Optimizers and unnecessary
abstractions ! Application designer who knows their data and queries should
be able to optimize at RDD level transformations and actions. Does spark
provides a way to achieve same level of optimization by using either SQL
Catalyst or raw RDD transformation?

Thanks

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Running kafka consumer in local mode - error - connection timed out

2016-01-25 Thread Supreeth
We are running a Kafka Consumer in local mode using Spark Streaming,
KafkaUtils.createDirectStream.

The job runs as expected, however once in a very long time time, I see the
following exception.

Wanted to check if others have faced a similar issue, and what are the
right timeout parameters to change to avoid this issue.

Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1
times, most recent failure: Lost task 5.0 in stage 30499.0 (TID
203307, localhost): java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.Net.connect(Net.java:449)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at 
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


When this failure happens, the executor is not retried either.

Any help appreciated.

-S


Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-01-25 Thread Mark Hamstra
What do you think is preventing you from optimizing your own RDD-level
transformations and actions?  AFAIK, nothing that has been added in
Catalyst precludes you from doing that.  The fact of the matter is, though,
that there is less type and semantic information available to Spark from
the raw RDD API than from using Spark SQL, DataFrames or DataSets.  That
means that Spark itself can't optimize for raw RDDs the same way that it
can for higher-level constructs that can leverage Catalyst; but if you want
to write your own optimizations based on your own knowledge of the data
types and semantics that are hiding in your raw RDDs, there's no reason
that you can't do that.

On Mon, Jan 25, 2016 at 9:35 AM, Nirav Patel  wrote:

> Hi,
>
> Perhaps I should write a blog about this that why spark is focusing more
> on writing easier spark jobs and hiding underlaying performance
> optimization details from a seasoned spark users. It's one thing to provide
> such abstract framework that does optimization for you so you don't have to
> worry about it as a data scientist or data analyst but what about
> developers who do not want overhead of SQL and Optimizers and unnecessary
> abstractions ! Application designer who knows their data and queries should
> be able to optimize at RDD level transformations and actions. Does spark
> provides a way to achieve same level of optimization by using either SQL
> Catalyst or raw RDD transformation?
>
> Thanks
>
>
>
>
>
> [image: What's New with Xactly] 
>
>   [image: LinkedIn]
>   [image: Twitter]
>   [image: Facebook]
>   [image: YouTube]
> 


Re: Sharing HiveContext in Spark JobServer / getOrCreate

2016-01-25 Thread Ted Yu
Have you noticed the following method of HiveContext ?

   * Returns a new HiveContext as new session, which will have separated
SQLConf, UDF/UDAF,
   * temporary tables and SessionState, but sharing the same CacheManager,
IsolatedClientLoader
   * and Hive client (both of execution and metadata) with existing
HiveContext.
   */
  override def newSession(): HiveContext = {

Cheers

On Mon, Jan 25, 2016 at 7:22 AM, Deenar Toraskar 
wrote:

> Hi
>
> I am using a shared sparkContext for all of my Spark jobs. Some of the
> jobs use HiveContext, but there isn't a getOrCreate method on HiveContext
> which will allow reuse of an existing HiveContext. Such a method exists on
> SQLContext only (def getOrCreate(sparkContext: SparkContext): SQLContext).
>
> Is there any reason that a HiveContext cannot be shared amongst multiple
> threads within the same Spark driver process?
>
> In addition I cannot seem to be able to cast a HiveContext to a
> SQLContext, but this works fine in the spark shell, I am doing something
> wrong here?
>
> scala> sqlContext
>
> res19: org.apache.spark.sql.SQLContext =
> org.apache.spark.sql.hive.HiveContext@383b3357
>
> scala> import org.apache.spark.sql.SQLContext
>
> import org.apache.spark.sql.SQLContext
>
> scala> SQLContext.getOrCreate(sc)
>
> res18: org.apache.spark.sql.SQLContext =
> org.apache.spark.sql.hive.HiveContext@383b3357
>
>
>
> Regards
> Deenar
>


Re: bug for large textfiles on windows

2016-01-25 Thread Christopher Bourez
The same problem occurs on my desktop at work.
What's great with AWS Workspace is that you can easily reproduce it.

I created the test file with commands :

for i in {0..30}; do
  VALUE="$RANDOM"
  for j in {0..6}; do
VALUE="$VALUE;$RANDOM";
  done
  echo $VALUE >> test.csv
done

Christopher Bourez
06 17 17 50 60

On Mon, Jan 25, 2016 at 10:01 PM, Christopher Bourez <
christopher.bou...@gmail.com> wrote:

> Josh,
>
> Thanks a lot !
>
> You can download a video I created :
> https://s3-eu-west-1.amazonaws.com/christopherbourez/public/video.mov
>
> I created a sample file of 13 MB as explained :
> https://s3-eu-west-1.amazonaws.com/christopherbourez/public/test.csv
>
> Here are the commands I did :
>
> I created an Aws Workspace with Windows 7 (that I can share you if you'd
> like) with Standard instance, 2GiB RAM
> On this instance :
> I downloaded spark (1.5 or 1.6 same pb) with hadoop 2.6
> installed java 8 jdk
> downloaded python 2.7.8
>
> downloaded the sample file
> https://s3-eu-west-1.amazonaws.com/christopherbourez/public/test.csv
>
> And then the command lines I launch are :
> bin\pyspark --master local[1]
> sc.textFile("test.csv").take(1)
>
> As you can see, sc.textFile("test.csv", 2000).take(1) works well
>
> Thanks a lot !
>
>
> Christopher Bourez
> 06 17 17 50 60
>
> On Mon, Jan 25, 2016 at 8:02 PM, Josh Rosen 
> wrote:
>
>> Hi Christopher,
>>
>> What would be super helpful here is a standalone reproduction. Ideally
>> this would be a single Scala file or set of commands that I can run in
>> `spark-shell` in order to reproduce this. Ideally, this code would generate
>> a giant file, then try to read it in a way that demonstrates the bug. If
>> you have such a reproduction, could you attach it to that JIRA ticket?
>> Thanks!
>>
>> On Mon, Jan 25, 2016 at 7:53 AM Christopher Bourez <
>> christopher.bou...@gmail.com> wrote:
>>
>>> Dears,
>>>
>>> I would like to re-open a case for a potential bug (current status is
>>> resolved but it sounds not) :
>>>
>>> *https://issues.apache.org/jira/browse/SPARK-12261
>>> *
>>>
>>> I believe there is something wrong about the memory management under
>>> windows
>>>
>>> It has no sense to work with files smaller than a few Mo...
>>>
>>> Do not hesitate to ask me questions if you try to help and reproduce the
>>> bug,
>>>
>>> Best
>>>
>>> Christopher Bourez
>>> 06 17 17 50 60
>>>
>>
>


Re: Sharing HiveContext in Spark JobServer / getOrCreate

2016-01-25 Thread Deenar Toraskar
On 25 January 2016 at 21:09, Deenar Toraskar <
deenar.toras...@thinkreactive.co.uk> wrote:

> No I hadn't. This is useful, but in some cases we do want to share the
> same temporary tables between jobs so really wanted a getOrCreate
> equivalent on HIveContext.
>
> Deenar
>
>
>
> On 25 January 2016 at 18:10, Ted Yu  wrote:
>
>> Have you noticed the following method of HiveContext ?
>>
>>* Returns a new HiveContext as new session, which will have separated
>> SQLConf, UDF/UDAF,
>>* temporary tables and SessionState, but sharing the same
>> CacheManager, IsolatedClientLoader
>>* and Hive client (both of execution and metadata) with existing
>> HiveContext.
>>*/
>>   override def newSession(): HiveContext = {
>>
>> Cheers
>>
>> On Mon, Jan 25, 2016 at 7:22 AM, Deenar Toraskar <
>> deenar.toras...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I am using a shared sparkContext for all of my Spark jobs. Some of the
>>> jobs use HiveContext, but there isn't a getOrCreate method on HiveContext
>>> which will allow reuse of an existing HiveContext. Such a method exists on
>>> SQLContext only (def getOrCreate(sparkContext: SparkContext):
>>> SQLContext).
>>>
>>> Is there any reason that a HiveContext cannot be shared amongst multiple
>>> threads within the same Spark driver process?
>>>
>>> In addition I cannot seem to be able to cast a HiveContext to a
>>> SQLContext, but this works fine in the spark shell, I am doing something
>>> wrong here?
>>>
>>> scala> sqlContext
>>>
>>> res19: org.apache.spark.sql.SQLContext =
>>> org.apache.spark.sql.hive.HiveContext@383b3357
>>>
>>> scala> import org.apache.spark.sql.SQLContext
>>>
>>> import org.apache.spark.sql.SQLContext
>>>
>>> scala> SQLContext.getOrCreate(sc)
>>>
>>> res18: org.apache.spark.sql.SQLContext =
>>> org.apache.spark.sql.hive.HiveContext@383b3357
>>>
>>>
>>>
>>> Regards
>>> Deenar
>>>
>>
>>
>


Re: Running kafka consumer in local mode - error - connection timed out

2016-01-25 Thread Supreeth
Hmm, thanks for the response.

The current value I have for socket.timeout.ms is 12. I am not sure if
this needs a higher value, not much from the logs.

The retry aspect makes sense, I can work around the same.

-S

On Mon, Jan 25, 2016 at 11:51 AM, Cody Koeninger  wrote:

> Should be socket.timeout.ms on the map of kafka config parameters.  The
> lack of retry is probably due to the differences between running spark in
> local mode vs standalone / mesos / yarn.
>
>
>
> On Mon, Jan 25, 2016 at 1:19 PM, Supreeth  wrote:
>
>> We are running a Kafka Consumer in local mode using Spark Streaming,
>> KafkaUtils.createDirectStream.
>>
>> The job runs as expected, however once in a very long time time, I see the
>> following exception.
>>
>> Wanted to check if others have faced a similar issue, and what are the
>> right
>> timeout parameters to change to avoid this issue.
>>
>> Job aborted due to stage failure: Task 5 in stage 30499.0 failed 1 times,
>> most recent failure: Lost task 5.0 in stage 30499.0 (TID 203307,
>> localhost):
>> java.net.ConnectException: Connection timed out
>> at sun.nio.ch.Net.connect0(Native Method)
>> at sun.nio.ch.Net.connect(Net.java:457)
>> at sun.nio.ch.Net.connect(Net.java:449)
>> at
>> sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
>> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>> at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
>> at
>>
>> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
>> at
>>
>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:109)
>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:108)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> at
>>
>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:108)
>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
>> at
>>
>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
>> at
>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> at
>>
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>> at
>>
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> When this failure happens, the executor is not retried either.
>> Any help appreciated.
>>
>> -S
>>
>> ps: My last attempt to post did not succeed, apologies if this happens to
>> be
>> a re-post of my earlier post a 40 mins ago
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Running-kafka-consumer-in-local-mode-error-connection-timed-out-tp26063.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark RDD DAG behaviour understanding in case of checkpointing

2016-01-25 Thread Tathagata Das
First of all, if you are running batches of 15 minutes, and you dont need
second level latencies, it might be just easier to run batch jobs in a for
loop - you will have greater control over what is going on.
And if you are using reduceByKeyAndWindow without the
inverseReduceFunction, then Spark has to read all the 2 hours of data back
after recovering the driver from checkpoint files if it has to calculate
aggregations on last two hours of data (the data read earlier is lost when
the driver is dead).

On Sat, Jan 23, 2016 at 1:47 PM, gaurav sharma 
wrote:

> Hi Tathagata/Cody,
>
> I am facing a challenge in Production with DAG behaviour during
> checkpointing in spark streaming -
>
> Step 1 : Read data from Kafka every 15 min -  call this KafkaStreamRDD ~
> 100 GB of data
>
> Step 2 : Repartition KafkaStreamRdd from 5 to 100 partitions to
> parallelise processing - call this RepartitionedKafkaStreamRdd
>
> Step 3 : on this RepartitionedKafkaStreamRdd I run map and
> reduceByKeyAndWindow over a window of 2 hours, call this RDD1 ~ 100 MB of
> data
>
> Checkpointing is enabled.
>
> If i restart my streaming context, it picks up from last checkpointed
> state,
>
> READS data for all the 8 SUCCESSFULLY FINISHED 15 minute batches from
> Kafka , re-performs Repartition of all the data of all these 8 , 15 minute
> batches.
>
> Then reads data for current 15 minute batch and runs map and
> reduceByKeyAndWindow over a window of 2 hours.
>
> Challenge -
> 1> I cant checkpoint KafkaStreamRDD or RepartitionedKafkaStreamRdd as this
> is huge data around 800GB for 2 hours, reading and writing (checkpointing)
> this at every 15 minutes would be very slow.
>
> 2> When i have checkpointed data of RDD1 at every 15 minutes, and map and
> reduceByKeyAndWindow is being run over RDD1 only, and i have snapshot of
> all of the last 8, 15 minute batches of RDD1,
> why is spark reading all the data for last 8 successfully completed
> batches from Kafka again(Step 1) and again performing re-partitioning(Step
> 2) and then again running map and reduceByKeyandWindow over these newly
> fetched kafkaStreamRdd data of last 8 , 15 minute batches.
>
> Because of above mentioned challenges, i am not able to exploit
> checkpointing, in case streaming context is restarted at high load.
>
> Please help out in understanding, if there is something that i am missing
>
> Regards,
> Gaurav
>


Re: Trouble dropping columns from a DataFrame that has other columns with dots in their names

2016-01-25 Thread Michael Armbrust
Looks like you found a bug.  I've filed them here:

SPARK-12987 - Drop fails when columns contain dots

SPARK-12988 - Can't drop columns that contain dots


On Fri, Jan 22, 2016 at 3:18 PM, Joshua TAYLOR 
wrote:

> (Apologies if this comes through twice;  I sent it once before I'd
> confirmed by mailing list subscription.)
>
>
> I've been having lots of trouble with DataFrames whose columns have dots
> in their names today.  I know that in many places, backticks can be used to
> quote column names, but the problem I'm running into now is that I can't
> drop a column that has *no* dots in its name when there are *other* columns
> in the table that do.  Here's some code that tries four ways of dropping
> the column.  One throws a weird exception, one is a semi-expected no-op,
> and the other two work.
>
> public class SparkExample {
> public static void main(String[] args) {
> /* Get the spark and sql contexts.  Setting spark.ui.enabled to
> false
>  * keeps Spark from using its built in dependency on Jersey. */
> SparkConf conf = new SparkConf()
> .setMaster("local[*]")
> .setAppName("test")
> .set("spark.ui.enabled", "false");
> JavaSparkContext sparkContext = new JavaSparkContext(conf);
> SQLContext sqlContext = new SQLContext(sparkContext);
>
> /* Create a schema with two columns, one of which as no dots (a_b),
>  * and the other which does (a.b). */
> StructType schema = new StructType(new StructField[] {
> DataTypes.createStructField("a_b", DataTypes.StringType,
> false),
> DataTypes.createStructField("a.c", DataTypes.IntegerType,
> false)
> });
>
> /* Create an RDD of Rows, and then convert it into a DataFrame. */
> List rows = Arrays.asList(
> RowFactory.create("t", 2),
> RowFactory.create("u", 4));
> JavaRDD rdd = sparkContext.parallelize(rows);
> DataFrame df = sqlContext.createDataFrame(rdd, schema);
>
> /* Four ways to attempt dropping a_b from the DataFrame.
>  * We'll try calling each one of these and looking at
>  * the results (or the resulting exception). */
> Function x1 = d -> d.drop("a_b");  //
> exception
> Function x2 = d -> d.drop("`a_b`");//
> no-op
> Function x3 = d -> d.drop(d.col("a_b"));   //
> works
> Function x4 = d -> d.drop(d.col("`a_b`")); //
> works
>
> int i=0;
> for (Function x : Arrays.asList(x1, x2, x3,
> x4)) {
> System.out.println("Case "+i++);
> try {
> x.apply(df).show();
> } catch (Exception e) {
> e.printStackTrace(System.out);
> }
> }
> }
> }
>
> Here's the output.  Case 1 is a no-op, which I think I can understand,
> because DataFrame.drop(String) doesn't do any resolution (it doesn't need
> to), so d.drop("`a_b`") doesn't do anything because there's no column whose
> name is literally "`a_b`".  The third and fourth cases work, because
> DataFrame.col() does do resolution, and both "a_b" and "`a_b`" resolve
> correctly.  But why does the first case fail?  And why with the message
> that it does?  Why is it trying to resolve "a.c" at all in this case?
>
> Case 0
> org.apache.spark.sql.AnalysisException: cannot resolve 'a.c' given input
> columns a_b, a.c;
> at
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:60)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
> at
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:318)
> at
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:107)
> at org.apache.spark.sql.catalyst.plans.QueryPlan.org
> $apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:117)
> at
> org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:121)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> 

Datasets and columns

2016-01-25 Thread Steve Lewis
assume I have the following code

SparkConf sparkConf = new SparkConf();

JavaSparkContext sqlCtx= new JavaSparkContext(sparkConf);

JavaRDD rddMyType= generateRDD(); // some code

Encoder evidence = Encoders.kryo(MyType.class);
Dataset datasetMyType= sqlCtx.createDataset( rddMyType.rdd(), evidence);

Now I have a Dataset of MyType and assume there is some data.

Assume MyType has bean fields with getters and setters as well as some
internal collections and other data. What can I say about
datasetMyType??

Does datasetMyType have columns and if so what?

If not are there other ways to maka a DataSet with columns and if so
what are they


Re: streaming textFileStream problem - got only ONE line

2016-01-25 Thread Shixiong(Ryan) Zhu
Did you move the file into "hdfs://helmhdfs/user/patcharee/cerdata/", or
write into it directly? `textFileStream` requires that files must be
written to the monitored directory by "moving" them from another location
within the same file system.

On Mon, Jan 25, 2016 at 6:30 AM, patcharee 
wrote:

> Hi,
>
> My streaming application is receiving data from file system and just
> prints the input count every 1 sec interval, as the code below:
>
> val sparkConf = new SparkConf()
> val ssc = new StreamingContext(sparkConf, Milliseconds(interval_ms))
> val lines = ssc.textFileStream(args(0))
> lines.count().print()
>
> The problem is sometimes the data received from scc.textFileStream is ONLY
> ONE line. But in fact there are multiple lines in the new file found in
> that interval. See log below which shows three intervals. In the 2nd
> interval, the new file is:
> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt. This file
> contains 6288 lines. The ssc.textFileStream returns ONLY ONE line (the
> header).
>
> Any ideas/suggestions what the problem is?
>
>
> -
> SPARK LOG
>
> -
>
> 16/01/25 15:11:11 INFO FileInputDStream: Cleared 1 old files that were
> older than 1453731011000 ms: 145373101 ms
> 16/01/25 15:11:11 INFO FileInputDStream: Cleared 0 old files that were
> older than 1453731011000 ms:
> 16/01/25 15:11:12 INFO FileInputDStream: Finding new files took 4 ms
> 16/01/25 15:11:12 INFO FileInputDStream: New files at time 1453731072000
> ms:
> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19616.txt
> ---
> Time: 1453731072000 ms
> ---
> 6288
>
> 16/01/25 15:11:12 INFO FileInputDStream: Cleared 1 old files that were
> older than 1453731012000 ms: 1453731011000 ms
> 16/01/25 15:11:12 INFO FileInputDStream: Cleared 0 old files that were
> older than 1453731012000 ms:
> 16/01/25 15:11:13 INFO FileInputDStream: Finding new files took 4 ms
> 16/01/25 15:11:13 INFO FileInputDStream: New files at time 1453731073000
> ms:
> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19617.txt
> ---
> Time: 1453731073000 ms
> ---
> 1
>
> 16/01/25 15:11:13 INFO FileInputDStream: Cleared 1 old files that were
> older than 1453731013000 ms: 1453731012000 ms
> 16/01/25 15:11:13 INFO FileInputDStream: Cleared 0 old files that were
> older than 1453731013000 ms:
> 16/01/25 15:11:14 INFO FileInputDStream: Finding new files took 3 ms
> 16/01/25 15:11:14 INFO FileInputDStream: New files at time 1453731074000
> ms:
> hdfs://helmhdfs/user/patcharee/cerdata/datetime_19618.txt
> ---
> Time: 1453731074000 ms
> ---
> 6288
>
>
> Thanks,
> Patcharee
>


Trouble dropping columns from a DataFrame that has other columns with dots in their names

2016-01-25 Thread Joshua TAYLOR
(Apologies if this has arrived more than once.  I've subscribed to the list,
and tried posting via email with no success.  This an intentional
repost to see if things are going through yet.)

I've been having lots of trouble with DataFrames whose columns have dots in
their names today.  I know that in many places, backticks can be used to
quote column names, but the problem I'm running into now is that I can't
drop a column that has *no* dots in its name when there are *other* columns
in the table that do.  Here's some code that tries four ways of dropping the
column.  One throws a weird exception, one is a semi-expected no-op, and the
other two work.

public class SparkExample {
public static void main(String[] args) {
/* Get the spark and sql contexts.  Setting spark.ui.enabled to
false
 * keeps Spark from using its built in dependency on Jersey. */
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sparkContext);

/* Create a schema with two columns, one of which as no dots (a_b),
 * and the other which does (a.b). */
StructType schema = new StructType(new StructField[] {
DataTypes.createStructField("a_b", DataTypes.StringType,
false),
DataTypes.createStructField("a.c", DataTypes.IntegerType,
false)
});

/* Create an RDD of Rows, and then convert it into a DataFrame. */
List rows = Arrays.asList(
RowFactory.create("t", 2),
RowFactory.create("u", 4));
JavaRDD rdd = sparkContext.parallelize(rows);
DataFrame df = sqlContext.createDataFrame(rdd, schema);

/* Four ways to attempt dropping a_b from the DataFrame.
 * We'll try calling each one of these and looking at
 * the results (or the resulting exception). */
Function x1 = d -> d.drop("a_b");  //
exception
Function x2 = d -> d.drop("`a_b`");//
no-op
Function x3 = d -> d.drop(d.col("a_b"));   //
works
Function x4 = d -> d.drop(d.col("`a_b`")); //
works

int i=0;
for (Function x : Arrays.asList(x1, x2, x3,
x4)) {
System.out.println("Case "+i++);
try {
x.apply(df).show();
} catch (Exception e) {
e.printStackTrace(System.out);
}
}
}
}

Here's the output.  Case 1 is a no-op, which I think I can understand,
because DataFrame.drop(String) doesn't do any resolution (it doesn't need
to), so d.drop("`a_b`") doesn't do anything because there's no column whose
name is literally "`a_b`".  The third and fourth cases work, because
DataFrame.col() does do resolution, and both "a_b" and "`a_b`" resolve
correctly.  But why does the first case fail?  And why with the message that
it does?  Why is it trying to resolve "a.c" at all in this case?

Case 0
org.apache.spark.sql.AnalysisException: cannot resolve 'a.c' given input
columns a_b, a.c;
at
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:60)
at
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
at
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:318)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:107)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:117)
at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:121)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:121)
at

Can Spark read input data from HDFS centralized cache?

2016-01-25 Thread Jia Zou
I configured HDFS to cache file in HDFS's cache, like following:

hdfs cacheadmin -addPool hibench

hdfs cacheadmin -addDirective -path /HiBench/Kmeans/Input -pool hibench


But I didn't see much performance impacts, no matter how I configure
dfs.datanode.max.locked.memory


Is it possible that Spark doesn't know the data is in HDFS cache, and still
read data from disk, instead of from HDFS cache?


Thanks!

Jia


Re: mapWithState and context start when checkpoint exists

2016-01-25 Thread Shixiong(Ryan) Zhu
Hey Andrey,

`ConstantInputDStream` doesn't support checkpoint as it contains an RDD
field. It cannot resume from checkpoints.

On Mon, Jan 25, 2016 at 1:12 PM, Andrey Yegorov 
wrote:

> Hi,
>
> I am new to spark (and scala) and hope someone can help me with the issue
> I got stuck on in my experiments/learning.
>
> mapWithState from spark 1.6 seems to be a great way for the task I want to
> implement with spark but unfortunately I am getting error "RDD
> transformations and actions can only be invoked by the driver, not inside
> of other transformations" on job restart when checkpoint already exists.
> Job starts and works ok if checkpoint is empty (this kind of defeats the
> point of having the checkpoint).
>
> I can reproduce it with ~65 lines of test code, see below.
> Is there something that I am doing wrong there?
>
> code:
> 
>
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.dstream.ConstantInputDStream
> import org.apache.spark.streaming.{Durations, StreamingContext, _}
>
> object TestJob {
>   def stateFunc(id: String,
> txt: Option[Iterable[String]],
> state: State[String]) : (String, Long) = {
> if (txt.nonEmpty) {
>   val aggregatedString = state.getOption().getOrElse("") + txt
>   state.update(aggregatedString)
>   (id, aggregatedString.length)
> } else { // happens when state is timing out? any other cases?
>   (id, 0)
> }
>   }
>
>   def createContext(checkpointDirectory: String): StreamingContext = {
> val sparkConf = new
> SparkConf().setMaster("local[2]").setAppName("test")
>
> val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
> ssc.checkpoint(checkpointDirectory)
>
> val input = Seq("1", "21", "321", "41", "42", "543", "67")
> val inputRdd = ssc.sparkContext.parallelize(input)
> val testStream = new ConstantInputDStream(ssc, inputRdd)
>
> val streamWithIds = testStream.map(x => (x.substring(0,1), x))
> val batched = streamWithIds.groupByKey()
>
> val stateSpec = StateSpec.function(stateFunc
> _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days
>
> val result = batched.mapWithState(stateSpec)
> result.print
> ssc
>   }
>
>   def main(args: Array[String]): Unit = {
> val checkpointDirectory = com.google.common.io.Files.createTempDir()
> checkpointDirectory.deleteOnExit()
> val checkpointDirectoryName = checkpointDirectory.getAbsolutePath
>
> val ssc = StreamingContext.getOrCreate(checkpointDirectoryName,
>   () => {
> createContext(checkpointDirectoryName)
>   })
>
> ssc.start()
> ssc.awaitTerminationOrTimeout(7000)
> ssc.stop()
> Thread.sleep(5000)
>
> val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName,
>   () => {
> createContext(checkpointDirectoryName)
>   })
>
> // terminates here with
> // Exception in thread "main" org.apache.spark.SparkException: RDD
> transformations and actions can only be invoked by the driver, not inside
> of other transformations; for example, rdd1.map(x => rdd2.values.count() *
> x) is invalid because the values transformation and count action cannot be
> performed inside of the rdd1.map transformation. For more information, see
> SPARK-5063.
> ssc2.start()
> ssc2.awaitTerminationOrTimeout(7000)
> ssc2.stop()
>   }
> }
>
> --
> Andrey Yegorov
>


Re: Can Spark read input data from HDFS centralized cache?

2016-01-25 Thread Ted Yu
Have you read this thread ?

http://search-hadoop.com/m/uOzYttXZcg1M6oKf2/HDFS+cache=RE+hadoop+hdfs+cache+question+do+client+processes+share+cache+

Cheers

On Mon, Jan 25, 2016 at 1:23 PM, Jia Zou  wrote:

> I configured HDFS to cache file in HDFS's cache, like following:
>
> hdfs cacheadmin -addPool hibench
>
> hdfs cacheadmin -addDirective -path /HiBench/Kmeans/Input -pool hibench
>
>
> But I didn't see much performance impacts, no matter how I configure
> dfs.datanode.max.locked.memory
>
>
> Is it possible that Spark doesn't know the data is in HDFS cache, and
> still read data from disk, instead of from HDFS cache?
>
>
> Thanks!
>
> Jia
>


Re: bug for large textfiles on windows

2016-01-25 Thread Christopher Bourez
Josh,

Thanks a lot !

You can download a video I created :
https://s3-eu-west-1.amazonaws.com/christopherbourez/public/video.mov

I created a sample file of 13 MB as explained :
https://s3-eu-west-1.amazonaws.com/christopherbourez/public/test.csv

Here are the commands I did :

I created an Aws Workspace with Windows 7 (that I can share you if you'd
like) with Standard instance, 2GiB RAM
On this instance :
I downloaded spark (1.5 or 1.6 same pb) with hadoop 2.6
installed java 8 jdk
downloaded python 2.7.8

downloaded the sample file
https://s3-eu-west-1.amazonaws.com/christopherbourez/public/test.csv

And then the command lines I launch are :
bin\pyspark --master local[1]
sc.textFile("test.csv").take(1)

As you can see, sc.textFile("test.csv", 2000).take(1) works well

Thanks a lot !


Christopher Bourez
06 17 17 50 60

On Mon, Jan 25, 2016 at 8:02 PM, Josh Rosen 
wrote:

> Hi Christopher,
>
> What would be super helpful here is a standalone reproduction. Ideally
> this would be a single Scala file or set of commands that I can run in
> `spark-shell` in order to reproduce this. Ideally, this code would generate
> a giant file, then try to read it in a way that demonstrates the bug. If
> you have such a reproduction, could you attach it to that JIRA ticket?
> Thanks!
>
> On Mon, Jan 25, 2016 at 7:53 AM Christopher Bourez <
> christopher.bou...@gmail.com> wrote:
>
>> Dears,
>>
>> I would like to re-open a case for a potential bug (current status is
>> resolved but it sounds not) :
>>
>> *https://issues.apache.org/jira/browse/SPARK-12261
>> *
>>
>> I believe there is something wrong about the memory management under
>> windows
>>
>> It has no sense to work with files smaller than a few Mo...
>>
>> Do not hesitate to ask me questions if you try to help and reproduce the
>> bug,
>>
>> Best
>>
>> Christopher Bourez
>> 06 17 17 50 60
>>
>


Re: bug for large textfiles on windows

2016-01-25 Thread Christopher Bourez
Here is a pic of memory
If I put --conf spark.driver.memory=3g, it increases the displaid memory,
but the problem remains... for a file that is only 13M.

Christopher Bourez
06 17 17 50 60

On Mon, Jan 25, 2016 at 10:06 PM, Christopher Bourez <
christopher.bou...@gmail.com> wrote:

> The same problem occurs on my desktop at work.
> What's great with AWS Workspace is that you can easily reproduce it.
>
> I created the test file with commands :
>
> for i in {0..30}; do
>   VALUE="$RANDOM"
>   for j in {0..6}; do
> VALUE="$VALUE;$RANDOM";
>   done
>   echo $VALUE >> test.csv
> done
>
> Christopher Bourez
> 06 17 17 50 60
>
> On Mon, Jan 25, 2016 at 10:01 PM, Christopher Bourez <
> christopher.bou...@gmail.com> wrote:
>
>> Josh,
>>
>> Thanks a lot !
>>
>> You can download a video I created :
>> https://s3-eu-west-1.amazonaws.com/christopherbourez/public/video.mov
>>
>> I created a sample file of 13 MB as explained :
>> https://s3-eu-west-1.amazonaws.com/christopherbourez/public/test.csv
>>
>> Here are the commands I did :
>>
>> I created an Aws Workspace with Windows 7 (that I can share you if you'd
>> like) with Standard instance, 2GiB RAM
>> On this instance :
>> I downloaded spark (1.5 or 1.6 same pb) with hadoop 2.6
>> installed java 8 jdk
>> downloaded python 2.7.8
>>
>> downloaded the sample file
>> https://s3-eu-west-1.amazonaws.com/christopherbourez/public/test.csv
>>
>> And then the command lines I launch are :
>> bin\pyspark --master local[1]
>> sc.textFile("test.csv").take(1)
>>
>> As you can see, sc.textFile("test.csv", 2000).take(1) works well
>>
>> Thanks a lot !
>>
>>
>> Christopher Bourez
>> 06 17 17 50 60
>>
>> On Mon, Jan 25, 2016 at 8:02 PM, Josh Rosen 
>> wrote:
>>
>>> Hi Christopher,
>>>
>>> What would be super helpful here is a standalone reproduction. Ideally
>>> this would be a single Scala file or set of commands that I can run in
>>> `spark-shell` in order to reproduce this. Ideally, this code would generate
>>> a giant file, then try to read it in a way that demonstrates the bug. If
>>> you have such a reproduction, could you attach it to that JIRA ticket?
>>> Thanks!
>>>
>>> On Mon, Jan 25, 2016 at 7:53 AM Christopher Bourez <
>>> christopher.bou...@gmail.com> wrote:
>>>
 Dears,

 I would like to re-open a case for a potential bug (current status is
 resolved but it sounds not) :

 *https://issues.apache.org/jira/browse/SPARK-12261
 *

 I believe there is something wrong about the memory management under
 windows

 It has no sense to work with files smaller than a few Mo...

 Do not hesitate to ask me questions if you try to help and reproduce
 the bug,

 Best

 Christopher Bourez
 06 17 17 50 60

>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Trouble dropping columns from a DataFrame that has other columns with dots in their names

2016-01-25 Thread Joshua TAYLOR
Thanks Michael, hopefully those will get some attention for a not too
distant release.  Do you think that this is related to, or separate
from, a similar issue [1] that a filed a bit earlier, regarding the
way that StringIndexer (and perhaps other ML components) handles some
of these columns?  (I've dug through a bit of the source, but it's not
entirely clear to me (I'm not a Scala hacker) how transparently (or
non-transparently) column names are passed through to underlying
DataFrame methods.)

Joshua

[1]: https://issues.apache.org/jira/browse/SPARK-12965

On Mon, Jan 25, 2016 at 4:08 PM, Michael Armbrust
 wrote:
> Looks like you found a bug.  I've filed them here:
>
> SPARK-12987 - Drop fails when columns contain dots
> SPARK-12988 - Can't drop columns that contain dots
>
> On Fri, Jan 22, 2016 at 3:18 PM, Joshua TAYLOR 
> wrote:
>>
>> (Apologies if this comes through twice;  I sent it once before I'd
>> confirmed by mailing list subscription.)
>>
>>
>> I've been having lots of trouble with DataFrames whose columns have dots
>> in their names today.  I know that in many places, backticks can be used to
>> quote column names, but the problem I'm running into now is that I can't
>> drop a column that has *no* dots in its name when there are *other* columns
>> in the table that do.  Here's some code that tries four ways of dropping the
>> column.  One throws a weird exception, one is a semi-expected no-op, and the
>> other two work.
>>
>> public class SparkExample {
>> public static void main(String[] args) {
>> /* Get the spark and sql contexts.  Setting spark.ui.enabled to
>> false
>>  * keeps Spark from using its built in dependency on Jersey. */
>> SparkConf conf = new SparkConf()
>> .setMaster("local[*]")
>> .setAppName("test")
>> .set("spark.ui.enabled", "false");
>> JavaSparkContext sparkContext = new JavaSparkContext(conf);
>> SQLContext sqlContext = new SQLContext(sparkContext);
>>
>> /* Create a schema with two columns, one of which as no dots
>> (a_b),
>>  * and the other which does (a.b). */
>> StructType schema = new StructType(new StructField[] {
>> DataTypes.createStructField("a_b", DataTypes.StringType,
>> false),
>> DataTypes.createStructField("a.c", DataTypes.IntegerType,
>> false)
>> });
>>
>> /* Create an RDD of Rows, and then convert it into a DataFrame. */
>> List rows = Arrays.asList(
>> RowFactory.create("t", 2),
>> RowFactory.create("u", 4));
>> JavaRDD rdd = sparkContext.parallelize(rows);
>> DataFrame df = sqlContext.createDataFrame(rdd, schema);
>>
>> /* Four ways to attempt dropping a_b from the DataFrame.
>>  * We'll try calling each one of these and looking at
>>  * the results (or the resulting exception). */
>> Function x1 = d -> d.drop("a_b");  //
>> exception
>> Function x2 = d -> d.drop("`a_b`");//
>> no-op
>> Function x3 = d -> d.drop(d.col("a_b"));   //
>> works
>> Function x4 = d -> d.drop(d.col("`a_b`")); //
>> works
>>
>> int i=0;
>> for (Function x : Arrays.asList(x1, x2, x3,
>> x4)) {
>> System.out.println("Case "+i++);
>> try {
>> x.apply(df).show();
>> } catch (Exception e) {
>> e.printStackTrace(System.out);
>> }
>> }
>> }
>> }
>>
>> Here's the output.  Case 1 is a no-op, which I think I can understand,
>> because DataFrame.drop(String) doesn't do any resolution (it doesn't need
>> to), so d.drop("`a_b`") doesn't do anything because there's no column whose
>> name is literally "`a_b`".  The third and fourth cases work, because
>> DataFrame.col() does do resolution, and both "a_b" and "`a_b`" resolve
>> correctly.  But why does the first case fail?  And why with the message that
>> it does?  Why is it trying to resolve "a.c" at all in this case?
>>
>> Case 0
>> org.apache.spark.sql.AnalysisException: cannot resolve 'a.c' given input
>> columns a_b, a.c;
>> at
>> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:60)
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
>> at
>> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
>> at
>> 

Re: Datasets and columns

2016-01-25 Thread Michael Armbrust
The encoder is responsible for mapping your class onto some set of
columns.  Try running: datasetMyType.printSchema()

On Mon, Jan 25, 2016 at 1:16 PM, Steve Lewis  wrote:

> assume I have the following code
>
> SparkConf sparkConf = new SparkConf();
>
> JavaSparkContext sqlCtx= new JavaSparkContext(sparkConf);
>
> JavaRDD rddMyType= generateRDD(); // some code
>
> Encoder evidence = Encoders.kryo(MyType.class);
> Dataset datasetMyType= sqlCtx.createDataset( rddMyType.rdd(), 
> evidence);
>
> Now I have a Dataset of MyType and assume there is some data.
>
> Assume MyType has bean fields with getters and setters as well as some 
> internal collections and other data. What can I say about datasetMyType??
>
> Does datasetMyType have columns and if so what?
>
> If not are there other ways to maka a DataSet with columns and if so what are 
> they
>
>
>


mapWithState and context start when checkpoint exists

2016-01-25 Thread Andrey Yegorov
Hi,

I am new to spark (and scala) and hope someone can help me with the issue I
got stuck on in my experiments/learning.

mapWithState from spark 1.6 seems to be a great way for the task I want to
implement with spark but unfortunately I am getting error "RDD
transformations and actions can only be invoked by the driver, not inside
of other transformations" on job restart when checkpoint already exists.
Job starts and works ok if checkpoint is empty (this kind of defeats the
point of having the checkpoint).

I can reproduce it with ~65 lines of test code, see below.
Is there something that I am doing wrong there?

code:


import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext, _}

object TestJob {
  def stateFunc(id: String,
txt: Option[Iterable[String]],
state: State[String]) : (String, Long) = {
if (txt.nonEmpty) {
  val aggregatedString = state.getOption().getOrElse("") + txt
  state.update(aggregatedString)
  (id, aggregatedString.length)
} else { // happens when state is timing out? any other cases?
  (id, 0)
}
  }

  def createContext(checkpointDirectory: String): StreamingContext = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("test")

val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
ssc.checkpoint(checkpointDirectory)

val input = Seq("1", "21", "321", "41", "42", "543", "67")
val inputRdd = ssc.sparkContext.parallelize(input)
val testStream = new ConstantInputDStream(ssc, inputRdd)

val streamWithIds = testStream.map(x => (x.substring(0,1), x))
val batched = streamWithIds.groupByKey()

val stateSpec = StateSpec.function(stateFunc
_).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days

val result = batched.mapWithState(stateSpec)
result.print
ssc
  }

  def main(args: Array[String]): Unit = {
val checkpointDirectory = com.google.common.io.Files.createTempDir()
checkpointDirectory.deleteOnExit()
val checkpointDirectoryName = checkpointDirectory.getAbsolutePath

val ssc = StreamingContext.getOrCreate(checkpointDirectoryName,
  () => {
createContext(checkpointDirectoryName)
  })

ssc.start()
ssc.awaitTerminationOrTimeout(7000)
ssc.stop()
Thread.sleep(5000)

val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName,
  () => {
createContext(checkpointDirectoryName)
  })

// terminates here with
// Exception in thread "main" org.apache.spark.SparkException: RDD
transformations and actions can only be invoked by the driver, not inside
of other transformations; for example, rdd1.map(x => rdd2.values.count() *
x) is invalid because the values transformation and count action cannot be
performed inside of the rdd1.map transformation. For more information, see
SPARK-5063.
ssc2.start()
ssc2.awaitTerminationOrTimeout(7000)
ssc2.stop()
  }
}

--
Andrey Yegorov


Re: mapWithState and context start when checkpoint exists

2016-01-25 Thread Andrey Yegorov
Thank you!

what would be the best alternative to simulate a stream for testing
purposes from e.g. sequence or a text file?
In production I'll use kafka as a source but locally I wanted to mock it.
Worst case scenario I'll have setup/tear down kafka cluster in tests but I
think having a mock will be faster.


--
Andrey Yegorov

On Mon, Jan 25, 2016 at 1:26 PM, Shixiong(Ryan) Zhu  wrote:

> Hey Andrey,
>
> `ConstantInputDStream` doesn't support checkpoint as it contains an RDD
> field. It cannot resume from checkpoints.
>
> On Mon, Jan 25, 2016 at 1:12 PM, Andrey Yegorov 
> wrote:
>
>> Hi,
>>
>> I am new to spark (and scala) and hope someone can help me with the issue
>> I got stuck on in my experiments/learning.
>>
>> mapWithState from spark 1.6 seems to be a great way for the task I want
>> to implement with spark but unfortunately I am getting error "RDD
>> transformations and actions can only be invoked by the driver, not inside
>> of other transformations" on job restart when checkpoint already exists.
>> Job starts and works ok if checkpoint is empty (this kind of defeats the
>> point of having the checkpoint).
>>
>> I can reproduce it with ~65 lines of test code, see below.
>> Is there something that I am doing wrong there?
>>
>> code:
>> 
>>
>> import org.apache.spark.SparkConf
>> import org.apache.spark.streaming.dstream.ConstantInputDStream
>> import org.apache.spark.streaming.{Durations, StreamingContext, _}
>>
>> object TestJob {
>>   def stateFunc(id: String,
>> txt: Option[Iterable[String]],
>> state: State[String]) : (String, Long) = {
>> if (txt.nonEmpty) {
>>   val aggregatedString = state.getOption().getOrElse("") + txt
>>   state.update(aggregatedString)
>>   (id, aggregatedString.length)
>> } else { // happens when state is timing out? any other cases?
>>   (id, 0)
>> }
>>   }
>>
>>   def createContext(checkpointDirectory: String): StreamingContext = {
>> val sparkConf = new
>> SparkConf().setMaster("local[2]").setAppName("test")
>>
>> val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
>> ssc.checkpoint(checkpointDirectory)
>>
>> val input = Seq("1", "21", "321", "41", "42", "543", "67")
>> val inputRdd = ssc.sparkContext.parallelize(input)
>> val testStream = new ConstantInputDStream(ssc, inputRdd)
>>
>> val streamWithIds = testStream.map(x => (x.substring(0,1), x))
>> val batched = streamWithIds.groupByKey()
>>
>> val stateSpec = StateSpec.function(stateFunc
>> _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days
>>
>> val result = batched.mapWithState(stateSpec)
>> result.print
>> ssc
>>   }
>>
>>   def main(args: Array[String]): Unit = {
>> val checkpointDirectory = com.google.common.io.Files.createTempDir()
>> checkpointDirectory.deleteOnExit()
>> val checkpointDirectoryName = checkpointDirectory.getAbsolutePath
>>
>> val ssc = StreamingContext.getOrCreate(checkpointDirectoryName,
>>   () => {
>> createContext(checkpointDirectoryName)
>>   })
>>
>> ssc.start()
>> ssc.awaitTerminationOrTimeout(7000)
>> ssc.stop()
>> Thread.sleep(5000)
>>
>> val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName,
>>   () => {
>> createContext(checkpointDirectoryName)
>>   })
>>
>> // terminates here with
>> // Exception in thread "main" org.apache.spark.SparkException: RDD
>> transformations and actions can only be invoked by the driver, not inside
>> of other transformations; for example, rdd1.map(x => rdd2.values.count() *
>> x) is invalid because the values transformation and count action cannot be
>> performed inside of the rdd1.map transformation. For more information, see
>> SPARK-5063.
>> ssc2.start()
>> ssc2.awaitTerminationOrTimeout(7000)
>> ssc2.stop()
>>   }
>> }
>>
>> --
>> Andrey Yegorov
>>
>
>


Re: Datasets and columns

2016-01-25 Thread Michael Armbrust
There is no public API for custom encoders yet, but since your class looks
like a bean you should be able to use the `bean` method instead of `kryo`.
This will expose the actual columns.

On Mon, Jan 25, 2016 at 2:04 PM, Steve Lewis  wrote:

> Ok when I look at the schema it looks like KRYO makes one column is there
> a way to do a custom encoder with my own columns
> On Jan 25, 2016 1:30 PM, "Michael Armbrust" 
> wrote:
>
>> The encoder is responsible for mapping your class onto some set of
>> columns.  Try running: datasetMyType.printSchema()
>>
>> On Mon, Jan 25, 2016 at 1:16 PM, Steve Lewis 
>> wrote:
>>
>>> assume I have the following code
>>>
>>> SparkConf sparkConf = new SparkConf();
>>>
>>> JavaSparkContext sqlCtx= new JavaSparkContext(sparkConf);
>>>
>>> JavaRDD rddMyType= generateRDD(); // some code
>>>
>>> Encoder evidence = Encoders.kryo(MyType.class);
>>> Dataset datasetMyType= sqlCtx.createDataset( rddMyType.rdd(), 
>>> evidence);
>>>
>>> Now I have a Dataset of MyType and assume there is some data.
>>>
>>> Assume MyType has bean fields with getters and setters as well as some 
>>> internal collections and other data. What can I say about datasetMyType??
>>>
>>> Does datasetMyType have columns and if so what?
>>>
>>> If not are there other ways to maka a DataSet with columns and if so what 
>>> are they
>>>
>>>
>>>
>>


Re: cast column string -> timestamp in Parquet file

2016-01-25 Thread Cheng Lian

The following snippet may help:

  sqlContext.read.parquet(path).withColumn("col_ts", 
$"col".cast(TimestampType)).drop("col")


Cheng

On 1/21/16 6:58 AM, Muthu Jayakumar wrote:
DataFrame and udf. This may be more performant than doing an RDD 
transformation as you'll only transform just the column that requires 
to be changed.


Hope this helps.


On Thu, Jan 21, 2016 at 6:17 AM, Eli Super > wrote:


Hi

I have a large size parquet file .

I need to cast the whole column to timestamp format , then save

What the right way to do it ?

Thanks a lot






Re: MLlib OneVsRest causing intermittent exceptions

2016-01-25 Thread Ram Sriharsha
Hi David

What happens if you provide the class labels via metadata instead of
letting OneVsRest determine the labels?

Ram

On Mon, Jan 25, 2016 at 3:06 PM, David Brooks  wrote:

> Hi,
>
> I've run into an exception using MLlib OneVsRest with logistic regression
> (v1.6.0, but also in previous versions).
>
> The issue is intermittent.  When running multiclass classification with
> K-fold cross validation, there are scenarios where the split does not
> contain instances for every target label.  In such cases, an
> ArrayIndexOutOfBoundsException is generated.
>
> I've tried to reproduce the problem in a simple SBT project here:
>
>https://github.com/junglebarry/SparkOneVsRestTest
>
> I don't imagine this is typical - it first surfaced when running over a
> dataset with some very rare classes.
>
> I'm happy to look into patching the code, but I first wanted to confirm
> that the problem was real, and that I wasn't somehow misunderstanding how I
> should be using OneVsRest.
>
> Any guidance would be appreciated - I'm new to the list.
>
> Many thanks,
> David
>



-- 
Ram Sriharsha
Architect, Spark and Data Science
Hortonworks, 2550 Great America Way, 2nd Floor
Santa Clara, CA 95054
Ph: 408-510-8635
email: har...@apache.org

[image: https://www.linkedin.com/in/harsha340]
 



Standalone scheduler issue - one job occupies the whole cluster somehow

2016-01-25 Thread Mikhail Strebkov
Hi all,

Recently we started having issues with one of our background processing
scripts which we run on Spark. The cluster runs only two jobs. One job runs
for days, and another is usually like a couple of hours. Both jobs have a
crob schedule. The cluster is small, just 2 slaves, 24 cores, 25.4 GB of
memory. Each job takes 6 cores and 6 GB per worker. So when both jobs are
running it's 12 cores out of 24 cores and 24 GB out of 25.4 GB. But
sometimes I see this:

https://www.dropbox.com/s/6uad4hrchqpihp4/Screen%20Shot%202016-01-25%20at%201.16.19%20PM.png

So basically the long running job somehow occupied the whole cluster and
the fast one can't make any progress because the cluster doesn't have
resources. That's what I see in the logs:

16/01/25 21:26:48 WARN TaskSchedulerImpl: Initial job has not accepted any
> resources; check your cluster UI to ensure that workers are registered and
> have sufficient resources


When I log in to the slaves I see this:

slave 1:

> /usr/lib/jvm/java/bin/java -cp  -Xms6144M -Xmx6144M
> -Dspark.driver.port=42548 -Drun.mode=production -XX:MaxPermSize=256m
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
> akka.tcp://sparkDriver@10.233.17.48:42548/user/CoarseGrainedScheduler
> --executor-id 450 --hostname 10.191.4.151 *--cores 1 --app-id
> app-20160124152439-1468* --worker-url akka.tcp://
> sparkWorker@10.191.4.151:53144/user/Worker
> /usr/lib/jvm/java/bin/java -cp -cp  -Xms6144M -Xmx6144M
> -Dspark.driver.port=42548 -Drun.mode=production -XX:MaxPermSize=256m
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
> akka.tcp://sparkDriver@10.233.17.48:42548/user/CoarseGrainedScheduler
> --executor-id 451 --hostname 10.191.4.151 *--cores 1 --app-id
> app-20160124152439-1468* --worker-url akka.tcp://
> sparkWorker@10.191.4.151:53144/user/Worker


slave 2:

> /usr/lib/jvm/java/bin/java -cp  -Xms6144M -Xmx6144M
> -Dspark.driver.port=42548 -Drun.mode=production -XX:MaxPermSize=256m
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
> akka.tcp://sparkDriver@10.233.17.48:42548/user/CoarseGrainedScheduler
> --executor-id 1 --hostname 10.253.142.59 *--cores 3 --app-id
> app-20160124152439-1468* --worker-url akka.tcp://
> sparkWorker@10.253.142.59:33265/user/Worker
> /usr/lib/jvm/java/bin/java -cp -cp  -Xms6144M -Xmx6144M
> -Dspark.driver.port=42548 -Drun.mode=production -XX:MaxPermSize=256m
> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
> akka.tcp://sparkDriver@10.233.17.48:42548/user/CoarseGrainedScheduler
> --executor-id 448 --hostname 10.253.142.59 *--cores 1 --app-id
> app-20160124152439-1468* --worker-url akka.tcp://
> sparkWorker@10.253.142.59:33265/user/Worker


so somehow Spark created 4 executors, 2 on each machine, 1 core + 1 core
and 3 cores + 1 core to get the total of 6 cores. But because 6 GB setting
is per executor, it ends up occupying 24 GB instead of 12 GB (2 executors,
3 cores + 3 cores) and blocks the other Spark job.

My wild guess is that for some reason 1 executor of the long job failed, so
the job becomes 3 cores short and asks the scheduler if it can get 3 more
cores, then the scheduler distributes it evenly across the slaves: 2 cores
+ 1 core but this distribution doesn't work until the short job finishes
(because the shor job holds the rest of the memory). This explains 3 + 1 on
one slave but doesn't explain 1 + 1 on another.

Did anyone experience anything similar to this? Any ideas how to avoid it?

Thanks,
Mikhail


MLlib OneVsRest causing intermittent exceptions

2016-01-25 Thread David Brooks
Hi,

I've run into an exception using MLlib OneVsRest with logistic regression
(v1.6.0, but also in previous versions).

The issue is intermittent.  When running multiclass classification with
K-fold cross validation, there are scenarios where the split does not
contain instances for every target label.  In such cases, an
ArrayIndexOutOfBoundsException is generated.

I've tried to reproduce the problem in a simple SBT project here:

   https://github.com/junglebarry/SparkOneVsRestTest

I don't imagine this is typical - it first surfaced when running over a
dataset with some very rare classes.

I'm happy to look into patching the code, but I first wanted to confirm
that the problem was real, and that I wasn't somehow misunderstanding how I
should be using OneVsRest.

Any guidance would be appreciated - I'm new to the list.

Many thanks,
David


Re: Spark Streaming Write Ahead Log (WAL) not replaying data after restart

2016-01-25 Thread Shixiong(Ryan) Zhu
You need to define a create function and use StreamingContext.getOrCreate.
See the example here:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing

On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin 
wrote:

> Hi all,
>
> To have a simple way of testing the Spark Streaming Write Ahead Log I
> created a very simple Custom Input Receiver, which will generate strings
> and store those:
>
> class InMemoryStringReceiver extends 
> Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) {
>
>   val batchID = System.currentTimeMillis()
>
>   def onStart() {
> new Thread("InMemoryStringReceiver") {
>   override def run(): Unit = {
> var i = 0
> while(true) {
>   
> //http://spark.apache.org/docs/latest/streaming-custom-receivers.html
>   //To implement a reliable receiver, you have to use 
> store(multiple-records) to store data.
>   store(ArrayBuffer(s"$batchID-$i"))
>   println(s"Stored => [$batchID-$i)]")
>   Thread.sleep(1000L)
>   i = i + 1
> }
>   }
> }.start()
>   }
>
>   def onStop() {}
> }
>
> I then created a simple Application which will use the Custom Receiver to
> stream the data and process it:
>
> object DStreamResilienceTest extends App {
>
>   val conf = new 
> SparkConf().setMaster("local[*]").setAppName("DStreamResilienceTest").set("spark.streaming.receiver.writeAheadLog.enable",
>  "true")
>   val ssc = new StreamingContext(conf, Seconds(1))
>   
> ssc.checkpoint("hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest")
>   val customReceiverStream: ReceiverInputDStream[String] = 
> ssc.receiverStream(new InMemoryStringReceiver())
>   customReceiverStream.foreachRDD { (rdd: RDD[String]) =>
> println(s"processed => [${rdd.collect().toList}]")
> Thread.sleep(2000L)
>   }
>   ssc.start()
>   ssc.awaitTermination()
>
> }
>
> As you can see the processing of each received RDD has sleep of 2 seconds
> while the Strings are stored every second. This creates a backlog and the
> new strings pile up, and should be stored in the WAL. Indeed, I can see the
> files in the checkpoint dirs getting updated. Running the app I get output
> like this:
>
> [info] Stored => [1453374654941-0)]
> [info] processed => [List(1453374654941-0)]
> [info] Stored => [1453374654941-1)]
> [info] Stored => [1453374654941-2)]
> [info] processed => [List(1453374654941-1)]
> [info] Stored => [1453374654941-3)]
> [info] Stored => [1453374654941-4)]
> [info] processed => [List(1453374654941-2)]
> [info] Stored => [1453374654941-5)]
> [info] Stored => [1453374654941-6)]
> [info] processed => [List(1453374654941-3)]
> [info] Stored => [1453374654941-7)]
> [info] Stored => [1453374654941-8)]
> [info] processed => [List(1453374654941-4)]
> [info] Stored => [1453374654941-9)]
> [info] Stored => [1453374654941-10)]
>
> As you would expect, the storing is out pacing the processing. So I kill
> the application and restart it. This time I commented out the sleep in the
> foreachRDD so that the processing can clear any backlog:
>
> [info] Stored => [1453374753946-0)]
> [info] processed => [List(1453374753946-0)]
> [info] Stored => [1453374753946-1)]
> [info] processed => [List(1453374753946-1)]
> [info] Stored => [1453374753946-2)]
> [info] processed => [List(1453374753946-2)]
> [info] Stored => [1453374753946-3)]
> [info] processed => [List(1453374753946-3)]
> [info] Stored => [1453374753946-4)]
> [info] processed => [List(1453374753946-4)]
>
> As you can see the new events are processed but none from the previous
> batch. The old WAL logs are cleared and I see log messages like this but
> the old data does not get processed.
>
> INFO WriteAheadLogManager : Recovered 1 write ahead log files from 
> hdfs://myhdfsserver/user/spark/checkpoint/DStreamResilienceTest/receivedData/0
>
> What am I doing wrong? I am using Spark 1.5.2.
>
> Best regards,
>
> Patrick
>


Re: Datasets and columns

2016-01-25 Thread Steve Lewis
Ok when I look at the schema it looks like KRYO makes one column is there a
way to do a custom encoder with my own columns
On Jan 25, 2016 1:30 PM, "Michael Armbrust"  wrote:

> The encoder is responsible for mapping your class onto some set of
> columns.  Try running: datasetMyType.printSchema()
>
> On Mon, Jan 25, 2016 at 1:16 PM, Steve Lewis 
> wrote:
>
>> assume I have the following code
>>
>> SparkConf sparkConf = new SparkConf();
>>
>> JavaSparkContext sqlCtx= new JavaSparkContext(sparkConf);
>>
>> JavaRDD rddMyType= generateRDD(); // some code
>>
>> Encoder evidence = Encoders.kryo(MyType.class);
>> Dataset datasetMyType= sqlCtx.createDataset( rddMyType.rdd(), 
>> evidence);
>>
>> Now I have a Dataset of MyType and assume there is some data.
>>
>> Assume MyType has bean fields with getters and setters as well as some 
>> internal collections and other data. What can I say about datasetMyType??
>>
>> Does datasetMyType have columns and if so what?
>>
>> If not are there other ways to maka a DataSet with columns and if so what 
>> are they
>>
>>
>>
>


understanding iterative algorithms in Spark

2016-01-25 Thread Raghava
Hello All,

I am new to Spark and I am trying to understand how iterative application of
operations are handled in Spark. Consider the following program in Scala.

var u = sc.textFile(args(0)+"s1.txt").map(line => {
line.split("\\|") match { case Array(x,y) => 
(y.toInt,x.toInt)}}) 
u.cache()
println("Before iteration u count: "+u.count())

val type1 = sc.textFile(args(0)+"Type1.txt").map(line => {
line.split("\\|") match { case Array(x,y) => (x.toInt,y.toInt)}}) 
type1.cache()
println("Type1 count: " + type1.count())

var counter=0
while(counter < 20) {
val r1Join = type1.join(u).map( { case (k,v) => v}).cache()
u = u.union(r1Join).distinct.cache()
//testing checkpoint
if(counter == 4)
u.checkpoint()
println("u count: "+u.count())
counter += 1
}

>From the UI, I have attached the DAG visualizations at various iterations.

I have the following questions. It would be of great help if someone can
answer them.

1) When we cache a RDD, is it safe to say that it will not be recomputed?
For example in dag1.png, all the green map dots will not be recomputed.

2) In dag1.png, for stage4 join, we expected one input to be the output of
stage3 (this is as per our expectation) and the other input to be the output
of stage2. The latter does not happen. Why is this the case?

3) In dag1.png, why is stage5 not part of stage4? Why is distinct and
distinct + cache separated? Will distinct be run twice?

4) In dag4.png, we expected the input of join in stage21 would come from the
output of stage19 but instead, it gets recomputed at the beginning of
stage21. Why would distinct gets recomputed at the beginning of each
iteration? 

5) In dag2.png, join operation is represented by 3 boxes. What does this
mean?

6) In dag4.png, there are several "skipped" stages. Is it safe to assume
that the skipped stages not recomputed again?

Thanks in advance.

 
 
 
 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/understanding-iterative-algorithms-in-Spark-tp26064.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Generic Dataset Aggregator

2016-01-25 Thread Deenar Toraskar
Hi All

https://docs.cloud.databricks.com/docs/spark/1.6/index.html#examples/Dataset%20Aggregator.html

I have been converting my UDAFs to Dataset (Dataset's are cool BTW)
Aggregators. I have an ArraySum aggregator that does an element wise sum or
arrays. I have got the simple version working, but the Generic version
fails with the following error, not sure what I am doing wrong.

scala> import sqlContext.implicits._

scala> def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I, N]
= new GenericArraySumAggregator(f).toColumn

:34: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing sqlContext.implicits._  Support for serializing
other types will be added in future releases.

 def arraySum[I, N : Numeric : Encoder](f: I => N): TypedColumn[I,
N] = new GenericArraySumAggregator(f).toColumn


  ^

object ArraySumAggregator extends  Aggregator[Seq[Float], Seq[Float],
Seq[Float]] with Serializable {
  def zero: Seq[Float] = Nil
  // The initial value.
  def reduce(currentSum: Seq[Float], currentRow: Seq[Float]) =
sumArray(currentSum, currentRow)
  def merge(sum: Seq[Float], row: Seq[Float]) = sumArray(sum, row)
  def finish(b: Seq[Float]) = b // Return the final result.
  def sumArray(a: Seq[Float], b: Seq[Float]): Seq[Float] = {
(a, b) match {
  case (Nil, Nil) => Nil
  case (Nil, row) => row
  case (sum, Nil) => sum
  case (sum, row) => (a, b).zipped.map { case (a, b) => a + b }
}
  }
}

class GenericArraySumAggregator[I, N : Numeric](f: I => N) extends
Aggregator[Seq[I], Seq[N], Seq[N]] with Serializable {
  val numeric = implicitly[Numeric[N]]
  override def zero: Seq[N] = Nil
  override def reduce(b: Seq[N], a: Seq[I]): Seq[N] = sumArray(b,
a.map( x => f(x))) //numeric.plus(b, f(a))
  override def merge(b1: Seq[N],b2: Seq[N]): Seq[N] = sumArray(b1, b2)
  override def finish(reduction: Seq[N]): Seq[N] = reduction
  def sumArray(a: Seq[N], b: Seq[N]): Seq[N] = {
(a, b) match {
  case (Nil, Nil) => Nil
  case (Nil, row) => row
  case (sum, Nil) => sum
  case (sum, row) => (a, b).zipped.map { case (a, b) => numeric.plus(a, b) }
}
  }
}

Regards

Deenar


RE: a question about web ui log

2016-01-25 Thread Mohammed Guller
I am not sure whether you can copy the log files from Spark workers to your 
local machine and view it from the Web UI. In fact, if you are able to copy the 
log files locally, you can just view them directly in any text editor.

I suspect what you really want to see is the application history. Here is the 
relevant information from Spark’s monitoring page 
(http://spark.apache.org/docs/latest/monitoring.html)

To view the web UI after the fact, set spark.eventLog.enabled to true before 
starting the application. This configures Spark to log Spark events that encode 
the information displayed in the UI to persisted storage.

Mohammed
Author: Big Data Analytics with 
Spark

From: Philip Lee [mailto:philjj...@gmail.com]
Sent: Monday, January 25, 2016 9:51 AM
To: user@spark.apache.org
Subject: Re: a question about web ui log

As I mentioned before, I am tryint to see the spark log on a cluster via 
ssh-tunnel

1) The error on application details UI is probably from monitoring porting 
​4044. Web UI port is 8088, right? so how could I see job web ui view and 
application details UI view in the web ui on my local machine?

2) still wondering how to see the log after copyting log file to my local.

The error was metioned in previous mail.

Thanks,
Phil



On Mon, Jan 25, 2016 at 5:36 PM, Philip Lee 
> wrote:
​Hello, a questino about web UI log.

​I could see web interface log after forwarding the port on my cluster to my 
local and click completed application, but when I clicked "application detail 
UI"

[Inline image 1]

It happened to me. I do not know why. I also checked the specific log folder. 
It has a log file in it. Actually, that's why I could click the completed 
application link, right?

So is it okay for me to copy the log file in my cluster to my local machine.
And after turning on spark Job Manger on my local by myself, I could see 
application deatils UI in my local machine?

Best,
Phil



Re: Spark Streaming - Custom ReceiverInputDStream ( Custom Source) In java

2016-01-25 Thread Tathagata Das
See how other Java wrapper classes use JavaSparkContext.fakeClassTag

example;
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaMapWithStateDStream.scala

On Fri, Jan 22, 2016 at 2:00 AM, Nagu Kothapalli 
wrote:

> Hi
>
> Anyone have any idea on *ClassTag in spark context..*
>
> On Fri, Jan 22, 2016 at 12:42 PM, Nagu Kothapalli <
> nagukothapal...@gmail.com> wrote:
>
>> Hi All
>>
>> Facing an Issuee With CustomInputDStream object in java
>>
>>
>>
>> *public CustomInputDStream(StreamingContext ssc_, ClassTag
>> classTag)*
>> * {*
>> * super(ssc_, classTag);*
>> * }*
>> Can you help me to create the Instance in above class with *ClassTag* In
>> java
>>
>
>


Re: Concurrent Spark jobs

2016-01-25 Thread emlyn
Jean wrote
> Have you considered using pools?
> http://spark.apache.org/docs/latest/job-scheduling.html#fair-scheduler-pools
> 
> I haven't tried that by myself, but it looks like pool setting is applied
> per thread so that means it's possible to configure fair scheduler, so
> that more, than one job is on a go. Although each of them would probably
> use less number of workers...

Thanks for the tip, but I don't think that would work in this case - while
writing to Redshift, the cluster is sitting idle without the new tasks even
appearing on the pending queue yet, so changing how it executes the jobs on
the queue won't help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Concurrent-Spark-jobs-tp26011p26062.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-01-25 Thread Nirav Patel
I haven't gone through much details of spark catalyst optimizer and
tungston project but we have been advised by databricks support to use
DataFrame to resolve issues with OOM error that we are getting during Join
and GroupBy operations. We use spark 1.3.1 and looks like it can not
perform external sort and blows with OOM.
https://forums.databricks.com/questions/2082/i-got-the-akka-frame-size-exceeded-exception.html

Now it's great that it has been addressed in spark 1.5 release but why
databricks advocating to switch to DataFrames? It may make sense for batch
jobs or near real-time jobs but not sure if they do when you are developing
real time analytics where you want to optimize every millisecond that you
can. Again I am still knowledging myself with DataFrame APIs and
optimizations and I will benchmark it against RDD for our batch and
real-time use case as well.

On Mon, Jan 25, 2016 at 9:47 AM, Mark Hamstra 
wrote:

> What do you think is preventing you from optimizing your own RDD-level
> transformations and actions?  AFAIK, nothing that has been added in
> Catalyst precludes you from doing that.  The fact of the matter is, though,
> that there is less type and semantic information available to Spark from
> the raw RDD API than from using Spark SQL, DataFrames or DataSets.  That
> means that Spark itself can't optimize for raw RDDs the same way that it
> can for higher-level constructs that can leverage Catalyst; but if you want
> to write your own optimizations based on your own knowledge of the data
> types and semantics that are hiding in your raw RDDs, there's no reason
> that you can't do that.
>
> On Mon, Jan 25, 2016 at 9:35 AM, Nirav Patel 
> wrote:
>
>> Hi,
>>
>> Perhaps I should write a blog about this that why spark is focusing more
>> on writing easier spark jobs and hiding underlaying performance
>> optimization details from a seasoned spark users. It's one thing to provide
>> such abstract framework that does optimization for you so you don't have to
>> worry about it as a data scientist or data analyst but what about
>> developers who do not want overhead of SQL and Optimizers and unnecessary
>> abstractions ! Application designer who knows their data and queries should
>> be able to optimize at RDD level transformations and actions. Does spark
>> provides a way to achieve same level of optimization by using either SQL
>> Catalyst or raw RDD transformation?
>>
>> Thanks
>>
>>
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>>   [image: LinkedIn]
>>   [image: Twitter]
>>   [image: Facebook]
>>   [image: YouTube]
>> 
>
>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Re: Spark RDD DAG behaviour understanding in case of checkpointing

2016-01-25 Thread Cody Koeninger
Where are you calling checkpointing? Metadata checkpointing for a kafa
direct stream should just be the offsets, not the data.

TD can better speak to reduceByKeyAndWindow behavior when restoring from a
checkpoint, but ultimately the only available choices would be replay the
prior window data from kafka; replay the prior window data from checkpoint
/ other storage (not much reason for this, since it's stored in kafka); or
lose the prior window data.



On Sat, Jan 23, 2016 at 3:47 PM, gaurav sharma 
wrote:

> Hi Tathagata/Cody,
>
> I am facing a challenge in Production with DAG behaviour during
> checkpointing in spark streaming -
>
> Step 1 : Read data from Kafka every 15 min -  call this KafkaStreamRDD ~
> 100 GB of data
>
> Step 2 : Repartition KafkaStreamRdd from 5 to 100 partitions to
> parallelise processing - call this RepartitionedKafkaStreamRdd
>
> Step 3 : on this RepartitionedKafkaStreamRdd I run map and
> reduceByKeyAndWindow over a window of 2 hours, call this RDD1 ~ 100 MB of
> data
>
> Checkpointing is enabled.
>
> If i restart my streaming context, it picks up from last checkpointed
> state,
>
> READS data for all the 8 SUCCESSFULLY FINISHED 15 minute batches from
> Kafka , re-performs Repartition of all the data of all these 8 , 15 minute
> batches.
>
> Then reads data for current 15 minute batch and runs map and
> reduceByKeyAndWindow over a window of 2 hours.
>
> Challenge -
> 1> I cant checkpoint KafkaStreamRDD or RepartitionedKafkaStreamRdd as this
> is huge data around 800GB for 2 hours, reading and writing (checkpointing)
> this at every 15 minutes would be very slow.
>
> 2> When i have checkpointed data of RDD1 at every 15 minutes, and map and
> reduceByKeyAndWindow is being run over RDD1 only, and i have snapshot of
> all of the last 8, 15 minute batches of RDD1,
> why is spark reading all the data for last 8 successfully completed
> batches from Kafka again(Step 1) and again performing re-partitioning(Step
> 2) and then again running map and reduceByKeyandWindow over these newly
> fetched kafkaStreamRdd data of last 8 , 15 minute batches.
>
> Because of above mentioned challenges, i am not able to exploit
> checkpointing, in case streaming context is restarted at high load.
>
> Please help out in understanding, if there is something that i am missing
>
> Regards,
> Gaurav
>


Re: Can Spark read input data from HDFS centralized cache?

2016-01-25 Thread Ted Yu
Please see also:
http://hadoop.apache.org/docs/r2.7.1/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html

According to Chris Nauroth, an hdfs committer, it's extremely difficult to
use the feature correctly.

The feature also brings operational complexity. Since off-heap memory is
used, you can accidentally use too much RAM on the host, resulting in OOM
in the JVM which is hard to debug.

Cheers

On Mon, Jan 25, 2016 at 1:39 PM, Ted Yu  wrote:

> Have you read this thread ?
>
>
> http://search-hadoop.com/m/uOzYttXZcg1M6oKf2/HDFS+cache=RE+hadoop+hdfs+cache+question+do+client+processes+share+cache+
>
> Cheers
>
> On Mon, Jan 25, 2016 at 1:23 PM, Jia Zou  wrote:
>
>> I configured HDFS to cache file in HDFS's cache, like following:
>>
>> hdfs cacheadmin -addPool hibench
>>
>> hdfs cacheadmin -addDirective -path /HiBench/Kmeans/Input -pool hibench
>>
>>
>> But I didn't see much performance impacts, no matter how I configure
>> dfs.datanode.max.locked.memory
>>
>>
>> Is it possible that Spark doesn't know the data is in HDFS cache, and
>> still read data from disk, instead of from HDFS cache?
>>
>>
>> Thanks!
>>
>> Jia
>>
>
>


how to build spark with out hive

2016-01-25 Thread kevin
HI,all
I need to test hive on spark ,to use spark as the hive's execute engine.
I download the spark source 1.5.2 from apache web-site.
I have installed maven3.3.9 and scala 2.10.6 ,so I change
the /make-distribution.sh
to point to my mvn location where I installed.

then I run the commond:
./make-distribution.sh --name "hadoop2-without-hive" --tgz
"-Pyarn,hadoop-2.7,hadoop-provided,parquet-provided" -DskipTests
-Dhadoop.version=2.7.1

is this all right? when I star the spark cluster ,I got error :

Spark Command: /usr/lib/jdk/bin/java -cp
/dcos/spark/sbin/../conf/:/dcos/spark/lib/spark-assembly-1.5.2-hadoop2.7.1.jar:/dcos/hadoop/etc/hadoop/
-Xms
1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip
10.1.3.107 --port 7077 --webui-port 8080

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/Logger
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2615)
at java.lang.Class.getMethod0(Class.java:2856)
at java.lang.Class.getMethod(Class.java:1668)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: org.slf4j.Logger
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 6 more


I NEED some advise.


SparkSQL : "select non null values from column"

2016-01-25 Thread Eli Super
Hi

I try to select all values but not NULL values from column contains NULL
values

with

sqlContext.sql("select my_column from my_table where my_column <> null
").show(15)

or

sqlContext.sql("select my_column from my_table where my_column != null
").show(15)

I get empty result


Thanks !


RangePartitioning skewed data

2016-01-25 Thread jluan
Lets say I have a dataset of (K,V) where the keys are really skewed:

myDataRDD = 
[(8, 1), (8, 13), (1,1), (2,4)]
[(8, 12), (8, 15), (8, 7), (8, 6), (8, 4), (8, 3), (8, 4), (10,2)]

If I applied a RangePartitioner to this set of data, say val rangePart = new
RangePartitioner(4, myDataRDD) and then repartitioned the data, would I be
able to get back 4 equally distributed partitions where Key=8 would be split
across multiple partitions, or would all the 8 keys end up in one partition?

Also, does myDataRDD need to be sorted in order to correctly create the
range partitioner? My research shows this may be the case.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RangePartitioning-skewed-data-tp26055.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to discretize Continuous Variable with Spark DataFrames

2016-01-25 Thread Eli Super
Hi

What is a best way to discretize Continuous Variable within  Spark
DataFrames ?

I want to discretize some variable 1) by equal frequency 2) by k-means

I usually use R  for this porpoises

_http://www.inside-r.org/packages/cran/arules/docs/discretize

R code for example :

### equal frequency
table(discretize(data$some_column, "frequency", categories=10))


#k-means
table(discretize(data$some_column, "cluster", categories=10))

Thanks a lot !


Re: SparkSQL : "select non null values from column"

2016-01-25 Thread Deng Ching-Mallete
Hi,

Have you tried using IS NOT NULL for the where condition?

Thanks,
Deng

On Mon, Jan 25, 2016 at 7:00 PM, Eli Super  wrote:

> Hi
>
> I try to select all values but not NULL values from column contains NULL
> values
>
> with
>
> sqlContext.sql("select my_column from my_table where my_column <> null
> ").show(15)
>
> or
>
> sqlContext.sql("select my_column from my_table where my_column != null
> ").show(15)
>
> I get empty result
>
>
> Thanks !
>


Re: NA value handling in sparkR

2016-01-25 Thread Devesh Raj Singh
Hi,

Yes you are right.

I think the problem is with reading of csv files. read.df is not
considering NAs in the CSV file

So what would be a workable solution in dealing with NAs in csv files?



On Mon, Jan 25, 2016 at 2:31 PM, Deborah Siegel 
wrote:

> Hi Devesh,
>
> I'm not certain why that's happening, and it looks like it doesn't happen
> if you use createDataFrame directly:
> aq <- createDataFrame(sqlContext,airquality)
> head(dropna(aq,how="any"))
>
> If I had to guess.. dropna(), I believe, drops null values. I suppose its
> possible that createDataFrame converts R's  values to null, so dropna()
> works with that. But perhaps read.df() does not convert R s to null, as
> those are most likely interpreted as strings when they come in from the
> csv. Just a guess, can anyone confirm?
>
> Deb
>
>
>
>
>
>
> On Sun, Jan 24, 2016 at 11:05 PM, Devesh Raj Singh  > wrote:
>
>> Hi,
>>
>> I have applied the following code on airquality dataset available in R ,
>> which has some missing values. I want to omit the rows which has NAs
>>
>> library(SparkR) Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages"
>> "com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell"')
>>
>> sc <- sparkR.init("local",sparkHome =
>> "/Users/devesh/Downloads/spark-1.5.1-bin-hadoop2.6")
>>
>> sqlContext <- sparkRSQL.init(sc)
>>
>> path<-"/Users/devesh/work/airquality/"
>>
>> aq <- read.df(sqlContext,path,source = "com.databricks.spark.csv",
>> header="true", inferSchema="true")
>>
>> head(dropna(aq,how="any"))
>>
>> I am getting the output as
>>
>> Ozone Solar_R Wind Temp Month Day 1 41 190 7.4 67 5 1 2 36 118 8.0 72 5 2
>> 3 12 149 12.6 74 5 3 4 18 313 11.5 62 5 4 5 NA NA 14.3 56 5 5 6 28 NA 14.9
>> 66 5 6
>>
>> The NAs still exist in the output. Am I missing something here?
>>
>> --
>> Warm regards,
>> Devesh.
>>
>
>


-- 
Warm regards,
Devesh.


SparkSQL return all null fields when FIELDS TERMINATED BY '\t' and have a partition.

2016-01-25 Thread Liu Yiding
Hi, all

I am using CDH 5.5(spark 1.5 and hive 1.1). I occurred a strange problem.
In hive:
hive> create table `tmp.test_d`(`id` int, `name` string) PARTITIONED BY
(`dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'; 
hive> load data local inpath
'/var/lib/hive/dataimport/mendian/target/test.txt' OVERWRITE into table
tmp.test_d partition(dt='2016-01-25');
hive> select * from tmp.test_d;
1   test2016-01-25
2   2016-01-25
Time taken: 0.267 seconds, Fetched: 2 row(s)

But in spark:
scala> sqlContext.sql("select * from tmp.test_d").collect
res9: Array[org.apache.spark.sql.Row] = Array([null,null,2016-01-25],
[null,null,2016-01-25])

All fields return null.

But if I change field delimitor to '\u0001' or create a table didn't have
partitions, it would be normal.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-return-all-null-fields-when-FIELDS-TERMINATED-BY-t-and-have-a-partition-tp26056.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Streaming-Kafka] How to start from topic offset when streamcontext is using checkpoint

2016-01-25 Thread Yash Sharma
For specific offsets you can directly pass the offset ranges and use the
KafkaUtils. createRDD to get the events those were missed in the Dstream.

- Thanks, via mobile,  excuse brevity.
On Jan 25, 2016 3:33 PM, "Raju Bairishetti"  wrote:

> Hi Yash,
>Basically, my question is how to avoid storing the kafka offsets in
> spark checkpoint directory. Streaming context is getting build from
> checkpoint directory and proceeding with the offsets in checkpointed RDD.
>
> I want to consume data from kafka from specific offsets along with the
> spark checkpoints. Streaming context is getting prepared from the
> checkpoint directory and started consuming from the topic offsets which
> were stored in checkpoint directory.
>
>
> On Sat, Jan 23, 2016 at 3:44 PM, Yash Sharma  wrote:
>
>> Hi Raju,
>> Could you please explain your expected behavior with the DStream. The
>> DStream will have event only from the 'fromOffsets' that you provided in
>> the createDirectStream (which I think is the expected behavior).
>>
>> For the smaller files, you will have to deal with smaller files if you
>> intend to write it immediately. Alternately what we do sometimes is-
>>
>> 1.  Maintain couple of iterations for some 30-40 seconds in application
>> until we have substantial data and then we write them to disk.
>> 2. Push smaller data back to kafka, and a different job handles the save
>> to disk.
>>
>> On Sat, Jan 23, 2016 at 7:01 PM, Raju Bairishetti 
>> wrote:
>>
>>> Thanks for quick reply.
>>> I am creating Kafka Dstream by passing offsets map. I have pasted code
>>> snippet in my earlier mail. Let me know am I missing something.
>>>
>>> I want to use spark checkpoint for hand ng only driver/executor
>>> failures.
>>> On Jan 22, 2016 10:08 PM, "Cody Koeninger"  wrote:
>>>
 Offsets are stored in the checkpoint.  If you want to manage offsets
 yourself, don't restart from the checkpoint, specify the starting offsets
 when you create the stream.

 Have you read / watched the materials linked from

 https://github.com/koeninger/kafka-exactly-once

 Regarding the small files problem, either don't use HDFS, or use
 something like filecrush for merging.

 On Fri, Jan 22, 2016 at 3:03 AM, Raju Bairishetti 
 wrote:

> Hi,
>
>
>I am very new to spark & spark-streaming. I am planning to use
> spark streaming for real time processing.
>
>I have created a streaming context and checkpointing to hdfs
> directory for recovery purposes in case of executor failures & driver
> failures.
>
> I am creating Dstream with offset map for getting the data from kafka.
> I am simply ignoring the offsets to understand the behavior. Whenver I
> restart application driver restored from checkpoint as expected but 
> Dstream
> is not getting started from the initial offsets. Dstream was created with
> the last consumed offsets instead of startign from 0 offsets for each 
> topic
> partition as I am not storing the offsets any where.
>
> def main : Unit = {
>
> var sparkStreamingContext = 
> StreamingContext.getOrCreate(SparkConstants.CHECKPOINT_DIR_LOCATION,
>   () => creatingFunc())
>
> ...
>
>
> }
>
> def creatingFunc(): Unit = {
>
> ...
>
> var offsets:Map[TopicAndPartition, Long] = 
> Map(TopicAndPartition("sample_sample3_json",0) -> 0)
>
> KafkaUtils.createDirectStream[String,String, StringDecoder, 
> StringDecoder,
> String](sparkStreamingContext, kafkaParams, offsets, messageHandler)
>
> ...
> }
>
> I want to get control over offset management at event level instead of
> RDD level to make sure that at least once delivery to end system.
>
> As per my understanding, every RDD or RDD partition will stored in
> hdfs as a file If I choose to use HDFS as output. If I use 1sec as batch
> interval then it will be ended up having huge number of small files in
> HDFS. Having small files in HDFS will leads to lots of other issues.
> Is there any way to write multiple RDDs into single file? Don't have
> muh idea about *coalesce* usage. In the worst case, I can merge all small
> files in HDFS in regular intervals.
>
> Thanks...
>
> --
> Thanks
> Raju Bairishetti
> www.lazada.com
>
>
>
>

>>
>
>
> --
>
> --
> Thanks
> Raju Bairishetti
> www.lazada.com
>


Undefined job output-path error in Spark on hive

2016-01-25 Thread Akhilesh Pathodia
Hi,

I am getting following exception in Spark while writing to hive partitioned
table in parquet format:

16/01/25 03:56:40 ERROR executor.Executor: Exception in task 0.2 in
stage 1.0 (TID 3)
java.io.IOException: Undefined job output-path
at 
org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:232)
at 
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.org$apache$spark$sql$hive$SparkHiveDynamicPartitionWriterContainer$$newWriter$1(hiveWriterContainers.scala:237)
at 
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:250)
at 
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:250)
at 
scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at 
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.getLocalFileWriter(hiveWriterContainers.scala:250)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:112)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:85)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

Spark version:1.5.0

Please let me know if anybody has idea about this error.

Thanks,

Akhilesh


Getting top distinct strings from arraylist

2016-01-25 Thread Patrick Plaatje
Hi, 

I’m quite new to Spark and MR, but have a requirement to get all distinct 
values with their respective counts from a transactional file. Let’s assume the 
following file format:

0 1 2 3 4 5 6 7
1 3 4 5 8 9
9 10 11 12 13 14 15 16 17 18
1 4 7 11 12 13 19 20
3 4 7 11 15 20 21 22 23
1 2 5 9 11 12 16

Given this, I would like an ArrayList back, where the String 
is the item identifier and the Integer the count of that item identifier in the 
file. The following is what I came up with to map the values, but can’t figure 
out how to do the counting :(

// create RDD of an arraylist of strings

JavaRDD transactions = sc.textFile(dataPath).map(

new Function() {

private static final long serialVersionUID = 1L;

@Override

public ArrayList call(String s) {

return Lists.newArrayList(s.split(" "));

}

}

);


Any ideas?

Thanks!
Patrick



Re: 10hrs of Scheduler Delay

2016-01-25 Thread Darren Govoni


Probably we should open a ticket for this.There's definitely a deadlock 
situation occurring in spark under certain conditions.
The only clue I have is it always happens on the last stage. And it does seem 
sensitive to scale. If my job has 300mb of data I'll see the deadlock. But if I 
only run 10mb of it it will succeed. This suggest a serious fundamental scaling 
problem.
Workers have plenty of resources.


Sent from my Verizon Wireless 4G LTE smartphone

 Original message 
From: "Sanders, Isaac B"  
Date: 01/24/2016  2:54 PM  (GMT-05:00) 
To: Renu Yadav  
Cc: Darren Govoni , Muthu Jayakumar , 
Ted Yu , user@spark.apache.org 
Subject: Re: 10hrs of Scheduler Delay 






I am not getting anywhere with any of the suggestions so far. :(



Trying some more outlets, I will share any solution I find.



- Isaac




On Jan 23, 2016, at 1:48 AM, Renu Yadav  wrote:



If you turn on spark.speculation on then that might help. it worked  for me




On Sat, Jan 23, 2016 at 3:21 AM, Darren Govoni 
 wrote:



Thanks for the tip. I will try it. But this is the kind of thing spark is 
supposed to figure out and handle. Or at least not get stuck forever.











Sent from my Verizon Wireless 4G LTE smartphone





 Original message 



From: Muthu Jayakumar 


Date: 01/22/2016 3:50 PM (GMT-05:00) 

To: Darren Govoni , "Sanders, Isaac B" 
, Ted Yu 


Cc: user@spark.apache.org


Subject: Re: 10hrs of Scheduler Delay 



Does increasing the number of partition helps? You could try out something 3 
times what you currently have. 
Another trick i used was to partition the problem into multiple dataframes and 
run them sequentially and persistent the result and then run a union on the 
results. 



Hope this helps. 




On Fri, Jan 22, 2016, 3:48 AM Darren Govoni  wrote:




Me too. I had to shrink my dataset to get it to work. For us at least Spark 
seems to have scaling issues.












Sent from my Verizon Wireless 4G LTE smartphone





 Original message 


From: "Sanders, Isaac B" 


Date: 01/21/2016 11:18 PM (GMT-05:00) 

To: Ted Yu 


Cc: user@spark.apache.org


Subject: Re: 10hrs of Scheduler Delay 




I have run the driver on a smaller dataset (k=2, n=5000) and it worked quickly 
and didn’t hang like this. This dataset is closer to k=10, n=4.4m, but I am 
using more resources on this one.



- Isaac






On Jan 21, 2016, at 11:06 PM, Ted Yu  wrote:



You may have seen the following on github page:


Latest commit 50fdf0e  on Feb 22, 2015






That was 11 months ago.



Can you search for similar algorithm which runs on Spark and is newer ?



If nothing found, consider running the tests coming from the project to 
determine whether the delay is intrinsic.



Cheers



On Thu, Jan 21, 2016 at 7:46 PM, Sanders, Isaac B 
 wrote:



That thread seems to be moving, it oscillates between a few different traces… 
Maybe it is working. It seems odd that it would take that long.



This is 3rd party code, and after looking at some of it, I think it might not 
be as Spark-y as it could be.



I linked it below. I don’t know a lot about spark, so it might be fine, but I 
have my suspicions.



https://github.com/alitouka/spark_dbscan/blob/master/src/src/main/scala/org/alitouka/spark/dbscan/exploratoryAnalysis/DistanceToNearestNeighborDriver.scala



- Isaac




On Jan 21, 2016, at 10:08 PM, Ted Yu  wrote:



You may have noticed the following - did this indicate prolonged computation in 
your code ?




Re: Dataframe, Spark SQL - Drops First 8 Characters of String on Amazon EMR

2016-01-25 Thread awzurn
Sorry for the bump, but wondering if anyone else has seen this before. We're
hoping to either resolve this soon, or move on with further steps to move
this into an issue.

Thanks in advance,

Andrew Zurn



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dataframe-Spark-SQL-Drops-First-8-Characters-of-String-on-Amazon-EMR-tp26022p26065.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org