Need help :- org.apache.spark.SparkException :- No such file or directory

2016-09-28 Thread Madabhattula Rajesh Kumar
Hi Team,

I getting below exception in spark jobs. Please let me know how to fix this
issue.

*Below is my cluster configuration :- *

I am using SparkJobServer to trigger the jobs. Below is my configuration in
SparkJobServer.

   - num-cpu-cores = 4
   - memory-per-node = 4G

I have a 4 workers in my cluster.


"result": {
"errorClass": "org.apache.spark.SparkException",
"cause":
"/tmp/spark-31a538f3-9451-4a2d-9123-00feb56c9e91/executor-73be6ffd-cd03-452a-bc99-a44290953d4f/spark-d0630f1f-e3df-4714-af30-4c839f6e3e8a/9400069401471754061530_lock
(No such file or directory)",
"stack": ["java.io.RandomAccessFile.open0(Native Method)",
"java.io.RandomAccessFile.open(RandomAccessFile.java:316)",
"java.io.RandomAccessFile.(RandomAccessFile.java:243)",
"org.apache.spark.util.Utils$.fetchFile(Utils.scala:373)",
"org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:405)",
"org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:397)",
"scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)",
"scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)",
"scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)",
"scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)",
"scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)",
"scala.collection.mutable.HashMap.foreach(HashMap.scala:98)",
"scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)",
"org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:397)",
"org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)",
"java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)",
"java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)",
"java.lang.Thread.run(Thread.java:745)"],
"causingClass": "java.io.FileNotFoundException",
"message": "Job aborted due to stage failure: Task 0 in stage 1286.0
failed 4 times, most recent failure: Lost task 0.3 in stage 1286.0 (TID
39149, svcjo-prd911.cisco.com): java.io.FileNotFoundException:
/tmp/spark-31a538f3-9451-4a2d-9123-00feb56c9e91/executor-73be6ffd-cd03-452a-bc99-a44290953d4f/spark-d0630f1f-e3df-4714-af30-4c839f6e3e8a/9400069401471754061530_lock
(No such file or directory)\n\tat java.io.RandomAccessFile.open0(Native
Method)\n\tat
java.io.RandomAccessFile.open(RandomAccessFile.java:316)\n\tat
java.io.RandomAccessFile.(RandomAccessFile.java:243)\n\tat
org.apache.spark.util.Utils$.fetchFile(Utils.scala:373)\n\tat
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:405)\n\tat
org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:397)\n\tat
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)\n\tat
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)\n\tat
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)\n\tat
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)\n\tat
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)\n\tat
scala.collection.mutable.HashMap.foreach(HashMap.scala:98)\n\tat
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)\n\tat
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:397)\n\tat
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat
java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:"
  },

Regards,
Rajesh


Spark Hive Rejection

2016-09-28 Thread Mostafa Alaa Mohamed
Dears,
I want to ask

* What will happened if there are rejections rows when inserting 
dataframe into hive?

o   Rejection will be for example table required integer into column and 
dataframe include string.

o   Duplication rejection restriction from the table itself?

* How can we specify the rejection directory?
If not avaiable do you recommend to open Jira issue?

Best Regards,
Mostafa Alaa Mohamed,
Technical Expert Big Data,
M: +971506450787
Email: mohamedamost...@etisalat.ae


The content of this email together with any attachments, statements and 
opinions expressed herein contains information that is private and confidential 
are intended for the named addressee(s) only. If you are not the addressee of 
this email you may not copy, forward, disclose or otherwise use it or any part 
of it in any form whatsoever. If you have received this message in error please 
notify postmas...@etisalat.ae by email immediately and delete the message 
without making any copies.


spark persistence doubt

2016-09-28 Thread Shushant Arora
Hi

I have a flow like below

1.rdd1=some source.transform();
2.tranformedrdd1 = rdd1.transform(..);
3.transformrdd2 = rdd1.transform(..);

4.tranformrdd1.action();

Does I need to persist rdd1 to optimise step 2 and 3 ? or since there is no
lineage breakage so it will work without persist ?

Thanks


Re: MLib Documentation Update Needed

2016-09-28 Thread Tobi Bosede
OK, I've opened a jira. https://issues.apache.org/jira/browse/SPARK-17718

And ok, I forgot the loss is summed in the objective function provided. My
mistake.

On a tangentially related topic, why is there a half in front of the
squared loss? Similarly, the L2 regularizer has a half. It's just a
constant and so the objective's minimum is not affected, but still curious
to know why the half wasn't left out.

On Mon, Sep 26, 2016 at 4:40 AM, Sean Owen  wrote:

> Yes I think that footnote could be a lot more prominent, or pulled up
> right under the table.
>
> I also think it would be fine to present the {0,1} formulation. It's
> actually more recognizable, I think, for log-loss in that form. It's
> probably less recognizable for hinge loss, but, consistency is more
> important. There's just an extra (2y-1) term, at worst.
>
> The loss here is per instance, and implicitly summed over all
> instances. I think that is probably not confusing for the reader; if
> they're reading this at all to double-check just what formulation is
> being used, I think they'd know that. But, it's worth a note.
>
> The loss is summed in the case of log-loss, not multiplied (if that's
> what you're saying).
>
> Those are decent improvements, feel free to open a pull request / JIRA.
>
>
> On Mon, Sep 26, 2016 at 6:22 AM, Tobi Bosede  wrote:
> > The loss function here for logistic regression is confusing. It seems to
> > imply that spark uses only -1 and 1 class labels. However it uses 0,1 as
> the
> > very inconspicuous note quoted below (under Classification) says. We
> need to
> > make this point more visible to avoid confusion.
> >
> > Better yet, we should replace the loss function listed with that for 0,
> 1 no
> > matter how mathematically inconvenient, since that is what is actually
> > implemented in Spark.
> >
> > More problematic, the loss function (even in this "convenient" form) is
> > actually incorrect. This is because it is missing either a summation
> (sigma)
> > in the log or product (pi) outside the log, as the loss for logistic is
> the
> > log likelihood. So there are multiple problems with the documentation.
> > Please advise on steps to fix for all version documentation or if there
> are
> > already some in place.
> >
> > "Note that, in the mathematical formulation in this guide, a binary label
> > y is denoted as either +1 (positive) or −1 (negative), which is
> convenient
> > for the formulation. However, the negative label is represented by 0 in
> > spark.mllib instead of −1, to be consistent with multiclass labeling."
>


Re: Problems with new experimental Kafka Consumer for 0.10

2016-09-28 Thread Cody Koeninger
Well, I'd start at the first thing suggested by the error, namely that
the group has rebalanced.

Is that stream using a unique group id?

On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff
 wrote:
> Hi,
>
> the stacktrace:
>
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the partitions
> to another member. This means that the time between subsequent calls to
> poll() was longer than the configured session.timeout.ms, which typically
> implies that the poll loop is spending too much time message processing. You
> can address this either by increasing the session timeout or by reducing the
> maximum size of batches returned in poll() with max.poll.records.
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:169)
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> at scala.Option.orElse(Option.scala:289)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at
> scala.collection.TraversableLike$$anonfun$

Re: spark / mesos and GPU resources

2016-09-28 Thread Timothy Chen
Hi Jackie,

That doesn't work because GPU is a first class reource for Mesos
starting with 1.0, and the patch to enable it from me is still in PR.
I've done a demo last Spark Summit SF about Spark/Mesos/GPU and you
can look at the video to see how it works.

Feel free to try out the PR
(https://github.com/apache/spark/pull/14644) and let me know if you
have any problems or questions.

Tim


On Wed, Sep 28, 2016 at 4:48 PM, Jackie Tung  wrote:
> Hi,
>
> Does Spark support GPU resources reservation (much like it does for CPU and 
> memory) on a Mesos cluster manager?
>
> Mesos added GPU recently as a first class resource type.  I tried out the 
> spark.mesos.constraints variable optimistically “gpu:1” which did not work 
> for me.
>
> This is really one of the final hurdles for us to adopt Spark for our 
> workloads.  Spark looks really great otherwise for us.
>
> Many Thanks,
> Jackie
> --
>
>
> The information in this email is confidential and may be legally
> privileged. It is intended solely for the addressee. Access to this email
> by anyone else is unauthorized. If you are not the intended recipient, any
> disclosure, copying, distribution or any action taken or omitted to be
> taken in reliance on it, is prohibited and may be unlawful.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark / mesos and GPU resources

2016-09-28 Thread Jackie Tung
Hi,

Does Spark support GPU resources reservation (much like it does for CPU and 
memory) on a Mesos cluster manager?

Mesos added GPU recently as a first class resource type.  I tried out the 
spark.mesos.constraints variable optimistically “gpu:1” which did not work for 
me.

This is really one of the final hurdles for us to adopt Spark for our 
workloads.  Spark looks really great otherwise for us.

Many Thanks,
Jackie
-- 
 

The information in this email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be 
taken in reliance on it, is prohibited and may be unlawful.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Submit and Monitor standalone cluster application

2016-09-28 Thread Mariano Semelman
​Hello everybody,

I'm developing an application to submit batch and streaming apps in a fault
tolerant fashion. For that I need a programatically way to submit and
monitor my apps and relaunch them in case of failure. Right now I'm using
spark standalone (1.6.x) and submitting in cluster mode. The options I have
explored so far are:

SparkLauncher.java [1]: It has two modes:
- 1) launch() Doesn't give me the application-id in order to monitor
(with spark master rest API). Would have to infer from the application name and
startTime in api/v1/applications using the spark master API [9]
- 2) startApplication(...) Only works if submitted locally or client
mode (BTW, the fact that only works in client or local mode is not
documented in the package summary page[1] which led me to many, many wasted
hours)

Spark-Jobserver [2]:
Doesn't support standalone cluster mode

Livy [3]:
Doesn't support standalone cluster mode

Spark Submission Rest API [4,5,6]:
It seems the sensible way, but is black magic for the user. It's not
documented and there's no official Client. There's only one [7] unofficial
client. And it occurred to me also to copy in my own project the
RestSubmissionClient [8].


I'm between using launch and infer the appId or using Spark Submission Rest
API, but none of them seem a proper way to solve this. If someone could
give me an advise on how to face this I would appreciate it.

Thanks in advance,

Mariano


[1]
https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/launcher/package-summary.html
[2] https://github.com/spark-jobserver/spark-jobserver
[3] http://livy.io/
[4]
http://stackoverflow.com/questions/28992802/triggering-spark-jobs-with-rest
(most voted answer)
[5] http://arturmkrtchyan.com/apache-spark-hidden-rest-api
[6] https://issues.apache.org/jira/browse/SPARK-5388
[7] https://github.com/ywilkof/spark-jobs-rest-client
[8]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
[9] http://spark.apache.org/docs/latest/monitoring.html


Re: Issue with rogue data in csv file used in Spark application

2016-09-28 Thread Mich Talebzadeh
Thanks guys.

This seemed to be working after declaring all columns as Strings to start
and using filters below to avoid rogue characters. The second filter
ensures that there was trade volumes on that date.

val *rs = df2.filter($"Open" !== "-").filter($"Volume".cast("Integer")
> 0*).filter(changeToDate("TradeDate")
>=
monthsago).select((changeToDate("TradeDate").as("TradeDate")),(round(($"Close".cast("Float")+$"Open".cast("Float"))/2,2)).as("AverageDailyPrice"),
$"Low".cast("Float").as("Day's Low"), $"High".cast("Float")as("Day's
High")).orderBy(changeToDate("TradeDate"))

Cheers


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 28 September 2016 at 14:45, Bedrytski Aliaksandr 
wrote:

> Hi Mich,
>
> if I understood you well, you may cast the value to float, it will yield
> null if the value is not a correct float:
>
> val df = Seq(("-", 5), ("1", 6), (",", 7), ("8.6", 7)).toDF("value",
> "id").createOrReplaceTempView("lines")
>
> spark.sql("SELECT cast(value as FLOAT) from lines").show()
>
> +-+
> |value|
> +-+
> | null|
> | 1.0 |
> | null|
> | 8.6 |
> +-+
>
> After it you may filter the DataFrame for values containing null.
>
> Regards,
> --
>   Bedrytski Aliaksandr
>   sp...@bedryt.ski
>
>
>
> On Wed, Sep 28, 2016, at 10:11, Mich Talebzadeh wrote:
>
> Thanks all.
>
> This is the csv schema all columns mapped to String
>
> scala> df2.printSchema
> root
>  |-- Stock: string (nullable = true)
>  |-- Ticker: string (nullable = true)
>  |-- TradeDate: string (nullable = true)
>  |-- Open: string (nullable = true)
>  |-- High: string (nullable = true)
>  |-- Low: string (nullable = true)
>  |-- Close: string (nullable = true)
>  |-- Volume: string (nullable = true)
>
> The issue I have can be shown as below
>
> df2.filter( $"OPen" === 
> "-").select((changeToDate("TradeDate").as("TradeDate")),
> 'Open, 'High, 'Low, 'Close, 'Volume).show
>
> +--+++---+-+--+
> | TradeDate|Open|High|Low|Close|Volume|
> +--+++---+-+--+
> |2011-12-23|   -|   -|  -|40.56| 0|
> |2011-04-21|   -|   -|  -|45.85| 0|
> |2010-12-30|   -|   -|  -|38.10| 0|
> |2010-12-23|   -|   -|  -|38.36| 0|
> |2008-04-30|   -|   -|  -|32.39| 0|
> |2008-04-29|   -|   -|  -|33.05| 0|
> |2008-04-28|   -|   -|  -|32.60| 0|
> +--+++---+-+--+
> Now there are ways of dealing with this. However, the solution has to be
> generic! Checking for a column == "-" is not generic. How about if that
> column was "," etc.
>
> This is an issue in most databases. Specifically if a field is NaN.. --> (
> *NaN*, standing for not a number, is a numeric data type value
> representing an undefined or unrepresentable value, especially in
> floating-point calculations)
>
> Spark handles this
> .
> I am on  Spark 2.0.1  in Class DataFrameNaFunctions. The simplest one is to
> drop these rogue rows
>
> df2.filter( $"Open" === "-").drop()
>
> However, a better approach would be to use REPLACE method or testing any
> column for NaN
>
>
>
>
> There is a method called isnan(). However, it does not return correct
> values!
>
>  df2.filter(isnan($"Open")).show
> +-+--+-+++---+-+--+
> |Stock|Ticker|TradeDate|Open|High|Low|Close|Volume|
> +-+--+-+++---+-+--+
> +-+--+-+++---+-+--+
>
>
> Any suggestions?
>
> Thanks
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On 28 September 2016 at 04:07, Mike Metzger 
> wrote:
>
> Hi Mich -
>
>Can you run a filter command on df1 prior to your map for any rows
> where p(3).toString != '-' then run your map command?
>
> Thanks
>
>
> Mike
>
>
> On Tue, Sep 27, 2016 at 5:06 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
> Thanks guys
>
> Actually these are the 7 

Re: Dataset doesn't have partitioner after a repartition on one of the columns

2016-09-28 Thread Igor Berman
Michael, can you explain please why bucketBy is supported when using
writeAsTable() to parquet by not with parquet()
Is it only difference between table api and dataframe/dataset api? or there
are some other?

org.apache.spark.sql.AnalysisException: 'save' does not support bucketing
right now;
at
org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:310)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:203)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:478)


thanks in advance


On 28 September 2016 at 21:26, Michael Armbrust 
wrote:

> Hi Darin,
>
> In SQL we have finer grained information about partitioning, so we don't
> use the RDD Partitioner.  Here's a notebook
> that
> walks through what we do expose and how it is used by the query planner.
>
> Michael
>
> On Tue, Sep 20, 2016 at 11:22 AM, McBeath, Darin W (ELS-STL) <
> d.mcbe...@elsevier.com> wrote:
>
>> I’m using Spark 2.0.
>>
>> I’ve created a dataset from a parquet file and repartition on one of the
>> columns (docId) and persist the repartitioned dataset.
>>
>> val om = ds.repartition($"docId”).persist(StorageLevel.MEMORY_AND_DISK)
>>
>> When I try to confirm the partitioner, with
>>
>> om.rdd.partitioner
>>
>> I get
>>
>> Option[org.apache.spark.Partitioner] = None
>>
>> I would have thought it would be HashPartitioner.
>>
>> Does anyone know why this would be None and not HashPartitioner?
>>
>> Thanks.
>>
>> Darin.
>>
>>
>>
>


Re: Treadting NaN fields in Spark

2016-09-28 Thread Marco Mistroni
Hi Dr Mich,
  how bout reading all csv as  string and then applying an UDF sort of like
this?

  import scala.util.control.Exception.allCatch

  def getDouble(doubleStr:String):Double =
allCatch opt doubleStr.toDouble match {
case Some(doubleNum) => doubleNum
case _ => Double.NaN
  }


out of curiosity are you reading data from Yahoo Finance? if so, are you
downloading a whole .csv file?
i m doing similar thing but i am using instead a library from
com.github.tototoshi.csv._  to read csv files as a list of string, then i
have control on how to render each row. but presumably if you have over
1k worth of data perhaps this solution will not assist

hth
 marco




On Wed, Sep 28, 2016 at 3:44 PM, Peter Figliozzi 
wrote:

> In Scala, x.isNaN returns true for Double.NaN, but false for any
> character.  I guess the `isnan` function you are using works by ultimately
> looking at x.isNan.
>
> On Wed, Sep 28, 2016 at 5:56 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>>
>> This is an issue in most databases. Specifically if a field is NaN.. --> (
>> *NaN*, standing for not a number, is a numeric data type value
>> representing an undefined or unrepresentable value, especially in
>> floating-point calculations)
>>
>> There is a method called isnan() in Spark that is supposed to handle this
>> scenario . However, it does not return correct values! For example I
>> defined column "Open" as String  (it should be Float) and it has the
>> following 7 rogue entries out of 1272 rows in a csv
>>
>> df2.filter( $"OPen" === 
>> "-").select((changeToDate("TradeDate").as("TradeDate")),
>> 'Open, 'High, 'Low, 'Close, 'Volume).show
>>
>> +--+++---+-+--+
>> | TradeDate|Open|High|Low|Close|Volume|
>> +--+++---+-+--+
>> |2011-12-23|   -|   -|  -|40.56| 0|
>> |2011-04-21|   -|   -|  -|45.85| 0|
>> |2010-12-30|   -|   -|  -|38.10| 0|
>> |2010-12-23|   -|   -|  -|38.36| 0|
>> |2008-04-30|   -|   -|  -|32.39| 0|
>> |2008-04-29|   -|   -|  -|33.05| 0|
>> |2008-04-28|   -|   -|  -|32.60| 0|
>> +--+++---+-+--+
>>
>> However, the following does not work!
>>
>>  df2.filter(isnan($"Open")).show
>> +-+--+-+++---+-+--+
>> |Stock|Ticker|TradeDate|Open|High|Low|Close|Volume|
>> +-+--+-+++---+-+--+
>> +-+--+-+++---+-+--+
>>
>> Any suggestions?
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>


Re: Spark ML Decision Trees Algorithm

2016-09-28 Thread janardhan shetty
Is there a reference to the research paper which is implemented in spark
2.0 ?

On Wed, Sep 28, 2016 at 9:52 AM, janardhan shetty 
wrote:

> Which algorithm is used under the covers while doing decision trees FOR
> SPARK ?
> for example: scikit-learn (python) uses an optimised version of the CART
> algorithm.
>


Re: Dataset doesn't have partitioner after a repartition on one of the columns

2016-09-28 Thread Michael Armbrust
Hi Darin,

In SQL we have finer grained information about partitioning, so we don't
use the RDD Partitioner.  Here's a notebook
that
walks through what we do expose and how it is used by the query planner.

Michael

On Tue, Sep 20, 2016 at 11:22 AM, McBeath, Darin W (ELS-STL) <
d.mcbe...@elsevier.com> wrote:

> I’m using Spark 2.0.
>
> I’ve created a dataset from a parquet file and repartition on one of the
> columns (docId) and persist the repartitioned dataset.
>
> val om = ds.repartition($"docId”).persist(StorageLevel.MEMORY_AND_DISK)
>
> When I try to confirm the partitioner, with
>
> om.rdd.partitioner
>
> I get
>
> Option[org.apache.spark.Partitioner] = None
>
> I would have thought it would be HashPartitioner.
>
> Does anyone know why this would be None and not HashPartitioner?
>
> Thanks.
>
> Darin.
>
>
>


Re: Spark Executor Lost issue

2016-09-28 Thread Sushrut Ikhar
Can you add more details like are you using rdds/datasets/sql ..; are you
doing group by/ joins ; is your input splittable?
btw, you can pass the config the same way you are passing memryOverhead:
e.g.
--conf spark.default.parallelism=1000 or through spark-context in code

Regards,

Sushrut Ikhar
[image: https://]about.me/sushrutikhar



On Wed, Sep 28, 2016 at 7:30 PM, Aditya 
wrote:

> Hi All,
>
> Any updates on this?
>
> On Wednesday 28 September 2016 12:22 PM, Sushrut Ikhar wrote:
>
> Try with increasing the parallelism by repartitioning and also you may
> increase - spark.default.parallelism
> You can also try with decreasing num-executor cores.
> Basically, this happens when the executor is using quite large memory than
> it asked; and yarn kills the executor.
>
> Regards,
>
> Sushrut Ikhar
> [image: https://]about.me/sushrutikhar
> 
>
>
> On Wed, Sep 28, 2016 at 12:17 PM, Aditya  co.in> wrote:
>
>> I have a spark job which runs fine for small data. But when data
>> increases it gives executor lost error.My executor and driver memory are
>> set at its highest point. I have also tried increasing --conf
>> spark.yarn.executor.memoryOverhead=600 but still not able to fix the
>> problem. Is there any other solution to fix the problem?
>>
>>
>
>
>


Re: Using Spark as a Maven dependency but with Hadoop 2.6

2016-09-28 Thread Sean Owen
I guess I'm claiming the artifacts wouldn't even be different in the first
place, because the Hadoop APIs that are used are all the same across these
versions. That would be the thing that makes you need multiple versions of
the artifact under multiple classifiers.

On Wed, Sep 28, 2016 at 1:16 PM, Olivier Girardot <
o.girar...@lateral-thoughts.com> wrote:

> ok, don't you think it could be published with just different classifiers
> hadoop-2.6
> hadoop-2.4
> hadoop-2.2 being the current default.
>
> So for now, I should just override spark 2.0.0's dependencies with the
> ones defined in the pom profile
>
>
>
> On Thu, Sep 22, 2016 11:17 AM, Sean Owen so...@cloudera.com wrote:
>
>> There can be just one published version of the Spark artifacts and they
>> have to depend on something, though in truth they'd be binary-compatible
>> with anything 2.2+. So you merely manage the dependency versions up to the
>> desired version in your .
>>
>> On Thu, Sep 22, 2016 at 7:05 AM, Olivier Girardot <
>> o.girar...@lateral-thoughts.com> wrote:
>>
>> Hi,
>> when we fetch Spark 2.0.0 as maven dependency then we automatically end
>> up with hadoop 2.2 as a transitive dependency, I know multiple profiles are
>> used to generate the different tar.gz bundles that we can download, Is
>> there by any chance publications of Spark 2.0.0 with different classifier
>> according to different versions of Hadoop available ?
>>
>> Thanks for your time !
>>
>> *Olivier Girardot*
>>
>>
>>
>
> *Olivier Girardot* | Associé
> o.girar...@lateral-thoughts.com
> +33 6 24 09 17 94
>


Re: Using Spark as a Maven dependency but with Hadoop 2.6

2016-09-28 Thread Olivier Girardot
ok, don't you think it could be published with just different classifiers
hadoop-2.6hadoop-2.4
hadoop-2.2 being the current default.
So for now, I should just override spark 2.0.0's dependencies with the ones
defined in the pom profile
 





On Thu, Sep 22, 2016 11:17 AM, Sean Owen so...@cloudera.com
wrote:
There can be just one published version of the Spark artifacts and they have to
depend on something, though in truth they'd be binary-compatible with anything
2.2+. So you merely manage the dependency versions up to the desired version in
your .
On Thu, Sep 22, 2016 at 7:05 AM, Olivier Girardot <
o.girar...@lateral-thoughts.com>  wrote:
Hi,when we fetch Spark 2.0.0 as maven dependency then we automatically end up
with hadoop 2.2 as a transitive dependency, I know multiple profiles are used to
generate the different tar.gz bundles that we can download, Is there by any
chance publications of Spark 2.0.0 with different classifier according to
different versions of Hadoop available ?
Thanks for your time !
Olivier Girardot

 


Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Re: New to spark.

2016-09-28 Thread Bryan Cutler
Hi Anirudh,

All types of contributions are welcome, from code to documentation.  Please
check out the page at
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark for
some info, specifically keep a watch out for starter JIRAs here
https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20labels%20%3D%20Starter%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)
.

On Wed, Sep 28, 2016 at 9:11 AM, Anirudh Muhnot  wrote:

> Hello everyone, I'm Anirudh. I'm fairly new to spark as I've done an
> online specialisation from UC Berkeley. I know how to code in Python but
> have little to no idea about Scala. I want to contribute to Spark, Where do
> I start and how? I'm reading the pull requests at Git Hub but I'm barley
> able to understand them. Can anyone help? Thank you.
> Sent from my iPhone
>


Spark Summit CfP Closes Sunday

2016-09-28 Thread Jules Damji
Fellow Sparkers,

The Spark Summit East 2017 CfP closes Sunday. If you have an abstract, don’t 
miss the deadline

https://spark-summit.org/east-2017/ 

Thank you & see you in Boston!

cheers
Jules

--
Simplicity precludes neither profundity nor power. 
Jules Damji
dmat...@comcast.net






Spark ML Decision Trees Algorithm

2016-09-28 Thread janardhan shetty
Which algorithm is used under the covers while doing decision trees FOR
SPARK ?
for example: scikit-learn (python) uses an optimised version of the CART
algorithm.


Re: Access S3 buckets in multiple accounts

2016-09-28 Thread Daniel Siegmann
Thanks for the help everyone. I was able to get permissions configured for
my cluster so it now has access to the bucket on the other account.


--
Daniel Siegmann
Senior Software Engineer
*SecurityScorecard Inc.*
214 W 29th Street, 5th Floor
New York, NY 10001


On Wed, Sep 28, 2016 at 10:03 AM, Steve Loughran 
wrote:

>
> On 27 Sep 2016, at 15:53, Daniel Siegmann 
> wrote:
>
> I am running Spark on Amazon EMR and writing data to an S3 bucket.
> However, the data is read from an S3 bucket in a separate AWS account.
> Setting the fs.s3a.access.key and fs.s3a.secret.key values is sufficient to
> get access to the other account (using the s3a protocol), however I then
> won't have access to the S3 bucket in the EMR cluster's AWS account.
>
> Is there any way for Spark to access S3 buckets in multiple accounts? If
> not, is there any best practice for how to work around this?
>
>
>
> There are 2 ways to do this without changing permissions
>
> 1. different implementations: use s3a for one, s3n for the other, give
> them the different secrets
>
> 2. insecure: use the secrets in the URI. s3a://AWSID:escaped-secret@
> bucket/path
> -leaks your secrets thoughout the logs, has problems with "/" in the
> password..if there is one, you'll probably need to regenerate the password.
>
> This is going to have to be fixed in the s3a implementation at some point,
> as it's not only needed for cross user auth, once you switch to v4 AWS auth
> you need to specify the appropriate s3 endpoint for your region; you can't
> just use s3 central, but need to choose s3 frankfurt, s3 seoul, etc: so
> won't be able to work with data across regions.
>


New to spark.

2016-09-28 Thread Anirudh Muhnot
Hello everyone, I'm Anirudh. I'm fairly new to spark as I've done an online 
specialisation from UC Berkeley. I know how to code in Python but have little 
to no idea about Scala. I want to contribute to Spark, Where do I start and 
how? I'm reading the pull requests at Git Hub but I'm barley able to understand 
them. Can anyone help? Thank you. 
Sent from my iPhone

Re: Broadcast big dataset

2016-09-28 Thread Takeshi Yamamuro
Hi,

# I dropped dev and added user because this is more suitable in
user-mailinglist.

I think you need to describe more about your environments,
e.g. spark version, executor memory, and so on.

// maropu


On Wed, Sep 28, 2016 at 11:03 PM, WangJianfei <
wangjianfe...@otcaix.iscas.ac.cn> wrote:

> Hi Devs
>  In my application, i just broadcast a dataset(about 500M) to  the
> ececutors(100+), I got a java heap error
> Jmartad-7219.hadoop.jd.local:53591 (size: 4.0 MB, free: 3.3 GB)
> 16/09/28 15:56:48 INFO BlockManagerInfo: Added broadcast_9_piece19 in
> memory
> on BJHC-Jmartad-9012.hadoop.jd.local:53197 (size: 4.0 MB, free: 3.3 GB)
> 16/09/28 15:56:49 INFO BlockManagerInfo: Added broadcast_9_piece8 in memory
> on BJHC-Jmartad-84101.hadoop.jd.local:52044 (size: 4.0 MB, free: 3.3 GB)
> 16/09/28 15:56:58 INFO BlockManagerInfo: Removed broadcast_8_piece0 on
> 172.22.176.114:37438 in memory (size: 2.7 KB, free: 3.1 GB)
> 16/09/28 15:56:58 WARN TaskSetManager: Lost task 125.0 in stage 7.0 (TID
> 130, BJHC-Jmartad-9376.hadoop.jd.local): java.lang.OutOfMemoryError: Java
> heap space
> at java.io.ObjectInputStream$HandleTable.grow(
> ObjectInputStream.java:3465)
> at
> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3271)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1350)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1350)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1350)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.
> java:1706)
>
> My configuration is 4G memory in driver.  Any advice is appreciated.
> Thank you!
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Broadcast-big-dataset-tp19127.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Treadting NaN fields in Spark

2016-09-28 Thread Peter Figliozzi
In Scala, x.isNaN returns true for Double.NaN, but false for any
character.  I guess the `isnan` function you are using works by ultimately
looking at x.isNan.

On Wed, Sep 28, 2016 at 5:56 AM, Mich Talebzadeh 
wrote:

>
> This is an issue in most databases. Specifically if a field is NaN.. --> (
> *NaN*, standing for not a number, is a numeric data type value
> representing an undefined or unrepresentable value, especially in
> floating-point calculations)
>
> There is a method called isnan() in Spark that is supposed to handle this
> scenario . However, it does not return correct values! For example I
> defined column "Open" as String  (it should be Float) and it has the
> following 7 rogue entries out of 1272 rows in a csv
>
> df2.filter( $"OPen" === 
> "-").select((changeToDate("TradeDate").as("TradeDate")),
> 'Open, 'High, 'Low, 'Close, 'Volume).show
>
> +--+++---+-+--+
> | TradeDate|Open|High|Low|Close|Volume|
> +--+++---+-+--+
> |2011-12-23|   -|   -|  -|40.56| 0|
> |2011-04-21|   -|   -|  -|45.85| 0|
> |2010-12-30|   -|   -|  -|38.10| 0|
> |2010-12-23|   -|   -|  -|38.36| 0|
> |2008-04-30|   -|   -|  -|32.39| 0|
> |2008-04-29|   -|   -|  -|33.05| 0|
> |2008-04-28|   -|   -|  -|32.60| 0|
> +--+++---+-+--+
>
> However, the following does not work!
>
>  df2.filter(isnan($"Open")).show
> +-+--+-+++---+-+--+
> |Stock|Ticker|TradeDate|Open|High|Low|Close|Volume|
> +-+--+-+++---+-+--+
> +-+--+-+++---+-+--+
>
> Any suggestions?
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>


Re: Access S3 buckets in multiple accounts

2016-09-28 Thread Steve Loughran

On 27 Sep 2016, at 15:53, Daniel Siegmann 
mailto:dsiegm...@securityscorecard.io>> wrote:

I am running Spark on Amazon EMR and writing data to an S3 bucket. However, the 
data is read from an S3 bucket in a separate AWS account. Setting the 
fs.s3a.access.key and fs.s3a.secret.key values is sufficient to get access to 
the other account (using the s3a protocol), however I then won't have access to 
the S3 bucket in the EMR cluster's AWS account.

Is there any way for Spark to access S3 buckets in multiple accounts? If not, 
is there any best practice for how to work around this?



There are 2 ways to do this without changing permissions

1. different implementations: use s3a for one, s3n for the other, give them the 
different secrets

2. insecure: use the secrets in the URI. s3a://AWSID:escaped-secret@bucket/path
-leaks your secrets thoughout the logs, has problems with "/" in the 
password..if there is one, you'll probably need to regenerate the password.

This is going to have to be fixed in the s3a implementation at some point, as 
it's not only needed for cross user auth, once you switch to v4 AWS auth you 
need to specify the appropriate s3 endpoint for your region; you can't just use 
s3 central, but need to choose s3 frankfurt, s3 seoul, etc: so won't be able to 
work with data across regions.


Re: Spark Executor Lost issue

2016-09-28 Thread Aditya

Hi All,

Any updates on this?

On Wednesday 28 September 2016 12:22 PM, Sushrut Ikhar wrote:
Try with increasing the parallelism by repartitioning and also you may 
increase - spark.default.parallelism

You can also try with decreasing num-executor cores.
Basically, this happens when the executor is using quite large memory 
than it asked; and yarn kills the executor.


Regards,

Sushrut Ikhar
https://about.me/sushrutikhar




On Wed, Sep 28, 2016 at 12:17 PM, Aditya 
> wrote:


I have a spark job which runs fine for small data. But when data
increases it gives executor lost error.My executor and driver
memory are set at its highest point. I have also tried
increasing--conf spark.yarn.executor.memoryOverhead=600but still
not able to fix the problem. Is there any other solution to fix
the problem?








Re: Trying to fetch S3 data

2016-09-28 Thread Steve Loughran

On 28 Sep 2016, at 06:28, Hitesh Goyal 
mailto:hitesh.go...@nlpcaptcha.com>> wrote:

Hi team,

I want to fetch data from Amazon S3 bucket. For this, I am trying to access it 
using scala.
I have tried the basic wordcount application in scala.
Now I want to retrieve s3 data using it.
I have gone through the tutorials and I found solutions for uploading files to 
S3.
Please tell me how can I retrieve the data buckets stored in S3.


This is actually something I'm trying to document as part of some work to add a 
spark-cloud module which adds the appropriate JARs to the classpath for things 
to work out the box.

Could you have a look at

https://github.com/steveloughran/spark/blob/b04c037f2925d9b698e541493fc936627ddcf9ba/docs/cloud-integration.md

And tell me where it could be improved?



Re: Issue with rogue data in csv file used in Spark application

2016-09-28 Thread Bedrytski Aliaksandr
Hi Mich,

if I understood you well, you may cast the value to float, it will yield
null if the value is not a correct float:

val df = Seq(("-", 5), ("1", 6), (",", 7), ("8.6", 7)).toDF("value",
"id").createOrReplaceTempView("lines")

spark.sql("SELECT cast(value as FLOAT) from lines").show()

+-+
|value|
+-+
| null|
|  1. |
| null|
|  8.6 |
+-+

After it you may filter the DataFrame for values containing null.

Regards,
--
  Bedrytski Aliaksandr
  sp...@bedryt.ski



On Wed, Sep 28, 2016, at 10:11, Mich Talebzadeh wrote:
> Thanks all.
>
> This is the csv schema all columns mapped to String
>
> scala> df2.printSchema
> root
>  |-- Stock: string (nullable = true) -- Ticker: string (nullable =
>  |true) -- TradeDate: string (nullable = true) -- Open: string
>  |(nullable = true) -- High: string (nullable = true) -- Low: string
>  |(nullable = true) -- Close: string (nullable = true) -- Volume:
>  |string (nullable = true)
>
> The issue I have can be shown as below
>
> df2.filter( $"OPen" === "-
> ").select((changeToDate("TradeDate").as("TradeDate")), 'Open, 'High,
> 'Low, 'Close, 'Volume).show
>
> +--+++---+-+--+
> | TradeDate|Open|High|Low|Close|Volume|
> +--+++---+-+--+
> |2011-12-23|   -|   -|  -|40.56| 0| 2011-04-21|   -|   -|  -
> ||45.85| 0| 2010-12-30|   -|   -|  -|38.10| 0| 2010-12-23|
> |-|   -|  -|38.36| 0| 2008-04-30|   -|   -|  -|32.39| 0| 2008-04-
> |29|   -|   -|  -|33.05| 0| 2008-04-28|   -|   -|  -|32.60| 0|
> +--+++---+-+--+
> Now there are ways of dealing with this. However, the solution has to
> be generic! Checking for a column == "-" is not generic. How about if
> that column was "," etc.
>
> This is an issue in most databases. Specifically if a field is NaN..
> --> (*NaN*, standing for not a number, is a numeric data type value
> representing an undefined or unrepresentable value, especially in floating-
> point calculations)
>
> Spark handles this[1]. I am on  Spark 2.0.1  in Class
> DataFrameNaFunctions. The simplest one is to drop these rogue rows
> df2.filter( $"Open" === "-").drop()
> However, a better approach would be to use REPLACE method or testing
> any column for NaN
>
>
>
>
> There is a method called isnan(). However, it does not return
> correct values!
>
>  df2.filter(isnan($"Open")).show 
> +-+--+-+++---+-+--
>  + |Stock|Ticker|TradeDate|Open|High|Low|Close|Volume| 
> +-+--+-+++---+-+--
>  + +-+--+-+++---+-+--+
>
>
> Any suggestions?
>
> Thanks
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any loss, damage or destruction of data or any other property which
> may arise from relying on this email's technical content is explicitly
> disclaimed. The author will in no case be liable for any monetary
> damages arising from such loss, damage or destruction.
>
>
>
>
> On 28 September 2016 at 04:07, Mike Metzger
>  wrote:
>> Hi Mich -
>>
>>Can you run a filter command on df1 prior to your map for any rows
>>where p(3).toString != '-' then run your map command?
>>
>> Thanks
>>
>>
>> Mike
>>
>>
>> On Tue, Sep 27, 2016 at 5:06 PM, Mich Talebzadeh
>>  wrote:
>>> Thanks guys
>>>
>>> Actually these are the 7 rogue rows. The column 0 is the Volume
>>> column  which means there was no trades on those days
>>>
>>> *cat stock.csv|grep ",0"
*SAP SE,SAP, 23-Dec-11,-,-,-,40.56,0
>>> SAP SE,SAP, 21-Apr-11,-,-,-,45.85,0 SAP SE,SAP, 30-Dec-10,-,-,-
>>> ,38.10,0 SAP SE,SAP, 23-Dec-10,-,-,-,38.36,0 SAP SE,SAP, 30-Apr-08,-,-,-
>>> ,32.39,0 SAP SE,SAP, 29-Apr-08,-,-,-,33.05,0 SAP SE,SAP, 28-Apr-08,-,-,-
>>> ,32.60,0
>>>
>>> So one way would be to exclude the rows that there was no volume of
>>> trade that day when cleaning up the csv file
>>>
>>> *cat stock.csv|grep -v ",0"*
>>>
>>> and that works. Bearing in mind that putting 0s in place of "-" will
>>> skew the price plot.
>>>
>>> BTW I am using Spark csv as well
>>>
>>> val df1 = spark.read.option("header", true).csv(location)
>>>
>>> This is the class and the mapping
>>>
>>> case class columns(Stock: String, Ticker: String, TradeDate: String,
>>> Open: Float, High: Float, Low: Float, Close: Float, Volume: Integer)
>>> val df2 = df1.map(p => columns(p(0).toString, p(1).toString,
>>> p(2).toString, p(3).toString.toFloat, p(4).toString.toFloat,
>>> p(5).toString.toFloat, p(6).toString.toFloat, p(7).toString.toInt))
>>>
>>>
>>>
>>> In here I have
>>>
>>> p(3).toString.toFloat
>>>
>>>
>>> How can one check for rogue data in p(3)?
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn *
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw*
>>>
>>>
>>>
>>> http:/

Treadting NaN fields in Spark

2016-09-28 Thread Mich Talebzadeh
This is an issue in most databases. Specifically if a field is NaN.. --> (
*NaN*, standing for not a number, is a numeric data type value representing
an undefined or unrepresentable value, especially in floating-point
calculations)

There is a method called isnan() in Spark that is supposed to handle this
scenario . However, it does not return correct values! For example I
defined column "Open" as String  (it should be Float) and it has the
following 7 rogue entries out of 1272 rows in a csv

df2.filter( $"OPen" ===
"-").select((changeToDate("TradeDate").as("TradeDate")),
'Open, 'High, 'Low, 'Close, 'Volume).show

+--+++---+-+--+
| TradeDate|Open|High|Low|Close|Volume|
+--+++---+-+--+
|2011-12-23|   -|   -|  -|40.56| 0|
|2011-04-21|   -|   -|  -|45.85| 0|
|2010-12-30|   -|   -|  -|38.10| 0|
|2010-12-23|   -|   -|  -|38.36| 0|
|2008-04-30|   -|   -|  -|32.39| 0|
|2008-04-29|   -|   -|  -|33.05| 0|
|2008-04-28|   -|   -|  -|32.60| 0|
+--+++---+-+--+

However, the following does not work!

 df2.filter(isnan($"Open")).show
+-+--+-+++---+-+--+
|Stock|Ticker|TradeDate|Open|High|Low|Close|Volume|
+-+--+-+++---+-+--+
+-+--+-+++---+-+--+

Any suggestions?

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Problems with new experimental Kafka Consumer for 0.10

2016-09-28 Thread Matthias Niehoff
Hi,

the stacktrace:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
completed since the group has already rebalanced and assigned the
partitions to another member. This means that the time between subsequent
calls to poll() was longer than the configured session.timeout.ms, which
typically implies that the poll loop is spending too much time message
processing. You can address this either by increasing the session timeout
or by reducing the maximum size of batches returned in poll() with
max.poll.records.
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:169)
at
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at
org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at
org.apache.spark.streaming.DS

Re: Access S3 buckets in multiple accounts

2016-09-28 Thread Eike von Seggern
Hi Teng,

2016-09-28 10:42 GMT+02:00 Teng Qiu :

> hmm, i do not believe security group can control s3 bucket access... is
> this something new? or you mean IAM role?
>

You're right, it's not security groups but you can configure a VPC endpoint
for the EMR-Cluster and grant access rights for this VPCe in the foreign S3
bucket like:

[{
  "Sid": "Allow bucket list access from vpc endpoint",
  "Effect": "Allow",
  "Principal": "*",
  "Action": "s3:ListBucket",
  "Resource": "arn:aws:s3:::YourBucketName",
  "Condition": {
  "StringEquals": {
  "aws:sourceVpce": "vpce-YourId"
  }
  }
},
{
  "Sid": "Allow bucket object read access from vpc endpoint",
  "Effect": "Allow",
  "Principal": "*",
  "Action": "s3:GetObject","
  Resource": "arn:aws:s3:::YourBucketName/*",
  "Condition": {
  "StringEquals": {
  "aws:sourceVpce": "vpce-YourId"
  }
  }
}]

Best

Eike


Re: Access S3 buckets in multiple accounts

2016-09-28 Thread Teng Qiu
hmm, i do not believe security group can control s3 bucket access... is
this something new? or you mean IAM role?

@Daniel, using spark on EMR, you should be able to use IAM role to access
AWS resources, you do not need to specify fs.s3a.access.key or
fs.s3a.secret.key at all. S3A is able to use IAM role for the EC2 instances
of EMR cluster.

then, for accessing "S3 buckets in multiple accounts", you need following
two steps:

1) define your policies of IAM role with Get/Put permissions for all of
your s3 bucket's ARN uri, such as something like this:
https://github.com/zalando-incubator/ro2key/blob/master/policy_bucket_readonly.json

2) you need to add this IAM role's ARN with Get/Put permissions in all the
"s3 bucket policy" in your other accounts.
refer to "Granting cross-account bucket access to a specific IAM role" from
https://blogs.aws.amazon.com/security/post/TxK5WUJK3DG9G8/How-to-Restrict-Amazon-S3-Bucket-Access-to-a-Specific-IAM-Role

Then your cross account s3 access should work.

and nice to read this part: When to use IAM policies vs. S3 policies
from
https://blogs.aws.amazon.com/security/post/TxPOJBY6FE360K/IAM-policies-and-Bucket-Policies-and-ACLs-Oh-My-Controlling-Access-to-S3-Resourc


2016-09-28 10:33 GMT+02:00 Eike von Seggern :

> Hi Daniel,
>
> you can start your EMR Cluster in a dedicated security group and configure
> the foreign bucket's policy to allow read-write access from that SG.
>
> Best
>
> Eike
>
> 2016-09-27 16:53 GMT+02:00 Daniel Siegmann  >:
>
>> I am running Spark on Amazon EMR and writing data to an S3 bucket.
>> However, the data is read from an S3 bucket in a separate AWS account.
>> Setting the fs.s3a.access.key and fs.s3a.secret.key values is sufficient to
>> get access to the other account (using the s3a protocol), however I then
>> won't have access to the S3 bucket in the EMR cluster's AWS account.
>>
>> Is there any way for Spark to access S3 buckets in multiple accounts? If
>> not, is there any best practice for how to work around this?
>>
>> --
>> Daniel Siegmann
>> Senior Software Engineer
>> *SecurityScorecard Inc.*
>> 214 W 29th Street, 5th Floor
>> New York, NY 10001
>>
>>
>
>
> --
> 
> *Jan Eike von Seggern*
> Data Scientist
> 
> *Sevenval Technologies GmbH *
>
> FRONT-END-EXPERTS SINCE 1999
>
> Köpenicker Straße 154 | 10997 Berlin
>
> office   +49 30 707 190 - 229
> mail eike.segg...@sevenval.com
>
> www.sevenval.com
>
> Sitz: Köln, HRB 79823
> Geschäftsführung: Jan Webering (CEO), Thorsten May, Sascha Langfus,
> Joern-Carlos Kuntze
>
> *Wir erhöhen den Return On Investment bei Ihren Mobile und Web-Projekten.
> Sprechen Sie uns an:*http://roi.sevenval.com/
> 
> 
> ---
> FOLLOW US on
>
> [image: Sevenval blog]
> 
>
> [image: sevenval on twitter]
> 
>  [image: sevenval on linkedin]
> [image:
> sevenval on pinterest]
> 
>


Re: Access S3 buckets in multiple accounts

2016-09-28 Thread Eike von Seggern
Hi Daniel,

you can start your EMR Cluster in a dedicated security group and configure
the foreign bucket's policy to allow read-write access from that SG.

Best

Eike

2016-09-27 16:53 GMT+02:00 Daniel Siegmann :

> I am running Spark on Amazon EMR and writing data to an S3 bucket.
> However, the data is read from an S3 bucket in a separate AWS account.
> Setting the fs.s3a.access.key and fs.s3a.secret.key values is sufficient to
> get access to the other account (using the s3a protocol), however I then
> won't have access to the S3 bucket in the EMR cluster's AWS account.
>
> Is there any way for Spark to access S3 buckets in multiple accounts? If
> not, is there any best practice for how to work around this?
>
> --
> Daniel Siegmann
> Senior Software Engineer
> *SecurityScorecard Inc.*
> 214 W 29th Street, 5th Floor
> New York, NY 10001
>
>


-- 

*Jan Eike von Seggern*
Data Scientist

*Sevenval Technologies GmbH *

FRONT-END-EXPERTS SINCE 1999

Köpenicker Straße 154 | 10997 Berlin

office   +49 30 707 190 - 229
mail eike.segg...@sevenval.com

www.sevenval.com

Sitz: Köln, HRB 79823
Geschäftsführung: Jan Webering (CEO), Thorsten May, Sascha Langfus,
Joern-Carlos Kuntze

*Wir erhöhen den Return On Investment bei Ihren Mobile und Web-Projekten.
Sprechen Sie uns an:*http://roi.sevenval.com/
---
FOLLOW US on

[image: Sevenval blog]


[image: sevenval on twitter]

 [image: sevenval on linkedin]
[image:
sevenval on pinterest]



Re: Issue with rogue data in csv file used in Spark application

2016-09-28 Thread Mich Talebzadeh
Thanks all.

This is the csv schema all columns mapped to String

scala> df2.printSchema
root
 |-- Stock: string (nullable = true)
 |-- Ticker: string (nullable = true)
 |-- TradeDate: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)

The issue I have can be shown as below

df2.filter( $"OPen" ===
"-").select((changeToDate("TradeDate").as("TradeDate")),
'Open, 'High, 'Low, 'Close, 'Volume).show

+--+++---+-+--+
| TradeDate|Open|High|Low|Close|Volume|
+--+++---+-+--+
|2011-12-23|   -|   -|  -|40.56| 0|
|2011-04-21|   -|   -|  -|45.85| 0|
|2010-12-30|   -|   -|  -|38.10| 0|
|2010-12-23|   -|   -|  -|38.36| 0|
|2008-04-30|   -|   -|  -|32.39| 0|
|2008-04-29|   -|   -|  -|33.05| 0|
|2008-04-28|   -|   -|  -|32.60| 0|
+--+++---+-+--+

Now there are ways of dealing with this. However, the solution has to be
generic! Checking for a column == "-" is not generic. How about if that
column was "," etc.

This is an issue in most databases. Specifically if a field is NaN.. --> (
*NaN*, standing for not a number, is a numeric data type value representing
an undefined or unrepresentable value, especially in floating-point
calculations)

Spark handles this
.
I am on  Spark 2.0.1  in Class DataFrameNaFunctions. The simplest one is to
drop these rogue rows

df2.filter( $"Open" === "-").drop()

However, a better approach would be to use REPLACE method or testing any
column for NaN



There is a method called isnan(). However, it does not return correct
values!

 df2.filter(isnan($"Open")).show
+-+--+-+++---+-+--+
|Stock|Ticker|TradeDate|Open|High|Low|Close|Volume|
+-+--+-+++---+-+--+
+-+--+-+++---+-+--+

Any suggestions?

Thanks



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 28 September 2016 at 04:07, Mike Metzger 
wrote:

> Hi Mich -
>
>Can you run a filter command on df1 prior to your map for any rows
> where p(3).toString != '-' then run your map command?
>
> Thanks
>
> Mike
>
> On Tue, Sep 27, 2016 at 5:06 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Thanks guys
>>
>> Actually these are the 7 rogue rows. The column 0 is the Volume column
>> which means there was no trades on those days
>>
>>
>> *cat stock.csv|grep ",0"*SAP SE,SAP, 23-Dec-11,-,-,-,40.56,0
>> SAP SE,SAP, 21-Apr-11,-,-,-,45.85,0
>> SAP SE,SAP, 30-Dec-10,-,-,-,38.10,0
>> SAP SE,SAP, 23-Dec-10,-,-,-,38.36,0
>> SAP SE,SAP, 30-Apr-08,-,-,-,32.39,0
>> SAP SE,SAP, 29-Apr-08,-,-,-,33.05,0
>> SAP SE,SAP, 28-Apr-08,-,-,-,32.60,0
>>
>> So one way would be to exclude the rows that there was no volume of trade
>> that day when cleaning up the csv file
>>
>> *cat stock.csv|grep -v **",0"*
>>
>> and that works. Bearing in mind that putting 0s in place of "-" will skew
>> the price plot.
>>
>> BTW I am using Spark csv as well
>>
>> val df1 = spark.read.option("header", true).csv(location)
>>
>> This is the class and the mapping
>>
>>
>> case class columns(Stock: String, Ticker: String, TradeDate: String,
>> Open: Float, High: Float, Low: Float, Close: Float, Volume: Integer)
>> val df2 = df1.map(p => columns(p(0).toString, p(1).toString,
>> p(2).toString, p(3).toString.toFloat, p(4).toString.toFloat,
>> p(5).toString.toFloat, p(6).toString.toFloat, p(7).toString.toInt))
>>
>>
>> In here I have
>>
>> p(3).toString.toFloat
>>
>> How can one check for rogue data in p(3)?
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 27 September 2016 at 21:49, Mich Talebzadeh > > wrote:
>>
>>>
>>> I have 

Re: Spark Executor Lost issue

2016-09-28 Thread Aditya

:

Thanks Sushrut for the reply.

Currently I have not defined spark.default.parallelism property.
Can you let me know how much should I set it to?


Regards,
Aditya Calangutkar

On Wednesday 28 September 2016 12:22 PM, Sushrut Ikhar wrote:
Try with increasing the parallelism by repartitioning and also you 
may increase - spark.default.parallelism

You can also try with decreasing num-executor cores.
Basically, this happens when the executor is using quite large memory 
than it asked; and yarn kills the executor.


Regards,

Sushrut Ikhar
https://about.me/sushrutikhar




On Wed, Sep 28, 2016 at 12:17 PM, Aditya 
> wrote:


I have a spark job which runs fine for small data. But when data
increases it gives executor lost error.My executor and driver
memory are set at its highest point. I have also tried
increasing--conf spark.yarn.executor.memoryOverhead=600but still
not able to fix the problem. Is there any other solution to fix
the problem?