Re: Error joining dataframes

2016-05-18 Thread Takeshi Yamamuro
Look weird, seems spark-v1.5.x can accept the query.
What's the difference between the example and your query?



Welcome to

    __

 / __/__  ___ _/ /__

_\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 1.5.2

  /_/

scala> :paste

// Entering paste mode (ctrl-D to finish)

val df1 = Seq((1, 0), (2, 0)).toDF("id", "A")

val df2 = Seq((2, 0), (3, 0)).toDF("id", "B")

df1.join(df2, df1("id") === df2("id"), "outer").show


// Exiting paste mode, now interpreting.


+++++

|  id|   A|  id|   B|

+++++

|   1|   0|null|null|

|   2|   0|   2|   0|

|null|null|   3|   0|

+++++


df1: org.apache.spark.sql.DataFrame = [id: int, A: int]

df2: org.apache.spark.sql.DataFrame = [id: int, B: int]





On Wed, May 18, 2016 at 3:52 PM, ram kumar  wrote:

> I tried
> df1.join(df2, df1("id") === df2("id"), "outer").show
>
> But there is a duplicate "id" and when I query the "id", I get
> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>
> I am currently using spark 1.5.2.
> Is there any alternative way in 1.5
>
> Thanks
>
> On Wed, May 18, 2016 at 12:12 PM, Takeshi Yamamuro 
> wrote:
>
>> Also, you can pass the query that you'd like to use in spark-v1.6+;
>>
>> val df1 = Seq((1, 0), (2, 0), (3, 0)).toDF("id", "A")
>> val df2 = Seq((1, 0), (2, 0), (3, 0)).toDF("id", "B")
>> df1.join(df2, df1("id") === df2("id"), "outer").show
>>
>> // maropu
>>
>>
>> On Wed, May 18, 2016 at 3:29 PM, ram kumar 
>> wrote:
>>
>>> If I run as
>>> val rs = s.join(t,"time_id").join(c,"channel_id")
>>>
>>> It takes as inner join.
>>>
>>>
>>> On Wed, May 18, 2016 at 2:31 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 pretty simple, a similar construct to tables projected as DF

 val c =
 HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
 val t =
 HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
 val rs = s.join(t,"time_id").join(c,"channel_id")

 HTH

 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 17 May 2016 at 21:52, Bijay Kumar Pathak  wrote:

> Hi,
>
> Try this one:
>
>
> df_join = df1.*join*(df2, 'Id', "fullouter")
>
> Thanks,
> Bijay
>
>
> On Tue, May 17, 2016 at 9:39 AM, ram kumar 
> wrote:
>
>> Hi,
>>
>> I tried to join two dataframe
>>
>> df_join = df1.*join*(df2, ((df1("Id") === df2("Id")), "fullouter")
>>
>> df_join.registerTempTable("join_test")
>>
>>
>> When querying "Id" from "join_test"
>>
>> 0: jdbc:hive2://> *select Id from join_test;*
>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>> 0: jdbc:hive2://>
>>
>> Is there a way to merge the value of df1("Id") and df2("Id") into one
>> "Id"
>>
>> Thanks
>>
>
>

>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Error joining dataframes

2016-05-18 Thread ram kumar
When you register a temp table from the dataframe

eg:
var df_join = df1.join(df2, df1("id") === df2("id"), "outer")
df_join.registerTempTable("test")

sqlContext.sql("select * from test")

+++++

|  id|   A|  id|   B|

+++++

|   1|   0|null|null|

|   2|   0|   2|   0|

|null|null|   3|   0|

+++++


but, when you query the "id"


sqlContext.sql("select id from test")

*Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
*ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)

On Wed, May 18, 2016 at 12:44 PM, Takeshi Yamamuro 
wrote:

> Look weird, seems spark-v1.5.x can accept the query.
> What's the difference between the example and your query?
>
> 
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 1.5.2
>
>   /_/
>
> scala> :paste
>
> // Entering paste mode (ctrl-D to finish)
>
> val df1 = Seq((1, 0), (2, 0)).toDF("id", "A")
>
> val df2 = Seq((2, 0), (3, 0)).toDF("id", "B")
>
> df1.join(df2, df1("id") === df2("id"), "outer").show
>
>
> // Exiting paste mode, now interpreting.
>
>
> +++++
>
> |  id|   A|  id|   B|
>
> +++++
>
> |   1|   0|null|null|
>
> |   2|   0|   2|   0|
>
> |null|null|   3|   0|
>
> +++++
>
>
> df1: org.apache.spark.sql.DataFrame = [id: int, A: int]
>
> df2: org.apache.spark.sql.DataFrame = [id: int, B: int]
>
>
>
>
>
> On Wed, May 18, 2016 at 3:52 PM, ram kumar 
> wrote:
>
>> I tried
>> df1.join(df2, df1("id") === df2("id"), "outer").show
>>
>> But there is a duplicate "id" and when I query the "id", I get
>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>>
>> I am currently using spark 1.5.2.
>> Is there any alternative way in 1.5
>>
>> Thanks
>>
>> On Wed, May 18, 2016 at 12:12 PM, Takeshi Yamamuro > > wrote:
>>
>>> Also, you can pass the query that you'd like to use in spark-v1.6+;
>>>
>>> val df1 = Seq((1, 0), (2, 0), (3, 0)).toDF("id", "A")
>>> val df2 = Seq((1, 0), (2, 0), (3, 0)).toDF("id", "B")
>>> df1.join(df2, df1("id") === df2("id"), "outer").show
>>>
>>> // maropu
>>>
>>>
>>> On Wed, May 18, 2016 at 3:29 PM, ram kumar 
>>> wrote:
>>>
 If I run as
 val rs = s.join(t,"time_id").join(c,"channel_id")

 It takes as inner join.


 On Wed, May 18, 2016 at 2:31 AM, Mich Talebzadeh <
 mich.talebza...@gmail.com> wrote:

> pretty simple, a similar construct to tables projected as DF
>
> val c =
> HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
> val t =
> HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
> val rs = s.join(t,"time_id").join(c,"channel_id")
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 May 2016 at 21:52, Bijay Kumar Pathak  wrote:
>
>> Hi,
>>
>> Try this one:
>>
>>
>> df_join = df1.*join*(df2, 'Id', "fullouter")
>>
>> Thanks,
>> Bijay
>>
>>
>> On Tue, May 17, 2016 at 9:39 AM, ram kumar 
>> wrote:
>>
>>> Hi,
>>>
>>> I tried to join two dataframe
>>>
>>> df_join = df1.*join*(df2, ((df1("Id") === df2("Id")), "fullouter")
>>>
>>> df_join.registerTempTable("join_test")
>>>
>>>
>>> When querying "Id" from "join_test"
>>>
>>> 0: jdbc:hive2://> *select Id from join_test;*
>>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>>> 0: jdbc:hive2://>
>>>
>>> Is there a way to merge the value of df1("Id") and df2("Id") into
>>> one "Id"
>>>
>>> Thanks
>>>
>>
>>
>

>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Error joining dataframes

2016-05-18 Thread Takeshi Yamamuro
Ah, yes. `df_join` has the two `id`, so you need to select which id you use;

scala> :paste

// Entering paste mode (ctrl-D to finish)


val df1 = Seq((1, 0), (2, 0)).toDF("id", "A")

val df2 = Seq((2, 0), (3, 0)).toDF("id", "B")

val df3 = df1.join(df2, df1("id") === df2("id"), "outer")

df3.printSchema

df3.select(df1("id")).show


// Exiting paste mode, now interpreting.


root

 |-- id: integer (nullable = true)

 |-- A: integer (nullable = true)

 |-- id: integer (nullable = true)

 |-- B: integer (nullable = true)


++

|  id|

++

|   1|

|   2|

|null|

++



On Wed, May 18, 2016 at 4:29 PM, ram kumar  wrote:

> When you register a temp table from the dataframe
>
> eg:
> var df_join = df1.join(df2, df1("id") === df2("id"), "outer")
> df_join.registerTempTable("test")
>
> sqlContext.sql("select * from test")
>
> +++++
>
> |  id|   A|  id|   B|
>
> +++++
>
> |   1|   0|null|null|
>
> |   2|   0|   2|   0|
>
> |null|null|   3|   0|
>
> +++++
>
>
> but, when you query the "id"
>
>
> sqlContext.sql("select id from test")
>
> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>
> On Wed, May 18, 2016 at 12:44 PM, Takeshi Yamamuro 
> wrote:
>
>> Look weird, seems spark-v1.5.x can accept the query.
>> What's the difference between the example and your query?
>>
>> 
>>
>> Welcome to
>>
>>     __
>>
>>  / __/__  ___ _/ /__
>>
>> _\ \/ _ \/ _ `/ __/  '_/
>>
>>/___/ .__/\_,_/_/ /_/\_\   version 1.5.2
>>
>>   /_/
>>
>> scala> :paste
>>
>> // Entering paste mode (ctrl-D to finish)
>>
>> val df1 = Seq((1, 0), (2, 0)).toDF("id", "A")
>>
>> val df2 = Seq((2, 0), (3, 0)).toDF("id", "B")
>>
>> df1.join(df2, df1("id") === df2("id"), "outer").show
>>
>>
>> // Exiting paste mode, now interpreting.
>>
>>
>> +++++
>>
>> |  id|   A|  id|   B|
>>
>> +++++
>>
>> |   1|   0|null|null|
>>
>> |   2|   0|   2|   0|
>>
>> |null|null|   3|   0|
>>
>> +++++
>>
>>
>> df1: org.apache.spark.sql.DataFrame = [id: int, A: int]
>>
>> df2: org.apache.spark.sql.DataFrame = [id: int, B: int]
>>
>>
>>
>>
>>
>> On Wed, May 18, 2016 at 3:52 PM, ram kumar 
>> wrote:
>>
>>> I tried
>>> df1.join(df2, df1("id") === df2("id"), "outer").show
>>>
>>> But there is a duplicate "id" and when I query the "id", I get
>>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>>>
>>> I am currently using spark 1.5.2.
>>> Is there any alternative way in 1.5
>>>
>>> Thanks
>>>
>>> On Wed, May 18, 2016 at 12:12 PM, Takeshi Yamamuro <
>>> linguin@gmail.com> wrote:
>>>
 Also, you can pass the query that you'd like to use in spark-v1.6+;

 val df1 = Seq((1, 0), (2, 0), (3, 0)).toDF("id", "A")
 val df2 = Seq((1, 0), (2, 0), (3, 0)).toDF("id", "B")
 df1.join(df2, df1("id") === df2("id"), "outer").show

 // maropu


 On Wed, May 18, 2016 at 3:29 PM, ram kumar 
 wrote:

> If I run as
> val rs = s.join(t,"time_id").join(c,"channel_id")
>
> It takes as inner join.
>
>
> On Wed, May 18, 2016 at 2:31 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> pretty simple, a similar construct to tables projected as DF
>>
>> val c =
>> HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
>> val t =
>> HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
>> val rs = s.join(t,"time_id").join(c,"channel_id")
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 17 May 2016 at 21:52, Bijay Kumar Pathak  wrote:
>>
>>> Hi,
>>>
>>> Try this one:
>>>
>>>
>>> df_join = df1.*join*(df2, 'Id', "fullouter")
>>>
>>> Thanks,
>>> Bijay
>>>
>>>
>>> On Tue, May 17, 2016 at 9:39 AM, ram kumar 
>>> wrote:
>>>
 Hi,

 I tried to join two dataframe

 df_join = df1.*join*(df2, ((df1("Id") === df2("Id")), "fullouter")

 df_join.registerTempTable("join_test")


 When querying "Id" from "join_test"

 0: jdbc:hive2://> *select Id from join_test;*
 *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
 *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7
 (state=,code=0)
 0: jdbc:hive2://>

 Is there a way to merge the value of df1("Id") and df2("Id") into
 one "Id"

 Thanks

>>>
>

HBase / Spark Kerberos problem

2016-05-18 Thread philipp.meyerhoefer
Hi all,

I have been puzzling over a Kerberos problem for a while now and wondered if 
anyone can help.

For spark-submit, I specify --keytab x --principal y, which creates my 
SparkContext fine.
Connections to Zookeeper Quorum to find the HBase master work well too.
But when it comes to a .count() action on the RDD, I am always presented with 
the stack trace at the end of this mail.

We are using CDH5.5.2 (spark 1.5.0), and com.cloudera.spark.hbase.HBaseContext 
is a wrapper around TableInputFormat/hadoopRDD (see 
https://github.com/cloudera-labs/SparkOnHBase), as you can see in the stack 
trace.

Am I doing something obvious wrong here?
A similar flow, inside test code, works well, only going via spark-submit 
exposes this issue.

Code snippet (I have tried using the commented-out lines in various 
combinations, without success):

   val conf = new SparkConf().
  set("spark.shuffle.consolidateFiles", "true").
  set("spark.kryo.registrationRequired", "false").
  set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
  set("spark.kryoserializer.buffer", "30m")
val sc = new SparkContext(conf)
val cfg = sc.hadoopConfiguration
//cfg.addResource(new 
org.apache.hadoop.fs.Path("/etc/hbase/conf/hbase-site.xml"))
//
UserGroupInformation.getCurrentUser.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS)
//cfg.set("hbase.security.authentication", "kerberos")
val hc = new HBaseContext(sc, cfg)
val scan = new Scan
scan.setTimeRange(startMillis, endMillis)
val matchesInRange = hc.hbaseRDD(MY_TABLE, scan, resultToMatch)
val cnt = matchesInRange.count()
log.info(s"matches in range $cnt")

Stack trace / log:

16/05/17 17:04:47 INFO SparkContext: Starting job: count at Analysis.scala:93
16/05/17 17:04:47 INFO DAGScheduler: Got job 0 (count at Analysis.scala:93) 
with 1 output partitions
16/05/17 17:04:47 INFO DAGScheduler: Final stage: ResultStage 0(count at 
Analysis.scala:93)
16/05/17 17:04:47 INFO DAGScheduler: Parents of final stage: List()
16/05/17 17:04:47 INFO DAGScheduler: Missing parents: List()
16/05/17 17:04:47 INFO DAGScheduler: Submitting ResultStage 0 
(MapPartitionsRDD[1] at map at HBaseContext.scala:580), which has no missing 
parents
16/05/17 17:04:47 INFO MemoryStore: ensureFreeSpace(3248) called with 
curMem=428022, maxMem=244187136
16/05/17 17:04:47 INFO MemoryStore: Block broadcast_3 stored as values in 
memory (estimated size 3.2 KB, free 232.5 MB)
16/05/17 17:04:47 INFO MemoryStore: ensureFreeSpace(2022) called with 
curMem=431270, maxMem=244187136
16/05/17 17:04:47 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in 
memory (estimated size 2022.0 B, free 232.5 MB)
16/05/17 17:04:47 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 
10.6.164.40:33563 (size: 2022.0 B, free: 232.8 MB)
16/05/17 17:04:47 INFO SparkContext: Created broadcast 3 from broadcast at 
DAGScheduler.scala:861
16/05/17 17:04:47 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 0 (MapPartitionsRDD[1] at map at HBaseContext.scala:580)
16/05/17 17:04:47 INFO YarnScheduler: Adding task set 0.0 with 1 tasks
16/05/17 17:04:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
hpg-dev-vm, partition 0,PROCESS_LOCAL, 2208 bytes)
16/05/17 17:04:47 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 
hpg-dev-vm:52698 (size: 2022.0 B, free: 388.4 MB)
16/05/17 17:04:48 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
hpg-dev-vm:52698 (size: 26.0 KB, free: 388.4 MB)
16/05/17 17:04:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
hpg-dev-vm): org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't 
get the location
at 
org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:155)
at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:63)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
at 
org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:314)
at 
org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:289)
at 
org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:161)
at 
org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:156)
at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
at 
org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
at 
org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
at 
org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:138)
at 
org.apache.

Re: Error joining dataframes

2016-05-18 Thread Divya Gehlot
Can you try var df_join = df1.join(df2,df1( "Id") ===df2("Id"),
"fullouter").drop(df1("Id"))
On May 18, 2016 2:16 PM, "ram kumar"  wrote:

I tried

scala> var df_join = df1.join(df2, "Id", "fullouter")
:27: error: type mismatch;
 found   : String("Id")
 required: org.apache.spark.sql.Column
   var df_join = df1.join(df2, "Id", "fullouter")
   ^

scala>

And I cant see the above method in
https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html#join(org.apache.spark.sql.DataFrame,%20org.apache.spark.sql.Column,%20java.lang.String)

On Wed, May 18, 2016 at 2:22 AM, Bijay Kumar Pathak 
wrote:

> Hi,
>
> Try this one:
>
>
> df_join = df1.*join*(df2, 'Id', "fullouter")
>
> Thanks,
> Bijay
>
>
> On Tue, May 17, 2016 at 9:39 AM, ram kumar 
> wrote:
>
>> Hi,
>>
>> I tried to join two dataframe
>>
>> df_join = df1.*join*(df2, ((df1("Id") === df2("Id")), "fullouter")
>>
>> df_join.registerTempTable("join_test")
>>
>>
>> When querying "Id" from "join_test"
>>
>> 0: jdbc:hive2://> *select Id from join_test;*
>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>> 0: jdbc:hive2://>
>>
>> Is there a way to merge the value of df1("Id") and df2("Id") into one "Id"
>>
>> Thanks
>>
>
>


Re: Error joining dataframes

2016-05-18 Thread ram kumar
I tried it,

eg:
 df_join = df1.join(df2,df1( "Id") ===df2("Id"), "fullouter")

+++++

|  id|   A|  id|   B|

+++++

|   1|   0|null|null|

|   2|   0|   2|   0|

|null|null|   3|   0|

+++++


if I try,
df_join = df1.join(df2,df1( "Id") ===df2("Id"), "fullouter").drop(df1("Id"))



++++

|   A|  id|   B|

++++

|   0|null|null|

|   0|   2|   0|

|null|   3|   0|

++++

The "id" = 1 will be lost

On Wed, May 18, 2016 at 1:52 PM, Divya Gehlot 
wrote:

> Can you try var df_join = df1.join(df2,df1( "Id") ===df2("Id"),
> "fullouter").drop(df1("Id"))
> On May 18, 2016 2:16 PM, "ram kumar"  wrote:
>
> I tried
>
> scala> var df_join = df1.join(df2, "Id", "fullouter")
> :27: error: type mismatch;
>  found   : String("Id")
>  required: org.apache.spark.sql.Column
>var df_join = df1.join(df2, "Id", "fullouter")
>^
>
> scala>
>
> And I cant see the above method in
>
> https://spark.apache.org/docs/1.5.1/api/java/org/apache/spark/sql/DataFrame.html#join(org.apache.spark.sql.DataFrame,%20org.apache.spark.sql.Column,%20java.lang.String)
>
> On Wed, May 18, 2016 at 2:22 AM, Bijay Kumar Pathak 
> wrote:
>
>> Hi,
>>
>> Try this one:
>>
>>
>> df_join = df1.*join*(df2, 'Id', "fullouter")
>>
>> Thanks,
>> Bijay
>>
>>
>> On Tue, May 17, 2016 at 9:39 AM, ram kumar 
>> wrote:
>>
>>> Hi,
>>>
>>> I tried to join two dataframe
>>>
>>> df_join = df1.*join*(df2, ((df1("Id") === df2("Id")), "fullouter")
>>>
>>> df_join.registerTempTable("join_test")
>>>
>>>
>>> When querying "Id" from "join_test"
>>>
>>> 0: jdbc:hive2://> *select Id from join_test;*
>>> *Error*: org.apache.spark.sql.AnalysisException: Reference 'Id' is
>>> *ambiguous*, could be: Id#128, Id#155.; line 1 pos 7 (state=,code=0)
>>> 0: jdbc:hive2://>
>>>
>>> Is there a way to merge the value of df1("Id") and df2("Id") into one
>>> "Id"
>>>
>>> Thanks
>>>
>>
>>
>


Re: Does Structured Streaming support count(distinct) over all the streaming data?

2016-05-18 Thread Sean Owen
Late to the thread, but, why is counting distinct elements over a
24-hour window not possible? you can certainly do it now, and I'd
presume it's possible with structured streaming with a window.

countByValueAndWindow should do it right? the keys (with non-zero
counts, I suppose) in a window are the distinct values from the stream
in that window. Your example looks right.

On Wed, May 18, 2016 at 12:17 AM, Mich Talebzadeh
 wrote:
>
> Ok What can be used here below
>
> //val countDistinctByValueAndWindow = price.filter(_ > 0.0).reduceByKey((t1, 
> t2) -> t1).countByValueAndWindow(Seconds(windowLength), 
> Seconds(slidingInterval))
> //countDistinctByValueAndWindow.print()
>
>>> On 17 May 2016 at 20:02, Michael Armbrust  wrote:
 In 2.0 you won't be able to do this.  The long term vision would be to 
 make this possible, but a window will be required (like the 24 hours you 
 suggest).

 On Tue, May 17, 2016 at 1:36 AM, Todd  wrote:
>
> Hi,
> We have a requirement to do count(distinct) in a processing batch against 
> all the streaming data(eg, last 24 hours' data),that is,when we do 
> count(distinct),we actually want to compute distinct against last 24 
> hours' data.
> Does structured streaming support this scenario?Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



File not found exception while reading from folder using textFileStream

2016-05-18 Thread Yogesh Vyas
Hi,
I am trying to read the files in a streaming way using Spark
Streaming. For this I am copying files from my local folder to the
source folder from where spark reads the file.
After reading and printing some of the files, it gives the following error:

Caused by: org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
File does not exist: /user/hadoop/file17.xml._COPYING_

I guess the Spark Streaming file is trying to read the file before it
gets copied completely.

Does anyone knows how to handle such type of exception?

Regards,
Yogesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark 2.0 state store] Streaming wordcount using spark state store

2016-05-18 Thread Shekhar Bansal
Hi
What is the right way of using spark2.0 state store feature in spark 
streaming??I referred test cases in 
this(https://github.com/apache/spark/pull/11645/files) pull request and 
implemented word count using state store.My source is kafka(1 topic, 10 
partitions). My data pump is pushing numbers into random partition.I understand 
that state store maintains state per partition, so I am applying partitionBy 
before calling mapPartitionsWithStateStore.
Problem I am facing is that after some time, I start getting wrong running 
count.My data pump is pushing number 1 every 5 seconds, which is same as 
microbatch duration. First 20 micro batches ran fine but in 21st microbatch 
state of 1 somehow got reset and I got count=1, please see console output.
Code of my streaming app    val keySchema = StructType(Seq(StructField("key", 
StringType, true)))    val valueSchema = StructType(Seq(StructField("value", 
IntegerType, true)))        val stateStoreCheckpointPath = 
"/data/spark/stateStoreCheckpoints/"    var stateStoreVersion:Long = 0        
val stateStoreWordCount = (store: StateStore, iter: Iterator[String]) => {      
val out = new ListBuffer[(String, Int)]      iter.foreach { s =>        val 
current = store.get(stringToRow(s)).map(rowToInt).getOrElse(0) + 1        
store.put(stringToRow(s), intToRow(current))        out.append((s,current))     
 }
      store.commit      out.iterator    }        val opId = 100    
KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, topicsSet)      .flatMap(r=>{r._2.split(" ")}) 
     .foreachRDD((rdd, time) =>{        rdd        .map(r=>(r,null))        
.partitionBy(new HashPartitioner(20))        .map(r=>r._1)        
.mapPartitionsWithStateStore(sqlContet, stateStoreCheckpointPath, opId, 
storeVersion = stateStoreVersion, keySchema, valueSchema)(stateStoreWordCount)  
      .collect foreach(r=> println(time  + " - " + r))        
stateStoreVersion+=1        println(time + " batch finished")        }      )
Code of my Data pump    val list = 
List(1,2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97)
    while (true) {      list.foreach(r=>{        if(count%r==0){          val 
productRecord = new ProducerRecord(Configs.topic,new Random().nextInt(10), "" , 
r.toString)          producer.send(productRecord)        }      })      
count+=1      Thread.sleep(5000);    }

Complete code is available 
here(https://github.com/zuxqoj/HelloWorld/tree/master/SparkStreamingStateStore/src/main/scala/spark/streaming/statestore/test)

I am using spark on yarn in client mode.spark - spark-2.0.0-snapshot (git sha - 
6c5768594fe8b910125f06e1308a8154a199447e)  - May 13, 2016scala - 2.10.2java - 
1.8hadoop - 2.7.1kafka - 0.8.2.1
Spark config:spark.executor.cores=12spark.executor.instances=6


Console Output146356664 ms - (1,1)146356664 ms batch 
finished1463566645000 ms - (1,2)1463566645000 ms - (2,1)1463566645000 ms batch 
finished146356665 ms - (1,3)146356665 ms - (3,1)146356665 ms batch 
finished1463566655000 ms - (1,4)1463566655000 ms - (2,2)1463566655000 ms batch 
finished14635 ms - (1,5)14635 ms - (5,1)14635 ms batch 
finished146355000 ms - (1,6)146355000 ms - (2,3)146355000 ms - 
(3,2)146355000 ms batch finished146356667 ms - (1,7)146356667 ms - 
(7,1)146356667 ms batch finished1463566675000 ms - (1,8)1463566675000 ms - 
(2,4)1463566675000 ms batch finished146356668 ms - (1,9)146356668 ms - 
(3,3)146356668 ms batch finished1463566685000 ms - (1,10)1463566685000 ms - 
(2,5)1463566685000 ms - (5,2)1463566685000 ms batch finished146356669 ms - 
(11,1)146356669 ms - (1,11)146356669 ms batch finished1463566695000 ms 
- (1,12)1463566695000 ms - (2,6)1463566695000 ms - (3,4)1463566695000 ms batch 
finished146356670 ms - (1,13)146356670 ms - (13,1)146356670 ms 
batch finished1463566705000 ms - (1,14)1463566705000 ms - (2,7)1463566705000 ms 
- (7,2)1463566705000 ms batch finished146356671 ms - (1,15)146356671 ms 
- (3,5)146356671 ms - (5,3)146356671 ms batch finished1463566715000 ms 
- (1,16)1463566715000 ms - (2,8)1463566715000 ms batch finished146356672 ms 
- (1,17)146356672 ms - (17,1)146356672 ms batch finished1463566725000 
ms - (1,18)1463566725000 ms - (2,9)1463566725000 ms - (3,6)1463566725000 ms 
batch finished146356673 ms - (1,19)146356673 ms - (19,1)146356673 
ms batch finished1463566735000 ms - (1,20)1463566735000 ms - 
(2,10)1463566735000 ms - (5,4)1463566735000 ms batch finished146356674 ms - 
(1,1) <-- count got reset146356674 ms - (3,7)146356674 
ms - (7,1) <-- count got reset146356674 ms batch 
finished1463566745000 ms - (11,2)1463566745000 ms - (1,2)1463566745000 ms - 
(2,11)1463566745000 ms batch finished146356675 ms - (23,1)146356675 ms 
- (1,3)146356675 ms batch finished1463566755000 ms - (1,4)14635667

Managed memory leak detected.SPARK-11293 ?

2016-05-18 Thread Serega Sheypak
Hi, please have a look at log snippet:
16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
tracker endpoint =
NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
locations
16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 30
non-empty blocks out of 30 blocks
16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 30
remote fetches in 3 ms
16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Don't have map outputs
for shuffle 1, fetching them
16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
tracker endpoint =
NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
locations
16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 1
non-empty blocks out of 1500 blocks
16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 1
remote fetches in 1 ms
16/05/18 03:27:17 ERROR executor.Executor: Managed memory leak detected;
size = 6685476 bytes, TID = 3405
16/05/18 03:27:17 ERROR executor.Executor: Exception in task 285.0 in stage
6.0 (TID 3405)

Is it related to https://issues.apache.org/jira/browse/SPARK-11293

Is there any recommended workaround?


Re: File not found exception while reading from folder using textFileStream

2016-05-18 Thread Ted Yu
The following should handle the situation you encountered:

diff --git
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.sca
index ed93058..f79420b 100644
---
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -266,6 +266,10 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,
V]](
   logDebug(s"$pathStr already considered")
   return false
 }
+if (pathStr.endsWith("._COPYING_")) {
+  logDebug(s"$pathStr is being copied")
+  return false
+}
 logDebug(s"$pathStr accepted with mod time $modTime")
 return true
   }

On Wed, May 18, 2016 at 2:06 AM, Yogesh Vyas  wrote:

> Hi,
> I am trying to read the files in a streaming way using Spark
> Streaming. For this I am copying files from my local folder to the
> source folder from where spark reads the file.
> After reading and printing some of the files, it gives the following error:
>
> Caused by:
> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
> File does not exist: /user/hadoop/file17.xml._COPYING_
>
> I guess the Spark Streaming file is trying to read the file before it
> gets copied completely.
>
> Does anyone knows how to handle such type of exception?
>
> Regards,
> Yogesh
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: SPARK - DataFrame for BulkLoad

2016-05-18 Thread Ted Yu
Please see HBASE-14150

The hbase-spark module would be available in the upcoming hbase 2.0 release.

On Tue, May 17, 2016 at 11:48 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> Have you checked this?
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3ccacyzca3askwd-tujhqi1805bn7sctguaoruhd5xtxcsul1a...@mail.gmail.com%3E
>
> // maropu
>
> On Wed, May 18, 2016 at 1:14 PM, Mohanraj Ragupathiraj <
> mohanaug...@gmail.com> wrote:
>
>> I have 100 million records to be inserted to a HBase table (PHOENIX) as a
>> result of a Spark Job. I would like to know if i convert it to a Dataframe
>> and save it, will it do Bulk load (or) it is not the efficient way to write
>> data to a HBase table
>>
>> --
>> Thanks and Regards
>> Mohan
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: SPARK - DataFrame for BulkLoad

2016-05-18 Thread Michael Segel
Yes, but he’s using phoenix which may not work cleanly with your HBase spark 
module. 
They key issue here may be Phoenix which is separate from HBase. 


> On May 18, 2016, at 5:36 AM, Ted Yu  wrote:
> 
> Please see HBASE-14150
> 
> The hbase-spark module would be available in the upcoming hbase 2.0 release.
> 
> On Tue, May 17, 2016 at 11:48 PM, Takeshi Yamamuro  > wrote:
> Hi,
> 
> Have you checked this?
> http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3ccacyzca3askwd-tujhqi1805bn7sctguaoruhd5xtxcsul1a...@mail.gmail.com%3E
>  
> 
> 
> // maropu
> 
> On Wed, May 18, 2016 at 1:14 PM, Mohanraj Ragupathiraj  > wrote:
> I have 100 million records to be inserted to a HBase table (PHOENIX) as a 
> result of a Spark Job. I would like to know if i convert it to a Dataframe 
> and save it, will it do Bulk load (or) it is not the efficient way to write 
> data to a HBase table
> 
> -- 
> Thanks and Regards
> Mohan
> 
> 
> 
> -- 
> ---
> Takeshi Yamamuro
> 



Re: File not found exception while reading from folder using textFileStream

2016-05-18 Thread Saisai Shao
>From my understanding, we should copy the file into another folder and move
to source folder after copy is finished, otherwise we will read the
half-copied data or meet the issue as you mentioned above.

On Wed, May 18, 2016 at 8:32 PM, Ted Yu  wrote:

> The following should handle the situation you encountered:
>
> diff --git
> a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
> b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.sca
> index ed93058..f79420b 100644
> ---
> a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
> +++
> b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
> @@ -266,6 +266,10 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,
> V]](
>logDebug(s"$pathStr already considered")
>return false
>  }
> +if (pathStr.endsWith("._COPYING_")) {
> +  logDebug(s"$pathStr is being copied")
> +  return false
> +}
>  logDebug(s"$pathStr accepted with mod time $modTime")
>  return true
>}
>
> On Wed, May 18, 2016 at 2:06 AM, Yogesh Vyas  wrote:
>
>> Hi,
>> I am trying to read the files in a streaming way using Spark
>> Streaming. For this I am copying files from my local folder to the
>> source folder from where spark reads the file.
>> After reading and printing some of the files, it gives the following
>> error:
>>
>> Caused by:
>> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException):
>> File does not exist: /user/hadoop/file17.xml._COPYING_
>>
>> I guess the Spark Streaming file is trying to read the file before it
>> gets copied completely.
>>
>> Does anyone knows how to handle such type of exception?
>>
>> Regards,
>> Yogesh
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Managed memory leak detected.SPARK-11293 ?

2016-05-18 Thread Ted Yu
Please increase the number of partitions.

Cheers

On Wed, May 18, 2016 at 4:17 AM, Serega Sheypak 
wrote:

> Hi, please have a look at log snippet:
> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
> tracker endpoint =
> NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
> locations
> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 30
> non-empty blocks out of 30 blocks
> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 30
> remote fetches in 3 ms
> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Don't have map
> outputs for shuffle 1, fetching them
> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
> tracker endpoint =
> NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
> locations
> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 1
> non-empty blocks out of 1500 blocks
> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 1
> remote fetches in 1 ms
> 16/05/18 03:27:17 ERROR executor.Executor: Managed memory leak detected;
> size = 6685476 bytes, TID = 3405
> 16/05/18 03:27:17 ERROR executor.Executor: Exception in task 285.0 in
> stage 6.0 (TID 3405)
>
> Is it related to https://issues.apache.org/jira/browse/SPARK-11293
>
> Is there any recommended workaround?
>


Spark Task not serializable with lag Window function

2016-05-18 Thread luca_guerra
I've noticed that after I use a Window function over a DataFrame if I call a
map() with a function, Spark returns a "Task not serializable" Exception
This is my code:

val hc = new org.apache.spark.sql.hive.HiveContext(sc)
import hc.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
def f():String = "test"
case class P(name:String,surname:String)
val lag_result = lag($"name",1).over(Window.partitionBy($"surname"))
val lista = List(P("N1","S1"),P("N2","S2"),P("N2","S2"))
val data_frame = hc.createDataFrame(sc.parallelize(lista))
df.withColumn("lag_result", lag_result).map(x => f)
//df.withColumn("lag_result", lag_result).map{case x => def f():String =
"test";f}.collect // This works

And this is the Stack Trace:

org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323)
at 
... and more
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value:
'lag(name,1,null) windowspecdefinition(surname,UnspecifiedFrame))
- field (class:
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,
name: lag_result, type: class org.apache.spark.sql.Column)
... and more



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Task-not-serializable-with-lag-Window-function-tp26976.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark udf can not change a json string to a map

2016-05-18 Thread Ted Yu
Please take a look at JavaUtils#mapAsSerializableJavaMap

FYI

On Mon, May 16, 2016 at 3:24 AM, 喜之郎 <251922...@qq.com> wrote:

>
> hi, Ted.
> I found a built-in function called str_to_map, which can transform string
> to map.
> But it can not meet my need.
>
> Because my string is maybe a map with a array nested in its value.
> for example, map>.
> I think it can not work fine in my situation.
>
> Cheers
>
> -- 原始邮件 --
> *发件人:* "喜之郎";<251922...@qq.com>;
> *发送时间:* 2016年5月16日(星期一) 上午10:00
> *收件人:* "Ted Yu";
> *抄送:* "user";
> *主题:* 回复: spark udf can not change a json string to a map
>
> this is my usecase:
>Another system upload csv files to my system. In csv files, there are
> complicated data types such as map. In order to express complicated data
> types and ordinary string having special characters, we put urlencoded
> string in csv files.  So we use urlencoded json string to express
> map,string and array.
>
> second stage:
>   load csv files to spark text table.
> ###
> CREATE TABLE `a_text`(
>   parameters  string
> )
> load data inpath 'XXX' into table a_text;
> #
> Third stage:
>  insert into spark parquet table select from text table. In order to use
> advantage of complicated data types, we use udf to transform a json
> string to map , and put map to table.
>
> CREATE TABLE `a_parquet`(
>   parameters   map
> )
>
> insert into a_parquet select UDF(parameters ) from a_text;
>
> So do you have any suggestions?
>
>
>
>
>
>
> -- 原始邮件 --
> *发件人:* "Ted Yu";;
> *发送时间:* 2016年5月16日(星期一) 凌晨0:44
> *收件人:* "喜之郎"<251922...@qq.com>;
> *抄送:* "user";
> *主题:* Re: spark udf can not change a json string to a map
>
> Can you let us know more about your use case ?
>
> I wonder if you can structure your udf by not returning Map.
>
> Cheers
>
> On Sun, May 15, 2016 at 9:18 AM, 喜之郎 <251922...@qq.com> wrote:
>
>> Hi, all. I want to implement a udf which is used to change a json string
>> to a map.
>> But some problem occurs. My spark version:1.5.1.
>>
>>
>> my udf code:
>> 
>> public Map evaluate(final String s) {
>> if (s == null)
>> return null;
>> return getString(s);
>> }
>>
>> @SuppressWarnings("unchecked")
>> public static Map getString(String s) {
>> try {
>> String str =  URLDecoder.decode(s, "UTF-8");
>> ObjectMapper mapper = new ObjectMapper();
>> Map  map = mapper.readValue(str, Map.class);
>> return map;
>> } catch (Exception e) {
>> return new HashMap();
>> }
>> }
>> #
>> exception infos:
>>
>> 16/05/14 21:05:22 ERROR CliDriver:
>> org.apache.spark.sql.AnalysisException: Map type in java is unsupported
>> because JVM type erasure makes spark fail to catch key and value types in
>> Map<>; line 1 pos 352
>> at
>> org.apache.spark.sql.hive.HiveInspectors$class.javaClassToDataType(HiveInspectors.scala:230)
>> at
>> org.apache.spark.sql.hive.HiveSimpleUDF.javaClassToDataType(hiveUDFs.scala:107)
>> at org.apache.spark.sql.hive.HiveSimpleUDF.(hiveUDFs.scala:136)
>> 
>>
>>
>> I have saw that there is a testsuite in spark says spark did not support
>> this kind of udf.
>> But is there a method to implement this udf?
>>
>>
>>
>


Re: Managed memory leak detected.SPARK-11293 ?

2016-05-18 Thread Serega Sheypak
Switching from snappy to lzf helped me:

*spark.io.compression.codec=lzf*

Do you know why? :) I can't find exact explanation...



2016-05-18 15:41 GMT+02:00 Ted Yu :

> Please increase the number of partitions.
>
> Cheers
>
> On Wed, May 18, 2016 at 4:17 AM, Serega Sheypak 
> wrote:
>
>> Hi, please have a look at log snippet:
>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
>> tracker endpoint =
>> NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
>> locations
>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 30
>> non-empty blocks out of 30 blocks
>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 30
>> remote fetches in 3 ms
>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Don't have map
>> outputs for shuffle 1, fetching them
>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
>> tracker endpoint =
>> NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
>> locations
>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 1
>> non-empty blocks out of 1500 blocks
>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 1
>> remote fetches in 1 ms
>> 16/05/18 03:27:17 ERROR executor.Executor: Managed memory leak detected;
>> size = 6685476 bytes, TID = 3405
>> 16/05/18 03:27:17 ERROR executor.Executor: Exception in task 285.0 in
>> stage 6.0 (TID 3405)
>>
>> Is it related to https://issues.apache.org/jira/browse/SPARK-11293
>>
>> Is there any recommended workaround?
>>
>
>


Re: Managed memory leak detected.SPARK-11293 ?

2016-05-18 Thread Ted Yu
According to:
http://blog.erdemagaoglu.com/post/4605524309/lzo-vs-snappy-vs-lzf-vs-zlib-a-comparison-of

performance of snappy and lzf were on-par to each other.

Maybe lzf has lower memory requirement.

On Wed, May 18, 2016 at 7:22 AM, Serega Sheypak 
wrote:

> Switching from snappy to lzf helped me:
>
> *spark.io.compression.codec=lzf*
>
> Do you know why? :) I can't find exact explanation...
>
>
>
> 2016-05-18 15:41 GMT+02:00 Ted Yu :
>
>> Please increase the number of partitions.
>>
>> Cheers
>>
>> On Wed, May 18, 2016 at 4:17 AM, Serega Sheypak > > wrote:
>>
>>> Hi, please have a look at log snippet:
>>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
>>> tracker endpoint =
>>> NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
>>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
>>> locations
>>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 30
>>> non-empty blocks out of 30 blocks
>>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 30
>>> remote fetches in 3 ms
>>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Don't have map
>>> outputs for shuffle 1, fetching them
>>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
>>> tracker endpoint =
>>> NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
>>> 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
>>> locations
>>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 1
>>> non-empty blocks out of 1500 blocks
>>> 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 1
>>> remote fetches in 1 ms
>>> 16/05/18 03:27:17 ERROR executor.Executor: Managed memory leak detected;
>>> size = 6685476 bytes, TID = 3405
>>> 16/05/18 03:27:17 ERROR executor.Executor: Exception in task 285.0 in
>>> stage 6.0 (TID 3405)
>>>
>>> Is it related to https://issues.apache.org/jira/browse/SPARK-11293
>>>
>>> Is there any recommended workaround?
>>>
>>
>>
>


Re: Managed memory leak detected.SPARK-11293 ?

2016-05-18 Thread Serega Sheypak
Ok, it happens only in YARN+cluster mode. It works with snappy in
YARN+client mode.
I've  started to hit this problem when I switched to cluster mode.

2016-05-18 16:31 GMT+02:00 Ted Yu :

> According to:
>
> http://blog.erdemagaoglu.com/post/4605524309/lzo-vs-snappy-vs-lzf-vs-zlib-a-comparison-of
>
> performance of snappy and lzf were on-par to each other.
>
> Maybe lzf has lower memory requirement.
>
> On Wed, May 18, 2016 at 7:22 AM, Serega Sheypak 
> wrote:
>
>> Switching from snappy to lzf helped me:
>>
>> *spark.io.compression.codec=lzf*
>>
>> Do you know why? :) I can't find exact explanation...
>>
>>
>>
>> 2016-05-18 15:41 GMT+02:00 Ted Yu :
>>
>>> Please increase the number of partitions.
>>>
>>> Cheers
>>>
>>> On Wed, May 18, 2016 at 4:17 AM, Serega Sheypak <
>>> serega.shey...@gmail.com> wrote:
>>>
 Hi, please have a look at log snippet:
 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
 tracker endpoint =
 NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
 locations
 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 30
 non-empty blocks out of 30 blocks
 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 30
 remote fetches in 3 ms
 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Don't have map
 outputs for shuffle 1, fetching them
 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Doing the fetch;
 tracker endpoint =
 NettyRpcEndpointRef(spark://mapoutputtrac...@xxx.xxx.xxx.xxx:38128)
 16/05/18 03:27:16 INFO spark.MapOutputTrackerWorker: Got the output
 locations
 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Getting 1
 non-empty blocks out of 1500 blocks
 16/05/18 03:27:16 INFO storage.ShuffleBlockFetcherIterator: Started 1
 remote fetches in 1 ms
 16/05/18 03:27:17 ERROR executor.Executor: Managed memory leak
 detected; size = 6685476 bytes, TID = 3405
 16/05/18 03:27:17 ERROR executor.Executor: Exception in task 285.0 in
 stage 6.0 (TID 3405)

 Is it related to https://issues.apache.org/jira/browse/SPARK-11293

 Is there any recommended workaround?

>>>
>>>
>>
>


Submit python egg?

2016-05-18 Thread Darren Govoni


Hi  I have a python egg with a __main__.py in it. I am able to execute the egg 
by itself fine.
Is there a way to just submit the egg to spark and have it run? It seems an 
external .py script is needed which would be unfortunate if true.
Thanks


Sent from my Verizon Wireless 4G LTE smartphone

Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.

2016-05-18 Thread Mail.com
Adding back users.



> On May 18, 2016, at 11:49 AM, Mail.com  wrote:
> 
> Hi Uladzimir,
> 
> I run is as below.
> 
> Spark-submit --class com.test --num-executors 4 --executor-cores 5 --queue 
> Dev --master yarn-client --driver-memory 512M --executor-memory 512M test.jar
> 
> Thanks,
> Pradeep
> 
> 
>> On May 18, 2016, at 5:45 AM, Vova Shelgunov  wrote:
>> 
>> Hi Pradeep,
>> 
>> How do you run your spark application? What is spark master? How many cores 
>> do you allocate?
>> 
>> Regards,
>> Uladzimir
>> 
>>> On May 17, 2016 7:33 AM, "Mail.com"  wrote:
>>> Hi Muthu,
>>> 
>>> Are you on spark 1.4.1 and Kafka 0.8.2? I have a similar issue even for 
>>> simple string messages.
>>> 
>>> Console producer and consumer work fine. But spark always reruns empty RDD. 
>>> I am using Receiver based Approach.
>>> 
>>> Thanks,
>>> Pradeep
>>> 
>>> > On May 16, 2016, at 8:19 PM, Ramaswamy, Muthuraman 
>>> >  wrote:
>>> >
>>> > Yes, I can see the messages. Also, I wrote a quick custom decoder for 
>>> > avro and it works fine for the following:
>>> >
>>> >>> kvs = KafkaUtils.createDirectStream(ssc, [topic], 
>>> >>> {"metadata.broker.list": brokers}, valueDecoder=decoder)
>>> >
>>> > But, when I use the Confluent Serializers to leverage the Schema Registry 
>>> > (based on the link shown below), it doesn’t work for me. I am not sure 
>>> > whether I need to configure any more details to consume the Schema 
>>> > Registry. I can fetch the schema from the schema registry based on is 
>>> > Ids. The decoder method is not returning any values for me.
>>> >
>>> > ~Muthu
>>> >
>>> >
>>> >
>>> >> On 5/16/16, 10:49 AM, "Cody Koeninger"  wrote:
>>> >>
>>> >> Have you checked to make sure you can receive messages just using a
>>> >> byte array for value?
>>> >>
>>> >> On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman
>>> >>  wrote:
>>> >>> I am trying to consume AVRO formatted message through
>>> >>> KafkaUtils.createDirectStream. I followed the listed below example 
>>> >>> (refer
>>> >>> link) but the messages are not being fetched by the Stream.
>>> >>>
>>> >>> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser&d=CwIBaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0&s=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8&e=
>>> >>>
>>> >>> Is there any code missing that I must add to make the above sample work.
>>> >>> Say, I am not sure how the confluent serializers would know the avro 
>>> >>> schema
>>> >>> info as it knows only the Schema Registry URL info.
>>> >>>
>>> >>> Appreciate your help.
>>> >>>
>>> >>> ~Muthu
>>> >  
>>> > B‹CB• 
>>> > È [œÝXœØÜšX™K  K[XZ[ ˆ \Ù\‹][œÝXœØÜšX™P Ü \šË˜\ XÚ K›Ü™ÃB‘›Üˆ Y  ] [Û˜[  
>>> > ÛÛ[X[™ Ë  K[XZ[ ˆ \Ù\‹Z [   Ü \šË˜\ XÚ K›Ü™ÃBƒB
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org


Re: 2 tables join happens at Hive but not in spark

2016-05-18 Thread Davies Liu
What the schema of the two tables looks like? Could you also show the
explain of the query?

On Sat, Feb 27, 2016 at 2:10 AM, Sandeep Khurana  wrote:
> Hello
>
> We have 2 tables  (tab1, tab2) exposed using hive. The data is in different
> hdfs folders. We are trying to join these 2 tables on certain single column
> using sparkR join. But inspite of join columns having same values, it
> returns zero rows.
>
> But when I run the same join sql in hive, from hive console, to get the
> count(*), I do get millions of records meeting the join criteria.
>
> The join columns are of 'int' type. Also, when I join 'tab1' from one of
> these 2 tables for which join is not working with another 3rd table 'tab3'
> separately, that join works.
>
> To debug , we selected just 1 row in the sparkR script from tab1 and also 1
> row row having the same value of join column from tab2 also. We used
> 'select' sparkR function for this. Now, our dataframes for tab1 and tab2
> have single row each and the join columns have same value in both, but still
> joining these 2 dataframes having single row each and with same join column,
> the join returned zero rows.
>
>
> We are running the script from rstudio. It does not give any error. It runs
> fine. But gives zero join results whereas on hive I do get many rows for
> same join. Any idea what might be the cause of this?
>
>
>
> --
> Architect
> Infoworks.io
> http://Infoworks.io

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark 2.0 state store] Streaming wordcount using spark state store

2016-05-18 Thread Michael Armbrust
The state store for structured streaming is an internal concept, and isn't
designed to be consumed by end users.  I'm hoping to write some
documentation on how to do aggregation, but support for reading from Kafka
and other sources will likely come in Spark 2.1+

On Wed, May 18, 2016 at 3:50 AM, Shekhar Bansal <
shekhar0...@yahoo.com.invalid> wrote:

> Hi
>
> What is the right way of using spark2.0 state store feature in spark
> streaming??
> I referred test cases in this(
> https://github.com/apache/spark/pull/11645/files) pull request and
> implemented word count using state store.
> My source is kafka(1 topic, 10 partitions). My data pump is pushing
> numbers into random partition.
> I understand that state store maintains state per partition, so I am
> applying partitionBy before calling mapPartitionsWithStateStore.
>
> Problem I am facing is that after some time, I start getting wrong running
> count.
> My data pump is pushing number 1 every 5 seconds, which is same as
> microbatch duration. First 20 micro batches ran fine but in 21st microbatch
> state of 1 somehow got reset and I got count=1, please see console output.
>
> Code of my streaming app
> val keySchema = StructType(Seq(StructField("key", StringType, true)))
> val valueSchema = StructType(Seq(StructField("value", IntegerType,
> true)))
>
> val stateStoreCheckpointPath = "/data/spark/stateStoreCheckpoints/"
> var stateStoreVersion:Long = 0
>
> val stateStoreWordCount = (store: StateStore, iter: Iterator[String])
> => {
>   val out = new ListBuffer[(String, Int)]
>   iter.foreach { s =>
> val current = store.get(stringToRow(s)).map(rowToInt).getOrElse(0)
> + 1
> store.put(stringToRow(s), intToRow(current))
> out.append((s,current))
>   }
>
>   store.commit
>   out.iterator
> }
>
> val opId = 100
> KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)
>   .flatMap(r=>{r._2.split(" ")})
>   .foreachRDD((rdd, time) =>{
> rdd
> .map(r=>(r,null))
> .partitionBy(new HashPartitioner(20))
> .map(r=>r._1)
> .mapPartitionsWithStateStore(sqlContet, stateStoreCheckpointPath,
> opId, storeVersion = stateStoreVersion, keySchema,
> valueSchema)(stateStoreWordCount)
> .collect foreach(r=> println(time  + " - " + r))
> stateStoreVersion+=1
> println(time + " batch finished")
> }
>   )
>
> Code of my Data pump
> val list =
> List(1,2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97)
> while (true) {
>   list.foreach(r=>{
> if(count%r==0){
>   val productRecord = new ProducerRecord(Configs.topic,new
> Random().nextInt(10), "" , r.toString)
>   producer.send(productRecord)
> }
>   })
>   count+=1
>   Thread.sleep(5000);
> }
>
>
> Complete code is available here(
> https://github.com/zuxqoj/HelloWorld/tree/master/SparkStreamingStateStore/src/main/scala/spark/streaming/statestore/test
> )
>
>
> I am using spark on yarn in client mode.
> spark - spark-2.0.0-snapshot (git sha -
> 6c5768594fe8b910125f06e1308a8154a199447e)  - May 13, 2016
> scala - 2.10.2
> java - 1.8
> hadoop - 2.7.1
> kafka - 0.8.2.1
>
> Spark config:
> spark.executor.cores=12
> spark.executor.instances=6
>
>
> Console Output
> 146356664 ms - (1,1)
> 146356664 ms batch finished
> 1463566645000 ms - (1,2)
> 1463566645000 ms - (2,1)
> 1463566645000 ms batch finished
> 146356665 ms - (1,3)
> 146356665 ms - (3,1)
> 146356665 ms batch finished
> 1463566655000 ms - (1,4)
> 1463566655000 ms - (2,2)
> 1463566655000 ms batch finished
> 14635 ms - (1,5)
> 14635 ms - (5,1)
> 14635 ms batch finished
> 146355000 ms - (1,6)
> 146355000 ms - (2,3)
> 146355000 ms - (3,2)
> 146355000 ms batch finished
> 146356667 ms - (1,7)
> 146356667 ms - (7,1)
> 146356667 ms batch finished
> 1463566675000 ms - (1,8)
> 1463566675000 ms - (2,4)
> 1463566675000 ms batch finished
> 146356668 ms - (1,9)
> 146356668 ms - (3,3)
> 146356668 ms batch finished
> 1463566685000 ms - (1,10)
> 1463566685000 ms - (2,5)
> 1463566685000 ms - (5,2)
> 1463566685000 ms batch finished
> 146356669 ms - (11,1)
> 146356669 ms - (1,11)
> 146356669 ms batch finished
> 1463566695000 ms - (1,12)
> 1463566695000 ms - (2,6)
> 1463566695000 ms - (3,4)
> 1463566695000 ms batch finished
> 146356670 ms - (1,13)
> 146356670 ms - (13,1)
> 146356670 ms batch finished
> 1463566705000 ms - (1,14)
> 1463566705000 ms - (2,7)
> 1463566705000 ms - (7,2)
> 1463566705000 ms batch finished
> 146356671 ms - (1,15)
> 146356671 ms - (3,5)
> 146356671 ms - (5,3)
> 146356671 ms batch finished
> 1463566715000 ms - (1,16)
> 1463566715000 ms - (2,8)
> 1463566715000 ms batch finished
> 146356672 ms - (1,17)
> 146356672 ms - (17,1)
> 146356672 ms batch finished
>

SLF4J binding error while running Spark using YARN as Cluster Manager

2016-05-18 Thread Anubhav Agarwal
Hi,
I am having log4j trouble while running Spark using YARN as cluster manager
in CDH 5.3.3.
I get the following error:-

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/12/yarn/nm/filecache/34/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/hadoop/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]


I know this is not world ending situation but I still need to remove
one of the StaticLoggerBinder from classpath as neither MDC nor Marker
work.

I need custom fields in my log.


Has anybody made any success in this regard? Any workaround or
suggestions are welcome.


Thank You,

Anu


Re: SLF4J binding error while running Spark using YARN as Cluster Manager

2016-05-18 Thread Marcelo Vanzin
Hi Anubhav,

This is happening because you're trying to use the configuration
generated for CDH with upstream Spark. The CDH configuration will add
extra needed jars that we don't include in our build of Spark, so
you'll end up getting duplicate classes.

You can either try to use a different Spark configuration directory
for your upstream version, or try to use the "hadoop provided"
distribution of Apache Spark; note that the latter doesn't include
certain features like Hive support.


On Wed, May 18, 2016 at 10:59 AM, Anubhav Agarwal  wrote:
> Hi,
> I am having log4j trouble while running Spark using YARN as cluster manager
> in CDH 5.3.3.
> I get the following error:-
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/data/12/yarn/nm/filecache/34/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/hadoop/cloudera/parcels/CDH-5.3.3-1.cdh5.3.3.p0.5/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>
>
> I know this is not world ending situation but I still need to remove one of
> the StaticLoggerBinder from classpath as neither MDC nor Marker work.
>
> I need custom fields in my log.
>
>
> Has anybody made any success in this regard? Any workaround or suggestions
> are welcome.
>
>
> Thank You,
>
> Anu



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Accessing Cassandra data from Spark Shell

2016-05-18 Thread Mohammed Guller
As Ben mentioned, Spark 1.5.2 does work with C*.  Make sure that you are using 
the correct version of the Spark Cassandra Connector.


Mohammed
Author: Big Data Analytics with 
Spark

From: Ben Slater [mailto:ben.sla...@instaclustr.com]
Sent: Tuesday, May 17, 2016 11:00 PM
To: u...@cassandra.apache.org; Mohammed Guller
Cc: user
Subject: Re: Accessing Cassandra data from Spark Shell

It definitely should be possible for 1.5.2 (I have used it with spark-shell and 
cassandra connector with 1.4.x). The main trick is in lining up all the 
versions and building an appropriate connector jar.

Cheers
Ben

On Wed, 18 May 2016 at 15:40 Cassa L 
mailto:lcas...@gmail.com>> wrote:
Hi,
I followed instructions to run SparkShell with Spark-1.6. It works fine. 
However, I need to use spark-1.5.2 version. With it, it does not work. I keep 
getting NoSuchMethod Errors. Is there any issue running Spark Shell for 
Cassandra using older version of Spark?


Regards,
LCassa

On Tue, May 10, 2016 at 6:48 PM, Mohammed Guller 
mailto:moham...@glassbeam.com>> wrote:
Yes, it is very simple to access Cassandra data using Spark shell.

Step 1: Launch the spark-shell with the spark-cassandra-connector package
$SPARK_HOME/bin/spark-shell --packages 
com.datastax.spark:spark-cassandra-connector_2.10:1.5.0

Step 2: Create a DataFrame pointing to your Cassandra table
val dfCassTable = sqlContext.read
 
.format("org.apache.spark.sql.cassandra")
 .options(Map( "table" 
-> "your_column_family", "keyspace" -> "your_keyspace"))
 .load()

From this point onward, you have complete access to the DataFrame API. You can 
even register it as a temporary table, if you would prefer to use SQL/HiveQL.

Mohammed
Author: Big Data Analytics with 
Spark

From: Ben Slater 
[mailto:ben.sla...@instaclustr.com]
Sent: Monday, May 9, 2016 9:28 PM
To: u...@cassandra.apache.org; user
Subject: Re: Accessing Cassandra data from Spark Shell

You can use SparkShell to access Cassandra via the Spark Cassandra connector. 
The getting started article on our support page will probably give you a good 
steer to get started even if you’re not using Instaclustr: 
https://support.instaclustr.com/hc/en-us/articles/213097877-Getting-Started-with-Instaclustr-Spark-Cassandra-

Cheers
Ben

On Tue, 10 May 2016 at 14:08 Cassa L 
mailto:lcas...@gmail.com>> wrote:
Hi,
Has anyone tried accessing Cassandra data using SparkShell? How do you do it? 
Can you use HiveContext for Cassandra data? I'm using community version of 
Cassandra-3.0

Thanks,
LCassa
--

Ben Slater
Chief Product Officer, Instaclustr
+61 437 929 798

--

Ben Slater
Chief Product Officer, Instaclustr
+61 437 929 798


Re: Unit testing framework for Spark Jobs?

2016-05-18 Thread swetha kasireddy
Hi Lars,

Do you have any examples for the methods that you described for Spark batch
and Streaming?

Thanks!

On Wed, Mar 30, 2016 at 2:41 AM, Lars Albertsson  wrote:

> Thanks!
>
> It is on my backlog to write a couple of blog posts on the topic, and
> eventually some example code, but I am currently busy with clients.
>
> Thanks for the pointer to Eventually - I was unaware. Fast exit on
> exception would be a useful addition, indeed.
>
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> +46 70 7687109
>
> On Mon, Mar 28, 2016 at 2:00 PM, Steve Loughran 
> wrote:
> > this is a good summary -Have you thought of publishing it at the end of
> a URL for others to refer to
> >
> >> On 18 Mar 2016, at 07:05, Lars Albertsson  wrote:
> >>
> >> I would recommend against writing unit tests for Spark programs, and
> >> instead focus on integration tests of jobs or pipelines of several
> >> jobs. You can still use a unit test framework to execute them. Perhaps
> >> this is what you meant.
> >>
> >> You can use any of the popular unit test frameworks to drive your
> >> tests, e.g. JUnit, Scalatest, Specs2. I prefer Scalatest, since it
> >> gives you choice of TDD vs BDD, and it is also well integrated with
> >> IntelliJ.
> >>
> >> I would also recommend against using testing frameworks tied to a
> >> processing technology, such as Spark Testing Base. Although it does
> >> seem well crafted, and makes it easy to get started with testing,
> >> there are drawbacks:
> >>
> >> 1. I/O routines are not tested. Bundled test frameworks typically do
> >> not materialise datasets on storage, but pass them directly in memory.
> >> (I have not verified this for Spark Testing Base, but it looks so.)
> >> I/O routines are therefore not exercised, and they often hide bugs,
> >> e.g. related to serialisation.
> >>
> >> 2. You create a strong coupling between processing technology and your
> >> tests. If you decide to change processing technology (which can happen
> >> soon in this fast paced world...), you need to rewrite your tests.
> >> Therefore, during a migration process, the tests cannot detect bugs
> >> introduced in migration, and help you migrate fast.
> >>
> >> I recommend that you instead materialise input datasets on local disk,
> >> run your Spark job, which writes output datasets to local disk, read
> >> output from disk, and verify the results. You can still use Spark
> >> routines to read and write input and output datasets. A Spark context
> >> is expensive to create, so for speed, I would recommend reusing the
> >> Spark context between input generation, running the job, and reading
> >> output.
> >>
> >> This is easy to set up, so you don't need a dedicated framework for
> >> it. Just put your common boilerplate in a shared test trait or base
> >> class.
> >>
> >> In the future, when you want to replace your Spark job with something
> >> shinier, you can still use the old tests, and only replace the part
> >> that runs your job, giving you some protection from regression bugs.
> >>
> >>
> >> Testing Spark Streaming applications is a different beast, and you can
> >> probably not reuse much from your batch testing.
> >>
> >> For testing streaming applications, I recommend that you run your
> >> application inside a unit test framework, e.g, Scalatest, and have the
> >> test setup create a fixture that includes your input and output
> >> components. For example, if your streaming application consumes from
> >> Kafka and updates tables in Cassandra, spin up single node instances
> >> of Kafka and Cassandra on your local machine, and connect your
> >> application to them. Then feed input to a Kafka topic, and wait for
> >> the result to appear in Cassandra.
> >>
> >> With this setup, your application still runs in Scalatest, the tests
> >> run without custom setup in maven/sbt/gradle, and you can easily run
> >> and debug inside IntelliJ.
> >>
> >> Docker is suitable for spinning up external components. If you use
> >> Kafka, the Docker image spotify/kafka is useful, since it bundles
> >> Zookeeper.
> >>
> >> When waiting for output to appear, don't sleep for a long time and
> >> then check, since it will slow down your tests. Instead enter a loop
> >> where you poll for the results and sleep for a few milliseconds in
> >> between, with a long timeout (~30s) before the test fails with a
> >> timeout.
> >
> > org.scalatest.concurrent.Eventually is your friend there
> >
> > eventually(stdTimeout, stdInterval) {
> > listRestAPIApplications(connector, webUI, true) should
> contain(expectedAppId)
> > }
> >
> > It has good exponential backoff, for fast initial success without using
> too much CPU later, and is simple to use
> >
> > If it has weaknesses in my tests, they are
> >
> > 1. it will retry on all exceptions, rather than assertions. If there's a
> bug in the test code then it manifests as a timeout. ( I think I could play
> with Suite.anExceptionThatShouldCauseAnAbort()) here.
> > 2. it's timeout act

Re: Can Pyspark access Scala API?

2016-05-18 Thread Ted Yu
Not sure if you have seen this (for 2.0):

[SPARK-15087][CORE][SQL] Remove AccumulatorV2.localValue and keep only value

Can you tell us your use case ?

On Tue, May 17, 2016 at 9:16 PM, Abi  wrote:

> Can Pyspark access Scala API? The accumulator in pysPark does not have
> local variable available . The Scala API does have it available


Re: Can Pyspark access Scala API?

2016-05-18 Thread Abi
Thanks for that. But the question is more general. Can pyspark access Scala 
somehow ?

On May 18, 2016 3:53:50 PM EDT, Ted Yu  wrote:
>Not sure if you have seen this (for 2.0):
>
>[SPARK-15087][CORE][SQL] Remove AccumulatorV2.localValue and keep only
>value
>
>Can you tell us your use case ?
>
>On Tue, May 17, 2016 at 9:16 PM, Abi 
>wrote:
>
>> Can Pyspark access Scala API? The accumulator in pysPark does not
>have
>> local variable available . The Scala API does have it available


Re: Can Pyspark access Scala API?

2016-05-18 Thread Ted Yu
Please take a look at python/pyspark/ml/wrapper.py

On Wed, May 18, 2016 at 1:08 PM, Abi  wrote:

> Thanks for that. But the question is more general. Can pyspark access
> Scala somehow ?
>
>
> On May 18, 2016 3:53:50 PM EDT, Ted Yu  wrote:
>>
>> Not sure if you have seen this (for 2.0):
>>
>> [SPARK-15087][CORE][SQL] Remove AccumulatorV2.localValue and keep only
>> value
>>
>> Can you tell us your use case ?
>>
>> On Tue, May 17, 2016 at 9:16 PM, Abi  wrote:
>>
>>> Can Pyspark access Scala API? The accumulator in pysPark does not have
>>> local variable available . The Scala API does have it available
>>
>>
>>


Couldn't find leader offsets

2016-05-18 Thread samsayiam
I have seen questions posted about this on SO and on this list but haven't
seen a response that addresses my issue.  I am trying to create a direct
stream connection to a kafka topic but it fails with Couldn't find leader
offsets for Set(...).  If I run a kafka consumer I can read the topic but
can't do it with spark.  Can someone tell me where I'm going wrong here?

Test topic info:
vagrant@broker1$ ./bin/kafka-topics.sh --describe --zookeeper 10.30.3.2:2181
--topic footopic
Topic:footopic  PartitionCount:1ReplicationFactor:1 Configs:
Topic: footopic Partition: 0Leader: 0   Replicas: 0   Isr: 0

consuming from kafka:
vagrant@broker1$ bin/kafka-console-consumer.sh --zookeeper 10.30.3.2:2181
--from-beginning --topic footopic
this is a test
and so is this
goodbye

Attempting from spark:
spark-submit --class com.foo.Experiment --master local[*] --jars
/vagrant/spark-streaming-kafka-assembly_2.10-1.6.1.jar
/vagrant/spark-app-1.0-SNAPSHOT.jar 10.0.7.34:9092

...

Using kafkaparams: {auto.offset.reset=smallest,
metadata.broker.list=10.0.7.34:9092}
16/05/18 20:27:21 INFO utils.VerifiableProperties: Verifying properties
16/05/18 20:27:21 INFO utils.VerifiableProperties: Property
auto.offset.reset is overridden to smallest
16/05/18 20:27:21 INFO utils.VerifiableProperties: Property group.id is
overridden to 
16/05/18 20:27:21 INFO utils.VerifiableProperties: Property
zookeeper.connect is overridden to 
16/05/18 20:27:21 INFO consumer.SimpleConsumer: Reconnect due to socket
error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException:
java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([footopic,0])
...


Any help is appreciated.

Thanks,
ch.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Couldn-t-find-leader-offsets-tp26978.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is there a way to run a jar built for scala 2.11 on spark 1.6.1 (which is using 2.10?)

2016-05-18 Thread Sergey Zelvenskiy



Re: Unit testing framework for Spark Jobs?

2016-05-18 Thread Todd Nist
Perhaps these may be of some use:

https://github.com/mkuthan/example-spark
http://mkuthan.github.io/blog/2015/03/01/spark-unit-testing/
https://github.com/holdenk/spark-testing-base

On Wed, May 18, 2016 at 2:14 PM, swetha kasireddy  wrote:

> Hi Lars,
>
> Do you have any examples for the methods that you described for Spark
> batch and Streaming?
>
> Thanks!
>
> On Wed, Mar 30, 2016 at 2:41 AM, Lars Albertsson 
> wrote:
>
>> Thanks!
>>
>> It is on my backlog to write a couple of blog posts on the topic, and
>> eventually some example code, but I am currently busy with clients.
>>
>> Thanks for the pointer to Eventually - I was unaware. Fast exit on
>> exception would be a useful addition, indeed.
>>
>> Lars Albertsson
>> Data engineering consultant
>> www.mapflat.com
>> +46 70 7687109
>>
>> On Mon, Mar 28, 2016 at 2:00 PM, Steve Loughran 
>> wrote:
>> > this is a good summary -Have you thought of publishing it at the end of
>> a URL for others to refer to
>> >
>> >> On 18 Mar 2016, at 07:05, Lars Albertsson  wrote:
>> >>
>> >> I would recommend against writing unit tests for Spark programs, and
>> >> instead focus on integration tests of jobs or pipelines of several
>> >> jobs. You can still use a unit test framework to execute them. Perhaps
>> >> this is what you meant.
>> >>
>> >> You can use any of the popular unit test frameworks to drive your
>> >> tests, e.g. JUnit, Scalatest, Specs2. I prefer Scalatest, since it
>> >> gives you choice of TDD vs BDD, and it is also well integrated with
>> >> IntelliJ.
>> >>
>> >> I would also recommend against using testing frameworks tied to a
>> >> processing technology, such as Spark Testing Base. Although it does
>> >> seem well crafted, and makes it easy to get started with testing,
>> >> there are drawbacks:
>> >>
>> >> 1. I/O routines are not tested. Bundled test frameworks typically do
>> >> not materialise datasets on storage, but pass them directly in memory.
>> >> (I have not verified this for Spark Testing Base, but it looks so.)
>> >> I/O routines are therefore not exercised, and they often hide bugs,
>> >> e.g. related to serialisation.
>> >>
>> >> 2. You create a strong coupling between processing technology and your
>> >> tests. If you decide to change processing technology (which can happen
>> >> soon in this fast paced world...), you need to rewrite your tests.
>> >> Therefore, during a migration process, the tests cannot detect bugs
>> >> introduced in migration, and help you migrate fast.
>> >>
>> >> I recommend that you instead materialise input datasets on local disk,
>> >> run your Spark job, which writes output datasets to local disk, read
>> >> output from disk, and verify the results. You can still use Spark
>> >> routines to read and write input and output datasets. A Spark context
>> >> is expensive to create, so for speed, I would recommend reusing the
>> >> Spark context between input generation, running the job, and reading
>> >> output.
>> >>
>> >> This is easy to set up, so you don't need a dedicated framework for
>> >> it. Just put your common boilerplate in a shared test trait or base
>> >> class.
>> >>
>> >> In the future, when you want to replace your Spark job with something
>> >> shinier, you can still use the old tests, and only replace the part
>> >> that runs your job, giving you some protection from regression bugs.
>> >>
>> >>
>> >> Testing Spark Streaming applications is a different beast, and you can
>> >> probably not reuse much from your batch testing.
>> >>
>> >> For testing streaming applications, I recommend that you run your
>> >> application inside a unit test framework, e.g, Scalatest, and have the
>> >> test setup create a fixture that includes your input and output
>> >> components. For example, if your streaming application consumes from
>> >> Kafka and updates tables in Cassandra, spin up single node instances
>> >> of Kafka and Cassandra on your local machine, and connect your
>> >> application to them. Then feed input to a Kafka topic, and wait for
>> >> the result to appear in Cassandra.
>> >>
>> >> With this setup, your application still runs in Scalatest, the tests
>> >> run without custom setup in maven/sbt/gradle, and you can easily run
>> >> and debug inside IntelliJ.
>> >>
>> >> Docker is suitable for spinning up external components. If you use
>> >> Kafka, the Docker image spotify/kafka is useful, since it bundles
>> >> Zookeeper.
>> >>
>> >> When waiting for output to appear, don't sleep for a long time and
>> >> then check, since it will slow down your tests. Instead enter a loop
>> >> where you poll for the results and sleep for a few milliseconds in
>> >> between, with a long timeout (~30s) before the test fails with a
>> >> timeout.
>> >
>> > org.scalatest.concurrent.Eventually is your friend there
>> >
>> > eventually(stdTimeout, stdInterval) {
>> > listRestAPIApplications(connector, webUI, true) should
>> contain(expectedAppId)
>> > }
>> >
>> > It has good exponential backoff, for fast initi

Re: Is there a way to run a jar built for scala 2.11 on spark 1.6.1 (which is using 2.10?)

2016-05-18 Thread Koert Kuipers
no but you can trivially build spark 1.6.1 for scala 2.11

On Wed, May 18, 2016 at 6:11 PM, Sergey Zelvenskiy 
wrote:

>
>


Re: Is there a way to run a jar built for scala 2.11 on spark 1.6.1 (which is using 2.10?)

2016-05-18 Thread Ted Yu
Depending on the version of hadoop you use, you may find tar ball prebuilt
with Scala 2.11:

https://s3.amazonaws.com/spark-related-packages

FYI

On Wed, May 18, 2016 at 3:35 PM, Koert Kuipers  wrote:

> no but you can trivially build spark 1.6.1 for scala 2.11
>
> On Wed, May 18, 2016 at 6:11 PM, Sergey Zelvenskiy 
> wrote:
>
>>
>>
>


Spark UI metrics - Task execution time and number of records processed

2016-05-18 Thread Nirav Patel
Hi,

I have been noticing that for shuffled tasks(groupBy, Join) reducer tasks
are not evenly loaded. Most of them (90%) finished super fast but there are
some outliers that takes much longer as you can see from "Max" value in
following metric. Metric is from Join operation done on two RDDs. I tried
repartitioning both rdd with HashPartitioner before join. It's certainly
faster then before where I was not doing repartitioning. But it still slows
and looks like its not allocating equal number of records to each
partitions. Could this be just result of data skew? Or something else can
be done here?

Summary Metrics for 4000 Completed Tasks
MetricMin25th percentileMedian75th percentileMax
Duration 89 ms 3 s 7 s 14 s 5.9 min

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Does Structured Streaming support Kafka as data source?

2016-05-18 Thread Todd
Hi,
I brief the spark code, and it looks that structured streaming doesn't support 
kafka as data source yet?


How to perform reduce operation in the same order as partition indexes

2016-05-18 Thread Pulasthi Supun Wickramasinghe
Hi Devs/All,

I am pretty new to Spark. I have a program which does some map reduce
operations with matrices. Here *shortrddFinal* is a of type "
*RDD[Array[Short]]"* and consists of several partitions

*var BC =
shortrddFinal.mapPartitionsWithIndex(calculateBCInternal).reduce(mergeBC)*

The map function produces a "Array[Array[Double]]" and at the reduce step i
need to merge all the 2 dimensional double arrays produced for each
partition into one big matrix. But i also need to keep the same order as
the partitions. that is the 2D double array produced for partition 0 should
be the first set of rows in the matrix and then the 2d double array
produced for partition 1 and so on. Is there a way to enforce the order in
the reduce step.

Thanks in advance

Best Regards,
Pulasthi
-- 
Pulasthi S. Wickramasinghe
Graduate Student  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035


RE: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.

2016-05-18 Thread Ramaswamy, Muthuraman
I am using Spark 1.6.1 and Kafka 0.9+ It works for both receiver and 
receiver-less mode.

One thing I noticed when you specify invalid topic name, KafkaUtils doesn't 
fetch any messages. So, check you have specified the topic name correctly.

~Muthu

From: Mail.com [pradeep.mi...@mail.com]
Sent: Monday, May 16, 2016 9:33 PM
To: Ramaswamy, Muthuraman
Cc: Cody Koeninger; spark users
Subject: Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent 
Serializers as Value Decoder.

Hi Muthu,

Are you on spark 1.4.1 and Kafka 0.8.2? I have a similar issue even for simple 
string messages.

Console producer and consumer work fine. But spark always reruns empty RDD. I 
am using Receiver based Approach.

Thanks,
Pradeep

> On May 16, 2016, at 8:19 PM, Ramaswamy, Muthuraman 
>  wrote:
>
> Yes, I can see the messages. Also, I wrote a quick custom decoder for avro 
> and it works fine for the following:
>
>>> kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": 
>>> brokers}, valueDecoder=decoder)
>
> But, when I use the Confluent Serializers to leverage the Schema Registry 
> (based on the link shown below), it doesn’t work for me. I am not sure 
> whether I need to configure any more details to consume the Schema Registry. 
> I can fetch the schema from the schema registry based on is Ids. The decoder 
> method is not returning any values for me.
>
> ~Muthu
>
>
>
>> On 5/16/16, 10:49 AM, "Cody Koeninger"  wrote:
>>
>> Have you checked to make sure you can receive messages just using a
>> byte array for value?
>>
>> On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman
>>  wrote:
>>> I am trying to consume AVRO formatted message through
>>> KafkaUtils.createDirectStream. I followed the listed below example (refer
>>> link) but the messages are not being fetched by the Stream.
>>>
>>> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser&d=CwIBaQ&c=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk&r=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU&m=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0&s=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8&e=
>>>
>>> Is there any code missing that I must add to make the above sample work.
>>> Say, I am not sure how the confluent serializers would know the avro schema
>>> info as it knows only the Schema Registry URL info.
>>>
>>> Appreciate your help.
>>>
>>> ~Muthu
> ?B‹CB•?È?[œÝXœØÜšX™K??K[XZ[?ˆ?\Ù\‹][œÝXœØÜšX™P?Ü?\šË˜\?XÚ?K›Ü™ÃB‘›Üˆ?Y??]?[Û˜[??ÛÛ[X[™?Ë??K[XZ[?ˆ?\Ù\‹Z?[???Ü?\šË˜\?XÚ?K›Ü™ÃBƒB

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Latency experiment without losing executors

2016-05-18 Thread gkumar7
I would like to test the latency (tasks/s) perceived in a simple application
on Apache Spark.

The idea: The workers will generate random data to be placed in a list. The
final action (count) will count the total number of data points generated.

Below, the numberOfPartitions is equal to the number of datapoints which
need to be generated (datapoints are integers). 

Although the code works as expected, a total of 119 spark executors were
killed while running with 64 slaves. I feel this is because since spark
assigns executors to each node, the amount of total partitions each node is
assigned to compute may be larger than the available memory on that node.
This causes these executors to be killed and therefore, the latency
measurement I would like to analyze is inaccurate.

Any assistance with code cleanup below or how to fix the above issue to
decrease the number of killed executors, would be much appreciated.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Latency-experiment-without-losing-executors-tp26981.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accessing Cassandra data from Spark Shell

2016-05-18 Thread Cassa L
I tried all combinations of spark-cassandra connector. Didn't work.
Finally, I downgraded spark to 1.5.1 and now it works.
LCassa

On Wed, May 18, 2016 at 11:11 AM, Mohammed Guller 
wrote:

> As Ben mentioned, Spark 1.5.2 does work with C*.  Make sure that you are
> using the correct version of the Spark Cassandra Connector.
>
>
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> 
>
>
>
> *From:* Ben Slater [mailto:ben.sla...@instaclustr.com]
> *Sent:* Tuesday, May 17, 2016 11:00 PM
> *To:* u...@cassandra.apache.org; Mohammed Guller
> *Cc:* user
>
> *Subject:* Re: Accessing Cassandra data from Spark Shell
>
>
>
> It definitely should be possible for 1.5.2 (I have used it with
> spark-shell and cassandra connector with 1.4.x). The main trick is in
> lining up all the versions and building an appropriate connector jar.
>
>
>
> Cheers
>
> Ben
>
>
>
> On Wed, 18 May 2016 at 15:40 Cassa L  wrote:
>
> Hi,
>
> I followed instructions to run SparkShell with Spark-1.6. It works fine.
> However, I need to use spark-1.5.2 version. With it, it does not work. I
> keep getting NoSuchMethod Errors. Is there any issue running Spark Shell
> for Cassandra using older version of Spark?
>
>
>
>
>
> Regards,
>
> LCassa
>
>
>
> On Tue, May 10, 2016 at 6:48 PM, Mohammed Guller 
> wrote:
>
> Yes, it is very simple to access Cassandra data using Spark shell.
>
>
>
> Step 1: Launch the spark-shell with the spark-cassandra-connector package
>
> $SPARK_HOME/bin/spark-shell --packages
> com.datastax.spark:spark-cassandra-connector_2.10:1.5.0
>
>
>
> Step 2: Create a DataFrame pointing to your Cassandra table
>
> val dfCassTable = sqlContext.read
>
>
> .format("org.apache.spark.sql.cassandra")
>
>  .options(Map(
> "table" -> "your_column_family", "keyspace" -> "your_keyspace"))
>
>  .load()
>
>
>
> From this point onward, you have complete access to the DataFrame API. You
> can even register it as a temporary table, if you would prefer to use
> SQL/HiveQL.
>
>
>
> Mohammed
>
> Author: Big Data Analytics with Spark
> 
>
>
>
> *From:* Ben Slater [mailto:ben.sla...@instaclustr.com]
> *Sent:* Monday, May 9, 2016 9:28 PM
> *To:* u...@cassandra.apache.org; user
> *Subject:* Re: Accessing Cassandra data from Spark Shell
>
>
>
> You can use SparkShell to access Cassandra via the Spark Cassandra
> connector. The getting started article on our support page will probably
> give you a good steer to get started even if you’re not using Instaclustr:
> https://support.instaclustr.com/hc/en-us/articles/213097877-Getting-Started-with-Instaclustr-Spark-Cassandra-
>
>
>
> Cheers
>
> Ben
>
>
>
> On Tue, 10 May 2016 at 14:08 Cassa L  wrote:
>
> Hi,
>
> Has anyone tried accessing Cassandra data using SparkShell? How do you do
> it? Can you use HiveContext for Cassandra data? I'm using community version
> of Cassandra-3.0
>
>
>
> Thanks,
>
> LCassa
>
> --
>
> 
>
> Ben Slater
>
> Chief Product Officer, Instaclustr
>
> +61 437 929 798
>
>
>
> --
>
> 
>
> Ben Slater
>
> Chief Product Officer, Instaclustr
>
> +61 437 929 798
>


Any way to pass custom hadoop conf to through spark thrift server ?

2016-05-18 Thread Jeff Zhang
I want to pass one custom hadoop conf to spark thrift server so that both
driver and executor side can get this conf. And I also want this custom
hadoop conf only detected by this user's job who set this conf.  Is it
possible for spark thrift server now ? Thanks



-- 
Best Regards

Jeff Zhang


Tar File: On Spark

2016-05-18 Thread ayan guha
Hi

I have few tar files in HDFS in a single folder. each file has multiple
files in it.

tar1:
  - f1.txt
  - f2.txt
tar2:
  - f1.txt
  - f2.txt

(each tar file will have exact same number of files, same name)

I am trying to find a way (spark or pig) to extract them to their own
folders.

f1
  - tar1_f1.txt
  - tar2_f1.txt
f2:
   - tar1_f2.txt
   - tar1_f2.txt

Any help?



-- 
Best Regards,
Ayan Guha


Re: Tar File: On Spark

2016-05-18 Thread Sun Rui
1. create a temp dir on HDFS, say “/tmp”
2. write a script to create in the temp dir one file for each tar file. Each 
file has only one line:

3. Write a spark application. It is like:
  val rdd = sc.textFile ()
  rdd.map { line =>
   construct an untar command using the path information in “line” and 
launches the command
  }

> On May 19, 2016, at 14:42, ayan guha  wrote:
> 
> Hi
> 
> I have few tar files in HDFS in a single folder. each file has multiple files 
> in it. 
> 
> tar1:
>   - f1.txt
>   - f2.txt
> tar2:
>   - f1.txt
>   - f2.txt
> 
> (each tar file will have exact same number of files, same name)
> 
> I am trying to find a way (spark or pig) to extract them to their own 
> folders. 
> 
> f1
>   - tar1_f1.txt
>   - tar2_f1.txt
> f2:
>- tar1_f2.txt
>- tar1_f2.txt
> 
> Any help? 
> 
> 
> 
> -- 
> Best Regards,
> Ayan Guha



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



HBase / Spark Kerberos problem

2016-05-18 Thread philipp.meyerhoefer
Hi all,

I have been puzzling over a Kerberos problem for a while now and wondered if 
anyone can help.

For spark-submit, I specify --master yarn-client --keytab x --principal y, 
which creates my SparkContext fine.
Connections to Zookeeper Quorum to find the HBase master work well too.
But when it comes to a .count() action on the RDD, I am always presented with 
the stack trace at the end of this mail.

We are using CDH5.5.2 (spark 1.5.0), and com.cloudera.spark.hbase.HBaseContext 
is a wrapper around TableInputFormat/hadoopRDD (see 
https://github.com/cloudera-labs/SparkOnHBase), as you can see in the stack 
trace.

Am I doing something obvious wrong here?
A similar flow, inside test code, works well, only going via spark-submit 
exposes this issue.

Code snippet (I have tried using the commented-out lines in various 
combinations, without success):

   val conf = new SparkConf().
  set("spark.shuffle.consolidateFiles", "true").
  set("spark.kryo.registrationRequired", "false").
  set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
  set("spark.kryoserializer.buffer", "30m")
val sc = new SparkContext(conf)
val cfg = sc.hadoopConfiguration
//cfg.addResource(new 
org.apache.hadoop.fs.Path("/etc/hbase/conf/hbase-site.xml"))
//
UserGroupInformation.getCurrentUser.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS)
//cfg.set("hbase.security.authentication", "kerberos")
val hc = new HBaseContext(sc, cfg)
val scan = new Scan
scan.setTimeRange(startMillis, endMillis)
val matchesInRange = hc.hbaseRDD(MY_TABLE, scan, resultToMatch)
val cnt = matchesInRange.count()
log.info(s"matches in range $cnt")

Stack trace / log:

16/05/17 17:04:47 INFO SparkContext: Starting job: count at Analysis.scala:93
16/05/17 17:04:47 INFO DAGScheduler: Got job 0 (count at Analysis.scala:93) 
with 1 output partitions
16/05/17 17:04:47 INFO DAGScheduler: Final stage: ResultStage 0(count at 
Analysis.scala:93)
16/05/17 17:04:47 INFO DAGScheduler: Parents of final stage: List()
16/05/17 17:04:47 INFO DAGScheduler: Missing parents: List()
16/05/17 17:04:47 INFO DAGScheduler: Submitting ResultStage 0 
(MapPartitionsRDD[1] at map at HBaseContext.scala:580), which has no missing 
parents
16/05/17 17:04:47 INFO MemoryStore: ensureFreeSpace(3248) called with 
curMem=428022, maxMem=244187136
16/05/17 17:04:47 INFO MemoryStore: Block broadcast_3 stored as values in 
memory (estimated size 3.2 KB, free 232.5 MB)
16/05/17 17:04:47 INFO MemoryStore: ensureFreeSpace(2022) called with 
curMem=431270, maxMem=244187136
16/05/17 17:04:47 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in 
memory (estimated size 2022.0 B, free 232.5 MB)
16/05/17 17:04:47 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 
10.6.164.40:33563 (size: 2022.0 B, free: 232.8 MB)
16/05/17 17:04:47 INFO SparkContext: Created broadcast 3 from broadcast at 
DAGScheduler.scala:861
16/05/17 17:04:47 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 0 (MapPartitionsRDD[1] at map at HBaseContext.scala:580)
16/05/17 17:04:47 INFO YarnScheduler: Adding task set 0.0 with 1 tasks
16/05/17 17:04:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
hpg-dev-vm, partition 0,PROCESS_LOCAL, 2208 bytes)
16/05/17 17:04:47 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 
hpg-dev-vm:52698 (size: 2022.0 B, free: 388.4 MB)
16/05/17 17:04:48 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
hpg-dev-vm:52698 (size: 26.0 KB, free: 388.4 MB)
16/05/17 17:04:57 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
hpg-dev-vm): org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't 
get the location
at 
org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:155)
at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:63)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
at 
org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:314)
at 
org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:289)
at 
org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:161)
at 
org.apache.hadoop.hbase.client.ClientScanner.(ClientScanner.java:156)
at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
at 
org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
at 
org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
at 
org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:138)