Re: Best way to read XML data from RDD

2016-08-19 Thread Felix Cheung
Ah. Have you tried Jackson?
https://github.com/FasterXML/jackson-dataformat-xml/blob/master/README.md


_
From: Diwakar Dhanuskodi 
>
Sent: Friday, August 19, 2016 9:41 PM
Subject: Re: Best way to read XML data from RDD
To: Felix Cheung >, 
user >


Yes . It accepts a xml file as source but not RDD. The XML data embedded  
inside json is streamed from kafka cluster.  So I could get it as RDD.
Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
function  but  performance  wise I am not happy as it takes 4 minutes to parse 
XML from 2 million messages in a 3 nodes 100G 4 cpu each environment.


Sent from Samsung Mobile.


 Original message 
From: Felix Cheung >
Date:20/08/2016 09:49 (GMT+05:30)
To: Diwakar Dhanuskodi 
>, user 
>
Cc:
Subject: Re: Best way to read XML data from RDD

Have you tried

https://github.com/databricks/spark-xml
?




On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar 
Dhanuskodi"> 
wrote:

Hi,

There is a RDD with json data. I could read json data using rdd.read.json . The 
json data has XML data in couple of key-value paris.

Which is the best method to read and parse XML from rdd. Is there any specific 
xml libraries for spark. Could anyone help on this.

Thanks.




Re: Best way to read XML data from RDD

2016-08-19 Thread Diwakar Dhanuskodi
Yes . It accepts a xml file as source but not RDD. The XML data embedded  
inside json is streamed from kafka cluster.  So I could get it as RDD. 
Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
function  but  performance  wise I am not happy as it takes 4 minutes to parse 
XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 


Sent from Samsung Mobile.

 Original message From: Felix Cheung 
 Date:20/08/2016  09:49  (GMT+05:30) 
To: Diwakar Dhanuskodi , user 
 Cc:  Subject: Re: Best way to 
read XML data from RDD 
Have you tried

https://github.com/databricks/spark-xml
?




On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
 wrote:

Hi, 

There is a RDD with json data. I could read json data using rdd.read.json . The 
json data has XML data in couple of key-value paris. 

Which is the best method to read and parse XML from rdd. Is there any specific 
xml libraries for spark. Could anyone help on this.

Thanks. 

Re: Best way to read XML data from RDD

2016-08-19 Thread Felix Cheung
Have you tried

https://github.com/databricks/spark-xml
?




On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
> wrote:

Hi,

There is a RDD with json data. I could read json data using rdd.read.json . The 
json data has XML data in couple of key-value paris.

Which is the best method to read and parse XML from rdd. Is there any specific 
xml libraries for spark. Could anyone help on this.

Thanks.


Re: How Spark HA works

2016-08-19 Thread Charles Nnamdi Akalugwu
I am experiencing this exact issue. Does anyone know what's going on with
the zookeeper setup?

On Jul 5, 2016 10:34 AM, "Akmal Abbasov"  wrote:
>
> Hi,
> I'm trying to understand how Spark HA works. I'm using Spark 1.6.1 and
Zookeeper 3.4.6.
> I've add the following line to $SPARK_HOME/conf/spark-env.sh
> export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181,zk3:2181
-Dspark.deploy.zookeeper.dir=/spark
> It's working so far.
> I'd like to setup a link which will always go to active master UI(I'm
using Spark in Standalone).
> I've checked the znode /spark, and it contains
> [leader_election, master_status]
> I'm assuming that master_status znode will contain ip address of the
current active master, is it true? Because in my case this znode isn't
updated after failover.
> And how /spark/leader_election works, because it doesn't contain any data.
> Thank you.
>
> Regards,
> Akmal
>
>


Re: spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-19 Thread chandan prakash
Ohh that explains the reason.
My use case does not need state management.
So i guess i am better off without checkpointing.
Thank you for clarification.

Regards,
Chandan

On Sat, Aug 20, 2016 at 8:24 AM, Cody Koeninger  wrote:

> Checkpointing is required to be turned on in certain situations (e.g.
> updateStateByKey), but you're certainly not required to rely on it for
> fault tolerance.  I try not to.
>
> On Fri, Aug 19, 2016 at 1:51 PM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Thanks Cody for the pointer.
>>
>> I am able to do this now. Not using checkpointing. Rather storing offsets
>> in zookeeper for fault tolerance.
>> Spark Config changes now getting reflected in code deployment.
>> *Using this api :*
>> *KafkaUtils.createDirectStream[String, String, StringDecoder,
>> StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets,
>> messageHandler)*
>> *instead of :*
>> *KafkaUtils.createDirectStream[String, String, StringDecoder,
>> StringDecoder](ssc, kafkaParams, topicsSet)*
>>
>> *One Quick question :
>> *What is need of checkpointing if we can achieve both fault tolerance and 
>> application code/config changes  without checkpointing? Is there anything 
>> else which checkpointing gives? I might be missing something.
>>
>>
>> Regards,
>> Chandan
>>
>>
>> On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger 
>> wrote:
>>
>>> Yeah the solutions are outlined in the doc link.  Or just don't rely on
>>> checkpoints
>>> On Aug 18, 2016 8:53 AM, "chandan prakash" 
>>> wrote:
>>>
 Yes,
  i looked into the source code implementation.  sparkConf is serialized
 and saved during checkpointing and re-created from the checkpoint directory
 at time of restart. So any sparkConf parameter which you load from
 application.config and set in sparkConf object in code cannot be changed
 and reflected with checkpointing.  :(

 Is there is any work around of reading changed sparkConf parameter
 value with using checkpoiting?
 p.s. i am not adding new parameter, i am just changing values of some
 existing sparkConf param.

 This is a common case and there must be some solution for this.

 On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger 
 wrote:

> Checkpointing is not kafka-specific.  It encompasses metadata about
> the application.  You can't re-use a checkpoint if your application has
> changed.
>
> http://spark.apache.org/docs/latest/streaming-programming-gu
> ide.html#upgrading-application-code
>
>
> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Is it possible that i use checkpoint directory to restart streaming
>> but with modified parameter value in config file (e.g.  username/password
>> for db connection)  ?
>> Thanks in advance.
>>
>> Regards,
>> Chandan
>>
>> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
>> chandanbaran...@gmail.com> wrote:
>>
>>> Hi,
>>> I am using direct kafka with checkpointing of offsets same as :
>>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>>> src/main/scala/example/IdempotentExample.scala
>>>
>>> I need to change some parameters like db connection params :
>>> username/password for db connection .
>>> I stopped streaming gracefully ,changed parameters in config file
>>> and restarted streaming.
>>> *Issue : changed parameters  username/password are not being
>>> considered.*
>>>
>>> *Question* :
>>> As per my understanding , Checkpointing should only save offsets of
>>> kafka partitions and not the credentials of the db connection.
>>> Why its picking old db connection params ?
>>>
>>> I am declaring params in main method and not in setUpSsc(0 method.
>>> My code is identical to that in the above program link  as below:
>>> val jdbcDriver = conf.getString("jdbc.driver")
>>> val jdbcUrl = conf.getString("jdbc.url")
>>> *val jdbcUser = conf.getString("jdbc.user")*
>>> * val jdbcPassword = conf.getString("jdbc.password")*
>>> // while the job doesn't strictly need checkpointing,
>>> // we'll checkpoint to avoid replaying the whole kafka log in case
>>> of failure
>>> val checkpointDir = conf.getString("checkpointDir")
>>> val ssc = StreamingContext.getOrCreate(
>>> checkpointDir,
>>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>>> *jdbcPassword*, checkpointDir) _
>>> )
>>>
>>>
>>>
>>> --
>>> Chandan Prakash
>>>
>>>
>>
>>
>> --
>> Chandan Prakash
>>
>>
>


 --
 Chandan Prakash


>>
>>
>> --
>> Chandan Prakash
>>
>>
>


-- 
Chandan Prakash


Re: spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-19 Thread Cody Koeninger
Checkpointing is required to be turned on in certain situations (e.g.
updateStateByKey), but you're certainly not required to rely on it for
fault tolerance.  I try not to.

On Fri, Aug 19, 2016 at 1:51 PM, chandan prakash 
wrote:

> Thanks Cody for the pointer.
>
> I am able to do this now. Not using checkpointing. Rather storing offsets
> in zookeeper for fault tolerance.
> Spark Config changes now getting reflected in code deployment.
> *Using this api :*
> *KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets,
> messageHandler)*
> *instead of :*
> *KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)*
>
> *One Quick question :
> *What is need of checkpointing if we can achieve both fault tolerance and 
> application code/config changes  without checkpointing? Is there anything 
> else which checkpointing gives? I might be missing something.
>
>
> Regards,
> Chandan
>
>
> On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger 
> wrote:
>
>> Yeah the solutions are outlined in the doc link.  Or just don't rely on
>> checkpoints
>> On Aug 18, 2016 8:53 AM, "chandan prakash" 
>> wrote:
>>
>>> Yes,
>>>  i looked into the source code implementation.  sparkConf is serialized
>>> and saved during checkpointing and re-created from the checkpoint directory
>>> at time of restart. So any sparkConf parameter which you load from
>>> application.config and set in sparkConf object in code cannot be changed
>>> and reflected with checkpointing.  :(
>>>
>>> Is there is any work around of reading changed sparkConf parameter value
>>> with using checkpoiting?
>>> p.s. i am not adding new parameter, i am just changing values of some
>>> existing sparkConf param.
>>>
>>> This is a common case and there must be some solution for this.
>>>
>>> On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger 
>>> wrote:
>>>
 Checkpointing is not kafka-specific.  It encompasses metadata about the
 application.  You can't re-use a checkpoint if your application has 
 changed.

 http://spark.apache.org/docs/latest/streaming-programming-gu
 ide.html#upgrading-application-code


 On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
 chandanbaran...@gmail.com> wrote:

> Is it possible that i use checkpoint directory to restart streaming
> but with modified parameter value in config file (e.g.  username/password
> for db connection)  ?
> Thanks in advance.
>
> Regards,
> Chandan
>
> On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
> chandanbaran...@gmail.com> wrote:
>
>> Hi,
>> I am using direct kafka with checkpointing of offsets same as :
>> https://github.com/koeninger/kafka-exactly-once/blob/master/
>> src/main/scala/example/IdempotentExample.scala
>>
>> I need to change some parameters like db connection params :
>> username/password for db connection .
>> I stopped streaming gracefully ,changed parameters in config file and
>> restarted streaming.
>> *Issue : changed parameters  username/password are not being
>> considered.*
>>
>> *Question* :
>> As per my understanding , Checkpointing should only save offsets of
>> kafka partitions and not the credentials of the db connection.
>> Why its picking old db connection params ?
>>
>> I am declaring params in main method and not in setUpSsc(0 method.
>> My code is identical to that in the above program link  as below:
>> val jdbcDriver = conf.getString("jdbc.driver")
>> val jdbcUrl = conf.getString("jdbc.url")
>> *val jdbcUser = conf.getString("jdbc.user")*
>> * val jdbcPassword = conf.getString("jdbc.password")*
>> // while the job doesn't strictly need checkpointing,
>> // we'll checkpoint to avoid replaying the whole kafka log in case of
>> failure
>> val checkpointDir = conf.getString("checkpointDir")
>> val ssc = StreamingContext.getOrCreate(
>> checkpointDir,
>> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
>> *jdbcPassword*, checkpointDir) _
>> )
>>
>>
>>
>> --
>> Chandan Prakash
>>
>>
>
>
> --
> Chandan Prakash
>
>

>>>
>>>
>>> --
>>> Chandan Prakash
>>>
>>>
>
>
> --
> Chandan Prakash
>
>


Re: Spark 2.0 regression when querying very wide data frames

2016-08-19 Thread mhornbech
I did some extra digging. Running the query "select column1 from myTable" I
can reproduce the problem on a frame with a single row - it occurs exactly
when the frame has more than 200 columns, which smells a bit like a
hardcoded limit.

Interestingly the problem disappears when replacing the query with "select
column1 from myTable limit N" where N is arbitrary. However it appears again
when running "select * from myTable limit N" with sufficiently many columns
(haven't determined the exact threshold here).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-regression-when-querying-very-wide-data-frames-tp27567p27568.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-19 Thread Everett Anderson
Hi!

Just following up on this --

When people talk about a shared session/context for testing like this, I
assume it's still within one test class. So it's still the case that if you
have a lot of test classes that test Spark-related things, you must
configure your build system to not run in them in parallel. You'll get the
benefit of not creating and tearing down a Spark session/context between
test cases with a test class, though.

Is that right?

Or have people figured out a way to have sbt (or Maven/Gradle/etc) share
Spark sessions/contexts across integration tests in a safe way?


On Mon, Aug 1, 2016 at 3:23 PM, Holden Karau  wrote:

> Thats a good point - there is an open issue for spark-testing-base to
> support this shared sparksession approach - but I haven't had the time (
> https://github.com/holdenk/spark-testing-base/issues/123 ). I'll try and
> include this in the next release :)
>
> On Mon, Aug 1, 2016 at 9:22 AM, Koert Kuipers  wrote:
>
>> we share a single single sparksession across tests, and they can run in
>> parallel. is pretty fast
>>
>> On Mon, Aug 1, 2016 at 12:02 PM, Everett Anderson <
>> ever...@nuna.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> Right now, if any code uses DataFrame/Dataset, I need a test setup that
>>> brings up a local master as in this article
>>> 
>>> .
>>>
>>> That's a lot of overhead for unit testing and the tests can't run in
>>> parallel, so testing is slow -- this is more like what I'd call an
>>> integration test.
>>>
>>> Do people have any tricks to get around this? Maybe using spy mocks on
>>> fake DataFrame/Datasets?
>>>
>>> Anyone know if there are plans to make more traditional unit testing
>>> possible with Spark SQL, perhaps with a stripped down in-memory
>>> implementation? (I admit this does seem quite hard since there's so much
>>> functionality in these classes!)
>>>
>>> Thanks!
>>>
>>> - Everett
>>>
>>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Spark 2.0 regression when querying very wide data frames

2016-08-19 Thread mhornbech
Hi

We currently have some workloads in Spark 1.6.2 with queries operating on a
data frame with 1500+ columns (17000 rows). This has never been quite
stable, and some queries, such as "select *" would yield empty result sets,
but queries restricting to specific columns have mostly worked. Needless to
say that 1500+ columns isn't "desirable", but that's what the client's data
looks like and our preference have been to load it and normalize it through
Spark.

We have been waiting to see how this would work with Spark 2.0, and
unfortunately the problem has gotten worse. Almost all queries on this large
data frame that worked before will now return data frames with only null
values.

Is this a known issue with Spark? If yes, does anyone know why it has been
left untouched / made worse in Spark 2.0? If data frames with many columns
is a limitation that goes deep into Spark, I would prefer hard errors rather
than queries that run with meaningless results. The problem is easy to
reproduce, but I am not familiar enough debugging the Spark source code to
find the root cause. 

Hope some of you can enlighten me :-)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-2-0-regression-when-querying-very-wide-data-frames-tp27567.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: "Schemaless" Spark

2016-08-19 Thread Sebastian Piu
You can do operations without a schema just fine, obviously the more you
know about your data the more tools you will have, it is hard without more
context on what you are trying to achieve.

On Fri, 19 Aug 2016, 22:55 Efe Selcuk,  wrote:

> Hi Spark community,
>
> This is a bit of a high level question as frankly I'm not well versed in
> Spark or related tech.
>
> We have a system in place that reads columnar data in through CSV and
> represents the data in relational tables as it operates. It's essentially
> schema-based ETL. This restricts our input data so we either have to
> restrict what the data looks like coming in, or we have to transform and
> map it to some relational representation before we work on it.
>
> One of our goals with the Spark application we're building is to make our
> input and operations more generic. So we can accept data in say JSON
> format, operate on it without a schema, and output that way as well.
>
> My question is on whether Spark supports this view and what facilities it
> provides. Unless I've been interpreting things incorrectly, the various
> data formats that spark operates on still assumes specified fields. I don't
> know what this approach would look like in terms of data types, operations,
> etc.
>
> I realize that this is lacking in detail but I imagine this may be more of
> a conversation than just an answer to a question.
>
> Efe
>


"Schemaless" Spark

2016-08-19 Thread Efe Selcuk
Hi Spark community,

This is a bit of a high level question as frankly I'm not well versed in
Spark or related tech.

We have a system in place that reads columnar data in through CSV and
represents the data in relational tables as it operates. It's essentially
schema-based ETL. This restricts our input data so we either have to
restrict what the data looks like coming in, or we have to transform and
map it to some relational representation before we work on it.

One of our goals with the Spark application we're building is to make our
input and operations more generic. So we can accept data in say JSON
format, operate on it without a schema, and output that way as well.

My question is on whether Spark supports this view and what facilities it
provides. Unless I've been interpreting things incorrectly, the various
data formats that spark operates on still assumes specified fields. I don't
know what this approach would look like in terms of data types, operations,
etc.

I realize that this is lacking in detail but I imagine this may be more of
a conversation than just an answer to a question.

Efe


Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-19 Thread Davies Liu
The OOM happen in driver, you may also need more memory for driver.

On Fri, Aug 19, 2016 at 2:33 PM, Davies Liu  wrote:
> You are using lots of tiny executors (128 executor with only 2G
> memory), could you try with bigger executor (for example 16G x 16)?
>
> On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen  wrote:
>>
>> So I wrote some code to reproduce the problem.
>>
>> I assume here that a pipeline should be able to transform a categorical 
>> feature with a few million levels.
>> So I create a dataframe with the categorical feature (‘id’), apply a 
>> StringIndexer and OneHotEncoder transformer, and run a loop where I increase 
>> the amount of levels.
>> It breaks at 1.276.000 levels.
>>
>> Shall I report this as a ticket in JIRA?
>>
>> 
>>
>>
>> from pyspark.sql.functions import rand
>> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
>> from pyspark.ml import Pipeline
>>
>> start_id = 10
>> n = 500
>> step = (n - start_id) / 25
>>
>> for i in xrange(start_id,start_id + n,step):
>> print "#\n {}".format(i)
>> dfr = (sqlContext
>>.range(start_id, start_id + i)
>>.withColumn(‘label', rand(seed=10))
>>.withColumn('feat2', rand(seed=101))
>> #.withColumn('normal', randn(seed=27))
>>).repartition(32).cache()
>> # dfr.select("id", rand(seed=10).alias("uniform"), 
>> randn(seed=27).alias("normal")).show()
>> dfr.show(1)
>> print "This dataframe has {0} rows (and therefore {0} levels will be one 
>> hot encoded)".format(dfr.count())
>>
>> categorical_feature  = ['id']
>> stages = []
>>
>> for c in categorical_feature:
>> stages.append(StringIndexer(inputCol=c, 
>> outputCol="{}Index".format(c)))
>> stages.append(OneHotEncoder(dropLast= False, inputCol = 
>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>>
>> columns = ["{}OHE".format(x) for x in categorical_feature]
>> columns.append('feat2')
>>
>> assembler = VectorAssembler(
>> inputCols=columns,
>> outputCol="features")
>> stages.append(assembler)
>>
>> df2 = dfr
>>
>> pipeline = Pipeline(stages=stages)
>> pipeline_fitted = pipeline.fit(df2)
>> df3 = pipeline_fitted.transform(df2)
>> df3.show(1)
>> dfr.unpersist()
>>
>>
>> 
>>
>> Output:
>>
>>
>> #
>>  10
>> +--+---+---+
>> |id|label  |  feat2|
>> +--+---+---+
>> |183601|0.38693226548356197|0.04485291680169634|
>> +--+---+---+
>> only showing top 1 row
>>
>> This dataframe has 10 rows (and therefore 10 levels will be one hot 
>> encoded)
>> +--+---+---+---+++
>> |id|label  |  feat2|idIndex| 
>>   idOHE|features|
>> +--+---+---+---+++
>> |183601|
>> 0.38693226548356197|0.04485291680169634|83240.0|(10,[83240],[...|(11,[83240,10...|
>> +--+---+---+---+++
>> only showing top 1 row
>>
>> #
>>  296000
>> +--+---+---+
>> |id|label  |  feat2|
>> +--+---+---+
>> |137008| 0.2996020619810592|0.38693226548356197|
>> +--+---+---+
>> only showing top 1 row
>>
>> This dataframe has 296000 rows (and therefore 296000 levels will be one hot 
>> encoded)
>> +--+---+---+---+++
>> |id|label  |  feat2|idIndex| 
>>   idOHE|features|
>> +--+---+---+---+++
>> |137008| 
>> 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...|
>> +--+---+---+---+++
>> only showing top 1 row
>>
>> #
>>  492000
>> +--+---+---+
>> |id|label  |  feat2|
>> +--+---+---+
>> |534351| 0.9450641392552516|0.23472935141246665|
>> +--+---+---+
>> only showing top 1 row
>>
>> This dataframe has 492000 rows (and therefore 492000 levels will be one hot 
>> encoded)
>> 

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-19 Thread Davies Liu
You are using lots of tiny executors (128 executor with only 2G
memory), could you try with bigger executor (for example 16G x 16)?

On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen  wrote:
>
> So I wrote some code to reproduce the problem.
>
> I assume here that a pipeline should be able to transform a categorical 
> feature with a few million levels.
> So I create a dataframe with the categorical feature (‘id’), apply a 
> StringIndexer and OneHotEncoder transformer, and run a loop where I increase 
> the amount of levels.
> It breaks at 1.276.000 levels.
>
> Shall I report this as a ticket in JIRA?
>
> 
>
>
> from pyspark.sql.functions import rand
> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
> from pyspark.ml import Pipeline
>
> start_id = 10
> n = 500
> step = (n - start_id) / 25
>
> for i in xrange(start_id,start_id + n,step):
> print "#\n {}".format(i)
> dfr = (sqlContext
>.range(start_id, start_id + i)
>.withColumn(‘label', rand(seed=10))
>.withColumn('feat2', rand(seed=101))
> #.withColumn('normal', randn(seed=27))
>).repartition(32).cache()
> # dfr.select("id", rand(seed=10).alias("uniform"), 
> randn(seed=27).alias("normal")).show()
> dfr.show(1)
> print "This dataframe has {0} rows (and therefore {0} levels will be one 
> hot encoded)".format(dfr.count())
>
> categorical_feature  = ['id']
> stages = []
>
> for c in categorical_feature:
> stages.append(StringIndexer(inputCol=c, 
> outputCol="{}Index".format(c)))
> stages.append(OneHotEncoder(dropLast= False, inputCol = 
> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>
> columns = ["{}OHE".format(x) for x in categorical_feature]
> columns.append('feat2')
>
> assembler = VectorAssembler(
> inputCols=columns,
> outputCol="features")
> stages.append(assembler)
>
> df2 = dfr
>
> pipeline = Pipeline(stages=stages)
> pipeline_fitted = pipeline.fit(df2)
> df3 = pipeline_fitted.transform(df2)
> df3.show(1)
> dfr.unpersist()
>
>
> 
>
> Output:
>
>
> #
>  10
> +--+---+---+
> |id|label  |  feat2|
> +--+---+---+
> |183601|0.38693226548356197|0.04485291680169634|
> +--+---+---+
> only showing top 1 row
>
> This dataframe has 10 rows (and therefore 10 levels will be one hot 
> encoded)
> +--+---+---+---+++
> |id|label  |  feat2|idIndex|  
>  idOHE|features|
> +--+---+---+---+++
> |183601|
> 0.38693226548356197|0.04485291680169634|83240.0|(10,[83240],[...|(11,[83240,10...|
> +--+---+---+---+++
> only showing top 1 row
>
> #
>  296000
> +--+---+---+
> |id|label  |  feat2|
> +--+---+---+
> |137008| 0.2996020619810592|0.38693226548356197|
> +--+---+---+
> only showing top 1 row
>
> This dataframe has 296000 rows (and therefore 296000 levels will be one hot 
> encoded)
> +--+---+---+---+++
> |id|label  |  feat2|idIndex|  
>  idOHE|features|
> +--+---+---+---+++
> |137008| 
> 0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...|
> +--+---+---+---+++
> only showing top 1 row
>
> #
>  492000
> +--+---+---+
> |id|label  |  feat2|
> +--+---+---+
> |534351| 0.9450641392552516|0.23472935141246665|
> +--+---+---+
> only showing top 1 row
>
> This dataframe has 492000 rows (and therefore 492000 levels will be one hot 
> encoded)
> +--+---+---+---+++
> |id|label  |  feat2|idIndex|  
>  idOHE|features|
> +--+---+---+---+++
> |534351| 

Re: Spark SQL concurrent runs fails with java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]

2016-08-19 Thread Davies Liu
The query failed to finish broadcast in 5 minutes, you could decrease
the broadcast threshold (spark.sql.autoBroadcastJoinThreshold) or
increase the conf: spark.sql.broadcastTimeout

On Tue, Jun 28, 2016 at 3:35 PM, Jesse F Chen  wrote:
>
> With the Spark 2.0 build from 0615, when running 4-user concurrent SQL tests 
> against Spark SQL on 1TB TPCDS, we are seeing
> consistently the following exceptions:
>
> 10:35:33 AM: 16/06/27 23:40:37 INFO scheduler.TaskSetManager: Finished task 
> 412.0 in stage 819.0 (TID 270396) in 8468 ms on 9.30.148.101 (417/581)
> 16/06/27 23:40:37 ERROR thriftserver.SparkExecuteStatementOperation: Error 
> executing query, currentState RUNNING,
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange SinglePartition
> +- *HashAggregate(key=[], 
> functions=[partial_sum(cs_ext_discount_amt#100849)], output=[sum#101124])
> +- *Project [cs_ext_discount_amt#100849]
>
> 
> at 
> org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:113)
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:49)
> ... 40 more
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
> [300 seconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:190)
>
>
> The longest query would complete in about 700 seconds, and I think we need to 
> increase the futures timeout value. However,
> I tried the 'spark.network.timeout' setting to 700 via the '--conf' facility 
> but it does not seem to control this particular timeout value.
> In other words, it stays at "300 seconds" no matter what value I give it. I 
> also played with the spark.rpc.askTimeout setting which
> does not affect this 300-second value.
>
> Could someone tell me which parameter I need to change in order to control it?
>
>
> JESSE CHEN
> Big Data Performance | IBM Analytics
>
> Office: 408 463 2296
> Mobile: 408 828 9068
> Email: jfc...@us.ibm.com
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: HiveThriftServer and spark.sql.hive.thriftServer.singleSession setting

2016-08-19 Thread Richard M
I was using the 1.1 driver. I upgraded that library to 2.1 and it resolved my
problem.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveThriftServer-and-spark-sql-hive-thriftServer-singleSession-setting-tp27340p27566.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Best way to read XML data from RDD

2016-08-19 Thread Diwakar Dhanuskodi
Hi,

There is a RDD with json data. I could read json data using rdd.read.json .
The json data has XML data in couple of key-value paris.

Which is the best method to read and parse XML from rdd. Is there any
specific xml libraries for spark. Could anyone help on this.

Thanks.


Re: Spark streaming 2, giving error ClassNotFoundException: scala.collection.GenTraversableOnce$class

2016-08-19 Thread Mich Talebzadeh
Thanks

--jars /home/hduser/jars/spark-streaming-kafka-assembly_*2.11*-1.6.1.jar

sorted it out

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 19 August 2016 at 20:19, Tathagata Das 
wrote:

> You seem to combining Scala 2.10 and 2.11 libraries - your sbt project is
> 2.11, where as you are trying to pull in spark-streaming-kafka-assembly_
> *2.10*-1.6.1.jar.
>
> On Fri, Aug 19, 2016 at 11:24 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> My spark streaming app with 1.6.1 used to work.
>>
>> Now with
>>
>> scala> sc version
>> res0: String = 2.0.0
>>
>> Compiling with sbt assembly as before, with the following:
>>
>> version := "1.0",
>> scalaVersion := "2.11.8",
>> mainClass in Compile := Some("myPackage.${APPLICATION}")
>>   )
>> libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.0.0" %
>> "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0"
>> % "provided"
>> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
>> "1.6.1" % "provided"
>>
>>
>> I downgradedscalaVersion to 2.10.4, it did not change.
>>
>> It compiles OK but at run time it fails
>>
>> This Jar is added to spark-summit
>>
>> --jars /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
>>
>> And this is the error
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> scala/collection/GenTraversableOnce$class
>> at kafka.utils.Pool.(Pool.scala:28)
>> at kafka.consumer.FetchRequestAndResponseStatsRegistry$.(
>> FetchRequestAndResponseStats.scala:60)
>> at kafka.consumer.FetchRequestAndResponseStatsRegistry$.<
>> clinit>(FetchRequestAndResponseStats.scala)
>> at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>> at org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaC
>> luster.scala:52)
>> at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$
>> apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.
>> apply(KafkaCluster.scala:345)
>> at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$
>> apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.
>> apply(KafkaCluster.scala:342)
>> at scala.collection.IndexedSeqOptimized$class.foreach(
>> IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.
>> scala:35)
>> at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spa
>> rk$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>> at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMe
>> tadata(KafkaCluster.scala:125)
>> at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(
>> KafkaCluster.scala:112)
>> at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(
>> KafkaUtils.scala:211)
>> at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStr
>> eam(KafkaUtils.scala:484)
>> at CEP_streaming$.main(CEP_streaming.scala:123)
>> at CEP_streaming.main(CEP_streaming.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
>> $SparkSubmit$$runMain(SparkSubmit.scala:729)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit
>> .scala:185)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.
>> scala:210)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:
>> 124)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Caused by: java.lang.ClassNotFoundException:
>> scala.collection.GenTraversableOnce$class
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>
>>
>> Any ideas appreciated
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> 

Re: Spark streaming 2, giving error ClassNotFoundException: scala.collection.GenTraversableOnce$class

2016-08-19 Thread Tathagata Das
You seem to combining Scala 2.10 and 2.11 libraries - your sbt project is
2.11, where as you are trying to pull in spark-streaming-kafka-assembly_
*2.10*-1.6.1.jar.

On Fri, Aug 19, 2016 at 11:24 AM, Mich Talebzadeh  wrote:

> Hi,
>
> My spark streaming app with 1.6.1 used to work.
>
> Now with
>
> scala> sc version
> res0: String = 2.0.0
>
> Compiling with sbt assembly as before, with the following:
>
> version := "1.0",
> scalaVersion := "2.11.8",
> mainClass in Compile := Some("myPackage.${APPLICATION}")
>   )
> libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.0.0" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0" %
> "provided"
> libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
> "1.6.1" % "provided"
>
>
> I downgradedscalaVersion to 2.10.4, it did not change.
>
> It compiles OK but at run time it fails
>
> This Jar is added to spark-summit
>
> --jars /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \
>
> And this is the error
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> scala/collection/GenTraversableOnce$class
> at kafka.utils.Pool.(Pool.scala:28)
> at kafka.consumer.FetchRequestAndResponseStatsRegistry$.(
> FetchRequestAndResponseStats.scala:60)
> at kafka.consumer.FetchRequestAndResponseStatsRegistry$.(
> FetchRequestAndResponseStats.scala)
> at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
> at org.apache.spark.streaming.kafka.KafkaCluster.connect(
> KafkaCluster.scala:52)
> at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:345)
> at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:342)
> at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(
> WrappedArray.scala:35)
> at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$
> spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
> at org.apache.spark.streaming.kafka.KafkaCluster.
> getPartitionMetadata(KafkaCluster.scala:125)
> at org.apache.spark.streaming.kafka.KafkaCluster.
> getPartitions(KafkaCluster.scala:112)
> at org.apache.spark.streaming.kafka.KafkaUtils$.
> getFromOffsets(KafkaUtils.scala:211)
> at org.apache.spark.streaming.kafka.KafkaUtils$.
> createDirectStream(KafkaUtils.scala:484)
> at CEP_streaming$.main(CEP_streaming.scala:123)
> at CEP_streaming.main(CEP_streaming.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
> SparkSubmit.scala:185)
> at org.apache.spark.deploy.SparkSubmit$.submit(
> SparkSubmit.scala:210)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.
> scala:124)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException: scala.collection.
> GenTraversableOnce$class
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>
> Any ideas appreciated
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: HiveThriftServer and spark.sql.hive.thriftServer.singleSession setting

2016-08-19 Thread Chang Lim
What command did you use to connect?  Try this:

beeline>  !connect
jdbc:hive2://localhost:1?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice


On Thu, Aug 11, 2016 at 9:23 AM, Richard M [via Apache Spark User List] <
ml-node+s1001560n27513...@n3.nabble.com> wrote:

> I am running HiveServer2 as well and when I connect with beeline I get the
> following:
>
>  org.apache.spark.sql.internal.SessionState cannot be cast to
> org.apache.spark.sql.hive.HiveSessionState
>
>
> Do you know how to resolve this?
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/
> HiveThriftServer-and-spark-sql-hive-thriftServer-singleSession-setting-
> tp27340p27513.html
> To unsubscribe from HiveThriftServer and 
> spark.sql.hive.thriftServer.singleSession
> setting, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveThriftServer-and-spark-sql-hive-thriftServer-singleSession-setting-tp27340p27565.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark streaming 2, giving error ClassNotFoundException: scala.collection.GenTraversableOnce$class

2016-08-19 Thread Mich Talebzadeh
Hi,

My spark streaming app with 1.6.1 used to work.

Now with

scala> sc version
res0: String = 2.0.0

Compiling with sbt assembly as before, with the following:

version := "1.0",
scalaVersion := "2.11.8",
mainClass in Compile := Some("myPackage.${APPLICATION}")
  )
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.0.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.0.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.0.0" %
"provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" %
"1.6.1" % "provided"


I downgradedscalaVersion to 2.10.4, it did not change.

It compiles OK but at run time it fails

This Jar is added to spark-summit

--jars /home/hduser/jars/spark-streaming-kafka-assembly_2.10-1.6.1.jar \

And this is the error

Exception in thread "main" java.lang.NoClassDefFoundError:
scala/collection/GenTraversableOnce$class
at kafka.utils.Pool.(Pool.scala:28)
at
kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala:60)
at
kafka.consumer.FetchRequestAndResponseStatsRegistry$.(FetchRequestAndResponseStats.scala)
at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
at
org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:345)
at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.streaming.kafka.KafkaCluster.org
$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
at
org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
at
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at CEP_streaming$.main(CEP_streaming.scala:123)
at CEP_streaming.main(CEP_streaming.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain
(SparkSubmit.scala:729)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
scala.collection.GenTraversableOnce$class
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)


Any ideas appreciated

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: 2.0.1/2.1.x release dates

2016-08-19 Thread Michael Gummelt
Adrian,

We haven't had any reports of hangs on Mesos in 2.0, so it's likely that if
you wait until the release, your problem still won't be solved unless you
file a bug.  Can you create a JIRA so we can look into it?

On Thu, Aug 18, 2016 at 2:40 AM, Sean Owen  wrote:

> Historically, minor releases happen every ~4 months, and maintenance
> releases are a bit ad hoc but come about a month after the minor
> release. It's up to the release manager to decide to do them but maybe
> realistic to expect 2.0.1 in early September.
>
> On Thu, Aug 18, 2016 at 10:35 AM, Adrian Bridgett 
> wrote:
> > Just wondering if there were any rumoured release dates for either of the
> > above.  I'm seeing some odd hangs with 2.0.0 and mesos (and I know that
> the
> > mesos integration has had a bit of updating in 2.1.x).   Looking at JIRA,
> > there's no suggested release date and issues seem to be added to a
> release
> > version once resolved so the usual trick of looking at the
> > resolved/unresolved ratio isn't helping :-)  The wiki only mentions
> 2.0.0 so
> > no joy there either.
> >
> > Still doing testing but then I don't want to test with 2.1.x if it's
> going
> > to be under heavy development for a while longer.
> >
> > Thanks for any info,
> >
> > Adrian
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Michael Gummelt
Software Engineer
Mesosphere


Re: Attempting to accept an unknown offer

2016-08-19 Thread Michael Gummelt
That error message occurs when the Mesos scheduler tries to accept an offer
that doesn't exist.  It should never happen.  Can you submit a JIRA and cc
me to it?  Also, what libmesos and mesos master version are you running?

On Wed, Aug 17, 2016 at 9:23 AM, vr spark  wrote:

> My code is very simple, if i use other hive tables, my code works fine. This
>  particular table (virtual view) is huge and might have  more metadata.
>
> It has only two columns.
>
> virtual view name is : cluster_table
>
> # col_namedata_type
>
>  ln   string
>
>  parti  int
>
>
> here is snippet...
>
> from pyspark import SparkConf, SparkContext
>
> from pyspark.sql import HiveContext
>
> import pyspark.sql
>
> import json
>
> myconf=SparkConf().setAppName("sql")
>
> spcont=SparkContext(conf=myconf)
>
> sqlcont=HiveContext(spcont)
>
> res=sqlcont.sql("select  parti FROM h.cluster_table WHERE parti > 408910
> and parti <408911  limit 10")
>
> print res.printSchema()
>
> print 'res'
>
> print res
>
> df=res.collect()
>
> print 'after collect'
>
> print df
>
>
> Here is the ouput after i submit the job
>
> I0817 09:18:40.606465 31409 sched.cpp:262] New master detected at
> master@x.y.17.56:6750
>
> I0817 09:18:40.607461 31409 sched.cpp:272] No credentials provided.
> Attempting to register without authentication
>
> I0817 09:18:40.612763 31409 sched.cpp:641] Framework registered with
> b859f2f3-7484-482d-8c0d-35bd91c1ad0a-6336
>
> 16/08/17 09:18:57 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording
> the schema version 1.2.0
>
> 16/08/17 09:18:57 WARN ObjectStore: Failed to get database default,
> returning NoSuchObjectException
>
> root
>
>  |-- parti: integer (nullable = true)
>
>
> None
>
> res
>
> DataFrame[partition_epoch_hourtenth: int]
>
> 2016-08-17 09:19:20,648:31315(0x7fafebfb1700):ZOO_WARN@
> zookeeper_interest@1557: Exceeded deadline by 19ms
>
> 2016-08-17 09:19:30,662:31315(0x7fafebfb1700):ZOO_WARN@
> zookeeper_interest@1557: Exceeded deadline by 13ms
>
> W0817 09:20:01.715824 31412 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676564
>
> W0817 09:20:01.716455 31412 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676630
>
> W0817 09:20:01.716645 31412 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676713
>
> W0817 09:20:01.724409 31412 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676554
>
> W0817 09:20:01.724728 31412 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676555
>
> W0817 09:20:01.724936 31412 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676556
>
> W0817 09:20:01.725126 31412 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676557
>
> W0817 09:20:01.725309 31412 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O168676558.
>
> and many more lines like this on the screen with similar message
>
>
>
> On Wed, Aug 17, 2016 at 9:08 AM, Ted Yu  wrote:
>
>> Please include user@ in your reply.
>>
>> Can you reveal the snippet of hive sql ?
>>
>> On Wed, Aug 17, 2016 at 9:04 AM, vr spark  wrote:
>>
>>> spark 1.6.1
>>> mesos
>>> job is running for like 10-15 minutes and giving this message and i
>>> killed it.
>>>
>>> In this job, i am creating data frame from a hive sql. There are other
>>> similar jobs which work fine
>>>
>>> On Wed, Aug 17, 2016 at 8:52 AM, Ted Yu  wrote:
>>>
 Can you provide more information ?

 Were you running on YARN ?
 Which version of Spark are you using ?

 Was your job failing ?

 Thanks

 On Wed, Aug 17, 2016 at 8:46 AM, vr spark  wrote:

>
> W0816 23:17:01.984846 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910492
>
> W0816 23:17:01.984987 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910493
>
> W0816 23:17:01.985124 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910494
>
> W0816 23:17:01.985339 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910495
>
> W0816 23:17:01.985508 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910496
>
> W0816 23:17:01.985651 16360 sched.cpp:1195] Attempting to accept an
> unknown offer b859f2f3-7484-482d-8c0d-35bd91c1ad0a-O162910497
>
> W0816 

Re: [Spark2] Error writing "complex" type to CSV

2016-08-19 Thread Efe Selcuk
Okay so this is partially PEBKAC. I just noticed that there's a debugging
field at the end that's another case class with its own simple fields -
*that's* the struct that was showing up in the error, not the entry itself.

This raises a different question. What has changed that this is no longer
possible? The pull request said that it prints garbage. Was that some
regression in 2.0? The same code prints fine in 1.6.1. The field prints as
an array of the values of its fields.

On Thu, Aug 18, 2016 at 5:56 PM, Hyukjin Kwon  wrote:

> Ah, BTW, there is an issue, SPARK-16216, about printing dates and
> timestamps here. So please ignore the integer values for dates
>
> 2016-08-19 9:54 GMT+09:00 Hyukjin Kwon :
>
>> Ah, sorry, I should have read this carefully. Do you mind if I ask your
>> codes to test?
>>
>> I would like to reproduce.
>>
>>
>> I just tested this by myself but I couldn't reproduce as below (is this
>> what your doing, right?):
>>
>> case class ClassData(a: String, b: Date)
>>
>> val ds: Dataset[ClassData] = Seq(
>>   ("a", Date.valueOf("1990-12-13")),
>>   ("a", Date.valueOf("1990-12-13")),
>>   ("a", Date.valueOf("1990-12-13"))
>> ).toDF("a", "b").as[ClassData]
>> ds.write.csv("/tmp/data.csv")
>> spark.read.csv("/tmp/data.csv").show()
>>
>> prints as below:
>>
>> +---++
>> |_c0| _c1|
>> +---++
>> |  a|7651|
>> |  a|7651|
>> |  a|7651|
>> +---++
>>
>> ​
>>
>> 2016-08-19 9:27 GMT+09:00 Efe Selcuk :
>>
>>> Thanks for the response. The problem with that thought is that I don't
>>> think I'm dealing with a complex nested type. It's just a dataset where
>>> every record is a case class with only simple types as fields, strings and
>>> dates. There's no nesting.
>>>
>>> That's what confuses me about how it's interpreting the schema. The
>>> schema seems to be one complex field rather than a bunch of simple fields.
>>>
>>> On Thu, Aug 18, 2016, 5:07 PM Hyukjin Kwon  wrote:
>>>
 Hi Efe,

 If my understanding is correct, supporting to write/read complex types
 is not supported because CSV format can't represent the nested types in its
 own format.

 I guess supporting them in writing in external CSV is rather a bug.

 I think it'd be great if we can write and read back CSV in its own
 format but I guess we can't.

 Thanks!

 On 19 Aug 2016 6:33 a.m., "Efe Selcuk"  wrote:

> We have an application working in Spark 1.6. It uses the databricks
> csv library for the output format when writing out.
>
> I'm attempting an upgrade to Spark 2. When writing with both the
> native DataFrameWriter#csv() method and with first specifying the
> "com.databricks.spark.csv" format (I suspect underlying format is the same
> but I don't know how to verify), I get the following error:
>
> java.lang.UnsupportedOperationException: CSV data source does not
> support struct<[bunch of field names and types]> data type
>
> There are 20 fields, mostly plain strings with a couple of dates. The
> source object is a Dataset[T] where T is a case class with various fields
> The line just looks like: someDataset.write.csv(outputPath)
>
> Googling returned this fairly recent pull request:
> https://mail-archives.apache.org/mod_mbox/spark-com
> mits/201605.mbox/%3c65d35a72bd05483392857098a2635...@git.apache.org%3E
>
> If I'm reading that correctly, the schema shows that each record has
> one field of this complex struct type? And the validation thinks it's
> something that it can't serialize. I would expect the schema to have a
> bunch of fields in it matching the case class, so maybe there's something
> I'm misunderstanding.
>
> Efe
>

>>
>


Re: spark streaming Directkafka with checkpointing : changed parameters not considered

2016-08-19 Thread chandan prakash
Thanks Cody for the pointer.

I am able to do this now. Not using checkpointing. Rather storing offsets
in zookeeper for fault tolerance.
Spark Config changes now getting reflected in code deployment.
*Using this api :*
*KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets,
messageHandler)*
*instead of :*
*KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)*

*One Quick question :
*What is need of checkpointing if we can achieve both fault tolerance
and application code/config changes  without checkpointing? Is there
anything else which checkpointing gives? I might be missing something.


Regards,
Chandan


On Thu, Aug 18, 2016 at 8:27 PM, Cody Koeninger  wrote:

> Yeah the solutions are outlined in the doc link.  Or just don't rely on
> checkpoints
> On Aug 18, 2016 8:53 AM, "chandan prakash" 
> wrote:
>
>> Yes,
>>  i looked into the source code implementation.  sparkConf is serialized
>> and saved during checkpointing and re-created from the checkpoint directory
>> at time of restart. So any sparkConf parameter which you load from
>> application.config and set in sparkConf object in code cannot be changed
>> and reflected with checkpointing.  :(
>>
>> Is there is any work around of reading changed sparkConf parameter value
>> with using checkpoiting?
>> p.s. i am not adding new parameter, i am just changing values of some
>> existing sparkConf param.
>>
>> This is a common case and there must be some solution for this.
>>
>> On Thu, Aug 18, 2016 at 6:07 PM, Cody Koeninger 
>> wrote:
>>
>>> Checkpointing is not kafka-specific.  It encompasses metadata about the
>>> application.  You can't re-use a checkpoint if your application has changed.
>>>
>>> http://spark.apache.org/docs/latest/streaming-programming-gu
>>> ide.html#upgrading-application-code
>>>
>>>
>>> On Thu, Aug 18, 2016 at 4:39 AM, chandan prakash <
>>> chandanbaran...@gmail.com> wrote:
>>>
 Is it possible that i use checkpoint directory to restart streaming but
 with modified parameter value in config file (e.g.  username/password for
 db connection)  ?
 Thanks in advance.

 Regards,
 Chandan

 On Thu, Aug 18, 2016 at 1:10 PM, chandan prakash <
 chandanbaran...@gmail.com> wrote:

> Hi,
> I am using direct kafka with checkpointing of offsets same as :
> https://github.com/koeninger/kafka-exactly-once/blob/master/
> src/main/scala/example/IdempotentExample.scala
>
> I need to change some parameters like db connection params :
> username/password for db connection .
> I stopped streaming gracefully ,changed parameters in config file and
> restarted streaming.
> *Issue : changed parameters  username/password are not being
> considered.*
>
> *Question* :
> As per my understanding , Checkpointing should only save offsets of
> kafka partitions and not the credentials of the db connection.
> Why its picking old db connection params ?
>
> I am declaring params in main method and not in setUpSsc(0 method.
> My code is identical to that in the above program link  as below:
> val jdbcDriver = conf.getString("jdbc.driver")
> val jdbcUrl = conf.getString("jdbc.url")
> *val jdbcUser = conf.getString("jdbc.user")*
> * val jdbcPassword = conf.getString("jdbc.password")*
> // while the job doesn't strictly need checkpointing,
> // we'll checkpoint to avoid replaying the whole kafka log in case of
> failure
> val checkpointDir = conf.getString("checkpointDir")
> val ssc = StreamingContext.getOrCreate(
> checkpointDir,
> setupSsc(topics, kafkaParams, jdbcDriver, jdbcUrl, *jdbcUser*,
> *jdbcPassword*, checkpointDir) _
> )
>
>
>
> --
> Chandan Prakash
>
>


 --
 Chandan Prakash


>>>
>>
>>
>> --
>> Chandan Prakash
>>
>>


-- 
Chandan Prakash


Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-19 Thread Ben Teeuwen
So I wrote some code to reproduce the problem.

I assume here that a pipeline should be able to transform a categorical feature 
with a few million levels.
So I create a dataframe with the categorical feature (‘id’), apply a 
StringIndexer and OneHotEncoder transformer, and run a loop where I increase 
the amount of levels.
It breaks at 1.276.000 levels.

Shall I report this as a ticket in JIRA?




from pyspark.sql.functions import rand
from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
from pyspark.ml import Pipeline

start_id = 10
n = 500
step = (n - start_id) / 25

for i in xrange(start_id,start_id + n,step):
print "#\n {}".format(i)
dfr = (sqlContext
   .range(start_id, start_id + i)
   .withColumn(‘label', rand(seed=10))
   .withColumn('feat2', rand(seed=101))
#.withColumn('normal', randn(seed=27))
   ).repartition(32).cache()
# dfr.select("id", rand(seed=10).alias("uniform"), 
randn(seed=27).alias("normal")).show()
dfr.show(1)
print "This dataframe has {0} rows (and therefore {0} levels will be one 
hot encoded)".format(dfr.count())

categorical_feature  = ['id'] 
stages = []

for c in categorical_feature:
stages.append(StringIndexer(inputCol=c, outputCol="{}Index".format(c)))
stages.append(OneHotEncoder(dropLast= False, inputCol = 
"{}Index".format(c), outputCol = "{}OHE".format(c)))

columns = ["{}OHE".format(x) for x in categorical_feature]
columns.append('feat2')

assembler = VectorAssembler(
inputCols=columns,
outputCol="features")
stages.append(assembler)

df2 = dfr

pipeline = Pipeline(stages=stages)
pipeline_fitted = pipeline.fit(df2)
df3 = pipeline_fitted.transform(df2)
df3.show(1)
dfr.unpersist()




Output:

#
 10
+--+---+---+
|id|label  |  feat2|
+--+---+---+
|183601|0.38693226548356197|0.04485291680169634|
+--+---+---+
only showing top 1 row

This dataframe has 10 rows (and therefore 10 levels will be one hot 
encoded)
+--+---+---+---+++
|id|label  |  feat2|idIndex|   
idOHE|features|
+--+---+---+---+++
|183601|
0.38693226548356197|0.04485291680169634|83240.0|(10,[83240],[...|(11,[83240,10...|
+--+---+---+---+++
only showing top 1 row

#
 296000
+--+---+---+
|id|label  |  feat2|
+--+---+---+
|137008| 0.2996020619810592|0.38693226548356197|
+--+---+---+
only showing top 1 row

This dataframe has 296000 rows (and therefore 296000 levels will be one hot 
encoded)
+--+---+---+---+++
|id|label  |  feat2|idIndex|   
idOHE|features|
+--+---+---+---+++
|137008| 
0.2996020619810592|0.38693226548356197|35347.0|(296000,[35347],[...|(296001,[35347,29...|
+--+---+---+---+++
only showing top 1 row

#
 492000
+--+---+---+
|id|label  |  feat2|
+--+---+---+
|534351| 0.9450641392552516|0.23472935141246665|
+--+---+---+
only showing top 1 row

This dataframe has 492000 rows (and therefore 492000 levels will be one hot 
encoded)
+--+---+---+---+++
|id|label  |  feat2|idIndex|   
idOHE|features|
+--+---+---+---+++
|534351| 0.9450641392552516|0.23472935141246665| 
3656.0|(492000,[3656],[1...|(492001,[3656,492...|
+--+---+---+---+++
only showing top 1 row

#
 688000
+--+---+--+
|id|label  | feat2|
+--+---+--+
|573008| 

How to continuous update or refresh RandomForestClassificationModel

2016-08-19 Thread 陈哲
Hi All
   I'm using my training data generate
the RandomForestClassificationModel , and I can use this to predict the
upcoming data.
   But if predict failed I'll put the failed features into the training
data, here is my question , how can I update or refresh the model ? Which
api should I use ? Do I need to generate this model from start again?

Thanks