Re: Integration tests for Spark Streaming

2016-06-28 Thread Luciano Resende
This thread might be useful for what you want:
https://www.mail-archive.com/user%40spark.apache.org/msg34673.html

On Tue, Jun 28, 2016 at 1:25 PM, SRK  wrote:

> Hi,
>
> I need to write some integration tests for my Spark Streaming app. Any
> example on how to do this would be of great help.
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Integration-tests-for-Spark-Streaming-tp27246.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Luciano Resende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: Random Forest Classification

2016-06-28 Thread Bryan Cutler
Are you fitting the VectorIndexer to the entire data set and not just
training or test data?  If you are able to post your code and some data to
reproduce, that would help in troubleshooting.

On Tue, Jun 28, 2016 at 4:40 PM, Rich Tarro  wrote:

> Thanks for the response, but in my case I reversed the meaning of
> "prediction" and "predictedLabel". It seemed to make more sense to me that
> way, but in retrospect, it probably only causes confusion to anyone else
> looking at this. I reran the code with all the pipeline stage inputs and
> outputs named exactly as in the Random Forest Classifier example to make
> sure I hadn't messed anything up when I renamed things. Same error.
>
> I'm still at the point where I can train the model and make predictions,
> but not able to get the MulticlassClassificationEvaluator to work on the
> DataFrame of predictions.
>
> Any other suggestions? Thanks.
>
>
>
> On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro  wrote:
>
>> I created a ML pipeline using the Random Forest Classifier - similar to
>> what is described here except in my case the source data is in csv format
>> rather than libsvm.
>>
>>
>> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>>
>> I am able to successfully train the model and make predictions (on test
>> data not used to train the model) as shown here.
>>
>> ++--+-+--++
>> |indexedLabel|predictedLabel|label|prediction|features|
>> ++--+-+--++
>> | 4.0|   4.0|0| 0|(784,[124,125,126...|
>> | 2.0|   2.0|3| 3|(784,[119,120,121...|
>> | 8.0|   8.0|8| 8|(784,[180,181,182...|
>> | 0.0|   0.0|1| 1|(784,[154,155,156...|
>> | 3.0|   8.0|2| 8|(784,[148,149,150...|
>> ++--+-+--++
>> only showing top 5 rows
>>
>> However, when I attempt to calculate the error between the indexedLabel and 
>> the precictedLabel using the MulticlassClassificationEvaluator, I get the 
>> NoSuchElementException error attached below.
>>
>> val evaluator = new 
>> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
>> val accuracy = evaluator.evaluate(predictions)
>> println("Test Error = " + (1.0 - accuracy))
>>
>> What could be the issue?
>>
>>
>>
>> Name: org.apache.spark.SparkException
>> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 
>> times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, 
>> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 
>> 132.0
>>  at scala.collection.MapLike$class.default(MapLike.scala:228)
>>  at scala.collection.AbstractMap.default(Map.scala:58)
>>  at scala.collection.MapLike$class.apply(MapLike.scala:141)
>>  at scala.collection.AbstractMap.apply(Map.scala:58)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>  at 
>> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>>  at 
>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>>  at 
>> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>>  at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>  at 
>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>>  at 
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>  at 

Re: Modify the functioning of zipWithIndex function for RDDs

2016-06-28 Thread Punit Naik
Actually I was writing a code for the Connected Components algorithm. In
that I have to keep track of a variable called vertex number which keeps on
getting incremented based on the number of triples it encounters in a line.
This variable should be available at all the nodes and all the partitions.
The way I want to keep track of it is by incorporating it in the index of
every line. By default, the number of triples are two in a line. But in
some cases there maybe three triples also. So based on the number of
triples a line has, I want to increment its index by that number and the
next line should take the index of the previous line and increment it by
the number of triples it has.

For example:

  asdas asdas,0

   asdasd,1

In this case the final aggregated vertex number should be 5 as there are 2
triples in the first line and 3 triples in the second.

Considering the default case, the index numbers of the first and second
line should be 2 and 4 respectively. But because there is an extra triple
in the second line in its third field, the index number of it should be 5
and not 4. There is no pattern in the occurrence of the extra triple in a
line which makes it hard to keep track of the vertex number. So the
modified zipWithIndex function that I want to write should give me the
following output:

  asdas asdas,2

   asdasd,5

I hope I clearly explained myself. I am not so sure if this is the proper
approach. Maybe you could suggest me a better approach if there is any.
On 29-Jun-2016 6:31 AM, "Ted Yu"  wrote:

> Since the data.length is variable, I am not sure whether mixing data.length
> and the index makes sense.
>
> Can you describe your use case in bit more detail ?
>
> Thanks
>
> On Tue, Jun 28, 2016 at 11:34 AM, Punit Naik 
> wrote:
>
>> Hi Ted
>>
>> So would the tuple look like: (x._1, split.startIndex + x._2 +
>> x._1.length) ?
>>
>> On Tue, Jun 28, 2016 at 11:09 PM, Ted Yu  wrote:
>>
>>> Please take a look at:
>>> core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
>>>
>>> In compute() method:
>>> val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
>>> firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
>>>   (x._1, split.startIndex + x._2)
>>>
>>> You can modify the second component of the tuple to take data.length
>>> into account.
>>>
>>> On Tue, Jun 28, 2016 at 10:31 AM, Punit Naik 
>>> wrote:
>>>
 Hi

 I wanted to change the functioning of the "zipWithIndex" function for
 spark RDDs in which the output of the function is, just for an example,
  "(data, prev_index+data.length)" instead of "(data,prev_index+1)".

 How can I do this?

 --
 Thank You

 Regards

 Punit Naik

>>>
>>>
>>
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>


Re: Logging trait in Spark 2.0

2016-06-28 Thread Stephen Boesch
I also did not understand why the Logging class was made private in Spark
2.0.  In a couple of projects including CaffeOnSpark the Logging class was
simply copied to the new project to allow for backwards compatibility.

2016-06-28 18:10 GMT-07:00 Michael Armbrust :

> I'd suggest using the slf4j APIs directly.  They provide a nice stable API
> that works with a variety of logging backends.  This is what Spark does
> internally.
>
> On Sun, Jun 26, 2016 at 4:02 AM, Paolo Patierno 
> wrote:
>
>> Yes ... the same here ... I'd like to know the best way for adding
>> logging in a custom receiver for Spark Streaming 2.0
>>
>> *Paolo Patierno*
>>
>> *Senior Software Engineer (IoT) @ Red Hat**Microsoft MVP on **Windows
>> Embedded & IoT*
>> *Microsoft Azure Advisor*
>>
>> Twitter : @ppatierno 
>> Linkedin : paolopatierno 
>> Blog : DevExperience 
>>
>>
>> --
>> From: jonathaka...@gmail.com
>> Date: Fri, 24 Jun 2016 20:56:40 +
>> Subject: Re: Logging trait in Spark 2.0
>> To: yuzhih...@gmail.com; ppatie...@live.com
>> CC: user@spark.apache.org
>>
>>
>> Ted, how is that thread related to Paolo's question?
>>
>> On Fri, Jun 24, 2016 at 1:50 PM Ted Yu  wrote:
>>
>> See this related thread:
>>
>>
>> http://search-hadoop.com/m/q3RTtEor1vYWbsW=RE+Configuring+Log4J+Spark+1+5+on+EMR+4+1+
>>
>> On Fri, Jun 24, 2016 at 6:07 AM, Paolo Patierno 
>> wrote:
>>
>> Hi,
>>
>> developing a Spark Streaming custom receiver I noticed that the Logging
>> trait isn't accessible anymore in Spark 2.0.
>>
>> trait Logging in package internal cannot be accessed in package
>> org.apache.spark.internal
>>
>> For developing a custom receiver what is the preferred way for logging ?
>> Just using log4j dependency as any other Java/Scala library/application ?
>>
>> Thanks,
>> Paolo
>>
>> *Paolo Patierno*
>>
>> *Senior Software Engineer (IoT) @ Red Hat**Microsoft MVP on **Windows
>> Embedded & IoT*
>> *Microsoft Azure Advisor*
>>
>> Twitter : @ppatierno 
>> Linkedin : paolopatierno 
>> Blog : DevExperience 
>>
>>
>>
>


Re: Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Timur Shenkao
Hi, guys!

As far as I remember, Spark does not use all peculiarities and
optimizations of ORC. Moreover, the possibility  to read ORC files appeared
not so long time ago in Spark.

So, despite "victorious" results announced in
http://hortonworks.com/blog/bringing-orc-support-into-apache-spark/ ,
 there is a lot of "nuisances" like
https://issues.apache.org/jira/browse/SPARK-11087 or
https://issues.apache.org/jira/browse/SPARK-14286

May be, it would be useful
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization

On Wed, Jun 29, 2016 at 1:38 AM, Mich Talebzadeh 
wrote:

> This is what I am getting in the container log for mr
>
> 2016-06-28 23:25:53,808 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: Writing to temp file: FS
> hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_task_tmp.-mr-10004/_tmp.00_0
> 2016-06-28 23:25:53,808 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: New Final Path: FS
> hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_tmp.-mr-10004/00_0
> 2016-06-28 23:25:53,836 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 1
> 2016-06-28 23:25:53,837 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 10
> 2016-06-28 23:25:53,837 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 100
> 2016-06-28 23:25:53,844 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1000
> 2016-06-28 23:25:53,875 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1
> 2016-06-28 23:25:53,954 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 10
> 2016-06-28 23:25:55,072 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 100
> 2016-06-28 23:26:56,236 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1000
> 2016-06-28 23:27:58,499 WARN [ResponseProcessor for block
> BP-1648199869-50.140.197.217-1462266926537:blk_1074784072_1043287]
> org.apache.hadoop.hdfs.DFSClient: Slow ReadProcessor read fields took
> 35556ms (threshold=3ms); ack: seqno: 6815 status: SUCCESS status:
> SUCCESS downstreamAckTimeNanos: 35566795000, targets: [
> 50.140.197.217:50010, 50.140.197.216:50010]
> 2016-06-28 23:31:38,437 INFO [main]
> org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
> 1
> 2016-06-28 23:35:27,631 WARN [ResponseProcessor for block
> BP-1648199869-50.140.197.217-1462266926537:blk_1074784086_1043301]
> org.apache.hadoop.hdfs.DFSClient: *Slow ReadProcessor read fields took
> 31118ms (threshold=3ms);* ack: seqno: 36303 status: SUCCESS status:
> SUCCESS downstreamAckTimeNanos: 31128701000, targets: [
> 50.140.197.217:50010, 50.140.197.216:50010]
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 28 June 2016 at 23:27, Mich Talebzadeh 
> wrote:
>
>> That is a good point.
>>
>> The ORC table property is as follows
>>
>> TBLPROPERTIES ( "orc.compress"="SNAPPY",
>> "orc.stripe.size"="268435456",
>> "orc.row.index.stride"="1")
>>
>> which puts each stripe at 256MB
>>
>> Just to clarify this is spark running on Hive tables. I don't think the
>> use of TEZ, MR or Spark as execution engines is going to make any
>> difference?
>>
>> This is the same query with Hive on MR
>>
>> select a.prod_id from sales2 a, sales_staging b where a.prod_id =
>> b.prod_id order by a.prod_id;
>>
>> 2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
>> 2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
>> 7.32 sec
>> 2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU
>> 18.21 sec
>> 2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU
>> 22.34 sec
>> 2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU
>> 30.33 sec
>> 2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU
>> 33.45 sec
>> 2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU
>> 37.5 sec
>> 2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU
>> 42.0 sec
>> 

Re: Logging trait in Spark 2.0

2016-06-28 Thread Michael Armbrust
I'd suggest using the slf4j APIs directly.  They provide a nice stable API
that works with a variety of logging backends.  This is what Spark does
internally.

On Sun, Jun 26, 2016 at 4:02 AM, Paolo Patierno  wrote:

> Yes ... the same here ... I'd like to know the best way for adding logging
> in a custom receiver for Spark Streaming 2.0
>
> *Paolo Patierno*
>
> *Senior Software Engineer (IoT) @ Red Hat**Microsoft MVP on **Windows
> Embedded & IoT*
> *Microsoft Azure Advisor*
>
> Twitter : @ppatierno 
> Linkedin : paolopatierno 
> Blog : DevExperience 
>
>
> --
> From: jonathaka...@gmail.com
> Date: Fri, 24 Jun 2016 20:56:40 +
> Subject: Re: Logging trait in Spark 2.0
> To: yuzhih...@gmail.com; ppatie...@live.com
> CC: user@spark.apache.org
>
>
> Ted, how is that thread related to Paolo's question?
>
> On Fri, Jun 24, 2016 at 1:50 PM Ted Yu  wrote:
>
> See this related thread:
>
>
> http://search-hadoop.com/m/q3RTtEor1vYWbsW=RE+Configuring+Log4J+Spark+1+5+on+EMR+4+1+
>
> On Fri, Jun 24, 2016 at 6:07 AM, Paolo Patierno 
> wrote:
>
> Hi,
>
> developing a Spark Streaming custom receiver I noticed that the Logging
> trait isn't accessible anymore in Spark 2.0.
>
> trait Logging in package internal cannot be accessed in package
> org.apache.spark.internal
>
> For developing a custom receiver what is the preferred way for logging ?
> Just using log4j dependency as any other Java/Scala library/application ?
>
> Thanks,
> Paolo
>
> *Paolo Patierno*
>
> *Senior Software Engineer (IoT) @ Red Hat**Microsoft MVP on **Windows
> Embedded & IoT*
> *Microsoft Azure Advisor*
>
> Twitter : @ppatierno 
> Linkedin : paolopatierno 
> Blog : DevExperience 
>
>
>


Re: Modify the functioning of zipWithIndex function for RDDs

2016-06-28 Thread Ted Yu
Since the data.length is variable, I am not sure whether mixing data.length
and the index makes sense.

Can you describe your use case in bit more detail ?

Thanks

On Tue, Jun 28, 2016 at 11:34 AM, Punit Naik  wrote:

> Hi Ted
>
> So would the tuple look like: (x._1, split.startIndex + x._2 +
> x._1.length) ?
>
> On Tue, Jun 28, 2016 at 11:09 PM, Ted Yu  wrote:
>
>> Please take a look at:
>> core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
>>
>> In compute() method:
>> val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
>> firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
>>   (x._1, split.startIndex + x._2)
>>
>> You can modify the second component of the tuple to take data.length
>> into account.
>>
>> On Tue, Jun 28, 2016 at 10:31 AM, Punit Naik 
>> wrote:
>>
>>> Hi
>>>
>>> I wanted to change the functioning of the "zipWithIndex" function for
>>> spark RDDs in which the output of the function is, just for an example,
>>>  "(data, prev_index+data.length)" instead of "(data,prev_index+1)".
>>>
>>> How can I do this?
>>>
>>> --
>>> Thank You
>>>
>>> Regards
>>>
>>> Punit Naik
>>>
>>
>>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


Re: Random Forest Classification

2016-06-28 Thread Rich Tarro
Thanks for the response, but in my case I reversed the meaning of
"prediction" and "predictedLabel". It seemed to make more sense to me that
way, but in retrospect, it probably only causes confusion to anyone else
looking at this. I reran the code with all the pipeline stage inputs and
outputs named exactly as in the Random Forest Classifier example to make
sure I hadn't messed anything up when I renamed things. Same error.

I'm still at the point where I can train the model and make predictions,
but not able to get the MulticlassClassificationEvaluator to work on the
DataFrame of predictions.

Any other suggestions? Thanks.



On Tue, Jun 28, 2016 at 4:21 PM, Rich Tarro  wrote:

> I created a ML pipeline using the Random Forest Classifier - similar to
> what is described here except in my case the source data is in csv format
> rather than libsvm.
>
>
> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>
> I am able to successfully train the model and make predictions (on test
> data not used to train the model) as shown here.
>
> ++--+-+--++
> |indexedLabel|predictedLabel|label|prediction|features|
> ++--+-+--++
> | 4.0|   4.0|0| 0|(784,[124,125,126...|
> | 2.0|   2.0|3| 3|(784,[119,120,121...|
> | 8.0|   8.0|8| 8|(784,[180,181,182...|
> | 0.0|   0.0|1| 1|(784,[154,155,156...|
> | 3.0|   8.0|2| 8|(784,[148,149,150...|
> ++--+-+--++
> only showing top 5 rows
>
> However, when I attempt to calculate the error between the indexedLabel and 
> the precictedLabel using the MulticlassClassificationEvaluator, I get the 
> NoSuchElementException error attached below.
>
> val evaluator = new 
> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
> val accuracy = evaluator.evaluate(predictions)
> println("Test Error = " + (1.0 - accuracy))
>
> What could be the issue?
>
>
>
> Name: org.apache.spark.SparkException
> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 
> times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, 
> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 
> 132.0
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.MapLike$class.apply(MapLike.scala:141)
>   at scala.collection.AbstractMap.apply(Map.scala:58)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>   at 
> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>   at 
> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>   at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.lang.Thread.run(Thread.java:785)
>
> Driver stacktrace:
> StackTrace: 
> 

Re: Random Forest Classification

2016-06-28 Thread Bryan Cutler
The problem might be that you are evaluating with "predictionLabel" instead
of "prediction", where predictionLabel is the prediction index mapped to
the original label strings - at least according to the
RandomForestClassifierExample, not sure if your code is exactly the same.

On Tue, Jun 28, 2016 at 1:21 PM, Rich Tarro  wrote:

> I created a ML pipeline using the Random Forest Classifier - similar to
> what is described here except in my case the source data is in csv format
> rather than libsvm.
>
>
> https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
>
> I am able to successfully train the model and make predictions (on test
> data not used to train the model) as shown here.
>
> ++--+-+--++
> |indexedLabel|predictedLabel|label|prediction|features|
> ++--+-+--++
> | 4.0|   4.0|0| 0|(784,[124,125,126...|
> | 2.0|   2.0|3| 3|(784,[119,120,121...|
> | 8.0|   8.0|8| 8|(784,[180,181,182...|
> | 0.0|   0.0|1| 1|(784,[154,155,156...|
> | 3.0|   8.0|2| 8|(784,[148,149,150...|
> ++--+-+--++
> only showing top 5 rows
>
> However, when I attempt to calculate the error between the indexedLabel and 
> the precictedLabel using the MulticlassClassificationEvaluator, I get the 
> NoSuchElementException error attached below.
>
> val evaluator = new 
> MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
> val accuracy = evaluator.evaluate(predictions)
> println("Test Error = " + (1.0 - accuracy))
>
> What could be the issue?
>
>
>
> Name: org.apache.spark.SparkException
> Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed 10 
> times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162, 
> yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not found: 
> 132.0
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.MapLike$class.apply(MapLike.scala:141)
>   at scala.collection.AbstractMap.apply(Map.scala:58)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>   at 
> org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
>   at 
> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
>   at 
> org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
>   at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   at java.lang.Thread.run(Thread.java:785)
>
> Driver stacktrace:
> StackTrace: 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> 

Re: Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Mich Talebzadeh
This is what I am getting in the container log for mr

2016-06-28 23:25:53,808 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: Writing to temp file: FS
hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_task_tmp.-mr-10004/_tmp.00_0
2016-06-28 23:25:53,808 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: New Final Path: FS
hdfs://rhes564:9000/tmp/hive/hduser/71a6beee-ac0d-423a-a14e-6ce51667a441/hive_2016-06-28_23-23-42_929_6384631032208608956-1/_tmp.-mr-10004/00_0
2016-06-28 23:25:53,836 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written - 1
2016-06-28 23:25:53,837 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
10
2016-06-28 23:25:53,837 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
100
2016-06-28 23:25:53,844 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
1000
2016-06-28 23:25:53,875 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
1
2016-06-28 23:25:53,954 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
10
2016-06-28 23:25:55,072 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
100
2016-06-28 23:26:56,236 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
1000
2016-06-28 23:27:58,499 WARN [ResponseProcessor for block
BP-1648199869-50.140.197.217-1462266926537:blk_1074784072_1043287]
org.apache.hadoop.hdfs.DFSClient: Slow ReadProcessor read fields took
35556ms (threshold=3ms); ack: seqno: 6815 status: SUCCESS status:
SUCCESS downstreamAckTimeNanos: 35566795000, targets: [50.140.197.217:50010,
50.140.197.216:50010]
2016-06-28 23:31:38,437 INFO [main]
org.apache.hadoop.hive.ql.exec.FileSinkOperator: FS[15]: records written -
1
2016-06-28 23:35:27,631 WARN [ResponseProcessor for block
BP-1648199869-50.140.197.217-1462266926537:blk_1074784086_1043301]
org.apache.hadoop.hdfs.DFSClient: *Slow ReadProcessor read fields took
31118ms (threshold=3ms);* ack: seqno: 36303 status: SUCCESS status:
SUCCESS downstreamAckTimeNanos: 31128701000, targets: [50.140.197.217:50010,
50.140.197.216:50010]




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 28 June 2016 at 23:27, Mich Talebzadeh  wrote:

> That is a good point.
>
> The ORC table property is as follows
>
> TBLPROPERTIES ( "orc.compress"="SNAPPY",
> "orc.stripe.size"="268435456",
> "orc.row.index.stride"="1")
>
> which puts each stripe at 256MB
>
> Just to clarify this is spark running on Hive tables. I don't think the
> use of TEZ, MR or Spark as execution engines is going to make any
> difference?
>
> This is the same query with Hive on MR
>
> select a.prod_id from sales2 a, sales_staging b where a.prod_id =
> b.prod_id order by a.prod_id;
>
> 2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
> 2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
> 7.32 sec
> 2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU
> 18.21 sec
> 2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU
> 22.34 sec
> 2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU
> 30.33 sec
> 2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU
> 33.45 sec
> 2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU
> 37.5 sec
> 2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU
> 42.0 sec
> 2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU
> 45.62 sec
> 2016-06-28 23:24:33,441 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU
> 49.69 sec
> 2016-06-28 23:24:36,521 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU
> 52.92 sec
> 2016-06-28 23:24:39,605 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU
> 56.78 sec
> 2016-06-28 23:24:42,686 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU
> 60.36 sec
> 2016-06-28 23:24:45,767 Stage-1 map = 81%,  reduce = 0%, Cumulative CPU
> 63.68 sec
> 2016-06-28 23:24:48,842 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU
> 66.92 sec
> 2016-06-28 23:24:51,918 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
> 70.18 sec
> 2016-06-28 23:25:52,354 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
> 127.99 sec
> 2016-06-28 23:25:57,494 Stage-1 

Spark SQL concurrent runs fails with java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]

2016-06-28 Thread Jesse F Chen

With the Spark 2.0 build from 0615, when running 4-user concurrent SQL
tests against Spark SQL on 1TB TPCDS, we are seeing
consistently the following exceptions:

10:35:33 AM: 16/06/27 23:40:37 INFO scheduler.TaskSetManager: Finished task
412.0 in stage 819.0 (TID 270396) in 8468 ms on 9.30.148.101 (417/581)
16/06/27 23:40:37 ERROR thriftserver.SparkExecuteStatementOperation: Error
executing query, currentState RUNNING,
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute,
tree:
Exchange SinglePartition
+- *HashAggregate(key=[], functions=[partial_sum
(cs_ext_discount_amt#100849)], output=[sum#101124])
   +- *Project [cs_ext_discount_amt#100849]


at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun
$doExecute$1.apply(ShuffleExchange.scala:113)
at org.apache.spark.sql.catalyst.errors.package$.attachTree
(package.scala:49)
... 40 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready
(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result
(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply
(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn
(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.util.ThreadUtils$.awaitResult
(ThreadUtils.scala:190)


The longest query would complete in about 700 seconds, and I think we need
to increase the futures timeout value. However,
I tried the 'spark.network.timeout' setting to 700 via the '--conf'
facility but it does not seem to control this particular timeout value.
In other words, it stays at "300 seconds" no matter what value I give it. I
also played with the spark.rpc.askTimeout setting which
does not affect this 300-second value.

Could someone tell me which parameter I need to change in order to control
it?


  

  

  

  

  

  
   JESSE CHEN   
  
   Big Data Performance | IBM Analytics 
  

  
   Office:  408 463 2296
  
   Mobile: 408 828 9068 
  
   Email:   jfc...@us.ibm.com   
  

  

  




Re: Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Mich Talebzadeh
That is a good point.

The ORC table property is as follows

TBLPROPERTIES ( "orc.compress"="SNAPPY",
"orc.stripe.size"="268435456",
"orc.row.index.stride"="1")

which puts each stripe at 256MB

Just to clarify this is spark running on Hive tables. I don't think the use
of TEZ, MR or Spark as execution engines is going to make any difference?

This is the same query with Hive on MR

select a.prod_id from sales2 a, sales_staging b where a.prod_id = b.prod_id
order by a.prod_id;

2016-06-28 23:23:51,203 Stage-1 map = 0%,  reduce = 0%
2016-06-28 23:23:59,480 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU
7.32 sec
2016-06-28 23:24:08,771 Stage-1 map = 55%,  reduce = 0%, Cumulative CPU
18.21 sec
2016-06-28 23:24:11,860 Stage-1 map = 58%,  reduce = 0%, Cumulative CPU
22.34 sec
2016-06-28 23:24:18,021 Stage-1 map = 62%,  reduce = 0%, Cumulative CPU
30.33 sec
2016-06-28 23:24:21,101 Stage-1 map = 64%,  reduce = 0%, Cumulative CPU
33.45 sec
2016-06-28 23:24:24,181 Stage-1 map = 66%,  reduce = 0%, Cumulative CPU
37.5 sec
2016-06-28 23:24:27,270 Stage-1 map = 69%,  reduce = 0%, Cumulative CPU
42.0 sec
2016-06-28 23:24:30,349 Stage-1 map = 70%,  reduce = 0%, Cumulative CPU
45.62 sec
2016-06-28 23:24:33,441 Stage-1 map = 73%,  reduce = 0%, Cumulative CPU
49.69 sec
2016-06-28 23:24:36,521 Stage-1 map = 75%,  reduce = 0%, Cumulative CPU
52.92 sec
2016-06-28 23:24:39,605 Stage-1 map = 77%,  reduce = 0%, Cumulative CPU
56.78 sec
2016-06-28 23:24:42,686 Stage-1 map = 80%,  reduce = 0%, Cumulative CPU
60.36 sec
2016-06-28 23:24:45,767 Stage-1 map = 81%,  reduce = 0%, Cumulative CPU
63.68 sec
2016-06-28 23:24:48,842 Stage-1 map = 83%,  reduce = 0%, Cumulative CPU
66.92 sec
2016-06-28 23:24:51,918 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
70.18 sec
2016-06-28 23:25:52,354 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU
127.99 sec
2016-06-28 23:25:57,494 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU
134.64 sec
2016-06-28 23:26:57,847 Stage-1 map = 100%,  reduce = 67%, Cumulative CPU
141.01 sec

which basically sits at 67% all day





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 28 June 2016 at 23:07, Jörn Franke  wrote:

>
>
> Bzip2 is splittable for text files.
>
> Btw in Orc the question of splittable does not matter because each stripe
> is compressed individually.
>
> Have you tried tez? As far as I recall (at least it was in the first
> version of Hive) mr uses for order by a single reducer which is a
> bottleneck.
>
> Do you see some errors in the log file?
>
> On 28 Jun 2016, at 23:53, Mich Talebzadeh 
> wrote:
>
> Hi,
>
>
> I have a simple join between table sales2 a compressed (snappy) ORC with
> 22 million rows and another simple table sales_staging under a million rows
> stored as a text file with no compression.
>
> The join is very simple
>
>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>   val s = HiveContext.table("sales_staging").select("PROD_ID")
>
>   val rs =
> s2.join(s,"prod_id").orderBy("prod_id").sort(desc("prod_id")).take(5).foreach(println)
>
>
> Now what is happening is it is sitting on SortMergeJoin operation
> on ZippedPartitionRDD as shown in the DAG diagram below
>
>
> 
>
>
> And at this rate  only 10% is done and will take for ever to finish :(
>
> Stage 3:==> (10 + 2) /
> 200]
>
> Ok I understand that zipped files cannot be broken into blocks and
> operations on them cannot be parallelized.
>
> Having said that what are the alternatives? Never use compression and live
> with it. I emphasise that any operation on the compressed table itself is
> pretty fast as it is a simple table scan. However, a join between two
> tables on a column as above suggests seems to be problematic?
>
> Thanks
>
> P.S. the same is happening using Hive with MR
>
> select a.prod_id from sales2 a inner join sales_staging b on a.prod_id =
> b.prod_id order by a.prod_id;
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no 

Re: Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Jörn Franke


Bzip2 is splittable for text files.

Btw in Orc the question of splittable does not matter because each stripe is 
compressed individually.

Have you tried tez? As far as I recall (at least it was in the first version of 
Hive) mr uses for order by a single reducer which is a bottleneck.

Do you see some errors in the log file?

> On 28 Jun 2016, at 23:53, Mich Talebzadeh  wrote:
> 
> Hi,
> 
> 
> I have a simple join between table sales2 a compressed (snappy) ORC with 22 
> million rows and another simple table sales_staging under a million rows 
> stored as a text file with no compression.
> 
> The join is very simple
> 
>   val s2 = HiveContext.table("sales2").select("PROD_ID")
>   val s = HiveContext.table("sales_staging").select("PROD_ID")
> 
>   val rs = 
> s2.join(s,"prod_id").orderBy("prod_id").sort(desc("prod_id")).take(5).foreach(println)
> 
> 
> Now what is happening is it is sitting on SortMergeJoin operation on 
> ZippedPartitionRDD as shown in the DAG diagram below
> 
> 
> 
> 
> 
> And at this rate  only 10% is done and will take for ever to finish :(
> 
> Stage 3:==> (10 + 2) / 
> 200]
> 
> Ok I understand that zipped files cannot be broken into blocks and operations 
> on them cannot be parallelized.
> 
> Having said that what are the alternatives? Never use compression and live 
> with it. I emphasise that any operation on the compressed table itself is 
> pretty fast as it is a simple table scan. However, a join between two tables 
> on a column as above suggests seems to be problematic?
> 
> Thanks
> 
> P.S. the same is happening using Hive with MR
> 
> select a.prod_id from sales2 a inner join sales_staging b on a.prod_id = 
> b.prod_id order by a.prod_id;
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Joining a compressed ORC table with a non compressed text table

2016-06-28 Thread Mich Talebzadeh
Hi,


I have a simple join between table sales2 a compressed (snappy) ORC with 22
million rows and another simple table sales_staging under a million rows
stored as a text file with no compression.

The join is very simple

  val s2 = HiveContext.table("sales2").select("PROD_ID")
  val s = HiveContext.table("sales_staging").select("PROD_ID")

  val rs =
s2.join(s,"prod_id").orderBy("prod_id").sort(desc("prod_id")).take(5).foreach(println)


Now what is happening is it is sitting on SortMergeJoin operation
on ZippedPartitionRDD as shown in the DAG diagram below


[image: Inline images 1]


And at this rate  only 10% is done and will take for ever to finish :(

Stage 3:==> (10 + 2) /
200]

Ok I understand that zipped files cannot be broken into blocks and
operations on them cannot be parallelized.

Having said that what are the alternatives? Never use compression and live
with it. I emphasise that any operation on the compressed table itself is
pretty fast as it is a simple table scan. However, a join between two
tables on a column as above suggests seems to be problematic?

Thanks

P.S. the same is happening using Hive with MR

select a.prod_id from sales2 a inner join sales_staging b on a.prod_id =
b.prod_id order by a.prod_id;

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Integration tests for Spark Streaming

2016-06-28 Thread SRK
Hi,

I need to write some integration tests for my Spark Streaming app. Any
example on how to do this would be of great help.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integration-tests-for-Spark-Streaming-tp27246.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Random Forest Classification

2016-06-28 Thread Rich Tarro
I created a ML pipeline using the Random Forest Classifier - similar to
what is described here except in my case the source data is in csv format
rather than libsvm.

https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier

I am able to successfully train the model and make predictions (on test
data not used to train the model) as shown here.

++--+-+--++
|indexedLabel|predictedLabel|label|prediction|features|
++--+-+--++
| 4.0|   4.0|0| 0|(784,[124,125,126...|
| 2.0|   2.0|3| 3|(784,[119,120,121...|
| 8.0|   8.0|8| 8|(784,[180,181,182...|
| 0.0|   0.0|1| 1|(784,[154,155,156...|
| 3.0|   8.0|2| 8|(784,[148,149,150...|
++--+-+--++
only showing top 5 rows

However, when I attempt to calculate the error between the
indexedLabel and the precictedLabel using the
MulticlassClassificationEvaluator, I get the NoSuchElementException
error attached below.

val evaluator = new
MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("predictedLabel").setMetricName("precision")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))

What could be the issue?



Name: org.apache.spark.SparkException
Message: Job aborted due to stage failure: Task 2 in stage 49.0 failed
10 times, most recent failure: Lost task 2.9 in stage 49.0 (TID 162,
yp-spark-dal09-env5-0024): java.util.NoSuchElementException: key not
found: 132.0
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:58)
at 
org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:331)
at 
org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$10.apply(VectorIndexer.scala:309)
at 
org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
at 
org.apache.spark.ml.feature.VectorIndexerModel$$anonfun$11.apply(VectorIndexer.scala:351)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
Source)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
at 
org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate$$anonfun$create$2.apply(GeneratePredicate.scala:67)
at 
org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:74)
at 
org.apache.spark.sql.execution.Filter$$anonfun$2$$anonfun$apply$2.apply(basicOperators.scala:72)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.lang.Thread.run(Thread.java:785)

Driver stacktrace:
StackTrace: 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
scala.Option.foreach(Option.scala:236)

Re: Best practice for handing tables between pipeline components

2016-06-28 Thread Everett Anderson
Thanks! Alluxio looks quite promising, but also quite new.

What did people do before?

On Mon, Jun 27, 2016 at 12:33 PM, Gene Pang  wrote:

> Yes, Alluxio (http://www.alluxio.org/) can be used to store data
> in-memory between stages in a pipeline.
>
> Here is more information about running Spark with Alluxio:
> http://www.alluxio.org/documentation/v1.1.0/en/Running-Spark-on-Alluxio.html
>
> Hope that helps,
> Gene
>
> On Mon, Jun 27, 2016 at 10:38 AM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> Alluxio off heap memory would help to share cached objects
>>
>> On Mon, Jun 27, 2016 at 11:14 AM Everett Anderson
>>  wrote:
>>
>>> Hi,
>>>
>>> We have a pipeline of components strung together via Airflow running on
>>> AWS. Some of them are implemented in Spark, but some aren't. Generally they
>>> can all talk to a JDBC/ODBC end point or read/write files from S3.
>>>
>>> Ideally, we wouldn't suffer the I/O cost of writing all the data to HDFS
>>> or S3 and reading it back in, again, in every component, if it could stay
>>> cached in memory in a Spark cluster.
>>>
>>> Our current investigation seems to lead us towards exploring if the
>>> following things are possible:
>>>
>>>- Using a Hive metastore with S3 as its backing data store to try to
>>>keep a mapping from table name to files on S3 (not sure if one can cache 
>>> a
>>>Hive table in Spark across contexts, though)
>>>- Using something like the spark-jobserver to keep a
>>>Spark SQLContext open across Spark components so they could avoid file 
>>> I/O
>>>for cached tables
>>>
>>> What's the best practice for handing tables between Spark programs? What
>>> about between Spark and non-Spark programs?
>>>
>>> Thanks!
>>>
>>> - Everett
>>>
>>>
>


Re: Best way to tranform string label into long label for classification problem

2016-06-28 Thread Jaonary Rabarisoa
Thank you Xinh. That's what I need.

Le mar. 28 juin 2016 à 17:43, Xinh Huynh  a écrit :

> Hi Jao,
>
> Here's one option:
> http://spark.apache.org/docs/latest/ml-features.html#stringindexer
> "StringIndexer encodes a string column of labels to a column of label
> indices. The indices are in [0, numLabels), ordered by label frequencies."
>
> Xinh
>
> On Tue, Jun 28, 2016 at 12:29 AM, Jaonary Rabarisoa 
> wrote:
>
>> Dear all,
>>
>> I'm trying to a find a way to transform a DataFrame into a data that is
>> more suitable for third party classification algorithm. The DataFrame have
>> two columns : "feature" represented by a vector and "label" represented by
>> a string. I want the "label" to be a number between [0, number of classes -
>> 1].
>> Do you have any ideas to do it efficiently ?
>>
>>  Cheers,
>>
>> Jao
>>
>
>


Need help with spark GraphiteSink

2016-06-28 Thread Vijay Vangapandu
Hi,

I need help resolving issue with spark GraphiteSink.

I am trying to use graphite sink, but i have no luck.

Here are the details.

spark version is 1.4 and i am passing below 2 arguments to spark-submit job in 
yarn cluster mode.

--files=/data/svc/metrics/metrics.properties --conf 
spark.metrics.conf=metrics.properties

where /data/svc/metrics is local folder in my entry node, where i am submitting 
spark job from.

and my metrics.properties file has below entries:

*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=carbon-dn1.
*.sink.graphite.port=2003
*.sink.graphite.prefix=HadoopMetrics
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource
*.sink.graphite.period=10
*.sink.graphite.protocol=tcp

Please let me know if i am missing anything.

Thanks for your help.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Modify the functioning of zipWithIndex function for RDDs

2016-06-28 Thread Punit Naik
Hi Ted

So would the tuple look like: (x._1, split.startIndex + x._2 + x._1.length)
?

On Tue, Jun 28, 2016 at 11:09 PM, Ted Yu  wrote:

> Please take a look at:
> core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala
>
> In compute() method:
> val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
> firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
>   (x._1, split.startIndex + x._2)
>
> You can modify the second component of the tuple to take data.length into
> account.
>
> On Tue, Jun 28, 2016 at 10:31 AM, Punit Naik 
> wrote:
>
>> Hi
>>
>> I wanted to change the functioning of the "zipWithIndex" function for
>> spark RDDs in which the output of the function is, just for an example,
>>  "(data, prev_index+data.length)" instead of "(data,prev_index+1)".
>>
>> How can I do this?
>>
>> --
>> Thank You
>>
>> Regards
>>
>> Punit Naik
>>
>
>


-- 
Thank You

Regards

Punit Naik


Re: Modify the functioning of zipWithIndex function for RDDs

2016-06-28 Thread Ted Yu
Please take a look at:
core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala

In compute() method:
val split = splitIn.asInstanceOf[ZippedWithIndexRDDPartition]
firstParent[T].iterator(split.prev, context).zipWithIndex.map { x =>
  (x._1, split.startIndex + x._2)

You can modify the second component of the tuple to take data.length into
account.

On Tue, Jun 28, 2016 at 10:31 AM, Punit Naik  wrote:

> Hi
>
> I wanted to change the functioning of the "zipWithIndex" function for
> spark RDDs in which the output of the function is, just for an example,
>  "(data, prev_index+data.length)" instead of "(data,prev_index+1)".
>
> How can I do this?
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


Modify the functioning of zipWithIndex function for RDDs

2016-06-28 Thread Punit Naik
Hi

I wanted to change the functioning of the "zipWithIndex" function for spark
RDDs in which the output of the function is, just for an example,  "(data,
prev_index+data.length)" instead of "(data,prev_index+1)".

How can I do this?

-- 
Thank You

Regards

Punit Naik


Re: Set the node the spark driver will be started

2016-06-28 Thread Mich Talebzadeh
Hi Felix,

In Yarn-cluster mode the resource manager Yarn is expected to take care of
that.

Are you getting some skewed distribution with drivers created through
spark-submit on different nodes?

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 28 June 2016 at 16:06, Felix Massem  wrote:

> Hey Mich,
>
> thx for the fast reply.
>
> We are using it in cluster mode and spark version 1.5.2
>
> Greets Felix
>
>
> Felix Massem | IT-Consultant | Karlsruhe
> mobil: +49 (0) 172.2919848
>
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet.
>
> Am 28.06.2016 um 17:04 schrieb Mich Talebzadeh  >:
>
> Hi Felix,
>
> what version of Spark?
>
> Are you using yarn client mode or cluster mode?
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
> http://talebzadehmich.wordpress.com
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 28 June 2016 at 15:27, adaman79  wrote:
>
>> Hey guys,
>>
>> I have a problem with memory because over 90% of my spark driver will be
>> started on one of my nine spark nodes.
>> So now I am looking for the possibility to define the node the spark
>> driver
>> will be started when using spark-submit or setting it somewhere in the
>> code.
>>
>> Is this possible? Does anyone else have this kind of problem?
>>
>> thx and best regards
>> Felix
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Set-the-node-the-spark-driver-will-be-started-tp27244.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> .
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: Best way to tranform string label into long label for classification problem

2016-06-28 Thread Xinh Huynh
Hi Jao,

Here's one option:
http://spark.apache.org/docs/latest/ml-features.html#stringindexer
"StringIndexer encodes a string column of labels to a column of label
indices. The indices are in [0, numLabels), ordered by label frequencies."

Xinh

On Tue, Jun 28, 2016 at 12:29 AM, Jaonary Rabarisoa 
wrote:

> Dear all,
>
> I'm trying to a find a way to transform a DataFrame into a data that is
> more suitable for third party classification algorithm. The DataFrame have
> two columns : "feature" represented by a vector and "label" represented by
> a string. I want the "label" to be a number between [0, number of classes -
> 1].
> Do you have any ideas to do it efficiently ?
>
>  Cheers,
>
> Jao
>


Re: Set the node the spark driver will be started

2016-06-28 Thread Felix Massem
Hey Mich,

thx for the fast reply.

We are using it in cluster mode and spark version 1.5.2

Greets Felix


Felix Massem | IT-Consultant | Karlsruhe
mobil: +49 (0) 172.2919848 <>

www.codecentric.de  | blog.codecentric.de 
 | www.meettheexperts.de 
 | www.more4fi.de 

Sitz der Gesellschaft: Düsseldorf | HRB 63043 | Amtsgericht Düsseldorf
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche 
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige 
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie 
bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter 
Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter 
Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet.

> Am 28.06.2016 um 17:04 schrieb Mich Talebzadeh :
> 
> Hi Felix,
> 
> what version of Spark?
> 
> Are you using yarn client mode or cluster mode?
> 
> HTH
> 
> 
> Dr Mich Talebzadeh
> 
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> 
> 
> http://talebzadehmich.wordpress.com 
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
> 
> 
> On 28 June 2016 at 15:27, adaman79  > wrote:
> Hey guys,
> 
> I have a problem with memory because over 90% of my spark driver will be
> started on one of my nine spark nodes.
> So now I am looking for the possibility to define the node the spark driver
> will be started when using spark-submit or setting it somewhere in the code.
> 
> Is this possible? Does anyone else have this kind of problem?
> 
> thx and best regards
> Felix
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Set-the-node-the-spark-driver-will-be-started-tp27244.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: Set the node the spark driver will be started

2016-06-28 Thread Mich Talebzadeh
Hi Felix,

what version of Spark?

Are you using yarn client mode or cluster mode?

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 28 June 2016 at 15:27, adaman79  wrote:

> Hey guys,
>
> I have a problem with memory because over 90% of my spark driver will be
> started on one of my nine spark nodes.
> So now I am looking for the possibility to define the node the spark driver
> will be started when using spark-submit or setting it somewhere in the
> code.
>
> Is this possible? Does anyone else have this kind of problem?
>
> thx and best regards
> Felix
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Set-the-node-the-spark-driver-will-be-started-tp27244.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Set the node the spark driver will be started

2016-06-28 Thread adaman79
Hey guys,

I have a problem with memory because over 90% of my spark driver will be
started on one of my nine spark nodes. 
So now I am looking for the possibility to define the node the spark driver
will be started when using spark-submit or setting it somewhere in the code.

Is this possible? Does anyone else have this kind of problem?

thx and best regards
Felix



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Set-the-node-the-spark-driver-will-be-started-tp27244.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark master shuts down when one of zookeeper dies

2016-06-28 Thread Ted Yu
Please see some blog w.r.t. the number of nodes in the quorum:

http://stackoverflow.com/questions/13022244/zookeeper-reliability-three-versus-five-nodes

http://www.ibm.com/developerworks/library/bd-zookeeper/
  the paragraph starting with 'A quorum is represented by a strict majority
of nodes'

FYI

On Tue, Jun 28, 2016 at 5:52 AM, vimal dinakaran 
wrote:

> I am using zookeeper for providing HA for spark cluster.  We have two
> nodes zookeeper cluster.
>
> When one of the zookeeper dies then the entire spark cluster goes down .
>
> Is this expected behaviour ?
> Am I missing something in config ?
>
> Spark version - 1.6.1.
> Zookeeper version - 3.4.6
> // spark-env.sh
> SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
> -Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181"
>
> Below is the log from spark master:
> ZooKeeperLeaderElectionAgent: We have lost leadership
> 16/06/27 09:39:30 ERROR Master: Leadership has been revoked -- master
> shutting down.
>
> Thanks
> Vimal
>
>
>
>


Re: How to write the DataFrame results back to HDFS with other then \n as record separator

2016-06-28 Thread Dhaval Patel
Did you try implementing MultipleTextOutputFormat and use SaveAsHadoopFile
with keyClass, valueClass and OutputFormat instead of default parameters?

You need to implement toString for your keyClass and ValueClass inorder to
get field separator other than defaults.

Regards
Dhaval


On Tue, Jun 28, 2016 at 4:44 AM, Radha krishna  wrote:

> Hi,
> i have some files in the hdfs with FS as field separator and RS as record
> separator, i am able to read the files and able to process successfully.
> how can i write the spark DataFrame result into the HDFS file with same
> delimeters (FS as field separator and RS as record separator instead of \n)
> using java
> Can any one suggest..
>
>
> with the below lines i am able to read the content as line separated by RS
> instead of \n
> hadoopConf = new Configuration(jsc.hadoopConfiguration());
> hadoopConf.set("textinputformat.record.delimiter", "\u001e");
>
> i want to write the data back to hdfs with the same line separator
> (RS[\u001e])
>
>
> Thanks & Regards
>Radha krishna
>
>
>


Issue with Spark on 25 nodes cluster

2016-06-28 Thread ANDREA SPINA
Hello everyone,

I am running some experiments with Spark 1.4.0 on a ~80GiB dataset located
on hdfs-2.7.1. The environment is a 25 nodes cluster, 16 cores per node. I
set the following params:

spark.master = "spark://"${runtime.hostname}":7077"

# 28 GiB of memory
spark.executor.memory = "28672m"
spark.worker.memory = "28672m"
spark.driver.memory = "2048m"

spark.driver.maxResultSize = "0"

I run some scaling experiments varying the machine set number.
I can successfully experiments with the whole number of nodes (25) and also
with (20) nodes. Experiments with environments of 5 nodes and 10 nodes
relentlessy fails. During the running spark executor begin to collect
failing jobs from different stages and end with the following trace:

16/06/28 03:11:09 INFO DAGScheduler: Job 14 failed: reduce at
sGradientDescent.scala:229, took 1778.508309 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 212 in stage 14.0 failed 4 times, most recent
failure: Lost task 212.3 in stage 14.0 (TID 12278, 130.149.21.19):
java.io.IOException: Connection from /130.149.21.16:35997 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:104)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:91)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:183)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:169)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:738)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:606)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:380)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Here

the Master full Log.
As well, each Worker receive signal SIGTERM: 15

I can't figure out a solution as well.
Thank you, Regards,

Andrea


-- 
*Andrea Spina*
N.Tessera: *74598*
MAT: *89369*
*Ingegneria Informatica* *[LM] *(D.M. 270)


Re: Restart App and consume from checkpoint using direct kafka API

2016-06-28 Thread vimal dinakaran
I have implemented the above approach with cassandra db.

Thank you all.

On Thu, Mar 31, 2016 at 8:26 PM, Cody Koeninger  wrote:

> Long story short, no.  Don't rely on checkpoints if you cant handle
> reprocessing some of your data.
>
> On Thu, Mar 31, 2016 at 3:02 AM, Imre Nagi 
> wrote:
> > I'm dont know how to read the data from the checkpoint. But AFAIK and
> based
> > on my experience, I think the best thing that you can do is storing the
> > offset to a particular storage such as database everytime you consume the
> > message. Then read the offset from the database everytime you want to
> start
> > reading the message.
> >
> > nb: This approach is also explained by Cody in his blog post.
> >
> > Thanks
> >
> > On Thu, Mar 31, 2016 at 2:14 PM, vimal dinakaran 
> > wrote:
> >>
> >> Hi,
> >>  In the blog
> >> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
> >>
> >> It is mentioned that enabling checkpoint works as long as the app jar is
> >> unchanged.
> >>
> >> If I want to upgrade the jar with the latest code and consume from kafka
> >> where it was stopped , how to do that ?
> >> Is there a way to read the binary object of the checkpoint during init
> and
> >> use that to start from offset ?
> >>
> >> Thanks
> >> Vimal
> >
> >
>


Spark master shuts down when one of zookeeper dies

2016-06-28 Thread vimal dinakaran
I am using zookeeper for providing HA for spark cluster.  We have two nodes
zookeeper cluster.

When one of the zookeeper dies then the entire spark cluster goes down .

Is this expected behaviour ?
Am I missing something in config ?

Spark version - 1.6.1.
Zookeeper version - 3.4.6
// spark-env.sh
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=zk1:2181,zk2:2181"

Below is the log from spark master:
ZooKeeperLeaderElectionAgent: We have lost leadership
16/06/27 09:39:30 ERROR Master: Leadership has been revoked -- master
shutting down.

Thanks
Vimal


Re: DataFrame versus Dataset creation and usage

2016-06-28 Thread Martin Serrano
Xinh,

Thanks for the clarification.  I'm new to Spark and trying to navigate the 
different APIs.  I was just following some examples and retrofitting them, but 
I see now I should stick with plain RDDs until my schema is known (at the end 
of the data pipeline).

Thanks again!

On 06/24/2016 04:57 PM, Xinh Huynh wrote:
Hi Martin,

Since your schema is dynamic, how would you use Datasets? Would you know ahead 
of time the row type T in a Dataset[T]?

One option is to start with DataFrames in the beginning of your data pipeline, 
figure out the field types, and then switch completely over to RDDs or Dataset 
in the next stage of the pipeline.

Also, I'm not sure what the custom Java mappers are doing - could you use them 
as UDFs within a DataFrame?

Xinh

On Fri, Jun 24, 2016 at 11:42 AM, Martin Serrano 
> wrote:
Indeed.  But I'm dealing with 1.6 for now unfortunately.


On 06/24/2016 02:30 PM, Ted Yu wrote:
In Spark 2.0, Dataset and DataFrame are unified.

Would this simplify your use case ?

On Fri, Jun 24, 2016 at 7:27 AM, Martin Serrano 
> wrote:
Hi,

I'm exposing a custom source to the Spark environment.  I have a question about 
the best way to approach this problem.

I created a custom relation for my source and it creates a DataFrame.  My 
custom source knows the data types which are dynamic so this seemed to be the 
appropriate return type.  This works fine.

The next step I want to take is to expose some custom mapping functions 
(written in Java).  But when I look at the APIs, the map method for DataFrame 
returns an RDD (not a DataFrame).  (Should I use SqlContext.createDataFrame on 
the result? -- does this result in additional processing overhead?)  The 
Dataset type seems to be more of what I'd be looking for, it's map method 
returns the Dataset type.  So chaining them together is a natural exercise.

But to create the Dataset from a DataFrame, it appears that I have to provide 
the types of each field in the Row in the DataFrame.as[...] method.  I would 
think that the DataFrame would be able to do this automatically since it has 
all the types already.

This leads me to wonder how I should be approaching this effort.  As all the 
fields and types are dynamic, I cannot use beans as my type when passing data 
around.  Any advice would be appreciated.

Thanks,
Martin









How to write the DataFrame results back to HDFS with other then \n as record separator

2016-06-28 Thread Radha krishna
Hi,
i have some files in the hdfs with FS as field separator and RS as record
separator, i am able to read the files and able to process successfully.
how can i write the spark DataFrame result into the HDFS file with same
delimeters (FS as field separator and RS as record separator instead of \n)
using java
Can any one suggest..


with the below lines i am able to read the content as line separated by RS
instead of \n
hadoopConf = new Configuration(jsc.hadoopConfiguration());
hadoopConf.set("textinputformat.record.delimiter", "\u001e");

i want to write the data back to hdfs with the same line separator
(RS[\u001e])


Thanks & Regards
   Radha krishna


Create JavaRDD from list in Spark 2.0

2016-06-28 Thread Rafael Caballero
Hi,

The usual way of creating a JavaRDD from a list is to use
JavaSparkContext.parallelize(List)

However, in Spark 2.0 SparkSession is used as entry point and I don't know
how to create a JavaRDD from a List. Is this possible?

Thanks and best regards,

Rafael Caballero




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Create-JavaRDD-from-list-in-Spark-2-0-tp27241.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Substract two DStreams

2016-06-28 Thread Marius Soutier
Sure, no problem.

> On 28.06.2016, at 08:57, Matthias Niehoff  
> wrote:
> 
> ah, didn't know about this. That might actually work. I solved it by 
> implementing the leftJoinWithCassandraTable by myself which is nearly as fast 
> as the normal join. This should be faster than joining and subtracting then. 
> Anyway, thanks for the hint of the transformWith method!
> 
> Am 27. Juni 2016 um 14:32 schrieb Marius Soutier  >:
> `transformWith` accepts another stream, wouldn't that work?
> 
>> On 27.06.2016, at 14:04, Matthias Niehoff > > wrote:
>> 
>> in transform I have only access to one stream and not do both the original 
>> and the change stream. in foreachRDD i can change the stream and have both 
>> the original RDD and the changed RDD to do a substract.
>> 
>> 2016-06-27 13:13 GMT+02:00 Marius Soutier > >:
>> Can't you use `transform` instead of `foreachRDD`?
>> 
>> 
>> 
>>> On 15.06.2016, at 15:18, Matthias Niehoff >> > wrote:
>>> 
>>> Hi,
>>> 
>>> i want to subtract 2 DStreams (based on the same Input Stream) to get all 
>>> elements that exist in the original stream, but not in the modified stream 
>>> (the modified Stream is changed using joinWithCassandraTable which does an 
>>> inner join and because of this might remove entries).
>>> 
>>> Subtract is only possible on RDDs. So I could use a foreachRDD right in the 
>>> beginning of the Stream processing and work on rdds. I think its quite ugly 
>>> to use the output op at the beginning and then implement a lot of 
>>> transformations in the foreachRDD. So could you think of different ways to 
>>> do an efficient diff between to DStreams?
>>> 
>>> Thank you
>>> 
>>> -- 
>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>> tel: +49 (0) 721.9595-681  | fax: +49 
>>> (0) 721.9595-666  | mobil: +49 (0) 
>>> 172.1702676 
>>> www.codecentric.de  | blog.codecentric.de 
>>>  | www.meettheexperts.de 
>>>  | www.more4fi.de  
>>> 
>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>>> 
>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche 
>>> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige 
>>> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie 
>>> bitte sofort den Absender und löschen Sie diese E-Mail und evtl. 
>>> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen 
>>> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist 
>>> nicht gestattet
>> 
>> 
>> 
>> 
>> -- 
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681  | fax: +49 
>> (0) 721.9595-666  | mobil: +49 (0) 
>> 172.1702676 
>> www.codecentric.de  | blog.codecentric.de 
>>  | www.meettheexperts.de 
>>  | www.more4fi.de  
>> 
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>> 
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche 
>> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige 
>> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie 
>> bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter 
>> Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. 
>> beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht 
>> gestattet
> 
> 
> 
> 
> -- 
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681  | fax: +49 (0) 
> 721.9595-666  | mobil: +49 (0) 
> 172.1702676 
> www.codecentric.de  | blog.codecentric.de 
>  | www.meettheexperts.de 
> 

Best way to tranform string label into long label for classification problem

2016-06-28 Thread Jaonary Rabarisoa
Dear all,

I'm trying to a find a way to transform a DataFrame into a data that is
more suitable for third party classification algorithm. The DataFrame have
two columns : "feature" represented by a vector and "label" represented by
a string. I want the "label" to be a number between [0, number of classes -
1].
Do you have any ideas to do it efficiently ?

 Cheers,

Jao