SparkML RandomForest

2016-08-10 Thread Pengcheng
Hi There,

I was comparing Randomforest in sparkml(org.apache.spark.ml.classification)
and spark mllib(org.apache.spark.mllib.tree) using the same datasets and
same parameter settings, spark mllib always gives me better results on test
data sets.
I was wondering

1. Did anyone notice similar performance
​difference ​
as I do?
2. How to output parameters for Pipelinemodel?

for example: I want to output the parameters trained for
RandomForestClassifier. None of these (model.params.toString or
 model.explainParams() or model.extractParamMap())
output meaningful parameters such as
​
totalNumNodes
​
etc.

*val *rf = *new *RandomForestClassifier()
.setFeaturesCol(*"features"*)
.setLabelCol(*"label"*)
.setNumTrees(100)
.setFeatureSubsetStrategy(*"auto"*)
.setImpurity(*"entropy"*)
.setMaxDepth(4)
.setMaxBins(32)

*val *indexer = *new *StringIndexer()
.setInputCol(*"category"*)
.setOutputCol(*"label"*)

*val *pipeline = *new *Pipeline().setStages(*Array*(indexer, rf))

*val *model: PipelineModel = pipeline.fit(trainingData)


thanks,
pengcheng

​


Re: Is there a reduceByKey functionality in DataFrame API?

2016-08-10 Thread Holden Karau
Hi Luis,

You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you
can do groupBy followed by a reduce on the GroupedDataset (
http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset
) - this works on a per-key basis despite the different name. In Spark 2.0
you would use groupByKey on the Dataset followed by reduceGroups (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset
).

Cheers,

Holden :)

On Wed, Aug 10, 2016 at 5:15 PM, luismattor  wrote:

> Hi everyone,
>
> Consider the following code:
>
> val result = df.groupBy("col1").agg(min("col2"))
>
> I know that rdd.reduceByKey(func) produces the same RDD as
> rdd.groupByKey().mapValues(value => value.reduce(func)) However
> reducerByKey
> is more efficient as it avoids shipping each value to the reducer doing the
> aggregation (it ships partial aggregations instead).
>
> I wonder whether the DataFrame API optimizes the code doing something
> similar to what RDD.reduceByKey does.
>
> I am using Spark 1.6.2.
>
> Regards,
> Luis
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-
> API-tp27508.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: groupByKey() compile error after upgrading from 1.6.2 to 2.0.0

2016-08-10 Thread Arun Luthra
Thanks, pair_rdd.rdd.groupByKey() did the trick.

On Wed, Aug 10, 2016 at 8:24 PM, Holden Karau  wrote:

> So it looks like (despite the name) pair_rdd is actually a Dataset - my
> guess is you might have a map on a dataset up above which used to return an
> RDD but now returns another dataset or an unexpected implicit conversion.
> Just add rdd() before the groupByKey call to push it into an RDD. That
> being said - groupByKey generally is an anti-pattern so please be careful
> with it.
>
> On Wed, Aug 10, 2016 at 8:07 PM, Arun Luthra 
> wrote:
>
>> Here is the offending line:
>>
>> val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter:
>> Iterable[MyData]) => {
>> ...
>>
>>
>> [error]  .scala:249: overloaded method value groupByKey with
>> alternatives:
>> [error]   [K](func: 
>> org.apache.spark.api.java.function.MapFunction[(aaa.MyKey,
>> aaa.MyData),K], encoder: org.apache.spark.sql.Encoder[K
>> ])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)]
>> 
>> [error]   [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4:
>> org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey,
>> aaa.MyData)]
>> [error]  cannot be applied to ()
>> [error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk:
>> MyKey, hd_iter: Iterable[MyData]) => {
>> [error] ^
>> [warn] .scala:249: non-variable type argument aaa.MyData in
>> type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData])
>> is unchecked since it is eliminated by erasure
>> [warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk:
>> MyKey, hd_iter: Iterable[MyData]) => {
>> [warn]
>>   ^
>> [warn] one warning found
>>
>>
>> I can't see any obvious API change... what is the problem?
>>
>> Thanks,
>> Arun
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: groupByKey() compile error after upgrading from 1.6.2 to 2.0.0

2016-08-10 Thread Holden Karau
So it looks like (despite the name) pair_rdd is actually a Dataset - my
guess is you might have a map on a dataset up above which used to return an
RDD but now returns another dataset or an unexpected implicit conversion.
Just add rdd() before the groupByKey call to push it into an RDD. That
being said - groupByKey generally is an anti-pattern so please be careful
with it.

On Wed, Aug 10, 2016 at 8:07 PM, Arun Luthra  wrote:

> Here is the offending line:
>
> val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter:
> Iterable[MyData]) => {
> ...
>
>
> [error]  .scala:249: overloaded method value groupByKey with
> alternatives:
> [error]   [K](func: org.apache.spark.api.java.function.MapFunction[(aaa.MyKey,
> aaa.MyData),K], encoder: org.apache.spark.sql.Encoder[
> K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)]
> 
> [error]   [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4:
> org.apache.spark.sql.Encoder[K])org.apache.spark.sql.
> KeyValueGroupedDataset[K,(aaa.MyKey, aaa.MyData)]
> [error]  cannot be applied to ()
> [error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk:
> MyKey, hd_iter: Iterable[MyData]) => {
> [error] ^
> [warn] .scala:249: non-variable type argument aaa.MyData in
> type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData])
> is unchecked since it is eliminated by erasure
> [warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey,
> hd_iter: Iterable[MyData]) => {
> [warn]
> ^
> [warn] one warning found
>
>
> I can't see any obvious API change... what is the problem?
>
> Thanks,
> Arun
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


groupByKey() compile error after upgrading from 1.6.2 to 2.0.0

2016-08-10 Thread Arun Luthra
Here is the offending line:

val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey, md_iter:
Iterable[MyData]) => {
...


[error]  .scala:249: overloaded method value groupByKey with
alternatives:
[error]   [K](func:
org.apache.spark.api.java.function.MapFunction[(aaa.MyKey, aaa.MyData),K],
encoder:
org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey,
aaa.MyData)] 
[error]   [K](func: ((aaa.MyKey, aaa.MyData)) => K)(implicit evidence$4:
org.apache.spark.sql.Encoder[K])org.apache.spark.sql.KeyValueGroupedDataset[K,(aaa.MyKey,
aaa.MyData)]
[error]  cannot be applied to ()
[error] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey,
hd_iter: Iterable[MyData]) => {
[error] ^
[warn] .scala:249: non-variable type argument aaa.MyData in
type pattern Iterable[aaa.MyData] (the underlying of Iterable[aaa.MyData])
is unchecked since it is eliminated by erasure
[warn] val some_rdd = pair_rdd.groupByKey().flatMap { case (mk: MyKey,
hd_iter: Iterable[MyData]) => {
[warn]
^
[warn] one warning found


I can't see any obvious API change... what is the problem?

Thanks,
Arun


na.fill doesn't work

2016-08-10 Thread Javier Rey
Hi everybody,

I have a data frame after many transformation, my final task is fill na's
with zeros, but I run this command : df_fil1 = df_fil.na.fill(0), but this
command doesn't work nulls doesn't disappear.

I did a toy test it works correctly.

I don't understand what happend.

Thanks in advance.

Samir


Re: Spark-2.0.0 fails reading a parquet dataset generated by Spark-1.6.2

2016-08-10 Thread Cheng Lian
Haven't figured out the exactly way how it failed, but the leading 
underscore in the partition directory name looks suspicious. Could you 
please try this PR to see whether it fixes the issue: 
https://github.com/apache/spark/pull/14585/files


Cheng


On 8/9/16 5:38 PM, immerrr again wrote:

Another follow-up: I have narrowed it down to the first 32 partitions,
but from that point it gets strange.

Here's the error:

In [68]: spark.read.parquet(*subdirs[:32])
...
AnalysisException: u'Unable to infer schema for ParquetFormat at
/path/to/data/_locality_code=AQ,/path/to/data/_locality_code=AI. It
must be specified manually;'


Removing *any* of the subdirs from that set removes the error.

In [69]: for i in range(32): spark.read.parquet(*(subdirs[:i] +
subdirs[i+1:32]))


Here's the punchline: schemas for the first 31 and for the last 31 of
those 32 subdirs are the same:

In [70]: spark.read.parquet(*subdirs[:31]).schema.jsonValue() ==
spark.read.parquet(*subdirs[1:32]).schema.jsonValue()
Out[70]: True

Any idea why that might be happening?

On Tue, Aug 9, 2016 at 12:12 PM, immerrr again  wrote:

Some follow-up information:

- dataset size is ~150G

- the data is partitioned by one of the columns, _locality_code:
$ ls -1
_locality_code=AD
_locality_code=AE
_locality_code=AF
_locality_code=AG
_locality_code=AI
_locality_code=AL
_locality_code=AM
_locality_code=AN

_locality_code=YE
_locality_code=YT
_locality_code=YU
_locality_code=ZA
_locality_code=ZM
_locality_code=ZW
_SUCCESS

- some of the partitions contain only one row, but all partitions are
in place (ie number of directories matches number of distinct
localities
val counts = 
sqlContext.read.parquet("/path-to-data").groupBy("_locality_code").count().orderBy($"count").collect()

scala> counts.slice(counts.length-10, counts.length)
res13: Array[org.apache.spark.sql.Row] = Array([CN,5682255],
[AU,6090561], [ES,6184507], [IT,7093401], [FR,8814435], [CA,10005467],
[UK,15375397], [BR,15829260], [IN,22404143], [US,98585175])

scala> counts.slice(0, 10)
res14: Array[org.apache.spark.sql.Row] = Array([UM,1], [JB,1], [JK,1],
[WP,1], [JT,1], [SX,9], [BL,52], [BQ,70], [BV,115], [MF,115])


On Tue, Aug 9, 2016 at 11:10 AM, immerrr again  wrote:

Hi everyone

I tried upgrading Spark-1.6.2 to Spark-2.0.0 but run into an issue
reading the existing data. Here's how the traceback looks in
spark-shell:

scala> spark.read.parquet("/path/to/data")
org.apache.spark.sql.AnalysisException: Unable to infer schema for
ParquetFormat at /path/to/data. It must be specified manually;
   at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:397)
   at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$16.apply(DataSource.scala:397)
   at scala.Option.getOrElse(Option.scala:121)
   at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:396)
   at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
   at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:427)
   at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:411)
   ... 48 elided

If I enable DEBUG log with sc.setLogLevel("DEBUG"), here's what I
additionally see in the output:
https://gist.github.com/immerrr/4474021ae70f35b7b9e262251c0abc59. Of
course, that same data is read and processed by spark-1.6.2 correctly.

Any idea what might be wrong here?

Cheers,
immerrr

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is there a reduceByKey functionality in DataFrame API?

2016-08-10 Thread luismattor
Hi everyone,

Consider the following code:

val result = df.groupBy("col1").agg(min("col2"))

I know that rdd.reduceByKey(func) produces the same RDD as
rdd.groupByKey().mapValues(value => value.reduce(func)) However reducerByKey
is more efficient as it avoids shipping each value to the reducer doing the
aggregation (it ships partial aggregations instead).

I wonder whether the DataFrame API optimizes the code doing something
similar to what RDD.reduceByKey does. 

I am using Spark 1.6.2. 

Regards,
Luis



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark2 SBT Assembly

2016-08-10 Thread Efe Selcuk
Thanks for the replies, folks.

My specific use case is maybe unusual. I'm working in the context of the
build environment in my company. Spark was being used in such a way that
the fat assembly jar that the old 'sbt assembly' command outputs was used
when building a spark applicaiton. I'm trying to figure out if I can just
use the many library jars instead, but in the meantime I'm hoping to get a
fat assembly in the old way to get us unblocked in updating our application
to use 2.0. It's a proprietary build system, not maven or sbt, so it's not
straightforward and the dependencies are modeled differently.

To be a bit more clear: the fat assembly was not used for any reason other
than to get the spark application to build. This is in the context of
running in Amazon EMR, so we don't send that spark assembly over for runs.

Efe

On Wed, Aug 10, 2016 at 2:15 PM, Mich Talebzadeh 
wrote:

> Hi Efe,
>
> Are you talking about creating an uber/fat jar file for your specific
> application? Then you can distribute it to another node just to use the jar
> file without assembling it.
>
> I can still do it in Spark 2 as before if I understand your special use
> case.
>
> [warn] Strategy 'discard' was applied to 349 files
> [warn] Strategy 'first' was applied to 450 files
>
> *[info] Assembly up to date:
> /data6/hduser/scala/CEP_streaming/target/scala-2.10/scala-assembly-1.0.jar*[success]
> Total time: 117 s, completed Aug 10, 2016 9:31:24 PM
> Submiting the job
> Ivy Default Cache set to: /home/hduser/.ivy2/cache
> The jars for the packages stored in: /home/hduser/.ivy2/jars
>
>
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 August 2016 at 20:35, Holden Karau  wrote:
>
>> What are you looking to use the assembly jar for - maybe we can think of
>> a workaround :)
>>
>>
>> On Wednesday, August 10, 2016, Efe Selcuk  wrote:
>>
>>> Sorry, I should have specified that I'm specifically looking for that
>>> fat assembly behavior. Is it no longer possible?
>>>
>>> On Wed, Aug 10, 2016 at 10:46 AM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
 You're correct - Spark packaging has been shifted to not use the
 assembly jar.

 To build now use "build/sbt package"



 On Wed, 10 Aug 2016 at 19:40, Efe Selcuk  wrote:

> Hi Spark folks,
>
> With Spark 1.6 the 'assembly' target for sbt would build a fat jar
> with all of the main Spark dependencies for building an application.
> Against Spark 2, that target is no longer building a spark assembly, just
> ones for e.g. Flume and Kafka.
>
> I'm not well versed with maven and sbt, so I don't know how to go
> about figuring this out.
>
> Is this intended? Or am I missing something?
>
> Thanks.
>

>>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>>
>


Re: Spark submit job that points to URL of a jar

2016-08-10 Thread Mich Talebzadeh
you can build your uber jar file on an NFS mounted file system accessible
to all nodes in the cluster. Any node then can start-submit and run the app
referring to the jar file.

sounds doable.

Having thought about it, it is feasible to place Spark binaries on the NFS
mount as well so any host can start it. The NFS directory will be treated
as a local mount.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 August 2016 at 22:16, Zlati Gardev  wrote:

> Hello,
>
> Is there a way to run a spark submit job that points to the URL of a jar
> file (instead of pushing the jar from local)?
>
> The documentation  at
> *http://spark.apache.org/docs/latest/submitting-applications.html*
> 
> implies that this may be possible.
>
> "*application-jar: Path to a bundled jar including your application and
> all dependencies. The URL must be globally visible inside of your cluster,
> for instance, an hdfs:// path or a file:// path that is present on all
> nodes*"
>
> Thank you,
> Zlati
>
>
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Spark submit job that points to URL of a jar

2016-08-10 Thread Zlati Gardev
Hello,
 
Is there a way to run a spark submit job that points to the URL of a jar file (instead of pushing the jar from local)? 
 
The documentation  at http://spark.apache.org/docs/latest/submitting-applications.html  implies that this may be possible.
 
"application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes"
 
Thank you,
Zlati
 
 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark2 SBT Assembly

2016-08-10 Thread Mich Talebzadeh
Hi Efe,

Are you talking about creating an uber/fat jar file for your specific
application? Then you can distribute it to another node just to use the jar
file without assembling it.

I can still do it in Spark 2 as before if I understand your special use
case.

[warn] Strategy 'discard' was applied to 349 files
[warn] Strategy 'first' was applied to 450 files

*[info] Assembly up to date:
/data6/hduser/scala/CEP_streaming/target/scala-2.10/scala-assembly-1.0.jar*[success]
Total time: 117 s, completed Aug 10, 2016 9:31:24 PM
Submiting the job
Ivy Default Cache set to: /home/hduser/.ivy2/cache
The jars for the packages stored in: /home/hduser/.ivy2/jars



HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 August 2016 at 20:35, Holden Karau  wrote:

> What are you looking to use the assembly jar for - maybe we can think of a
> workaround :)
>
>
> On Wednesday, August 10, 2016, Efe Selcuk  wrote:
>
>> Sorry, I should have specified that I'm specifically looking for that fat
>> assembly behavior. Is it no longer possible?
>>
>> On Wed, Aug 10, 2016 at 10:46 AM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> You're correct - Spark packaging has been shifted to not use the
>>> assembly jar.
>>>
>>> To build now use "build/sbt package"
>>>
>>>
>>>
>>> On Wed, 10 Aug 2016 at 19:40, Efe Selcuk  wrote:
>>>
 Hi Spark folks,

 With Spark 1.6 the 'assembly' target for sbt would build a fat jar with
 all of the main Spark dependencies for building an application. Against
 Spark 2, that target is no longer building a spark assembly, just ones for
 e.g. Flume and Kafka.

 I'm not well versed with maven and sbt, so I don't know how to go about
 figuring this out.

 Is this intended? Or am I missing something?

 Thanks.

>>>
>>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


Re: Spark2 SBT Assembly

2016-08-10 Thread Marco Mistroni
How bout all dependencies? Presumably they will all go in --jars ?
What if I have 10 dependencies? Any best practices in packaging apps for
spark 2.0?
Kr

On 10 Aug 2016 6:46 pm, "Nick Pentreath"  wrote:

> You're correct - Spark packaging has been shifted to not use the assembly
> jar.
>
> To build now use "build/sbt package"
>
>
> On Wed, 10 Aug 2016 at 19:40, Efe Selcuk  wrote:
>
>> Hi Spark folks,
>>
>> With Spark 1.6 the 'assembly' target for sbt would build a fat jar with
>> all of the main Spark dependencies for building an application. Against
>> Spark 2, that target is no longer building a spark assembly, just ones for
>> e.g. Flume and Kafka.
>>
>> I'm not well versed with maven and sbt, so I don't know how to go about
>> figuring this out.
>>
>> Is this intended? Or am I missing something?
>>
>> Thanks.
>>
>


Re: Standardization with Sparse Vectors

2016-08-10 Thread Tobi Bosede
Sean,

I have created a jira; I hope you don't mind that I borrowed your
explanation of "offset". https://issues.apache.org/jira/browse/SPARK-17001

So what did you do to standardize your data, if you didn't use
standardScaler? Did you write a udf to subtract mean and divide by standard
deviation?

Although I know this is not the best approach for something I plan to put
in production, I have been trying to write a udf to turn the sparse vector
into a dense one and apply the udf in withcolumn(). withColumn() complains
that the data is a tuple. I think the issue might be the datatype
parameter. The function returns a vector of doubles but there is no type
that would be adequate for this.


*sparseToDense=udf(lambda data: float(DenseVector([data.toArray()])),
DoubleType())*
*denseTrainingRdf=trainingRdfAssemb.withColumn("denseFeatures",
sparseToDense("features"))*

However the function works outside the udf, but I am unable to add an
arbitrary column to the data frame I started out working with. Thoughts?

*denseFeatures=TrainingRdf.select("features").map(lambda data:
DenseVector([data.features.toArray()]))*
*denseTrainingRdf=trainingRdfAssemb.withColumn("denseFeatures", denseFeatures)*

Thanks,
Tobi

On Wed, Aug 10, 2016 at 1:01 PM, Nick Pentreath 
wrote:

> Ah right, got it. As you say for storage it helps significantly, but for
> operations I suspect it puts one back in a "dense-like" position. Still,
> for online / mini-batch algorithms it may still be feasible I guess.
> On Wed, 10 Aug 2016 at 19:50, Sean Owen  wrote:
>
>> All elements, I think. Imagine a sparse vector 1:3 3:7 which conceptually
>> represents 0 3 0 7. Imagine it also has an offset stored which applies to
>> all elements. If it is -2 then it now represents -2 1 -2 5, but this
>> requires just one extra value to store. It only helps with storage of a
>> shifted sparse vector; iterating still typically requires iterating all
>> elements.
>>
>> Probably, where this would help, the caller can track this offset and
>> even more efficiently apply this knowledge. I remember digging into this in
>> how sparse covariance matrices are computed. It almost but not quite
>> enabled an optimization.
>>
>>
>> On Wed, Aug 10, 2016, 18:10 Nick Pentreath 
>> wrote:
>>
>>> Sean by 'offset' do you mean basically subtracting the mean but only
>>> from the non-zero elements in each row?
>>> On Wed, 10 Aug 2016 at 19:02, Sean Owen  wrote:
>>>
 Yeah I had thought the same, that perhaps it's fine to let the
 StandardScaler proceed, if it's explicitly asked to center, rather
 than refuse to. It's not really much more rope to let a user hang
 herself with, and, blocks legitimate usages (we ran into this last
 week and couldn't use StandardScaler as a result).

 I'm personally supportive of the change and don't see a JIRA. I think
 you could at least make one.

 On Wed, Aug 10, 2016 at 5:57 PM, Tobi Bosede 
 wrote:
 > Thanks Sean, I agree with 100% that the math is math and dense vs
 sparse is
 > just a matter of representation. I was trying to convince a co-worker
 of
 > this to no avail. Sending this email was mainly a sanity check.
 >
 > I think having an offset would be a great idea, although I am not
 sure how
 > to implement this. However, if anything should be done to rectify this
 > issue, it should be done in the standardScaler, not vectorAssembler.
 There
 > should not be any forcing of vectorAssembler to produce only dense
 vectors
 > so as to avoid performance problems with data that does not fit in
 memory.
 > Furthermore, not every machine learning algo requires standardization.
 > Instead, standardScaler should have withmean=True as default and
 should
 > apply an offset if the vector is sparse, whereas there would be normal
 > subtraction if the vector is dense. This way the default behavior of
 > standardScaler will always be what is generally understood to be
 > standardization, as opposed to people thinking they are standardizing
 when
 > they actually are not.
 >
 > Can anyone confirm whether there is a jira already?
 >
 > On Wed, Aug 10, 2016 at 10:58 AM, Sean Owen 
 wrote:
 >>
 >> Dense vs sparse is just a question of representation, so doesn't make
 >> an operation on a vector more or less important as a result. You've
 >> identified the reason that subtracting the mean can be undesirable: a
 >> notionally billion-element sparse vector becomes too big to fit in
 >> memory at once.
 >>
 >> I know this came up as a problem recently (I think there's a JIRA?)
 >> because VectorAssembler will *sometimes* output a small dense vector
 >> and sometimes output a small sparse vector based on how many zeroes
 >> there 

Re: Spark2 SBT Assembly

2016-08-10 Thread Holden Karau
What are you looking to use the assembly jar for - maybe we can think of a
workaround :)

On Wednesday, August 10, 2016, Efe Selcuk  wrote:

> Sorry, I should have specified that I'm specifically looking for that fat
> assembly behavior. Is it no longer possible?
>
> On Wed, Aug 10, 2016 at 10:46 AM, Nick Pentreath  > wrote:
>
>> You're correct - Spark packaging has been shifted to not use the assembly
>> jar.
>>
>> To build now use "build/sbt package"
>>
>>
>>
>> On Wed, 10 Aug 2016 at 19:40, Efe Selcuk > > wrote:
>>
>>> Hi Spark folks,
>>>
>>> With Spark 1.6 the 'assembly' target for sbt would build a fat jar with
>>> all of the main Spark dependencies for building an application. Against
>>> Spark 2, that target is no longer building a spark assembly, just ones for
>>> e.g. Flume and Kafka.
>>>
>>> I'm not well versed with maven and sbt, so I don't know how to go about
>>> figuring this out.
>>>
>>> Is this intended? Or am I missing something?
>>>
>>> Thanks.
>>>
>>
>

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

2016-08-10 Thread شجاع الرحمن بیگ
Hi,

I am getting following error while processing large input size.

...
[Stage 18:> (90 + 24) /
240]16/08/10 19:39:54 WARN TaskSetManager: Lost task 86.1 in stage 18.0
(TID 2517, bscpower8n2-data): FetchFailed(null, shuffleId=0, mapId=-1,
reduceId=86, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
at
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at
org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

...

The specification are as follow.

Spark version. 1.6.1

Cluster Mode= Standalone
Storage level: Memory and Disk

Spark Worker cores=6

spark worker memory=200gb

spark executor memory = 199gb
spark driver memory = 5gb

Number of input partitions=240

input data set =34 GB







I investigated the issue further and monitor the free ram using vmstat
during the execution of workload and it reveals that the job keep running
successfully until free memory available but start throwing exception on
ending up the free memory.



Is anyone face the similar problem and if yes then please share the
solution.

Thanks

Shuja



-- 
Regards
Shuja-ur-Rehman Baig



UNSUBSCRIBE

2016-08-10 Thread Sheth, Niraj




Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Diwakar Dhanuskodi
Checked executor logs and UI . There is no error message or something like
that.  when there is any action , it is  waiting .
There are data in partitions. I could use simple-consumer-shell and print
all data in console.  Am I doing anything wrong in foreachRDD?.
This just works fine  with single partitioned topic,

On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger  wrote:

> zookeeper.connect is irrelevant.
>
> Did you look at your executor logs?
> Did you look at the UI for the (probably failed) stages?
> Are you actually producing data into all of the kafka partitions?
> If you use kafka-simple-consumer-shell.sh to read that partition, do
> you get any data?
>
> On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi
>  wrote:
> > Hi Cody,
> >
> > Just added zookeeper.connect to kafkaparams . It couldn't come out of
> batch
> > window. Other batches are queued. I could see foreach(println) of
> dataFrame
> > printing one of partition's data and not the other.
> > Couldn't see any  errors from log.
> >
> > val brokers = "localhost:9092,localhost:9093"
> > val sparkConf = new
> > SparkConf().setAppName("KafkaWeather").setMaster("
> local[5]")//spark://localhost:7077
> > val sc = new SparkContext(sparkConf)
> > val ssc = new StreamingContext(sc, Seconds(1))
> > val kafkaParams = Map[String,
> > String]("bootstrap.servers"->"localhost:9093,localhost:9092"
> ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","
> group.id"->"xyz")
> > val topics = "test"
> > val topicsSet = topics.split(",").toSet
> > val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder,
> > StringDecoder](ssc, kafkaParams, topicsSet)
> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> > import sqlContext.implicits._
> > messages.foreachRDD(rdd => {
> >   if (rdd.isEmpty()) {
> > println("Failed to get data from Kafka. Please check that the Kafka
> > producer is streaming data.")
> > System.exit(-1)
> >   }
> >
> >val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
> >   dataframe.foreach(println)
> >  println( "$$$", dataframe.count())
> >   })
> > Logs:
> >
> > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
> > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
> > library for your platform... using builtin-java classes where applicable
> > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera
> resolves to
> > a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
> interface
> > eth1)
> > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> > another address
> > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
> > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera
> > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
> > disabled; ui acls disabled; users with view permissions: Set(cloudera);
> > users with modify permissions: Set(cloudera)
> > 16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver'
> on
> > port 45031.
> > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
> > 16/08/10 18:16:26 INFO Remoting: Starting remoting
> > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses
> > :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
> > 16/08/10 18:16:26 INFO Utils: Successfully started service
> > 'sparkDriverActorSystem' on port 56638.
> > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
> > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
> > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
> > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
> > 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity
> 511.5
> > MB
> > 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on
> port
> > 4040.
> > 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
> > http://192.168.126.131:4040
> > 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-
> 2b2a4e68-2952-41b0-a11b-f07860682749
> > 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
> > server' on port 59491.
> > 16/08/10 18:16:28 INFO SparkContext: Added JAR
> > file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
> > http://192.168.126.131:59491/jars/spark-streaming-kafka-
> assembly_2.10-1.6.2.jar
> > with timestamp 1470833188094
> > 16/08/10 18:16:29 INFO SparkContext: Added JAR
> > file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
> > http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar
> with
> > timestamp 1470833189531
> > 16/08/10 18:16:29 INFO SparkContext: Added JAR
> > 

Re: Standardization with Sparse Vectors

2016-08-10 Thread Nick Pentreath
Ah right, got it. As you say for storage it helps significantly, but for
operations I suspect it puts one back in a "dense-like" position. Still,
for online / mini-batch algorithms it may still be feasible I guess.
On Wed, 10 Aug 2016 at 19:50, Sean Owen  wrote:

> All elements, I think. Imagine a sparse vector 1:3 3:7 which conceptually
> represents 0 3 0 7. Imagine it also has an offset stored which applies to
> all elements. If it is -2 then it now represents -2 1 -2 5, but this
> requires just one extra value to store. It only helps with storage of a
> shifted sparse vector; iterating still typically requires iterating all
> elements.
>
> Probably, where this would help, the caller can track this offset and even
> more efficiently apply this knowledge. I remember digging into this in how
> sparse covariance matrices are computed. It almost but not quite enabled an
> optimization.
>
>
> On Wed, Aug 10, 2016, 18:10 Nick Pentreath 
> wrote:
>
>> Sean by 'offset' do you mean basically subtracting the mean but only from
>> the non-zero elements in each row?
>> On Wed, 10 Aug 2016 at 19:02, Sean Owen  wrote:
>>
>>> Yeah I had thought the same, that perhaps it's fine to let the
>>> StandardScaler proceed, if it's explicitly asked to center, rather
>>> than refuse to. It's not really much more rope to let a user hang
>>> herself with, and, blocks legitimate usages (we ran into this last
>>> week and couldn't use StandardScaler as a result).
>>>
>>> I'm personally supportive of the change and don't see a JIRA. I think
>>> you could at least make one.
>>>
>>> On Wed, Aug 10, 2016 at 5:57 PM, Tobi Bosede 
>>> wrote:
>>> > Thanks Sean, I agree with 100% that the math is math and dense vs
>>> sparse is
>>> > just a matter of representation. I was trying to convince a co-worker
>>> of
>>> > this to no avail. Sending this email was mainly a sanity check.
>>> >
>>> > I think having an offset would be a great idea, although I am not sure
>>> how
>>> > to implement this. However, if anything should be done to rectify this
>>> > issue, it should be done in the standardScaler, not vectorAssembler.
>>> There
>>> > should not be any forcing of vectorAssembler to produce only dense
>>> vectors
>>> > so as to avoid performance problems with data that does not fit in
>>> memory.
>>> > Furthermore, not every machine learning algo requires standardization.
>>> > Instead, standardScaler should have withmean=True as default and should
>>> > apply an offset if the vector is sparse, whereas there would be normal
>>> > subtraction if the vector is dense. This way the default behavior of
>>> > standardScaler will always be what is generally understood to be
>>> > standardization, as opposed to people thinking they are standardizing
>>> when
>>> > they actually are not.
>>> >
>>> > Can anyone confirm whether there is a jira already?
>>> >
>>> > On Wed, Aug 10, 2016 at 10:58 AM, Sean Owen 
>>> wrote:
>>> >>
>>> >> Dense vs sparse is just a question of representation, so doesn't make
>>> >> an operation on a vector more or less important as a result. You've
>>> >> identified the reason that subtracting the mean can be undesirable: a
>>> >> notionally billion-element sparse vector becomes too big to fit in
>>> >> memory at once.
>>> >>
>>> >> I know this came up as a problem recently (I think there's a JIRA?)
>>> >> because VectorAssembler will *sometimes* output a small dense vector
>>> >> and sometimes output a small sparse vector based on how many zeroes
>>> >> there are. But that's bad because then the StandardScaler can't
>>> >> process the output at all. You can work on this if you're interested;
>>> >> I think the proposal was to be able to force a dense representation
>>> >> only in VectorAssembler. I don't know if that's the nature of the
>>> >> problem you're hitting.
>>> >>
>>> >> It can be meaningful to only scale the dimension without centering it,
>>> >> but it's not the same thing, no. The math is the math.
>>> >>
>>> >> This has come up a few times -- it's necessary to center a sparse
>>> >> vector but prohibitive to do so. One idea I'd toyed with in the past
>>> >> was to let a sparse vector have an 'offset' value applied to all
>>> >> elements. That would let you shift all values while preserving a
>>> >> sparse representation. I'm not sure if it's worth implementing but
>>> >> would help this case.
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Wed, Aug 10, 2016 at 4:41 PM, Tobi Bosede 
>>> wrote:
>>> >> > Hi everyone,
>>> >> >
>>> >> > I am doing some standardization using standardScaler on data from
>>> >> > VectorAssembler which is represented as sparse vectors. I plan to
>>> fit a
>>> >> > regularized model.  However, standardScaler does not allow the mean
>>> to
>>> >> > be
>>> >> > subtracted from sparse vectors. It will only divide by the standard
>>> >> > deviation, which I understand is to 

Re: Spark2 SBT Assembly

2016-08-10 Thread Nick Pentreath
You're correct - Spark packaging has been shifted to not use the assembly
jar.

To build now use "build/sbt package"


On Wed, 10 Aug 2016 at 19:40, Efe Selcuk  wrote:

> Hi Spark folks,
>
> With Spark 1.6 the 'assembly' target for sbt would build a fat jar with
> all of the main Spark dependencies for building an application. Against
> Spark 2, that target is no longer building a spark assembly, just ones for
> e.g. Flume and Kafka.
>
> I'm not well versed with maven and sbt, so I don't know how to go about
> figuring this out.
>
> Is this intended? Or am I missing something?
>
> Thanks.
>


Re: Spark2 SBT Assembly

2016-08-10 Thread Efe Selcuk
Sorry, I should have specified that I'm specifically looking for that fat
assembly behavior. Is it no longer possible?

On Wed, Aug 10, 2016 at 10:46 AM, Nick Pentreath 
wrote:

> You're correct - Spark packaging has been shifted to not use the assembly
> jar.
>
> To build now use "build/sbt package"
>
>
>
> On Wed, 10 Aug 2016 at 19:40, Efe Selcuk  wrote:
>
>> Hi Spark folks,
>>
>> With Spark 1.6 the 'assembly' target for sbt would build a fat jar with
>> all of the main Spark dependencies for building an application. Against
>> Spark 2, that target is no longer building a spark assembly, just ones for
>> e.g. Flume and Kafka.
>>
>> I'm not well versed with maven and sbt, so I don't know how to go about
>> figuring this out.
>>
>> Is this intended? Or am I missing something?
>>
>> Thanks.
>>
>


Re: Standardization with Sparse Vectors

2016-08-10 Thread Sean Owen
All elements, I think. Imagine a sparse vector 1:3 3:7 which conceptually
represents 0 3 0 7. Imagine it also has an offset stored which applies to
all elements. If it is -2 then it now represents -2 1 -2 5, but this
requires just one extra value to store. It only helps with storage of a
shifted sparse vector; iterating still typically requires iterating all
elements.

Probably, where this would help, the caller can track this offset and even
more efficiently apply this knowledge. I remember digging into this in how
sparse covariance matrices are computed. It almost but not quite enabled an
optimization.

On Wed, Aug 10, 2016, 18:10 Nick Pentreath  wrote:

> Sean by 'offset' do you mean basically subtracting the mean but only from
> the non-zero elements in each row?
> On Wed, 10 Aug 2016 at 19:02, Sean Owen  wrote:
>
>> Yeah I had thought the same, that perhaps it's fine to let the
>> StandardScaler proceed, if it's explicitly asked to center, rather
>> than refuse to. It's not really much more rope to let a user hang
>> herself with, and, blocks legitimate usages (we ran into this last
>> week and couldn't use StandardScaler as a result).
>>
>> I'm personally supportive of the change and don't see a JIRA. I think
>> you could at least make one.
>>
>> On Wed, Aug 10, 2016 at 5:57 PM, Tobi Bosede  wrote:
>> > Thanks Sean, I agree with 100% that the math is math and dense vs
>> sparse is
>> > just a matter of representation. I was trying to convince a co-worker of
>> > this to no avail. Sending this email was mainly a sanity check.
>> >
>> > I think having an offset would be a great idea, although I am not sure
>> how
>> > to implement this. However, if anything should be done to rectify this
>> > issue, it should be done in the standardScaler, not vectorAssembler.
>> There
>> > should not be any forcing of vectorAssembler to produce only dense
>> vectors
>> > so as to avoid performance problems with data that does not fit in
>> memory.
>> > Furthermore, not every machine learning algo requires standardization.
>> > Instead, standardScaler should have withmean=True as default and should
>> > apply an offset if the vector is sparse, whereas there would be normal
>> > subtraction if the vector is dense. This way the default behavior of
>> > standardScaler will always be what is generally understood to be
>> > standardization, as opposed to people thinking they are standardizing
>> when
>> > they actually are not.
>> >
>> > Can anyone confirm whether there is a jira already?
>> >
>> > On Wed, Aug 10, 2016 at 10:58 AM, Sean Owen  wrote:
>> >>
>> >> Dense vs sparse is just a question of representation, so doesn't make
>> >> an operation on a vector more or less important as a result. You've
>> >> identified the reason that subtracting the mean can be undesirable: a
>> >> notionally billion-element sparse vector becomes too big to fit in
>> >> memory at once.
>> >>
>> >> I know this came up as a problem recently (I think there's a JIRA?)
>> >> because VectorAssembler will *sometimes* output a small dense vector
>> >> and sometimes output a small sparse vector based on how many zeroes
>> >> there are. But that's bad because then the StandardScaler can't
>> >> process the output at all. You can work on this if you're interested;
>> >> I think the proposal was to be able to force a dense representation
>> >> only in VectorAssembler. I don't know if that's the nature of the
>> >> problem you're hitting.
>> >>
>> >> It can be meaningful to only scale the dimension without centering it,
>> >> but it's not the same thing, no. The math is the math.
>> >>
>> >> This has come up a few times -- it's necessary to center a sparse
>> >> vector but prohibitive to do so. One idea I'd toyed with in the past
>> >> was to let a sparse vector have an 'offset' value applied to all
>> >> elements. That would let you shift all values while preserving a
>> >> sparse representation. I'm not sure if it's worth implementing but
>> >> would help this case.
>> >>
>> >>
>> >>
>> >>
>> >> On Wed, Aug 10, 2016 at 4:41 PM, Tobi Bosede 
>> wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I am doing some standardization using standardScaler on data from
>> >> > VectorAssembler which is represented as sparse vectors. I plan to
>> fit a
>> >> > regularized model.  However, standardScaler does not allow the mean
>> to
>> >> > be
>> >> > subtracted from sparse vectors. It will only divide by the standard
>> >> > deviation, which I understand is to keep the vector sparse. Thus I am
>> >> > trying
>> >> > to convert my sparse vectors into dense vectors, but this may not be
>> >> > worthwhile.
>> >> >
>> >> > So my questions are:
>> >> > Is subtracting the mean during standardization only important when
>> >> > working
>> >> > with dense vectors? Does it not matter for sparse vectors? Is just
>> >> > dividing
>> >> > by the standard deviation with 

Spark2 SBT Assembly

2016-08-10 Thread Efe Selcuk
Hi Spark folks,

With Spark 1.6 the 'assembly' target for sbt would build a fat jar with all
of the main Spark dependencies for building an application. Against Spark
2, that target is no longer building a spark assembly, just ones for e.g.
Flume and Kafka.

I'm not well versed with maven and sbt, so I don't know how to go about
figuring this out.

Is this intended? Or am I missing something?

Thanks.


Simulate serialization when running local

2016-08-10 Thread Ashic Mahtab
Hi,Is there a way to simulate "networked" spark when running local (i.e. 
master=local[4])? Ideally, some setting that'll ensure any "Task not 
serializable" errors are caught during local testing? I seem to vaguely 
remember something, but am having trouble pinpointing it.
Cheers,Ashic. 

Re: Standardization with Sparse Vectors

2016-08-10 Thread Nick Pentreath
Sean by 'offset' do you mean basically subtracting the mean but only from
the non-zero elements in each row?
On Wed, 10 Aug 2016 at 19:02, Sean Owen  wrote:

> Yeah I had thought the same, that perhaps it's fine to let the
> StandardScaler proceed, if it's explicitly asked to center, rather
> than refuse to. It's not really much more rope to let a user hang
> herself with, and, blocks legitimate usages (we ran into this last
> week and couldn't use StandardScaler as a result).
>
> I'm personally supportive of the change and don't see a JIRA. I think
> you could at least make one.
>
> On Wed, Aug 10, 2016 at 5:57 PM, Tobi Bosede  wrote:
> > Thanks Sean, I agree with 100% that the math is math and dense vs sparse
> is
> > just a matter of representation. I was trying to convince a co-worker of
> > this to no avail. Sending this email was mainly a sanity check.
> >
> > I think having an offset would be a great idea, although I am not sure
> how
> > to implement this. However, if anything should be done to rectify this
> > issue, it should be done in the standardScaler, not vectorAssembler.
> There
> > should not be any forcing of vectorAssembler to produce only dense
> vectors
> > so as to avoid performance problems with data that does not fit in
> memory.
> > Furthermore, not every machine learning algo requires standardization.
> > Instead, standardScaler should have withmean=True as default and should
> > apply an offset if the vector is sparse, whereas there would be normal
> > subtraction if the vector is dense. This way the default behavior of
> > standardScaler will always be what is generally understood to be
> > standardization, as opposed to people thinking they are standardizing
> when
> > they actually are not.
> >
> > Can anyone confirm whether there is a jira already?
> >
> > On Wed, Aug 10, 2016 at 10:58 AM, Sean Owen  wrote:
> >>
> >> Dense vs sparse is just a question of representation, so doesn't make
> >> an operation on a vector more or less important as a result. You've
> >> identified the reason that subtracting the mean can be undesirable: a
> >> notionally billion-element sparse vector becomes too big to fit in
> >> memory at once.
> >>
> >> I know this came up as a problem recently (I think there's a JIRA?)
> >> because VectorAssembler will *sometimes* output a small dense vector
> >> and sometimes output a small sparse vector based on how many zeroes
> >> there are. But that's bad because then the StandardScaler can't
> >> process the output at all. You can work on this if you're interested;
> >> I think the proposal was to be able to force a dense representation
> >> only in VectorAssembler. I don't know if that's the nature of the
> >> problem you're hitting.
> >>
> >> It can be meaningful to only scale the dimension without centering it,
> >> but it's not the same thing, no. The math is the math.
> >>
> >> This has come up a few times -- it's necessary to center a sparse
> >> vector but prohibitive to do so. One idea I'd toyed with in the past
> >> was to let a sparse vector have an 'offset' value applied to all
> >> elements. That would let you shift all values while preserving a
> >> sparse representation. I'm not sure if it's worth implementing but
> >> would help this case.
> >>
> >>
> >>
> >>
> >> On Wed, Aug 10, 2016 at 4:41 PM, Tobi Bosede 
> wrote:
> >> > Hi everyone,
> >> >
> >> > I am doing some standardization using standardScaler on data from
> >> > VectorAssembler which is represented as sparse vectors. I plan to fit
> a
> >> > regularized model.  However, standardScaler does not allow the mean to
> >> > be
> >> > subtracted from sparse vectors. It will only divide by the standard
> >> > deviation, which I understand is to keep the vector sparse. Thus I am
> >> > trying
> >> > to convert my sparse vectors into dense vectors, but this may not be
> >> > worthwhile.
> >> >
> >> > So my questions are:
> >> > Is subtracting the mean during standardization only important when
> >> > working
> >> > with dense vectors? Does it not matter for sparse vectors? Is just
> >> > dividing
> >> > by the standard deviation with sparse vectors equivalent to also
> >> > dividing by
> >> > standard deviation w and subtracting mean with dense vectors?
> >> >
> >> > Thank you,
> >> > Tobi
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Running spark Java on yarn cluster

2016-08-10 Thread atulp
Thanks Mandar.

Our need is to get sql queries from client and submit over spark cluster. We
don't want application to get submitted for each query. We want executors to
get shared across multiple queries as we would cache rdds which would get
used across queries. 

If I am correct, spark context corresponds to an application. And
application can be used in interactive mode too. So was thinking to create
server which will have spark context pointing to yarn cluster and use this
context to run multiple queries over period. 

Can you suggest way to achieve the requirement of interactive queries and
reuse of executors across queries.

Thanks,
Atul



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-Java-on-yarn-cluster-tp27504p27507.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Standardization with Sparse Vectors

2016-08-10 Thread Sean Owen
Yeah I had thought the same, that perhaps it's fine to let the
StandardScaler proceed, if it's explicitly asked to center, rather
than refuse to. It's not really much more rope to let a user hang
herself with, and, blocks legitimate usages (we ran into this last
week and couldn't use StandardScaler as a result).

I'm personally supportive of the change and don't see a JIRA. I think
you could at least make one.

On Wed, Aug 10, 2016 at 5:57 PM, Tobi Bosede  wrote:
> Thanks Sean, I agree with 100% that the math is math and dense vs sparse is
> just a matter of representation. I was trying to convince a co-worker of
> this to no avail. Sending this email was mainly a sanity check.
>
> I think having an offset would be a great idea, although I am not sure how
> to implement this. However, if anything should be done to rectify this
> issue, it should be done in the standardScaler, not vectorAssembler. There
> should not be any forcing of vectorAssembler to produce only dense vectors
> so as to avoid performance problems with data that does not fit in memory.
> Furthermore, not every machine learning algo requires standardization.
> Instead, standardScaler should have withmean=True as default and should
> apply an offset if the vector is sparse, whereas there would be normal
> subtraction if the vector is dense. This way the default behavior of
> standardScaler will always be what is generally understood to be
> standardization, as opposed to people thinking they are standardizing when
> they actually are not.
>
> Can anyone confirm whether there is a jira already?
>
> On Wed, Aug 10, 2016 at 10:58 AM, Sean Owen  wrote:
>>
>> Dense vs sparse is just a question of representation, so doesn't make
>> an operation on a vector more or less important as a result. You've
>> identified the reason that subtracting the mean can be undesirable: a
>> notionally billion-element sparse vector becomes too big to fit in
>> memory at once.
>>
>> I know this came up as a problem recently (I think there's a JIRA?)
>> because VectorAssembler will *sometimes* output a small dense vector
>> and sometimes output a small sparse vector based on how many zeroes
>> there are. But that's bad because then the StandardScaler can't
>> process the output at all. You can work on this if you're interested;
>> I think the proposal was to be able to force a dense representation
>> only in VectorAssembler. I don't know if that's the nature of the
>> problem you're hitting.
>>
>> It can be meaningful to only scale the dimension without centering it,
>> but it's not the same thing, no. The math is the math.
>>
>> This has come up a few times -- it's necessary to center a sparse
>> vector but prohibitive to do so. One idea I'd toyed with in the past
>> was to let a sparse vector have an 'offset' value applied to all
>> elements. That would let you shift all values while preserving a
>> sparse representation. I'm not sure if it's worth implementing but
>> would help this case.
>>
>>
>>
>>
>> On Wed, Aug 10, 2016 at 4:41 PM, Tobi Bosede  wrote:
>> > Hi everyone,
>> >
>> > I am doing some standardization using standardScaler on data from
>> > VectorAssembler which is represented as sparse vectors. I plan to fit a
>> > regularized model.  However, standardScaler does not allow the mean to
>> > be
>> > subtracted from sparse vectors. It will only divide by the standard
>> > deviation, which I understand is to keep the vector sparse. Thus I am
>> > trying
>> > to convert my sparse vectors into dense vectors, but this may not be
>> > worthwhile.
>> >
>> > So my questions are:
>> > Is subtracting the mean during standardization only important when
>> > working
>> > with dense vectors? Does it not matter for sparse vectors? Is just
>> > dividing
>> > by the standard deviation with sparse vectors equivalent to also
>> > dividing by
>> > standard deviation w and subtracting mean with dense vectors?
>> >
>> > Thank you,
>> > Tobi
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Changing Spark configuration midway through application.

2016-08-10 Thread Andrew Ehrlich
If you're changing properties for the SparkContext, then I believe you will
have to start a new SparkContext with the new properties.

On Wed, Aug 10, 2016 at 8:47 AM, Jestin Ma 
wrote:

> If I run an application, for example with 3 joins:
>
> [join 1]
> [join 2]
> [join 3]
>
> [final join and save to disk]
>
> Could I change Spark properties in between each join?
>
> [join 1]
> [change properties]
> [join 2]
> [change properties]
> ...
>
> Or would I have to create a separate application with different properties
> for each of the three joins and also save each intermediate join result to
> disk?
>
> Jestin
>


Re: Standardization with Sparse Vectors

2016-08-10 Thread Tobi Bosede
Thanks Sean, I agree with 100% that the math is math and dense vs sparse is
just a matter of representation. I was trying to convince a co-worker of
this to no avail. Sending this email was mainly a sanity check.

I think having an offset would be a great idea, although I am not sure how
to implement this. However, if anything should be done to rectify this
issue, it should be done in the standardScaler, not vectorAssembler. There
should not be any forcing of vectorAssembler to produce only dense vectors
so as to avoid performance problems with data that does not fit in memory.
Furthermore, not every machine learning algo requires standardization.
Instead, standardScaler should have withmean=True as default and should
apply an offset if the vector is sparse, whereas there would be normal
subtraction if the vector is dense. This way the default behavior of
standardScaler will always be what is generally understood to be
standardization, as opposed to people thinking they are standardizing when
they actually are not.

Can anyone confirm whether there is a jira already?

On Wed, Aug 10, 2016 at 10:58 AM, Sean Owen  wrote:

> Dense vs sparse is just a question of representation, so doesn't make
> an operation on a vector more or less important as a result. You've
> identified the reason that subtracting the mean can be undesirable: a
> notionally billion-element sparse vector becomes too big to fit in
> memory at once.
>
> I know this came up as a problem recently (I think there's a JIRA?)
> because VectorAssembler will *sometimes* output a small dense vector
> and sometimes output a small sparse vector based on how many zeroes
> there are. But that's bad because then the StandardScaler can't
> process the output at all. You can work on this if you're interested;
> I think the proposal was to be able to force a dense representation
> only in VectorAssembler. I don't know if that's the nature of the
> problem you're hitting.
>
> It can be meaningful to only scale the dimension without centering it,
> but it's not the same thing, no. The math is the math.
>
> This has come up a few times -- it's necessary to center a sparse
> vector but prohibitive to do so. One idea I'd toyed with in the past
> was to let a sparse vector have an 'offset' value applied to all
> elements. That would let you shift all values while preserving a
> sparse representation. I'm not sure if it's worth implementing but
> would help this case.
>
>
>
>
> On Wed, Aug 10, 2016 at 4:41 PM, Tobi Bosede  wrote:
> > Hi everyone,
> >
> > I am doing some standardization using standardScaler on data from
> > VectorAssembler which is represented as sparse vectors. I plan to fit a
> > regularized model.  However, standardScaler does not allow the mean to be
> > subtracted from sparse vectors. It will only divide by the standard
> > deviation, which I understand is to keep the vector sparse. Thus I am
> trying
> > to convert my sparse vectors into dense vectors, but this may not be
> > worthwhile.
> >
> > So my questions are:
> > Is subtracting the mean during standardization only important when
> working
> > with dense vectors? Does it not matter for sparse vectors? Is just
> dividing
> > by the standard deviation with sparse vectors equivalent to also
> dividing by
> > standard deviation w and subtracting mean with dense vectors?
> >
> > Thank you,
> > Tobi
>


Re: unsubscribe

2016-08-10 Thread Matei Zaharia
To unsubscribe, please send an email to user-unsubscr...@spark.apache.org from 
the address you're subscribed from.

Matei

> On Aug 10, 2016, at 12:48 PM, Sohil Jain  wrote:
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2016-08-10 Thread Sohil Jain



Re: Standardization with Sparse Vectors

2016-08-10 Thread Sean Owen
Dense vs sparse is just a question of representation, so doesn't make
an operation on a vector more or less important as a result. You've
identified the reason that subtracting the mean can be undesirable: a
notionally billion-element sparse vector becomes too big to fit in
memory at once.

I know this came up as a problem recently (I think there's a JIRA?)
because VectorAssembler will *sometimes* output a small dense vector
and sometimes output a small sparse vector based on how many zeroes
there are. But that's bad because then the StandardScaler can't
process the output at all. You can work on this if you're interested;
I think the proposal was to be able to force a dense representation
only in VectorAssembler. I don't know if that's the nature of the
problem you're hitting.

It can be meaningful to only scale the dimension without centering it,
but it's not the same thing, no. The math is the math.

This has come up a few times -- it's necessary to center a sparse
vector but prohibitive to do so. One idea I'd toyed with in the past
was to let a sparse vector have an 'offset' value applied to all
elements. That would let you shift all values while preserving a
sparse representation. I'm not sure if it's worth implementing but
would help this case.




On Wed, Aug 10, 2016 at 4:41 PM, Tobi Bosede  wrote:
> Hi everyone,
>
> I am doing some standardization using standardScaler on data from
> VectorAssembler which is represented as sparse vectors. I plan to fit a
> regularized model.  However, standardScaler does not allow the mean to be
> subtracted from sparse vectors. It will only divide by the standard
> deviation, which I understand is to keep the vector sparse. Thus I am trying
> to convert my sparse vectors into dense vectors, but this may not be
> worthwhile.
>
> So my questions are:
> Is subtracting the mean during standardization only important when working
> with dense vectors? Does it not matter for sparse vectors? Is just dividing
> by the standard deviation with sparse vectors equivalent to also dividing by
> standard deviation w and subtracting mean with dense vectors?
>
> Thank you,
> Tobi

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Changing Spark configuration midway through application.

2016-08-10 Thread Jestin Ma
If I run an application, for example with 3 joins:

[join 1]
[join 2]
[join 3]

[final join and save to disk]

Could I change Spark properties in between each join?

[join 1]
[change properties]
[join 2]
[change properties]
...

Or would I have to create a separate application with different properties
for each of the three joins and also save each intermediate join result to
disk?

Jestin


Re: Spark 2.0.0 - Apply schema on few columns of dataset

2016-08-10 Thread Aseem Bansal
To  those interested I changed the data frame to RDD. Then I created a data
frame. That has an option of giving a schema.

But probably someone should improve how to use the as function.

On Mon, Aug 8, 2016 at 1:05 PM, Ewan Leith 
wrote:

> Hmm I’m not sure, I don’t use the Java API sorry
>
>
>
> The simplest way to work around it would be to read the csv as a text file
> using sparkContext textFile, split each row based on a comma, then convert
> it to a dataset afterwards.
>
>
>
> *From:* Aseem Bansal [mailto:asmbans...@gmail.com]
> *Sent:* 08 August 2016 07:37
> *To:* Ewan Leith 
> *Cc:* user 
> *Subject:* Re: Spark 2.0.0 - Apply schema on few columns of dataset
>
>
>
> Hi Ewan
>
>
>
> The .as function take a single encoder or a single string or a single
> Symbol. I have like more than 10 columns so I cannot use the tuple
> functions. Passing using bracket does not work.
>
>
>
> On Mon, Aug 8, 2016 at 11:26 AM, Ewan Leith 
> wrote:
>
> Looking at the encoders api documentation at
>
> http://spark.apache.org/docs/latest/api/java/
>
> == Java == Encoders are specified by calling static methods on Encoders
> 
> .
>
> List data = Arrays.asList("abc", "abc", "xyz"); Dataset ds
> = context.createDataset(data, Encoders.STRING());
>
> I think you should be calling
>
> .as((Encoders.STRING(), Encoders.STRING()))
>
> or similar
>
> Ewan
>
>
>
> On 8 Aug 2016 06:10, Aseem Bansal  wrote:
>
> Hi All
>
>
>
> Has anyone done this with Java API?
>
>
>
> On Fri, Aug 5, 2016 at 5:36 PM, Aseem Bansal  wrote:
>
> I need to use few columns out of a csv. But as there is no option to read
> few columns out of csv so
>
>  1. I am reading the whole CSV using SparkSession.csv()
>
>  2.  selecting few of the columns using DataFrame.select()
>
>  3. applying schema using the .as() function of Dataset.  I tried to
> extent org.apache.spark.sql.Encoder as the input for as function
>
>
>
> But I am getting the following exception
>
>
>
> Exception in thread "main" java.lang.RuntimeException: Only expression
> encoders are supported today
>
>
>
> So my questions are -
>
> 1. Is it possible to read few columns instead of whole CSV? I cannot
> change the CSV as that is upstream data
>
> 2. How do I apply schema to few columns if I cannot write my encoder?
>
>
>
>
>
>
>


Standardization with Sparse Vectors

2016-08-10 Thread Tobi Bosede
Hi everyone,

I am doing some standardization using standardScaler on data from
VectorAssembler which is represented as sparse vectors. I plan to fit a
regularized model.  However, standardScaler does not allow the mean to be
subtracted from sparse vectors. It will only divide by the standard
deviation, which I understand is to keep the vector sparse. Thus I am
trying to convert my sparse vectors into dense vectors, but this may not be
worthwhile.

So my questions are:
Is subtracting the mean during standardization only important when working
with dense vectors? Does it not matter for sparse vectors? Is just dividing
by the standard deviation with sparse vectors equivalent to also dividing
by standard deviation w and subtracting mean with dense vectors?

Thank you,
Tobi


Re: Spark 1.6.2 can read hive tables created with sqoop, but Spark 2.0.0 cannot

2016-08-10 Thread cdecleene
Using the scala api instead of the python api yields the same results.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-6-2-can-read-hive-tables-created-with-sqoop-but-Spark-2-0-0-cannot-tp27502p27506.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL Parallelism - While reading from Oracle

2016-08-10 Thread @Sanjiv Singh
Use it 
You can set up all the properties (driver,partitionColumn, lowerBound,
upperBound, numPartitions) you should start with the driver at first.

Now you have the maximum id so you can use it for the upperBound parameter.
The numPartitions now based on your table's dimensions and your actual
system what you use. Now with this snippet you read a database table to a
dataframe with Spark.

df = sqlContext.read.format('jdbc').options(

url="jdbc:mysql://ip-address:3306/sometable?user=username=password",
dbtable=*sometable*,
driver="com.mysql.jdbc.Driver",
*partitionColumn*="id",
*lowerBound *= 1,
*upperBound *= maxId,
*numPartitions *= 100
).load()



Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Wed, Aug 10, 2016 at 6:35 AM, Siva A  wrote:

> Hi Team,
>
> How do we increase the parallelism in Spark SQL.
> In Spark Core, we can re-partition or pass extra arguments part of the
> transformation.
>
> I am trying the below example,
>
> val df1 = sqlContext.read.format("jdbc").options(Map(...)).load
> val df2= df1.cache
> val df2.count
>
> Here count operation using only one task. I couldn't increase the
> parallelism.
> Thanks in advance
>
> Thanks
> Siva
>


Use cases around image/video processing in spark

2016-08-10 Thread Deepak Sharma
Hi
If anyone is using or knows about github repo that can help me get started
with image and video processing using spark.
The images/videos will be stored in s3 and i am planning to use s3 with
Spark.
In this case , how will spark achieve distributed processing?
Any code base or references is really appreciated.

-- 
Thanks
Deepak


Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Cody Koeninger
zookeeper.connect is irrelevant.

Did you look at your executor logs?
Did you look at the UI for the (probably failed) stages?
Are you actually producing data into all of the kafka partitions?
If you use kafka-simple-consumer-shell.sh to read that partition, do
you get any data?

On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi
 wrote:
> Hi Cody,
>
> Just added zookeeper.connect to kafkaparams . It couldn't come out of batch
> window. Other batches are queued. I could see foreach(println) of dataFrame
> printing one of partition's data and not the other.
> Couldn't see any  errors from log.
>
> val brokers = "localhost:9092,localhost:9093"
> val sparkConf = new
> SparkConf().setAppName("KafkaWeather").setMaster("local[5]")//spark://localhost:7077
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(1))
> val kafkaParams = Map[String,
> String]("bootstrap.servers"->"localhost:9093,localhost:9092","auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","group.id"->"xyz")
> val topics = "test"
> val topicsSet = topics.split(",").toSet
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> import sqlContext.implicits._
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
> println("Failed to get data from Kafka. Please check that the Kafka
> producer is streaming data.")
> System.exit(-1)
>   }
>
>val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
>   dataframe.foreach(println)
>  println( "$$$", dataframe.count())
>   })
> Logs:
>
> 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
> 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera resolves to
> a loopback address: 127.0.0.1; using 192.168.126.131 instead (on interface
> eth1)
> 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> another address
> 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
> 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera
> 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(cloudera);
> users with modify permissions: Set(cloudera)
> 16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver' on
> port 45031.
> 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
> 16/08/10 18:16:26 INFO Remoting: Starting remoting
> 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
> 16/08/10 18:16:26 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 56638.
> 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
> 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
> 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
> 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity 511.5
> MB
> 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
> 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on port
> 4040.
> 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
> http://192.168.126.131:4040
> 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-2b2a4e68-2952-41b0-a11b-f07860682749
> 16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
> 16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
> server' on port 59491.
> 16/08/10 18:16:28 INFO SparkContext: Added JAR
> file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
> http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
> with timestamp 1470833188094
> 16/08/10 18:16:29 INFO SparkContext: Added JAR
> file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
> http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
> timestamp 1470833189531
> 16/08/10 18:16:29 INFO SparkContext: Added JAR
> file:/home/cloudera/Downloads/boa/pain.jar at
> http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533
> 16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host
> localhost
> 16/08/10 18:16:29 INFO Utils: Successfully started service
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55361.
> 16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on 55361
> 16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register BlockManager
> 16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block manager
> localhost:55361 with 511.5 MB RAM, 

Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Diwakar Dhanuskodi
Hi Cody,

Just added zookeeper.connect to kafkaparams . It couldn't come out of batch
window. Other batches are queued. I could see foreach(println) of dataFrame
printing one of partition's data and not the other.
Couldn't see any  errors from log.

val brokers = "localhost:9092,localhost:9093"
val sparkConf = new
SparkConf().setAppName("KafkaWeather").setMaster("local[5]")//spark://localhost:7077
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(1))
val kafkaParams = Map[String,
String]("bootstrap.servers"->"localhost:9093,localhost:9092","auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","
group.id"->"xyz")
val topics = "test"
val topicsSet = topics.split(",").toSet
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)
val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
import sqlContext.implicits._
messages.foreachRDD(rdd => {
  if (rdd.isEmpty()) {
println("Failed to get data from Kafka. Please check that the Kafka
producer is streaming data.")
System.exit(-1)
  }

   val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
  dataframe.foreach(println)
 println( "$$$", dataframe.count())
  })
Logs:

16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera resolves
to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
interface eth1)
16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera
16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(cloudera);
users with modify permissions: Set(cloudera)
16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver' on
port 45031.
16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
16/08/10 18:16:26 INFO Remoting: Starting remoting
16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
16/08/10 18:16:26 INFO Utils: Successfully started service
'sparkDriverActorSystem' on port 56638.
16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity 511.5
MB
16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
http://192.168.126.131:4040
16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-2b2a4e68-2952-41b0-a11b-f07860682749
16/08/10 18:16:27 INFO HttpServer: Starting HTTP Server
16/08/10 18:16:27 INFO Utils: Successfully started service 'HTTP file
server' on port 59491.
16/08/10 18:16:28 INFO SparkContext: Added JAR
file:/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar at
http://192.168.126.131:59491/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
with timestamp 1470833188094
16/08/10 18:16:29 INFO SparkContext: Added JAR
file:/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar at
http://192.168.126.131:59491/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
timestamp 1470833189531
16/08/10 18:16:29 INFO SparkContext: Added JAR
file:/home/cloudera/Downloads/boa/pain.jar at
http://192.168.126.131:59491/jars/pain.jar with timestamp 1470833189533
16/08/10 18:16:29 INFO Executor: Starting executor ID driver on host
localhost
16/08/10 18:16:29 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 55361.
16/08/10 18:16:29 INFO NettyBlockTransferService: Server created on 55361
16/08/10 18:16:29 INFO BlockManagerMaster: Trying to register BlockManager
16/08/10 18:16:29 INFO BlockManagerMasterEndpoint: Registering block
manager localhost:55361 with 511.5 MB RAM, BlockManagerId(driver,
localhost, 55361)
16/08/10 18:16:29 INFO BlockManagerMaster: Registered BlockManager
16/08/10 18:16:30 INFO VerifiableProperties: Verifying properties
16/08/10 18:16:30 INFO VerifiableProperties: Property auto.offset.reset is
overridden to smallest
16/08/10 18:16:30 INFO VerifiableProperties: Property group.id is
overridden to xyz
16/08/10 18:16:30 INFO VerifiableProperties: Property zookeeper.connect is
overridden to localhost:2181
16/08/10 18:16:31 INFO ForEachDStream: metadataCleanupDelay = -1
16/08/10 18:16:31 INFO 

UNSUBSCRIBE

2016-08-10 Thread Sudhanshu Janghel



Re: UNSUBSCRIBE

2016-08-10 Thread Nicholas Chammas
Please follow the links here to unsubscribe:
http://spark.apache.org/community.html


On Tue, Aug 9, 2016 at 5:14 PM abhishek singh  wrote:

>
>


Re: UNSUBSCRIBE

2016-08-10 Thread Nicholas Chammas
Please follow the links here to unsubscribe:
http://spark.apache.org/community.html


On Tue, Aug 9, 2016 at 8:03 PM James Ding  wrote:

>
>


Re: UNSUBSCRIBE

2016-08-10 Thread Nicholas Chammas
Please follow the links here to unsubscribe:
http://spark.apache.org/community.html


On Wed, Aug 10, 2016 at 2:46 AM Martin Somers  wrote:

>
>
> --
> M
>


Re: Unsubscribe

2016-08-10 Thread Nicholas Chammas
Please follow the links here to unsubscribe:
http://spark.apache.org/community.html

On Tue, Aug 9, 2016 at 3:02 PM Hogancamp, Aaron <
aaron.t.hoganc...@leidos.com> wrote:

> Unsubscribe.
>
>
>
> Thanks,
>
>
>
> Aaron Hogancamp
>
> Data Scientist
>
>
>


Re: Unsubscribe.

2016-08-10 Thread Nicholas Chammas
Please follow the links here to unsubscribe:
http://spark.apache.org/community.html

On Tue, Aug 9, 2016 at 3:05 PM Martin Somers  wrote:

> Unsubscribe.
>
> Thanks
> M
>


suggestion needed on FileInput Path- Spark Streaming

2016-08-10 Thread mdkhajaasmath

what is best practice while processing files from s3 bucket in spark file 
streaming ?? Like I keep on getting files in s3 path, have to process those in 
batch but while processing some other files might come up. In this steaming 
job, should I have to move files after end of our streaming batch to other 
location or is there any other way to do it?

Let's say, batch interval is 15 minutes, and current batch takes more than 15 
minutes.. batch gets started irrespective of the other batch being processed? 
Is there a way that I can control to hold on current batch if other batch is 
under processing ??

Thanks?
Asmath
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-10 Thread Sean Owen
Scaling can mean scaling factors up or down so that they're all on a
comparable scale. It certainly changes the sum of squared errors, but,
you can't compare this metric across scaled and unscaled data, exactly
because one is on a totally different scale and will have quite
different absolute values. If that's the motivation here, then, no
it's misleading.

You probably do want to scale factors because the underlying distance
metric (Euclidean) will treat all dimensions equally. If they're on
very different scales, dimensions that happen to have larger units
will dominate.

On Wed, Aug 10, 2016 at 12:46 PM, Rohit Chaddha
 wrote:
> Hi Sean,
>
> So basically I am trying to cluster a number of elements (its a domain
> object called PItem) based on a the quality factors of these items.
> These elements have 112 quality factors each.
>
> Now the issue is that when I am scaling the factors using StandardScaler I
> get a Sum of Squared Errors = 13300
> When I don't use scaling the Sum of Squared Errors = 5
>
> I was always of the opinion that different factors being on different scale
> should always be normalized, but I am confused based on the results above
> and I am wondering what factors should be removed to get a meaningful result
> (may be with 5% less accuracy)
>
> Will appreciate any help here.
>
> -Rohit
>
> On Tue, Aug 9, 2016 at 12:55 PM, Sean Owen  wrote:
>>
>> Fewer features doesn't necessarily mean better predictions, because indeed
>> you are subtracting data. It might, because when done well you subtract more
>> noise than signal. It is usually done to make data sets smaller or more
>> tractable or to improve explainability.
>>
>> But you have an unsupervised clustering problem where talking about
>> feature importance doesnt make as much sense. Important to what? There is no
>> target variable.
>>
>> PCA will not 'improve' clustering per se but can make it faster.
>> You may want to specify what you are actually trying to optimize.
>>
>>
>> On Tue, Aug 9, 2016, 03:23 Rohit Chaddha 
>> wrote:
>>>
>>> I would rather have less features to make better inferences on the data
>>> based on the smaller number of factors,
>>> Any suggestions Sean ?
>>>
>>> On Mon, Aug 8, 2016 at 11:37 PM, Sean Owen  wrote:

 Yes, that's exactly what PCA is for as Sivakumaran noted. Do you
 really want to select features or just obtain a lower-dimensional
 representation of them, with less redundancy?

 On Mon, Aug 8, 2016 at 4:10 PM, Tony Lane 
 wrote:
 > There must be an algorithmic way to figure out which of these factors
 > contribute the least and remove them in the analysis.
 > I am hoping same one can throw some insight on this.
 >
 > On Mon, Aug 8, 2016 at 7:41 PM, Sivakumaran S 
 > wrote:
 >>
 >> Not an expert here, but the first step would be devote some time and
 >> identify which of these 112 factors are actually causative. Some
 >> domain
 >> knowledge of the data may be required. Then, you can start of with
 >> PCA.
 >>
 >> HTH,
 >>
 >> Regards,
 >>
 >> Sivakumaran S
 >>
 >> On 08-Aug-2016, at 3:01 PM, Tony Lane  wrote:
 >>
 >> Great question Rohit.  I am in my early days of ML as well and it
 >> would be
 >> great if we get some idea on this from other experts on this group.
 >>
 >> I know we can reduce dimensions by using PCA, but i think that does
 >> not
 >> allow us to understand which factors from the original are we using
 >> in the
 >> end.
 >>
 >> - Tony L.
 >>
 >> On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha
 >> 
 >> wrote:
 >>>
 >>>
 >>> I have a data-set where each data-point has 112 factors.
 >>>
 >>> I want to remove the factors which are not relevant, and say reduce
 >>> to 20
 >>> factors out of these 112 and then do clustering of data-points using
 >>> these
 >>> 20 factors.
 >>>
 >>> How do I do these and how do I figure out which of the 20 factors
 >>> are
 >>> useful for analysis.
 >>>
 >>> I see SVD and PCA implementations, but I am not sure if these give
 >>> which
 >>> elements are removed and which are remaining.
 >>>
 >>> Can someone please help me understand what to do here
 >>>
 >>> thanks,
 >>> -Rohit
 >>>
 >>
 >>
 >
>>>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Cody Koeninger
Those logs you're posting are from right after your failure, they don't
include what actually went wrong when attempting to read json. Look at your
logs more carefully.
On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi" 
wrote:

> Hi Siva,
>
> With below code, it is stuck up at
> * sqlContext.read.json(rdd.map(_._2)).toDF()*
> There are two partitions in  topic.
> I am running spark 1.6.2
>
> val topics = "topic.name"
> val brokers = "localhost:9092"
> val topicsSet = topics.split(",").toSet
> val sparkConf = new SparkConf().setAppName("KafkaWeather").setMaster("
> local[5]")//spark://localhost:7077
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(60))
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "
> group.id" -> "xyz","auto.offset.reset"->"smallest")
> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
> println("Failed to get data from Kafka. Please check that the Kafka
> producer is streaming data.")
> System.exit(-1)
>   }
>   val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.
> sparkContext)
>   *val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()*
>   dataframe.foreach(println)
>
>   })
>
>
> Below are logs,
>
> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at
> todelete.scala:34) failed in 110.776 s
> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event SparkListenerStageCompleted(
> org.apache.spark.scheduler.StageInfo@6d8ff688)
> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event 
> SparkListenerJobEnd(0,1470812271971,JobFailed(org.apache.spark.SparkException:
> Job 0 cancelled because SparkContext was shut down))
> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint:
> MapOutputTrackerMasterEndpoint stopped!
> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared
> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped
> 16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
> 16/08/10 12:27:51 INFO OutputCommitCoordinator$
> OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
> 16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator:
> Shutting down remote daemon.
> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
> daemon shut down; proceeding with flushing remote transports.
> 16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext
> 16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called
> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-
> 07b9c1b6-01db-45b5-9302-d2f67f7c490e
> 16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator:
> Remoting shut down.
> 16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
> /tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2
> [cloudera@quickstart ~]$ spark-submit --master local[3] --class
> com.boa.poc.todelete --jars /home/cloudera/lib/spark-
> streaming-kafka-assembly_2.10-1.6.2.jar,/home/cloudera/lib/
> spark-assembly-1.6.2-hadoop2.6.0.jar /home/cloudera/Downloads/boa/pain.jar
> > log.txt
> Using Spark's default log4j profile: org/apache/spark/log4j-
> defaults.properties
> 16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2
> 16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera resolves
> to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
> interface eth1)
> 16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> another address
> 16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: cloudera
> 16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to: cloudera
> 16/08/10 12:27:59 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(cloudera);
> users with modify permissions: Set(cloudera)
> 16/08/10 12:28:00 INFO Utils: Successfully started service 'sparkDriver'
> on port 42140.
> 16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started
> 16/08/10 12:28:01 INFO Remoting: Starting remoting
> 16/08/10 12:28:01 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328]
> 16/08/10 12:28:01 INFO Utils: Successfully started service
> 'sparkDriverActorSystem' on port 53328.
> 16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker
> 16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster
> 16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at
> /tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2
> 16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity

Re: Machine learning question (suing spark)- removing redundant factors while doing clustering

2016-08-10 Thread Rohit Chaddha
Hi Sean,

So basically I am trying to cluster a number of elements (its a domain
object called PItem) based on a the quality factors of these items.
These elements have 112 quality factors each.

Now the issue is that when I am scaling the factors using StandardScaler I
get a Sum of Squared Errors = 13300
When I don't use scaling the Sum of Squared Errors = 5

I was always of the opinion that different factors being on different scale
should always be normalized, but I am confused based on the results above
and I am wondering what factors should be removed to get a meaningful
result (may be with 5% less accuracy)

Will appreciate any help here.

-Rohit

On Tue, Aug 9, 2016 at 12:55 PM, Sean Owen  wrote:

> Fewer features doesn't necessarily mean better predictions, because indeed
> you are subtracting data. It might, because when done well you subtract
> more noise than signal. It is usually done to make data sets smaller or
> more tractable or to improve explainability.
>
> But you have an unsupervised clustering problem where talking about
> feature importance doesnt make as much sense. Important to what? There is
> no target variable.
>
> PCA will not 'improve' clustering per se but can make it faster.
> You may want to specify what you are actually trying to optimize.
>
>
> On Tue, Aug 9, 2016, 03:23 Rohit Chaddha 
> wrote:
>
>> I would rather have less features to make better inferences on the data
>> based on the smaller number of factors,
>> Any suggestions Sean ?
>>
>> On Mon, Aug 8, 2016 at 11:37 PM, Sean Owen  wrote:
>>
>>> Yes, that's exactly what PCA is for as Sivakumaran noted. Do you
>>> really want to select features or just obtain a lower-dimensional
>>> representation of them, with less redundancy?
>>>
>>> On Mon, Aug 8, 2016 at 4:10 PM, Tony Lane 
>>> wrote:
>>> > There must be an algorithmic way to figure out which of these factors
>>> > contribute the least and remove them in the analysis.
>>> > I am hoping same one can throw some insight on this.
>>> >
>>> > On Mon, Aug 8, 2016 at 7:41 PM, Sivakumaran S 
>>> wrote:
>>> >>
>>> >> Not an expert here, but the first step would be devote some time and
>>> >> identify which of these 112 factors are actually causative. Some
>>> domain
>>> >> knowledge of the data may be required. Then, you can start of with
>>> PCA.
>>> >>
>>> >> HTH,
>>> >>
>>> >> Regards,
>>> >>
>>> >> Sivakumaran S
>>> >>
>>> >> On 08-Aug-2016, at 3:01 PM, Tony Lane  wrote:
>>> >>
>>> >> Great question Rohit.  I am in my early days of ML as well and it
>>> would be
>>> >> great if we get some idea on this from other experts on this group.
>>> >>
>>> >> I know we can reduce dimensions by using PCA, but i think that does
>>> not
>>> >> allow us to understand which factors from the original are we using
>>> in the
>>> >> end.
>>> >>
>>> >> - Tony L.
>>> >>
>>> >> On Mon, Aug 8, 2016 at 5:12 PM, Rohit Chaddha <
>>> rohitchaddha1...@gmail.com>
>>> >> wrote:
>>> >>>
>>> >>>
>>> >>> I have a data-set where each data-point has 112 factors.
>>> >>>
>>> >>> I want to remove the factors which are not relevant, and say reduce
>>> to 20
>>> >>> factors out of these 112 and then do clustering of data-points using
>>> these
>>> >>> 20 factors.
>>> >>>
>>> >>> How do I do these and how do I figure out which of the 20 factors are
>>> >>> useful for analysis.
>>> >>>
>>> >>> I see SVD and PCA implementations, but I am not sure if these give
>>> which
>>> >>> elements are removed and which are remaining.
>>> >>>
>>> >>> Can someone please help me understand what to do here
>>> >>>
>>> >>> thanks,
>>> >>> -Rohit
>>> >>>
>>> >>
>>> >>
>>> >
>>>
>>
>>


Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Sivakumaran S
I am testing with one partition now. I am using Kafka 0.9 and Spark 1.6.1 
(Scala 2.11). Just start with one topic first and then add more. I am not 
partitioning the topic.

HTH, 

Regards,

Sivakumaran

> On 10-Aug-2016, at 5:56 AM, Diwakar Dhanuskodi  
> wrote:
> 
> Hi Siva,
> 
> Does topic  has partitions? which version of Spark you are using?
> 
> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S  > wrote:
> Hi,
> 
> Here is a working example I did.
> 
> HTH
> 
> Regards,
> 
> Sivakumaran S
> 
> val topics = "test"
> val brokers = "localhost:9092"
> val topicsSet = topics.split(",").toSet
> val sparkConf = new 
> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local") 
> //spark://localhost:7077 <>
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(60))
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topicsSet)
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
> println("Failed to get data from Kafka. Please check that the Kafka 
> producer is streaming data.")
> System.exit(-1)
>   }
>   val sqlContext = 
> org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
>   //Process your DF as required here on
> }
> 
> 
> 
>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi > > wrote:
>> 
>> Hi,
>> 
>> I am reading json messages from kafka . Topics has 2 partitions. When 
>> running streaming job using spark-submit, I could see that  val dataFrame = 
>> sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing 
>> something wrong here. Below is code .This environment is cloudera sandbox 
>> env. Same issue in hadoop production cluster mode except that it is 
>> restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka 
>> 0.10 and  Spark 1.4.
>> 
>> val kafkaParams = 
>> Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", 
>> "group.id " -> "xyz","auto.offset.reset"->"smallest")
>> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
>> val ssc = new StreamingContext(conf, Seconds(1))
>> 
>> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>> 
>> val topics = Set("gpp.minf")
>> val kafkaStream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>> 
>> kafkaStream.foreachRDD(
>>   rdd => {
>> if (rdd.count > 0){
>> val dataFrame = sqlContext.read.json(rdd.map(_._2)) 
>>dataFrame.printSchema()
>> //dataFrame.foreach(println)
>> }
>> }
> 
> 



Spark SQL Parallelism - While reading from Oracle

2016-08-10 Thread Siva A
Hi Team,

How do we increase the parallelism in Spark SQL.
In Spark Core, we can re-partition or pass extra arguments part of the
transformation.

I am trying the below example,

val df1 = sqlContext.read.format("jdbc").options(Map(...)).load
val df2= df1.cache
val df2.count

Here count operation using only one task. I couldn't increase the
parallelism.
Thanks in advance

Thanks
Siva


Running spark Java on yarn cluster

2016-08-10 Thread atulp
Hi Team,

I am new to spark and writing my first program. I have written sample
program with spark master as local. To execute spark over local yarn what
should be value of spark.master property? Can I point to remote yarn
cluster? I would like to execute  this as a java application and not submit
using spark-submit.

Main objective is to create a service which can execute spark sql queries
over yarn cluster.


Thanks in advance.

Regards,
Atul
 

code snippet as below

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class TestSpark {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Java Spark 
Sql
Example").config("spark.master","local").getOrCreate();

Dataset df = 
spark.read().json("/spark/sparkSql/employee.json");
System.out.println("Data");
df.cache();
df.show();
//  JavaSparkContext sc = new JavaSparkContext(new
SparkConf().setAppName("SparkJoins").setMaster("yarn-client"));

}
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-Java-on-yarn-cluster-tp27504.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Running spark Java on yarn cluster

2016-08-10 Thread Atul Phalke
Hi Team,

I am new to spark and writing my first program. I have written sample
program with spark master as local. To execute spark over local yarn what
should be value of spark.master property? Can I point to remote yarn
cluster? I would like to execute  this as a java application and not submit
using spark-submit.

Main objective is to create a service which can execute spark sql queries
over yarn cluster.


Thanks in advance.

Regards,
Atul


code snippet as below


import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;


public class TestSpark {

public static void main(String[] args) {

SparkSession spark = SparkSession.builder().appName("Java Spark Sql Example"
).config(*"spark.master","local"*).getOrCreate();

Dataset df = spark.read().json("/spark/sparkSql/employee.json");

System.out.println("Data");

df.cache();

df.show();



}

}


RE: Spark join and large temp files

2016-08-10 Thread Ashic Mahtab
Already tried that. The CPU hits 100% on the collectAsMap (even tried 
foreaching to a java ConcurrentHashmap), and eventually finishes, but while 
broadcasting, it takes a while, and at some point there's some timeout, and the 
worker is killed. The driver (and workers) have more than enough RAM (1.5GB of 
parquet expands to about 4.5GB, and the nodes have 64GB RAM). Filtering is also 
not an option, as every entry of the "smaller" dataset exists in the large one.
As mentioned in another reply, I managed to get it working by using embedded 
Redis on the driver, loading the smaller dataset into it, and then doing a 
straight map on the larger dataset via a foreachPartition, and doing lookups to 
the dirver's Redis. Since there's no network shuffle, the temp folder is barely 
touched, and it seems to work quite well.
-Ashic.

From: zouz...@gmail.com
Date: Wed, 10 Aug 2016 08:22:24 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi Ashic,
I think this approach should solve your problem, i.e., by broadcasting the 
small RDD. However you should do it propertly.
IMO, you should try
val smallRDDBroadcasted: Map[Int, YouTypeValue] = 
sc.broadcast(smallRDD.collectAsMap())
bigRDD.mapPartitoin{ case elems =>   // Here manually join using the map
elems.flatMap{ case (key, value) =>  
smallRDDBroadcasted.value.get(key).map(x => (key, (value,x))}}
Ensure that your driver has enough memory to store the above Map. If you get 
out of memory on the driver, increase your memory.
Speaking of which, a filtering step might also help on the above, i.e., filter 
the bigRDD with the keys of the Map before joining.
Hope this helps,Anastasios 

On Tue, Aug 9, 2016 at 4:46 PM, Ashic Mahtab  wrote:



Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, )

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab  wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab  wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab  wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  


-- 
Thanks
Deepak
www.bigdatabig.com

Re: using matrix as column datatype in SparkSQL Dataframe

2016-08-10 Thread Yanbo Liang
A good way is to implement your own data source to load data of matrix
format. You can refer the LibSVM data format (
https://github.com/apache/spark/tree/master/mllib/src/main/scala/org/apache/spark/ml/source/libsvm)
which contains one column of vector type which is very similar with matrix.

Thanks
Yanbo

2016-08-08 11:06 GMT-07:00 Vadla, Karthik :

> Hello all,
>
>
>
> I'm trying to load set of medical images(dicom) into spark SQL dataframe.
> Here each image is loaded into matrix column of dataframe. I see spark
> recently added MatrixUDT to support this kind of cases, but i don't find a
> sample for using matrix as column in dataframe.
>
> https://github.com/apache/spark/blob/master/mllib/src/
> main/scala/org/apache/spark/ml/linalg/MatrixUDT.scala
>
> Can anyone help me with this.
>
> Really appreciate your help.
>
> Thanks
>
> Karthik Vadla
>
>
>


Re: Spark Thrift Server (Spark 2.0) show table has value with NULL in all fields

2016-08-10 Thread Mich Talebzadeh
Hi,

Have you raised a Jira for this?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 August 2016 at 08:54, Chanh Le  wrote:

> Hi Gene,
> It's a Spark 2.0 issue.
> I switch to Spark 1.6.1 it's ok now.
>
> Thanks.
>
> On Thursday, July 28, 2016 at 4:25:48 PM UTC+7, Chanh Le wrote:
>>
>> Hi everyone,
>>
>> I have problem when I create a external table in Spark Thrift Server
>> (STS) and query the data.
>>
>> Scenario:
>> *Spark 2.0*
>> *Alluxio 1.2.0 *
>> *Zeppelin 0.7.0*
>> STS start script
>> */home/spark/spark-2.0.0-bin-hadoop2.6/sbin/start-thriftserver.sh
>> --master mesos://zk://master1:2181,master2:2181,master3:2181/mesos --conf
>> spark.driver.memory=5G --conf spark.scheduler.mode=FAIR --class
>> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --jars
>> /home/spark/spark-2.0.0-bin-hadoop2.6/jars/alluxio-core-client-spark-1.2.0-jar-with-dependencies.jar
>> --total-executor-cores 35 spark-internal --hiveconf
>> hive.server2.thrift.port=1 --hiveconf
>> hive.metastore.warehouse.dir=/user/hive/warehouse --hiveconf
>> hive.metastore.metadb.dir=/user/hive/metadb --conf
>> spark.sql.shuffle.partitions=20*
>>
>> I have a file store in Alluxio *alluxio://master2:19998/etl_info/TOPIC*
>>
>> then I create a table in STS by
>> CREATE EXTERNAL TABLE topic (topic_id int, topic_name_vn String,
>> topic_name_en String, parent_id int, full_parent String, level_id int)
>> STORED AS PARQUET LOCATION 'alluxio://master2:19998/etl_info/TOPIC';
>>
>> to compare STS with Spark I create a temp table with name topics
>> spark.sqlContext.read.parquet("alluxio://master2:19998/etl_info/TOPIC
>> ").registerTempTable("topics")
>>
>> Then I do query and compare.
>>
>>
>> As you can see the result is different.
>> Is that a bug? Or I did something wrong
>>
>> Regards,
>> Chanh
>>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: Random forest binary classification H20 difference Spark

2016-08-10 Thread Yanbo Liang
Hi Samir,

Did you use VectorAssembler to assemble some columns into the feature
column? If there are NULLs in your dataset, VectorAssembler will throw this
exception. You can use DataFrame.drop() or DataFrame.replace() to
drop/substitute NULL values.

Thanks
Yanbo

2016-08-07 19:51 GMT-07:00 Javier Rey :

> Hi everybody.
>
> I have executed RF on H2O I didn't troubles with nulls values, by in
> contrast in Spark using dataframes and ML library I obtain this error,l I
> know my dataframe contains nulls, but I understand that Random Forest
> supports null values:
>
> "Values to assemble cannot be null"
>
> Any advice, that framework can handle this issue?.
>
> Regards,
>
> Samir
>


Please help: Spark job hung/stop writing after exceeding the folder size

2016-08-10 Thread Bhupendra Mishra
Dear All,

I have struggling with an issue where spark steam job gets hung after
exceeding size of output folder path.

here is more details:

I have Flume sending and configuration
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel2

# Describe/configure source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -f /mapr/rawdata/input/syslog


agent1.sinks.sink1.type = org.apache.spark.streaming.flume.sink.SparkSink
agent1.sinks.sink1.hostname = hostname
agent1.sinks.sink1.port = 4
#agent1.sinks.sink1.batchSize=200

agent1.channels.channel2.type = FILE
agent1.channels.channel2.checkpointDir=/tmp/mapr/flume/java/checkpoint
agent1.channels.channel2.dataDirs=/tmp/mapr/flume/java/data
agent1.channels.channel2.capacity = 1
agent1.channels.channel2.transactionCapacity = 1000

agent1.sources.source1.channels = channel2
agent1.sinks.sink1.channel = channel2



And i have batch interval of 180 second and

and here how we are writing final output

finalTable.write.partitionBy("destzone").mode(SaveMode.Append).parquet("/rawdata/output/FirewalSyslog2")
 sqlContext.sql("MSCK REPAIR TABLE network.trafficlog1")

Please help me to fix the same.


Re: Logistic regression formula string

2016-08-10 Thread Yanbo Liang
I think you can output the schema of DataFrame which will be feed into the
estimator such as LogisticRegression. The output array will be the encoded
feature names corresponding the coefficients of the model.

Thanks
Yanbo

2016-08-08 15:53 GMT-07:00 Cesar :

>
> I have a data frame with four columns, label , feature_1, feature_2,
> feature_3. Is there a simple way in the ML library to give me the weights
> based in feature names? I can only get the weights, which make this simple
> task complicated when one of my features is categorical.
>
> I am looking for something similar to what R output does (where it clearly
> indicates which weight corresponds to each feature name, including
> categorical ones).
>
>
>
> Thanks a lot !
> --
> Cesar Flores
>


Re: Spark Thrift Server (Spark 2.0) show table has value with NULL in all fields

2016-08-10 Thread Chanh Le
Hi Gene,
It's a Spark 2.0 issue.
I switch to Spark 1.6.1 it's ok now.

Thanks.

On Thursday, July 28, 2016 at 4:25:48 PM UTC+7, Chanh Le wrote:
>
> Hi everyone,
>
> I have problem when I create a external table in Spark Thrift Server (STS) 
> and query the data.
>
> Scenario:
> *Spark 2.0*
> *Alluxio 1.2.0 *
> *Zeppelin 0.7.0*
> STS start script 
> */home/spark/spark-2.0.0-bin-hadoop2.6/sbin/start-thriftserver.sh --master 
> mesos://zk://master1:2181,master2:2181,master3:2181/mesos --conf 
> spark.driver.memory=5G --conf spark.scheduler.mode=FAIR --class 
> org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 --jars 
> /home/spark/spark-2.0.0-bin-hadoop2.6/jars/alluxio-core-client-spark-1.2.0-jar-with-dependencies.jar
>  
> --total-executor-cores 35 spark-internal --hiveconf 
> hive.server2.thrift.port=1 --hiveconf 
> hive.metastore.warehouse.dir=/user/hive/warehouse --hiveconf 
> hive.metastore.metadb.dir=/user/hive/metadb --conf 
> spark.sql.shuffle.partitions=20*
>
> I have a file store in Alluxio *alluxio://master2:19998/etl_info/TOPIC*
>
> then I create a table in STS by 
> CREATE EXTERNAL TABLE topic (topic_id int, topic_name_vn String, 
> topic_name_en String, parent_id int, full_parent String, level_id int)
> STORED AS PARQUET LOCATION 'alluxio://master2:19998/etl_info/TOPIC';
>
> to compare STS with Spark I create a temp table with name topics
> spark.sqlContext.read.parquet("alluxio://master2:19998/etl_info/TOPIC
> ").registerTempTable("topics")
>
> Then I do query and compare.
>
>
> As you can see the result is different.
> Is that a bug? Or I did something wrong
>
> Regards,
> Chanh
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Diwakar Dhanuskodi
Hi Siva,

With below code, it is stuck up at
* sqlContext.read.json(rdd.map(_._2)).toDF()*
There are two partitions in  topic.
I am running spark 1.6.2

val topics = "topic.name"
val brokers = "localhost:9092"
val topicsSet = topics.split(",").toSet
val sparkConf = new
SparkConf().setAppName("KafkaWeather").setMaster("local[5]")//spark://localhost:7077
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(60))
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "
group.id" -> "xyz","auto.offset.reset"->"smallest")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)
messages.foreachRDD(rdd => {
  if (rdd.isEmpty()) {
println("Failed to get data from Kafka. Please check that the Kafka
producer is streaming data.")
System.exit(-1)
  }
  val sqlContext =
org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
  *val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()*
  dataframe.foreach(println)

  })


Below are logs,

16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at
todelete.scala:34) failed in 110.776 s
16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event
SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@6d8ff688)
16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event
SparkListenerJobEnd(0,1470812271971,JobFailed(org.apache.spark.SparkException:
Job 0 cancelled because SparkContext was shut down))
16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint:
MapOutputTrackerMasterEndpoint stopped!
16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared
16/08/10 12:27:51 INFO BlockManager: BlockManager stopped
16/08/10 12:27:51 INFO BlockManagerMaster: BlockManagerMaster stopped
16/08/10 12:27:51 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
OutputCommitCoordinator stopped!
16/08/10 12:27:51 INFO RemoteActorRefProvider$RemotingTerminator: Shutting
down remote daemon.
16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: Remote
daemon shut down; proceeding with flushing remote transports.
16/08/10 12:27:52 INFO SparkContext: Successfully stopped SparkContext
16/08/10 12:27:52 INFO ShutdownHookManager: Shutdown hook called
16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
/tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2/httpd-07b9c1b6-01db-45b5-9302-d2f67f7c490e
16/08/10 12:27:52 INFO RemoteActorRefProvider$RemotingTerminator: Remoting
shut down.
16/08/10 12:27:52 INFO ShutdownHookManager: Deleting directory
/tmp/spark-6df1d6aa-896e-46e1-a2ed-199343dad0e2
[cloudera@quickstart ~]$ spark-submit --master local[3] --class
com.boa.poc.todelete --jars
/home/cloudera/lib/spark-streaming-kafka-assembly_2.10-1.6.2.jar,/home/cloudera/lib/spark-assembly-1.6.2-hadoop2.6.0.jar
/home/cloudera/Downloads/boa/pain.jar > log.txt
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
16/08/10 12:27:58 INFO SparkContext: Running Spark version 1.6.2
16/08/10 12:27:59 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
16/08/10 12:27:59 WARN Utils: Your hostname, quickstart.cloudera resolves
to a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
interface eth1)
16/08/10 12:27:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
16/08/10 12:27:59 INFO SecurityManager: Changing view acls to: cloudera
16/08/10 12:27:59 INFO SecurityManager: Changing modify acls to: cloudera
16/08/10 12:27:59 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(cloudera);
users with modify permissions: Set(cloudera)
16/08/10 12:28:00 INFO Utils: Successfully started service 'sparkDriver' on
port 42140.
16/08/10 12:28:01 INFO Slf4jLogger: Slf4jLogger started
16/08/10 12:28:01 INFO Remoting: Starting remoting
16/08/10 12:28:01 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriverActorSystem@192.168.126.131:53328]
16/08/10 12:28:01 INFO Utils: Successfully started service
'sparkDriverActorSystem' on port 53328.
16/08/10 12:28:01 INFO SparkEnv: Registering MapOutputTracker
16/08/10 12:28:01 INFO SparkEnv: Registering BlockManagerMaster
16/08/10 12:28:01 INFO DiskBlockManager: Created local directory at
/tmp/blockmgr-04c1ecec-8708-4f4b-b898-5fb953ab63e2
16/08/10 12:28:01 INFO MemoryStore: MemoryStore started with capacity 511.5
MB
16/08/10 12:28:01 INFO SparkEnv: Registering OutputCommitCoordinator
16/08/10 12:28:02 INFO Utils: Successfully started service 'SparkUI' on
port 4040.
16/08/10 12:28:02 INFO SparkUI: Started SparkUI at
http://192.168.126.131:4040
16/08/10 12:28:02 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/httpd-70563ad1-3d30-4a9c-ab11-82ecbb2e71b0
16/08/10 12:28:02 INFO HttpServer: Starting HTTP 

Re: Change nullable property in Dataset schema

2016-08-10 Thread Kazuaki Ishizaki
After some investigations, I was able to change nullable property in 
Dataset[Array[Int]] in the following way. Is this right way?

(1) Apply https://github.com/apache/spark/pull/13873
(2) Use two Encoders. One is RowEncoder. The other is predefined 
ExressionEncoder.

class Test extends QueryTest with SharedSQLContext {
  import testImplicits._
  test("test") {
val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), 
Array(3, 3)), 1).toDS
val ds2 = ds1.map(e => e)
  .as(RowEncoder(new StructType()
 .add("value", ArrayType(IntegerType, false), nullable = false)))
  .as(newDoubleArrayEncoder)
ds1.printSchema
ds2.printSchema
  }
}

root
 |-- value: array (nullable = true)
 ||-- element: integer (containsNull = false)

root
 |-- value: array (nullable = false)
 ||-- element: integer (containsNull = false)


Kazuaki Ishizaki



From:   Kazuaki Ishizaki/Japan/IBM@IBMJP
To: user@spark.apache.org
Date:   2016/08/03 23:46
Subject:Change nullable property in Dataset schema



Dear all,
Would it be possible to let me know how to change nullable property in 
Dataset?

When I looked for how to change nullable property in Dataframe schema, I 
found the following approaches.
http://stackoverflow.com/questions/33193958/change-nullable-property-of-column-in-spark-dataframe

https://github.com/apache/spark/pull/13873(Not merged yet)

However, I cannot find how to change nullable property in Dataset schema. 
Even when I wrote the following program, nullable property for "value: 
array" in ds2.schema is not changed.
If my understanding is correct, current Spark 2.0 uses an 
ExpressionEncoder that is generated based on Dataset[T] at 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L46


class Test extends QueryTest with SharedSQLContext {
  import testImplicits._
  test("test") {
val ds1 = sparkContext.parallelize(Seq(Array(1, 1), Array(2, 2), 
Array(3, 3)), 1).toDS
val schema = new StructType().add("array", ArrayType(IntegerType, 
false), false)
val inputObject = BoundReference(0, 
ScalaReflection.dataTypeFor[Array[Int]], false)
val encoder = new ExpressionEncoder[Array[Int]](schema, true,
  ScalaReflection.serializerFor[Array[Int]](inputObject).flatten,
  ScalaReflection.deserializerFor[Array[Int]],
  ClassTag[Array[Int]](classOf[Array[Int]]))
val ds2 = ds1.map(e => e)(encoder)
ds1.printSchema
ds2.printSchema
  }
}

root
 |-- value: array (nullable = true)
 ||-- element: integer (containsNull = false)

root
 |-- value: array (nullable = true) // Expected 
(nullable = false)
 ||-- element: integer (containsNull = false)


Kazuaki Ishizaki




UNSUBSCRIBE

2016-08-10 Thread Martin Somers
-- 
M