Re: How to get db related metrics when use spark jdbc to read db table?

2024-04-08 Thread Femi Anthony
If you're using just Spark you could try turning on the history server
 and try to glean
statistics from there.  But there is no one location or log file which
stores them all.
Databricks, which is a managed Spark solution, provides such features in an
Enterprise setting.
I am unsure whether AWS EMR or Google Data Proc does the same.

Femi



On Mon, Apr 8, 2024 at 5:34 AM casel.chen  wrote:

> Hello, I have a spark application with jdbc source and do some
> calculation.
> To monitor application healthy, I need db related metrics per database
> like number of connections, sql execution time and sql fired time
> distribution etc.
> Does anybody know how to get them? Thanks!
>
>

-- 
http://dataphantik.com

"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: Kube estimate for Spark

2021-06-03 Thread Femi Anthony
I think he’s running Spark on Kubernetes not YARN as cluster manager 

Sent from my iPhone

> On Jun 3, 2021, at 6:05 AM, Mich Talebzadeh  wrote:
> 
> 
> Please provide the spark version, the environment you are running (on-prem, 
> cloud etc), state if you are running in YARN etc and your spark-submit 
> parameters.
> 
> Have you checked spark UI default on port 4040 under stages and executor tabs
> 
> HTH
> 
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Thu, 3 Jun 2021 at 10:51, Subash Prabanantham  
>> wrote:
>> Hi Team,
>> 
>> I am trying to understand how to estimate Kube cpu with respect to Spark 
>> executor cores. 
>> 
>> For example,
>> Job configuration: (given to start)
>> cores/executor = 4
>> # of executors = 240
>> 
>> 
>> But the allocated resources when we ran job are as follows,
>> cores/executor = 4
>> # of executors = 47
>> 
>> So the question, at the time of taking the screenshot 60 tasks were running 
>> in parallel,
>> 
>> 
>> (Apologies since the screenshot was taken terminal in top)
>> 
>> 188 cores are allocated with 60 tasks running currently.
>> 
>> Now we I took the quota for the namespace, I got the below,
>> 
>> 
>> 
>> 
>> How do I read 5290m == 5.29 CPU and limits == 97 with that of 60 tasks 
>> running in parallel ?
>> 
>> Say for acquiring 512 cores (Spark executors total) what would be the 
>> configuration for Kube requests.cpu and limits.cpu ?
>> 
>> 
>> Thanks,
>> Subash


Re: [External Sender] Memory issues in 3.0.2 but works well on 2.4.4

2021-05-21 Thread Femi Anthony
Post the stack trace and provide some more details about your configuration

On Fri, May 21, 2021 at 7:52 AM Praneeth Shishtla 
wrote:

> Hi,
> I have a simple DecisionForest model and was able to train the model on
> pyspark==2.4.4 without any issues.
> However, when I upgraded to pyspark==3.0.2, the fit takes a lot of time and
> eventually errors out saying out of memory. Even tried reducing the number
> of samples for training but no luck.
> Can anyone help with this?
>
> Best,
> Praneeth
>
>
>
>
> --
> Sent from:
> https://urldefense.com/v3/__http://apache-spark-user-list.1001560.n3.nabble.com/__;!!FrPt2g6CO4Wadw!ZLSi87cn_z9rooOV1ocSdGFNI6MpiLu5Ldf1WhLlFxPu4CyEzHNMNWj5iLGceHHPFMbr89MM$
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Card Machine Learning (ML) Team, Capital One

__



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Re: error , saving dataframe , LEGACY_PASS_PARTITION_BY_AS_OPTIONS

2019-11-13 Thread Femi Anthony
Can you post the line of code that’s resulting in that error along with the 
stack trace ?

Sent from my iPhone

> On Nov 13, 2019, at 9:53 AM, asma zgolli  wrote:
> 
> 
> Hello , 
> 
> I'm using spark 2.4.4 and i keep receiving this error message. Can you please 
> help me identify the problem?
> 
> thank you , 
> yours sincerely
> Asma ZGOLLI
> 
> PhD student in data engineering - computer science
> PJ:
> 
> 
> "main" java.lang.NoSuchMethodError: 
> org.apache.spark.sql.internal.SQLConf$.LEGACY_PASS_PARTITION_BY_AS_OPTIONS()Lorg/apache/spark/internal/config/ConfigEntry;
>   at 
> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:277)
> 
> 
> 
> 
> 


PySpark with custom transformer project organization

2019-09-23 Thread Femi Anthony


I have a Pyspark project that requires a custom ML Pipeline Transformer written 
in Scala. What is the best practice regarding project organization ? Should I 
include the scala files in the general Python project or should they be in a 
separate repo ? 

Opinions and suggestions welcome.

Sent from my iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [External Sender] Execute Spark model without Spark

2019-08-22 Thread Femi Anthony
Hi you can checkout mLeap -

https://github.com/combust/mleap

But I must warn you - their support is minimal at best.

Femi

Sent from my iPhone

On Aug 22, 2019, at 1:13 PM, Yeikel  wrote:

Hi ,

I have a  GBTClassificationModel
<
https://urldefense.proofpoint.com/v2/url?u=https-3A__spark.apache.org_docs_2.0.2_api_java_org_apache_spark_ml_classification_GBTClassificationModel.html&d=DwICAg&c=pLULRYW__RtkwsQUPxJVDGboCTdgji3AcHNJU0BpTJE&r=yGeUxkUZBNPLfjlLWOxq56P2VA9UROIq6saZFVet1r4&m=Kmn49ZCsG-cHV1eVvC2ySgHUo8XK_mI6yd79F45TaAE&s=jE0vP6FZSEAUKwW2HnalneQANBxu8u2t0WAi_KqVdn4&e=
>
that I generated using Spark.

How can I export this model and use without a Spark cluster? I would like to
serve it outside of Spark



--
Sent from:
https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_&d=DwICAg&c=pLULRYW__RtkwsQUPxJVDGboCTdgji3AcHNJU0BpTJE&r=yGeUxkUZBNPLfjlLWOxq56P2VA9UROIq6saZFVet1r4&m=Kmn49ZCsG-cHV1eVvC2ySgHUo8XK_mI6yd79F45TaAE&s=zXBZY8ZzWOB212qWxXwOwVi2bqG8YPjQ5ubYw7nVMRU&e=

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

__



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.





Pass row to UDF and select column based on pattern match

2019-07-09 Thread Femi Anthony
How can I achieve the following by passing a row to a udf ?

val df1 = df.withColumn("col_Z", 
  when($"col_x" === "a", $"col_A")
  .when($"col_x" === "b", $"col_B")
  .when($"col_x" === "c", $"col_C")
  .when($"col_x" === "d", $"col_D")
  .when($"col_x" === "e", $"col_E")
  .when($"col_x" === "f", $"col_F")
  .when($"col_x" === "g", $"col_G")
  )

As I understand it, only columns can be passed as arguments to a UDF in Scala 
Spark.

I have taken a look at this question:

https://stackoverflow.com/questions/31816975/how-to-pass-whole-row-to-udf-spark-dataframe-filter

and tried to implement this udf:

def myUDF(r:Row) = udf {
  
 val z : Float = r.getAs("col_x") match {
  case "a" => r.getAs("col_A")
  case "b" => r.getAs("col_B")
  case other => lit(0.0)
   }
 z
}

but I'm getting a type mismatch error:


 error: type mismatch;
 found   : String("a")
 required: Nothing
 case "a" => r.getAs("col_A")
  ^

What am I doing wrong ?


Sent from my iPhone

AWS EMR slow write to HDFS

2019-06-11 Thread Femi Anthony


I'm writing a large dataset in Parquet format to HDFS using Spark and it runs 
rather slowly in EMR vs say Databricks. I realize that if I was able to use 
Hadoop 3.1, it would be much more performant because it has a high performance 
output committer. Is this the case, and if so - when will there be a version of 
EMR that uses Hadoop 3.1 ? The current version I'm using is 5.21.
Sent from my iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing to multiple Kafka partitions from Spark

2019-05-28 Thread Femi Anthony
Ok that worked thanks for the suggestion.

Sent from my iPhone

> On May 24, 2019, at 11:53 AM, SNEHASISH DUTTA  
> wrote:
> 
> Hi,
> All the keys are similar so they are going to same partition.
> Key->Partition distribution is dependent upon hash calculation add some 
> random number to your key to distribute it across partitions.
> If your key is null/empty don't add key, just push the value to the topic, 
> Kafka will use round robin partitioning and distribute the data across 
> partitions
> 
>  selectExpr("CAST(value AS STRING)")
> 
> Regards,
> Snehasish
> 
> 
>> On Fri, May 24, 2019 at 9:05 PM Femi Anthony  wrote:
>> 
>> 
>> I have Spark code that writes a batch to Kafka as specified here:
>> 
>> https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html
>> 
>> The code looks like the following:
>> 
>>   df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
>>\
>>.write \
>>.format("kafka") \
>>.option("kafka.bootstrap.servers", 
>>"host1:port1,host2:port2") \
>>.option("topic", "topic1") \
>>.save()
>> However the data only gets written to Kafka partition 0. How can I get it 
>> written uniformly to all partitions in the same topic ?
>> 
>> Thanks in advance,
>> -- Femi
>> http://dataphantik.com
>> 
>> "Great spirits have always encountered violent opposition from mediocre 
>> minds." - Albert Einstein.


Writing to multiple Kafka partitions from Spark

2019-05-24 Thread Femi Anthony
I have Spark code that writes a batch to Kafka as specified here:

https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html

The code looks like the following:

  df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
   \
   .write \
   .format("kafka") \
   .option("kafka.bootstrap.servers",
   "host1:port1,host2:port2") \
   .option("topic", "topic1") \
   .save()

However the data only gets written to Kafka partition 0. How can I get it
written uniformly to all partitions in the same topic ?
Thanks in advance,
-- Femi
http://dataphantik.com

"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: [External Sender] How to use same SparkSession in another app?

2019-04-16 Thread Femi Anthony
Why not save the data frame to persistent storage s3/HDFS in the first
application and read it back in the 2nd ?

On Tue, Apr 16, 2019 at 8:58 PM Rishikesh Gawade 
wrote:

> Hi.
> I wish to use a SparkSession created by one app in another app so that i
> can use the dataframes belonging to that session. Is it possible to use the
> same sparkSession in another app?
> Thanks,
> Rishikesh
>
-- 
Card Machine Learning (ML) Team, Capital One


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Spark Stateful Streaming - add counter column

2019-01-23 Thread Femi Anthony


I have a a Spark Streaming process that consumes records off a Kafka topic, 
processes them and sends them to a producer to publish on another topic. I 
would like to add a sequence number column that can be used to identify records 
that have the same key and be incremented for each duplicate reoccurence of 
that key. For example if the output sent to the producer is

Key, col1, col2, seqnum 
A, 67, dog, 1 
B, 56, cat, 1 
C, 89, fish, 1
then if A reoccurs within a reasonable time interval Spark would produce the 
following:

A, 67, dog, 2 
B, 56, cat, 2
etc. How would I do that ? I suspect that this is a pattern that occurs 
frequently, but I haven't found any examples.



Sent from my iPhone

Re: [External Sender] Having access to spark results

2018-10-25 Thread Femi Anthony
What sort of environment are you running Spark on - in the cloud, on
premise ? Is its a real-time or batch oriented application?
Please provide more details.
Femi

On Thu, Oct 25, 2018 at 3:29 AM Affan Syed  wrote:

> Spark users,
> We really would want to get an input here about how the results from a
> Spark Query will be accessible to a web-application. Given Spark is a well
> used in the industry I would have thought that this part would have lots of
> answers/tutorials about it, but I didnt find anything.
>
> Here are a few options that come to mind
>
> 1) Spark results are saved in another DB ( perhaps a traditional one) and
> a request for query returns the new table name for access through a
> paginated query. That seems doable, although a bit convoluted as we need to
> handle the completion of the query.
>
> 2) Spark results are pumped into a messaging queue from which a socket
> server like connection is made.
>
> What confuses me is that other connectors to spark, like those for
> Tableau, using something like JDBC should have all the data (not the top
> 500 that we typically can get via Livy or other REST interfaces to Spark).
> How do those connectors get all the data through a single connection?
>
>
> Can someone with expertise help in bringing clarity.
>
> Thank you.
>
> Affan
> ᐧ
> ᐧ
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: [External Sender] Writing dataframe to vertica

2018-10-16 Thread Femi Anthony
How are you trying to write to Vertica ? Can you provide some snippets of
code ?


Femi

On Tue, Oct 16, 2018 at 7:24 PM Nikhil Goyal  wrote:

> Hi guys,
>
> I am trying to write dataframe to vertica using spark. It seems like spark
> is creating a temp table under public schema. I don't have access to public
> schema hence the job is failing. Is there a way to specify another schema?
>
> Error
> ERROR s2v.S2VUtils: createJobStatusTable: FAILED to create job status
> table public.S2V_JOB_STATUS_USER_NGOYAL
> java.lang.Exception: S2V: FATAL ERROR for job S2V_job8087339107009511230.
> Unable to create status table for tracking this
> job:public.S2V_JOB_STATUS_USER_NGOYAL
>
> Thanks
> Nikhil
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: [External Sender] Pyspark Window orderBy

2018-10-16 Thread Femi Anthony
I think that’s how it should behave. Did you try it out and see ?

On Tue, Oct 16, 2018 at 5:11 AM mhussain  wrote:

> Hi,
>
> I have a dataframe which looks like
>
> ++---+--++
> |group_id| id|  text|type|
> ++---+--++
> |   1|  1|   one|   a|
> |   1|  1|   two|   t|
> |   1|  2| three|   a|
> |   1|  2|  four|   t|
> |   1|  5|  five|   a|
> |   1|  6|   six|   t|
> |   1|  7| seven|   a|
> |   1|  9| eight|   t|
> |   1|  9|  nine|   a|
> |   1| 10|   ten|   t|
> |   1| 11|eleven|   a|
> ++---+--++
> If I do Window operation by partitioning it on group_id and ordering it by
> id then will orderby make sure that already ordered(sorted) rows retain the
> same order?
>
> e.g.
>
> window_spec = Window.partitionBy(df.group_id).orderBy(df.id)
> df = df.withColumn("row_number", row_number().over(window_spec))
> Will the result always be as bellow?
>
> ++---+--++--+
> |group_id| id|  text|type|row_number|
> ++---+--++--+
> |   1|  1|   one|   a| 1|
> |   1|  1|   two|   t| 2|
> |   1|  2| three|   a| 3|
> |   1|  2|  four|   t| 4|
> |   1|  5|  five|   a| 5|
> |   1|  6|   six|   t| 6|
> |   1|  7| seven|   a| 7|
> |   1|  9| eight|   t| 8|
> |   1|  9|  nine|   a| 9|
> |   1| 10|   ten|   t|10|
> |   1| 11|eleven|   a|11|
> ++---+--++--+
> In the nutshell my question is, how spark Window's orderBy handles already
> ordered(sorted) rows? My assumption is it is stable i.e. it doesn't change
> the order of already ordered rows but I couldn't find anything related to
> this in the documentation. How can I make sure that my assumption is
> correct?
>
> I am using python 3.5 and pyspark 2.3.1.
>
> Thanks.
> Muddasser
>
>
>
> --
> Sent from:
> https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_&d=DwICAg&c=pLULRYW__RtkwsQUPxJVDGboCTdgji3AcHNJU0BpTJE&r=yGeUxkUZBNPLfjlLWOxq5_p1UIOy_S4ghJsg2_iDHFY&m=r4NFmE8gXjw_dq6qvMLTr4IiW1lhjnwNuGIYMbRRvTM&s=1BpwwJdIDw42r_6EBfuGZXFytg37mxb_YZ18HXM5RtU&e=
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: [External Sender] How to debug Spark job

2018-09-07 Thread Femi Anthony
One way I would go about this would be to try running a new_df.show(numcols,
truncate=False) on a few columns before you try writing to parquet to force
computation of newdf and see whether the hanging is occurring at that point
or during the write. You may also try doing a newdf.count() as well.

Femi

On Fri, Sep 7, 2018 at 5:48 AM James Starks 
wrote:

>
> I have a Spark job that reads from a postgresql (v9.5) table, and write
> result to parquet. The code flow is not complicated, basically
>
> case class MyCaseClass(field1: String, field2: String)
> val df = spark.read.format("jdbc")...load()
> df.createOrReplaceTempView(...)
> val newdf = spark.sql("seslect field1, field2 from
> mytable").as[MyCaseClass].map { row =>
>   val fieldX = ... // extract something from field2
>   (field1, fileldX)
> }.filter { ... /* filter out field 3 that's not valid */ }
> newdf.write.mode(...).parquet(destPath)
>
> This job worked correct without a problem. But it's doesn't look working
> ok (the job looks like hanged) when adding more fields. The refactored job
> looks as below
> ...
> val newdf = spark.sql("seslect field1, field2, ... fieldN from
> mytable").as[MyCaseClassWithMoreFields].map { row =>
> ...
> NewCaseClassWithMoreFields(...) // all fields plus fieldX
> }.filter { ... }
> newdf.write.mode(...).parquet(destPath)
>
> Basically what the job does is extracting some info from one of a field in
> db table, appends that newly extracted field to the original row, and then
> dumps the whole new table to parquet.
>
> new filed + (original field1 + ... + original fieldN)
> ...
> ...
>
> Records loaded by spark sql to spark job (before refactored) are around
> 8MM, this remains the same, but when the refactored spark runs, it looks
> hanging there without progress. The only output on the console is (there is
> no crash, no exceptions thrown)
>
> WARN  HeartbeatReceiver:66 - Removing executor driver with no recent
> heartbeats: 137128 ms exceeds timeout 12 ms
>
> Memory in top command looks like
>
> VIRT RES SHR%CPU %MEM
> 15.866g 8.001g  41.4m 740.3   25.6
>
> The command used to  submit spark job is
>
> spark-submit --class ... --master local[*] --driver-memory 10g
> --executor-memory 10g ... --files ... --driver-class-path ... 
> ...
>
> How can I debug or check which part of my code might cause the problem (so
> I can improve it)?
>
> Thanks
>
>
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: [External Sender] Re: How to make pyspark use custom python?

2018-09-06 Thread Femi Anthony
Are you sure that pyarrow is deployed on your slave hosts ? If not, you
will either have to get it installed or ship it along when you call
spark-submit by zipping it up and specifying the zipfile to be shipped
using the
--py-files zipfile.zip option

A quick check would be to ssh to a slave host, run pyspark and try to
import pyarrow.

Femi

On Thu, Sep 6, 2018 at 9:25 PM mithril  wrote:

>
> The whole content in `spark-env.sh` is
>
> ```
> SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
> -Dspark.deploy.zookeeper.url=10.104.85.78:2181,10.104.114.131:2181,
> 10.135.2.132:2181
> -Dspark.deploy.zookeeper.dir=/spark"
> PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"
> ```
>
> I ran `/usr/local/spark/sbin/stop-all.sh`  and
> `/usr/local/spark/sbin/start-all.sh` to restart spark cluster.
>
> Anything wrong ??
>
>
>
> --
> Sent from:
> https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_&d=DwICAg&c=pLULRYW__RtkwsQUPxJVDGboCTdgji3AcHNJU0BpTJE&r=yGeUxkUZBNPLfjlLWOxq5_p1UIOy_S4ghJsg2_iDHFY&m=MukYKwEikKwBiW7D3pP5WDVQCs39Xo8dHytUwL1JjLM&s=5Bta_aRxRPJk58UXz-hQd7A1EzF-PX3A5C3vENHe3OQ&e=
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Sparklyr and idle executors

2018-03-16 Thread Femi Anthony
I assume you're setting these values in spark-defaults.conf. What happens
if you specify them directly to spark-submit  as in --conf
spark.dynamicAllocation.enabled=true
?

On Thu, Mar 15, 2018 at 1:47 PM, Florian Dewes  wrote:

> Hi all,
>
> I am currently trying to enable dynamic resource allocation for a little
> yarn managed spark cluster.
> We are using sparklyr to access spark from R and have multiple jobs which
> should run in parallel, because some of them take several days to complete
> or are in development.
>
> Everything works out so far, the only problem we have is that executors
> are not removed from idle jobs.
>
> Lets say job A is the only running job that loads a file that is several
> hundred GB in size and then goes idle without disconnecting from spark. It
> gets 80% of the cluster because I set a maximum value via
> spark.dynamicAllocation.maxExecutors.
>
> When we start another job (B) with the remaining 20% of the cluster
> resources, no idle executors of the other job are freed and the idle job
> will keep 80% of the cluster's resources, although 
> spark.dynamicAllocation.executorIdleTimeout
> is set.
>
> Only if we disconnect job A, B will allocate the freed executors.
>
> Configuration settings used:
>
> spark.shuffle.service.enabled = "true"
> spark.dynamicAllocation.enabled = “true"
> spark.dynamicAllocation.executorIdleTimeout = 120
> spark.dynamicAllocation.maxExecutors = 100
>
> with
>
> Spark 2.1.0
> R 3.4.3
> sparklyr 0.6.3
>
>
> Any ideas?
>
>
> Thanks,
>
> Florian
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: Insufficient memory for Java Runtime

2018-03-14 Thread Femi Anthony
Try specifying executor memory.

On Tue, Mar 13, 2018 at 5:15 PM, Shiyuan  wrote:

> Hi Spark-Users,
>   I encountered the problem of "insufficient memory". The error is logged
> in the file with a name " hs_err_pid86252.log"(attached in the end of this
> email).
>
> I launched the spark job by " spark-submit --driver-memory 40g --master
> yarn --deploy-mode client".  The spark session was created with 10
> executors each with 60g memory. The data access pattern is pretty simple, I
> keep reading some spark dataframe from hdfs one by one, filter, join with
> another dataframe,  and then append the results to an dataframe:
> for i= 1,2,3
> df1 = spark.read.parquet(file_i)
> df_r = df1.filter(...). join(df2)
> df_all = df_all.union(df_r)
>
> each file_i is quite small, only a few GB, but there are a lot of such
> files. after filtering and join, each df_r is also quite small. When the
> program failed, df_all had only 10k rows which should be around 10GB.  Each
> machine in the cluster has round 80GB memory and 1TB disk space and  only
> one user was using the cluster when it failed due to insufficient memory.
> My questions are:
> i).  The log file showed that it failed to allocate 8G committing memory.
> But how could that happen when the driver and executors have more than 40g
> free memory. In fact, only transformations but no actions had run when the
> program failed.  As I understand, only DAG and book-keeping work is done
> during dataframe transformation, no data is brought into the memory.  Why
> spark still tries to allocate such large memory?
> ii). Could manually running garbage collection help?
> iii). Did I mis-specify some runtime parameter for jvm, yarn, or spark?
>
>
> Any help or references are appreciated!
>
> The content of hs_err_pid86252,log:
>
> # There is insufficient memory for the Java Runtime Environment to
> continue.
> # Native memory allocation (mmap) failed to map 8663334912
> <(866)%20333-4912> bytes(~8G) for committing reserved memory.
> # Possible reasons:
> #   The system is out of physical RAM or swap space
> #   In 32 bit mode, the process size limit was hit
> # Possible solutions:
> #   Reduce memory load on the system
> #   Increase physical memory or swap space
> #   Check if swap backing store is full
> #   Use 64 bit Java on a 64 bit OS
> #   Decrease Java heap size (-Xmx/-Xms)
> #   Decrease number of Java threads
> #   Decrease Java thread stack sizes (-Xss)
> #   Set larger code cache with -XX:ReservedCodeCacheSize=
> # This output file may be truncated or incomplete.
> #
> #  Out of Memory Error (os_linux.cpp:2643), pid=86252,
> tid=0x7fd69e683700
> #
> # JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build
> 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12)
> # Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 )
> # Failed to write core dump. Core dumps have been disabled. To enable core
> dumping, try "ulimit -c unlimited" before starting Java again
> #
>
> ---  T H R E A D  ---
>
> Current thread (0x7fe0bc08c000):  VMThread [stack: 
> 0x7fd69e583000,0x7fd69e684000]
> [id=86295]
>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: How to run spark shell using YARN

2018-03-14 Thread Femi Anthony
What's the hardware configuration of the box you're running on i.e. how
much memory does it have ?

Femi

On Wed, Mar 14, 2018 at 5:32 AM, kant kodali  wrote:

> Tried this
>
>  ./spark-shell --master yarn --deploy-mode client --executor-memory 4g
>
>
> Same issue. Keeps going forever..
>
>
> 18/03/14 09:31:25 INFO Client:
>
> client token: N/A
>
> diagnostics: N/A
>
> ApplicationMaster host: N/A
>
> ApplicationMaster RPC port: -1
>
> queue: default
>
> start time: 1521019884656
>
> final status: UNDEFINED
>
> tracking URL: http://ip-172-31-0-54:8088/proxy/application_
> 1521014458020_0004/
>
> user: centos
>
>
> 18/03/14 09:30:08 INFO Client: Application report for
> application_1521014458020_0003 (state: ACCEPTED)
>
> 18/03/14 09:30:09 INFO Client: Application report for
> application_1521014458020_0003 (state: ACCEPTED)
>
> 18/03/14 09:30:10 INFO Client: Application report for
> application_1521014458020_0003 (state: ACCEPTED)
>
> 18/03/14 09:30:11 INFO Client: Application report for
> application_1521014458020_0003 (state: ACCEPTED)
>
> 18/03/14 09:30:12 INFO Client: Application report for
> application_1521014458020_0003 (state: ACCEPTED)
>
> 18/03/14 09:30:13 INFO Client: Application report for
> application_1521014458020_0003 (state: ACCEPTED)
>
> 18/03/14 09:30:14 INFO Client: Application report for
> application_1521014458020_0003 (state: ACCEPTED)
>
> 18/03/14 09:30:15 INFO Client: Application report for
> application_1521014458020_0003 (state: ACCEPTED)
>
> On Wed, Mar 14, 2018 at 2:03 AM, Femi Anthony  wrote:
>
>> Make sure you have enough memory allocated for Spark workers, try
>> specifying executor memory as follows:
>>
>> --executor-memory 
>>
>> to spark-submit.
>>
>> On Wed, Mar 14, 2018 at 3:25 AM, kant kodali  wrote:
>>
>>> I am using spark 2.3.0 and hadoop 2.7.3.
>>>
>>> Also I have done the following and restarted all. But I still
>>> see ACCEPTED: waiting for AM container to be allocated, launched and
>>> register with RM. And i am unable to spawn spark-shell.
>>>
>>> editing $HADOOP_HOME/etc/hadoop/capacity-scheduler.xml and change the
>>> following property value from 0.1 to something higher. I changed to 0.5
>>> (50%)
>>>
>>> 
>>> yarn.scheduler.capacity.maximum-am-resource-percent
>>> 0.5
>>> 
>>> Maximum percent of resources in the cluster which can be used to 
>>> run application masters i.e. controls number of concurrent running 
>>> applications.
>>> 
>>> 
>>>
>>> You may have to allocate more memory to YARN by editing yarn-site.xml by
>>> updating the following property:
>>>
>>> 
>>> yarn.nodemanager.resource.memory-mb
>>> 8192
>>> 
>>>
>>> https://stackoverflow.com/questions/45687607/waiting-for-am-
>>> container-to-be-allocated-launched-and-register-with-rm
>>>
>>>
>>>
>>> On Wed, Mar 14, 2018 at 12:12 AM, kant kodali 
>>> wrote:
>>>
>>>> any idea?
>>>>
>>>> On Wed, Mar 14, 2018 at 12:12 AM, kant kodali 
>>>> wrote:
>>>>
>>>>> I set core-site.xml, hdfs-site.xml, yarn-site.xml  as per this website
>>>>> <https://dwbi.org/etl/bigdata/183-setup-hadoop-cluster> and these are
>>>>> the only three files I changed Do I need to set or change anything in
>>>>> mapred-site.xml (As of now I have not touched mapred-site.xml)?
>>>>>
>>>>> when I do yarn -node -list -all I can see both node manager and
>>>>> resource managers are running fine.
>>>>>
>>>>> But when I run spark-shell --master yarn --deploy-mode client
>>>>>
>>>>>
>>>>> it just keeps looping forever and never stops with the following
>>>>> messages
>>>>>
>>>>> 18/03/14 07:07:47 INFO Client: Application report for
>>>>> application_1521011212550_0001 (state: ACCEPTED)
>>>>> 18/03/14 07:07:48 INFO Client: Application report for
>>>>> application_1521011212550_0001 (state: ACCEPTED)
>>>>> 18/03/14 07:07:49 INFO Client: Application report for
>>>>> application_1521011212550_0001 (state: ACCEPTED)
>>>>> 18/03/14 07:07:50 INFO Client: Application report for
>>>>> application_1521011212550_0001 (state: ACCEPTED)
>>>>> 18/03/14

Re: How to run spark shell using YARN

2018-03-14 Thread Femi Anthony
Make sure you have enough memory allocated for Spark workers, try
specifying executor memory as follows:

--executor-memory 

to spark-submit.

On Wed, Mar 14, 2018 at 3:25 AM, kant kodali  wrote:

> I am using spark 2.3.0 and hadoop 2.7.3.
>
> Also I have done the following and restarted all. But I still
> see ACCEPTED: waiting for AM container to be allocated, launched and
> register with RM. And i am unable to spawn spark-shell.
>
> editing $HADOOP_HOME/etc/hadoop/capacity-scheduler.xml and change the
> following property value from 0.1 to something higher. I changed to 0.5
> (50%)
>
> 
> yarn.scheduler.capacity.maximum-am-resource-percent
> 0.5
> 
> Maximum percent of resources in the cluster which can be used to run 
> application masters i.e. controls number of concurrent running applications.
> 
> 
>
> You may have to allocate more memory to YARN by editing yarn-site.xml by
> updating the following property:
>
> 
> yarn.nodemanager.resource.memory-mb
> 8192
> 
>
> https://stackoverflow.com/questions/45687607/waiting-
> for-am-container-to-be-allocated-launched-and-register-with-rm
>
>
>
> On Wed, Mar 14, 2018 at 12:12 AM, kant kodali  wrote:
>
>> any idea?
>>
>> On Wed, Mar 14, 2018 at 12:12 AM, kant kodali  wrote:
>>
>>> I set core-site.xml, hdfs-site.xml, yarn-site.xml  as per this website
>>>  and these are
>>> the only three files I changed Do I need to set or change anything in
>>> mapred-site.xml (As of now I have not touched mapred-site.xml)?
>>>
>>> when I do yarn -node -list -all I can see both node manager and resource
>>> managers are running fine.
>>>
>>> But when I run spark-shell --master yarn --deploy-mode client
>>>
>>>
>>> it just keeps looping forever and never stops with the following messages
>>>
>>> 18/03/14 07:07:47 INFO Client: Application report for
>>> application_1521011212550_0001 (state: ACCEPTED)
>>> 18/03/14 07:07:48 INFO Client: Application report for
>>> application_1521011212550_0001 (state: ACCEPTED)
>>> 18/03/14 07:07:49 INFO Client: Application report for
>>> application_1521011212550_0001 (state: ACCEPTED)
>>> 18/03/14 07:07:50 INFO Client: Application report for
>>> application_1521011212550_0001 (state: ACCEPTED)
>>> 18/03/14 07:07:51 INFO Client: Application report for
>>> application_1521011212550_0001 (state: ACCEPTED)
>>> 18/03/14 07:07:52 INFO Client: Application report for
>>> application_1521011212550_0001 (state: ACCEPTED)
>>>
>>> when I go to RM UI I see this
>>>
>>> ACCEPTED: waiting for AM container to be allocated, launched and
>>> register with RM.
>>>
>>>
>>>
>>>
>>> On Mon, Mar 12, 2018 at 7:16 PM, vermanurag <
>>> anurag.ve...@fnmathlogic.com> wrote:
>>>
 This does not look like Spark error. Looks like yarn has not been able
 to
 allocate resources for spark driver. If you check resource manager UI
 you
 are likely to see this as spark application waiting for resources. Try
 reducing the driver node memory and/ or other bottlenecks based on what
 you
 see in the resource manager UI.



 --
 Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: Spark Application stuck

2018-03-14 Thread Femi Anthony
Have you taken a look at the EMR UI ? What does your Spark setup look like
? I assume you're on EMR on AWS.
The various UI urls and ports are listed here:
https://docs.aws.amazon.com/emr/latest/ManagementGuide/
emr-web-interfaces.html


On Wed, Mar 14, 2018 at 4:23 AM, Mukund Big Data 
wrote:

> Hi
>
> I am executing the following recommendation engine using Spark ML
>
> https://aws.amazon.com/blogs/big-data/building-a-
> recommendation-engine-with-spark-ml-on-amazon-emr-using-zeppelin/
>
> When I am trying to save the model, the application hungs and does't
> respond.
>
> Any pointers to find where the problem is?
> Is there any blog/doc which guides us to efficiently debug Spark
> applications?
>
> Thanks & Regards
> Mukund
>



-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: Job never finishing

2018-02-20 Thread Femi Anthony
You can use spark speculation as a way to get around the problem.

Here is a useful link:

http://asyncified.io/2016/08/13/leveraging-spark-speculation-to-identify-and-re-schedule-slow-running-tasks/

Sent from my iPhone

> On Feb 20, 2018, at 5:52 PM, Nikhil Goyal  wrote:
> 
> Hi guys,
> 
> I have a job which gets stuck if a couple of tasks get killed due to OOM 
> exception. Spark doesn't kill the job and it keeps on running for hours. 
> Ideally I would expect Spark to kill the job or restart the killed executors 
> but nothing seems to be happening. Anybody got idea about this?
> 
> Thanks
> Nikhil


Re: Multiple filters vs multiple conditions

2017-10-03 Thread Femi Anthony
I would assume that the optimizer would end up transforming both to the same 
expression. 

Femi

Sent from my iPhone

> On Oct 3, 2017, at 8:14 AM, Ahmed Mahmoud  wrote:
> 
> Hi All,
> 
> Just a quick question from an optimisation point of view:
> 
> Approach 1:
> .filter (t-> t.x=1 && t.y=2)
> 
> Approach 2:
> .filter (t-> t.x=1)
> .filter (t-> t.y=2)
> 
> Is there a difference or one is better than the other  or both are same?
> 
> Thanks!
> Ahmed Mahmoud
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: for loops in pyspark

2017-09-21 Thread Femi Anthony
How much memory does the "do some stuff" portions occupy ? You should try 
caching the RDD and take a look at the Spark UI under the Storage tab to see 
how much memory is being used. Also, what portion of the overall memory of each 
worker are you allocating when you cal spark-submit ?

Sent from my iPhone

> On Sep 21, 2017, at 6:58 AM, Jeff Zhang  wrote:
> 
> 
> I suspect OOO happens in executor side, you have to check the stacktrace by 
> yourself if you can not attach more info. Most likely it is due to your user 
> code.
> 
> 
> Alexander Czech 于2017年9月21日周四 下午5:54写道:
>> That is not really possible the whole project is rather large and I would 
>> not like to release it before I published the results.
>> 
>> But if there is no know issues with doing spark in a for loop I will look 
>> into other possibilities for memory leaks.
>> 
>> Thanks
>> 
>> 
>> On 20 Sep 2017 15:22, "Weichen Xu"  wrote:
>> Spark manage memory allocation and release automatically. Can you post the 
>> complete program which help checking where is wrong ?
>> 
>> On Wed, Sep 20, 2017 at 8:12 PM, Alexander Czech 
>>  wrote:
>>> Hello all,
>>> 
>>> I'm running a pyspark script that makes use of for loop to create smaller 
>>> chunks of my main dataset.
>>> 
>>> some example code:
>>> 
>>> for chunk in chunks:
>>> my_rdd = sc.parallelize(chunk).flatmap(somefunc) 
>>> # do some stuff with my_rdd
>>> 
>>> my_df = make_df(my_rdd)
>>> # do some stuff with my_df
>>> my_df.write.parquet('./some/path')
>>> 
>>> After a couple of loops I always start to loose executors because out of 
>>> memory errors. Is there a way free up memory after an loop? Do I have to do 
>>> it in python or with spark?
>>> 
>>> Thanks
>> 
>> 


Re: Configuration for unit testing and sql.shuffle.partitions

2017-09-16 Thread Femi Anthony
How are you specifying it, as an option to spark-submit ?

On Sat, Sep 16, 2017 at 12:26 PM, Akhil Das  wrote:

> spark.sql.shuffle.partitions is still used I believe. I can see it in the
> code
> 
>  and
> in the documentation page
> 
> .
>
> On Wed, Sep 13, 2017 at 4:46 AM, peay  wrote:
>
>> Hello,
>>
>> I am running unit tests with Spark DataFrames, and I am looking for
>> configuration tweaks that would make tests faster. Usually, I use a
>> local[2] or local[4] master.
>>
>> Something that has been bothering me is that most of my stages end up
>> using 200 partitions, independently of whether I repartition the input.
>> This seems a bit overkill for small unit tests that barely have 200 rows
>> per DataFrame.
>>
>> spark.sql.shuffle.partitions used to control this I believe, but it seems
>> to be gone and I could not find any information on what mechanism/setting
>> replaces it or the corresponding JIRA.
>>
>> Has anyone experience to share on how to tune Spark best for very small
>> local runs like that?
>>
>> Thanks!
>>
>>
>
>
> --
> Cheers!
>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: spark.write.csv is not able write files to specified path, but is writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

2017-08-10 Thread Femi Anthony
Also, why are you trying to write results locally if you're not using a 
distributed file system ? Spark is geared towards writing to a distributed file 
system. I would suggest trying to collect() so the data is sent to the master 
and then do a write if the result set isn't too big, or repartition before 
trying to write (though I suspect this won't really help). You really should 
install HDFS if that is possible.

Sent from my iPhone

> On Aug 10, 2017, at 3:58 AM, Hemanth Gudela  
> wrote:
> 
> Thanks for reply Femi!
>  
> I’m writing the file like this à 
> myDataFrame.write.mode("overwrite").csv("myFilePath")
> There absolutely are no errors/warnings after the write.
>  
> _SUCCESS file is created on master node, but the problem of _temporary is 
> noticed only on worked nodes.
>  
> I know spark.write.csv works best with HDFS, but with the current setup I 
> have in my environment, I have to deal with spark write to node’s local file 
> system and not to HDFS.
>  
> Regards,
> Hemanth
>  
> From: Femi Anthony 
> Date: Thursday, 10 August 2017 at 10.38
> To: Hemanth Gudela 
> Cc: "user@spark.apache.org" 
> Subject: Re: spark.write.csv is not able write files to specified path, but 
> is writing to unintended subfolder _temporary/0/task_xxx folder on worker 
> nodes
>  
> Normally the _temporary directory gets deleted as part of the cleanup when 
> the write is complete and a SUCCESS file is created. I suspect that the 
> writes are not properly completed. How are you specifying the write ? Any 
> error messages in the logs ?
>  
> On Thu, Aug 10, 2017 at 3:17 AM, Hemanth Gudela  
> wrote:
> Hi,
>  
> I’m running spark on cluster mode containing 4 nodes, and trying to write CSV 
> files to node’s local path (not HDFS).
> I’m spark.write.csv to write CSV files.
>  
> On master node:
> spark.write.csv creates a folder with csv file name and writes many files 
> with part-r-000n suffix. This is okay for me, I can merge them later.
> But on worker nodes:
> spark.write.csv creates a folder with csv file name and 
> writes many folders and files under _temporary/0/. This is not okay for me.
> Could someone please suggest me what could have been going wrong in my 
> settings/how to be able to write csv files to the specified folder, and not 
> to subfolders (_temporary/0/task_xxx) in worker machines.
>  
> Thank you,
> Hemanth
>  
> 
> 
>  
> --
> http://www.femibyte.com/twiki5/bin/view/Tech/
> http://www.nextmatrix.com
> "Great spirits have always encountered violent opposition from mediocre 
> minds." - Albert Einstein.


Re: spark.write.csv is not able write files to specified path, but is writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

2017-08-10 Thread Femi Anthony
Is your filePath prefaced with file:/// and the full path or is it relative ?

You might also try calling close() on the Spark context or session the end of 
the program execution to try and ensure that cleanup is completed 

Sent from my iPhone

> On Aug 10, 2017, at 3:58 AM, Hemanth Gudela  
> wrote:
> 
> Thanks for reply Femi!
>  
> I’m writing the file like this à 
> myDataFrame.write.mode("overwrite").csv("myFilePath")
> There absolutely are no errors/warnings after the write.
>  
> _SUCCESS file is created on master node, but the problem of _temporary is 
> noticed only on worked nodes.
>  
> I know spark.write.csv works best with HDFS, but with the current setup I 
> have in my environment, I have to deal with spark write to node’s local file 
> system and not to HDFS.
>  
> Regards,
> Hemanth
>  
> From: Femi Anthony 
> Date: Thursday, 10 August 2017 at 10.38
> To: Hemanth Gudela 
> Cc: "user@spark.apache.org" 
> Subject: Re: spark.write.csv is not able write files to specified path, but 
> is writing to unintended subfolder _temporary/0/task_xxx folder on worker 
> nodes
>  
> Normally the _temporary directory gets deleted as part of the cleanup when 
> the write is complete and a SUCCESS file is created. I suspect that the 
> writes are not properly completed. How are you specifying the write ? Any 
> error messages in the logs ?
>  
> On Thu, Aug 10, 2017 at 3:17 AM, Hemanth Gudela  
> wrote:
> Hi,
>  
> I’m running spark on cluster mode containing 4 nodes, and trying to write CSV 
> files to node’s local path (not HDFS).
> I’m spark.write.csv to write CSV files.
>  
> On master node:
> spark.write.csv creates a folder with csv file name and writes many files 
> with part-r-000n suffix. This is okay for me, I can merge them later.
> But on worker nodes:
> spark.write.csv creates a folder with csv file name and 
> writes many folders and files under _temporary/0/. This is not okay for me.
> Could someone please suggest me what could have been going wrong in my 
> settings/how to be able to write csv files to the specified folder, and not 
> to subfolders (_temporary/0/task_xxx) in worker machines.
>  
> Thank you,
> Hemanth
>  
> 
> 
>  
> --
> http://www.femibyte.com/twiki5/bin/view/Tech/
> http://www.nextmatrix.com
> "Great spirits have always encountered violent opposition from mediocre 
> minds." - Albert Einstein.


Re: spark.write.csv is not able write files to specified path, but is writing to unintended subfolder _temporary/0/task_xxx folder on worker nodes

2017-08-10 Thread Femi Anthony
Normally the* _temporary* directory gets deleted as part of the cleanup
when the write is complete and a SUCCESS file is created. I suspect that
the writes are not properly completed. How are you specifying the write ?
Any error messages in the logs ?

On Thu, Aug 10, 2017 at 3:17 AM, Hemanth Gudela 
wrote:

> Hi,
>
>
>
> I’m running spark on cluster mode containing 4 nodes, and trying to write
> CSV files to node’s local path (*not HDFS*).
>
> I’m spark.write.csv to write CSV files.
>
>
>
> *On master node*:
>
> spark.write.csv creates a folder with csv file name and writes many files
> with part-r-000n suffix. This is okay for me, I can merge them later.
>
> *But on worker nodes*:
>
> spark.write.csv creates a folder with csv file name and
> writes many folders and files under _temporary/0/. This is not okay for me.
>
> Could someone please suggest me what could have been going wrong in my
> settings/how to be able to write csv files to the specified folder, and not
> to subfolders (_temporary/0/task_xxx) in worker machines.
>
>
>
> Thank you,
>
> Hemanth
>
>
>



-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: using spark to load a data warehouse in real time

2017-02-28 Thread Femi Anthony
Have you checked to see if there are any drivers to enable you to write to 
Greenplum directly from Spark ?

You can also take a look at this link:

https://groups.google.com/a/greenplum.org/forum/m/#!topic/gpdb-users/lnm0Z7WBW6Q

Apparently GPDB is based on Postgres so maybe that approach may work. 
Another approach maybe for Spark Streaming to write to Kafka, and then have 
another process read from Kafka and write to Greenplum.

Kafka Connect may be useful in this case -

https://www.confluent.io/blog/announcing-kafka-connect-building-large-scale-low-latency-data-pipelines/

Femi Anthony



> On Feb 27, 2017, at 7:18 PM, Adaryl Wakefield  
> wrote:
> 
> Is anybody using Spark streaming/SQL to load a relational data warehouse in 
> real time? There isn’t a lot of information on this use case out there. When 
> I google real time data warehouse load, nothing I find is up to date. It’s 
> all turn of the century stuff and doesn’t take into account advancements in 
> database technology. Additionally, whenever I try to learn spark, it’s always 
> the same thing. Play with twitter data never structured data. All the CEP 
> uses cases are about data science.
>  
> I’d like to use Spark to load Greenplumb in real time. Intuitively, this 
> should be possible. I was thinking Spark Streaming with Spark SQL along with 
> a ORM should do it. Am I off base with this? Is the reason why there are no 
> examples is because there is a better way to do what I want?
>  
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685
> www.massstreet.net
> www.linkedin.com/in/bobwakefieldmba
> Twitter: @BobLovesData
>  


Re: Run spark machine learning example on Yarn failed

2017-02-28 Thread Femi Anthony
Have you tried specifying an absolute instead of a relative path ?

Femi



> On Feb 27, 2017, at 8:18 PM, Yunjie Ji  wrote:
> 
> After start the dfs, yarn and spark, I run these code under the root
> directory of spark on my master host: 
> `MASTER=yarn ./bin/run-example ml.LogisticRegressionExample
> data/mllib/sample_libsvm_data.txt`
> 
> Actually I get these code from spark's README. And here is the source code
> about LogisticRegressionExample on GitHub: 
> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionExample.scala
> 
>   
> 
> Then, error occurs: 
> `Exception in thread "main" org.apache.spark.sql.AnalysisException: Path
> does notexist:
> hdfs://master:9000/user/root/data/mllib/sample_libsvm_data.txt;`
> 
> Firstly, I don't know why it's `hdfs://master:9000/user/root`, I do set
> namenode's IP address to `hdfs://master:9000`, but why spark chose the
> directory `/user/root`?
> 
> Then, I make a directory `/user/root/data/mllib/sample_libsvm_data.txt` on
> every host of the cluster, so I hope spark can find this file. But the same
> error occurs again. 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Run-spark-machine-learning-example-on-Yarn-failed-tp28435.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Get S3 Parquet File

2017-02-27 Thread Femi Anthony
Ok, thanks a lot for the heads up.

Sent from my iPhone

> On Feb 25, 2017, at 10:58 AM, Steve Loughran  wrote:
> 
> 
>> On 24 Feb 2017, at 07:47, Femi Anthony  wrote:
>> 
>> Have you tried reading using s3n which is a slightly older protocol ? I'm 
>> not sure how compatible s3a is with older versions of Spark.
> 
> I would absolutely not use s3n with a 1.2 GB file.
> 
> There is a WONTFIX JIRA on how it will read to the end of a file when you 
> close a stream, and as seek() closes a stream every seek will read to the end 
> of a file. And as readFully(position, bytes) does a seek either end, every 
> time the Parquet code tries to read a bit of data, 1.3 GV of download: 
> https://issues.apache.org/jira/browse/HADOOP-12376
> 
> That is not going to be fixed, ever. Because it can only be done by upgrading 
> the libraries, and that will simply move new bugs in, lead to different 
> bugreports, etc, etc. All for a piece of code which has be supplanted in the 
> hadoop-2.7.x JARs with s3a ready for use, and in the forthcoming hadoop-2.8+ 
> code, significantly faster for IO (especially ORC/Parquet), multi-GB upload, 
> and even the basic metadata operations used when setting up queries.
> 
> For Hadoop 2.7+, use S3a. Any issues with s3n will be closed as  "use s3a"
> 
> 
>> 
>> 
>> Femi
>> 
>>> On Fri, Feb 24, 2017 at 2:18 AM, Benjamin Kim  wrote:
>>> Hi Gourav,
>>> 
>>> My answers are below.
>>> 
>>> Cheers,
>>> Ben
>>> 
>>> 
>>>> On Feb 23, 2017, at 10:57 PM, Gourav Sengupta  
>>>> wrote:
>>>> 
>>>> Can I ask where are you running your CDH? Is it on premise or have you 
>>>> created a cluster for yourself in AWS? Our cluster in on premise in our 
>>>> data center.
>>>> 
> 
> you need to set  up your s3a credentials in core-site, spark-defaults, or 
> rely on spark-submit picking up the submitters AWS env vars a propagating 
> them.
> 
> 
>>>> Also I have really never seen use s3a before, that was used way long 
>>>> before when writing s3 files took a long time, but I think that you are 
>>>> reading it. 
>>>> 
>>>> Anyideas why you are not migrating to Spark 2.1, besides speed, there are 
>>>> lots of apis which are new and the existing ones are being deprecated. 
>>>> Therefore there is a very high chance that you are already working on code 
>>>> which is being deprecated by the SPARK community right now. We use CDH and 
>>>> upgrade with whatever Spark version they include, which is 1.6.0. We are 
>>>> waiting for the move to Spark 2.0/2.1.
> 
> this is in the hadoop codebase, not the spark release. it will be the same 
> irrsepectivel
> 
>>>> 
>>>> And besides that would you not want to work on a platform which is at 
>>>> least 10 times faster What would that be?
>>>> 
>>>> Regards,
>>>> Gourav Sengupta
>>>> 
>>>>> On Thu, Feb 23, 2017 at 6:23 PM, Benjamin Kim  wrote:
>>>>> We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB 
>>>>> Parquet file from AWS S3. We can read the schema and show some data when 
>>>>> the file is loaded into a DataFrame, but when we try to do some 
>>>>> operations, such as count, we get this error below.
>>>>> 
>>>>> com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS 
>>>>> credentials from any provider in the chain
>>>>> at 
>>>>> com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>>>>> at 
>>>>> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3779)
>>>>> at 
>>>>> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
>>>>> at 
>>>>> com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
>>>>> at 
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:239)
>>>>> at 
>>>>> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2711)
>>>>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
>>>>> at 
>>>>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2748)
>>>>>

Re: Get S3 Parquet File

2017-02-23 Thread Femi Anthony
Have you tried reading using s3n which is a slightly older protocol ? I'm
not sure how compatible s3a is with older versions of Spark.


Femi

On Fri, Feb 24, 2017 at 2:18 AM, Benjamin Kim  wrote:

> Hi Gourav,
>
> My answers are below.
>
> Cheers,
> Ben
>
>
> On Feb 23, 2017, at 10:57 PM, Gourav Sengupta 
> wrote:
>
> Can I ask where are you running your CDH? Is it on premise or have you
> created a cluster for yourself in AWS? Our cluster in on premise in our
> data center.
>
> Also I have really never seen use s3a before, that was used way long
> before when writing s3 files took a long time, but I think that you are
> reading it.
>
> Anyideas why you are not migrating to Spark 2.1, besides speed, there are
> lots of apis which are new and the existing ones are being deprecated.
> Therefore there is a very high chance that you are already working on code
> which is being deprecated by the SPARK community right now. We use CDH
> and upgrade with whatever Spark version they include, which is 1.6.0. We
> are waiting for the move to Spark 2.0/2.1.
>
> And besides that would you not want to work on a platform which is at
> least 10 times faster What would that be?
>
> Regards,
> Gourav Sengupta
>
> On Thu, Feb 23, 2017 at 6:23 PM, Benjamin Kim  wrote:
>
>> We are trying to use Spark 1.6 within CDH 5.7.1 to retrieve a 1.3GB
>> Parquet file from AWS S3. We can read the schema and show some data when
>> the file is loaded into a DataFrame, but when we try to do some operations,
>> such as count, we get this error below.
>>
>> com.cloudera.com.amazonaws.AmazonClientException: Unable to load AWS
>> credentials from any provider in the chain
>> at com.cloudera.com.amazonaws.auth.AWSCredentialsProviderChain.
>> getCredentials(AWSCredentialsProviderChain.java:117)
>> at com.cloudera.com.amazonaws.services.s3.AmazonS3Client.invoke
>> (AmazonS3Client.java:3779)
>> at com.cloudera.com.amazonaws.services.s3.AmazonS3Client.headBu
>> cket(AmazonS3Client.java:1107)
>> at com.cloudera.com.amazonaws.services.s3.AmazonS3Client.doesBu
>> cketExist(AmazonS3Client.java:1070)
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSys
>> tem.java:239)
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.
>> java:2711)
>> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:97)
>> at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem
>> .java:2748)
>> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:
>> 2730)
>> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:385)
>> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>> at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReade
>> r.java:385)
>> at parquet.hadoop.ParquetRecordReader.initializeInternalReader(
>> ParquetRecordReader.java:162)
>> at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordR
>> eader.java:145)
>> at org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(
>> SqlNewHadoopRDD.scala:180)
>> at org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD
>> .scala:126)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:
>> 306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:
>> 306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsR
>> DD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:
>> 306)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>> scala:229)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Can anyone help?
>>
>> Cheers,
>> Ben
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: Reading csv files with quoted fields containing embedded commas

2016-11-06 Thread Femi Anthony
The quote options seem to be related to escaping quotes and the dataset
isn't escaaping quotes. As I said quoted strings with embedded commas is
something that pandas handles easily, and even Excel does that as well.


Femi

On Sun, Nov 6, 2016 at 6:59 AM, Hyukjin Kwon  wrote:

> Hi Femi,
>
> Have you maybe tried the quote related options specified in the
> documentation?
>
> http://spark.apache.org/docs/latest/api/python/pyspark.sql.
> html#pyspark.sql.DataFrameReader.csv
>
> Thanks.
>
> 2016-11-06 6:58 GMT+09:00 Femi Anthony :
>
>> Hi, I am trying to process a very large comma delimited csv file and I am
>> running into problems.
>> The main problem is that some fields contain quoted strings with embedded
>> commas.
>> It seems as if PySpark is unable to properly parse lines containing such
>> fields like say Pandas does.
>>
>> Here is the code I am using to read the file in Pyspark
>>
>> df_raw=spark.read.option("header","true").csv(csv_path)
>>
>> Here is an example of a good and 'bad' line in such a file:
>>
>>
>> col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col
>> 12,col13,col14,col15,col16,col17,col18,col19
>> 80015360210876000,11.22,X,4076710258,,,sxsw,,"32 YIU ""A""",S5,,"32 XIY
>> ""W""   JK, RE LK",SOMETHINGLIKEAPHENOMENON#Y
>> OUGOTSOUL~BRINGDANOISE,23.0,cyclingstats,2012-25-19,432,2023
>> -05-17,CODERED
>> 6167229561918,137.12,U,8234971771,,,woodstock,,,T4,,,OUT
>> KAST#THROOTS~WUTANG#RUNDMC,0.0,runstats,2013-21-22,1333,201
>> 9-11-23,CODEBLUE
>>
>> Line 0 is the header
>> Line 1 is the 'problematic' line
>> Line 2 is a good line.
>>
>> Pandas can handle this easily:
>>
>>
>> [1]: import pandas as pd
>>
>> In [2]: pdf = pd.read_csv('malformed_data.csv')
>>
>> In [4]: pdf[['col12','col13','col14']]
>> Out[4]:
>> col12
>> col13  \
>> 0  32 XIY "W"   JK, RE LK  SOMETHINGLIKEAPHENOMENON#YOUG
>> OTSOUL~BRINGDANOISE
>> 1 NaN
>> OUTKAST#THROOTS~WUTANG#RUNDMC
>>
>>col14
>> 0   23.0
>> 10.0
>>
>>
>> while Pyspark seems to parse this erroneously:
>>
>> [5]: sdf=spark.read.format("org.apache.spark.csv").csv('malformed
>> _data.csv',header=True)
>>
>> [6]: sdf.select("col12","col13",'col14').show()
>> +--+++
>> | col12|   col13|   col14|
>> +--+++
>> |"32 XIY ""W""   JK|  RE LK"|SOMETHINGLIKEAPHE...|
>> |  null|OUTKAST#THROOTS~W...| 0.0|
>> +--+++
>>
>>  Is this a bug or am I doing something wrong ?
>>  I am working with Spark 2.0
>>  Any help is appreciated
>>
>> Thanks,
>> -- Femi
>>
>> http://www.nextmatrix.com
>> "Great spirits have always encountered violent opposition from mediocre
>> minds." - Albert Einstein.
>>
>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Reading csv files with quoted fields containing embedded commas

2016-11-05 Thread Femi Anthony
Hi, I am trying to process a very large comma delimited csv file and I am
running into problems.
The main problem is that some fields contain quoted strings with embedded
commas.
It seems as if PySpark is unable to properly parse lines containing such
fields like say Pandas does.

Here is the code I am using to read the file in Pyspark

df_raw=spark.read.option("header","true").csv(csv_path)

Here is an example of a good and 'bad' line in such a file:



col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19
80015360210876000,11.22,X,4076710258,,,sxsw,,"32 YIU ""A""",S5,,"32 XIY
""W""   JK, RE
LK",SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE,23.0,cyclingstats,2012-25-19,432,2023-05-17,CODERED
6167229561918,137.12,U,8234971771,,,woodstock,,,T4,,,OUTKAST#THROOTS~WUTANG#RUNDMC,0.0,runstats,2013-21-22,1333,2019-11-23,CODEBLUE

Line 0 is the header
Line 1 is the 'problematic' line
Line 2 is a good line.

Pandas can handle this easily:


[1]: import pandas as pd

In [2]: pdf = pd.read_csv('malformed_data.csv')

In [4]: pdf[['col12','col13','col14']]
Out[4]:
col12 col13
 \
0  32 XIY "W"   JK, RE LK  SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE
1 NaN OUTKAST#THROOTS~WUTANG#RUNDMC

   col14
0   23.0
10.0


while Pyspark seems to parse this erroneously:

[5]:
sdf=spark.read.format("org.apache.spark.csv").csv('malformed_data.csv',header=True)

[6]: sdf.select("col12","col13",'col14').show()
+--+++
| col12|   col13|   col14|
+--+++
|"32 XIY ""W""   JK|  RE LK"|SOMETHINGLIKEAPHE...|
|  null|OUTKAST#THROOTS~W...| 0.0|
+--+++

 Is this a bug or am I doing something wrong ?
 I am working with Spark 2.0
 Any help is appreciated

Thanks,
-- Femi

http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: My notes on Spark Performance & Tuning Guide

2016-05-17 Thread Femi Anthony
Please send it to me as well.

Thanks

Sent from my iPhone

> On May 17, 2016, at 12:09 PM, Raghavendra Pandey 
>  wrote:
> 
> Can you please send me as well.
> 
> Thanks 
> Raghav
> 
>> On 12 May 2016 20:02, "Tom Ellis"  wrote:
>> I would like to also Mich, please send it through, thanks!
>> 
>>> On Thu, 12 May 2016 at 15:14 Alonso Isidoro  wrote:
>>> Me too, send me the guide.
>>> 
>>> Enviado desde mi iPhone
>>> 
 El 12 may 2016, a las 12:11, Ashok Kumar  
 escribió:
 
 Hi Dr Mich,
 
 I will be very keen to have a look at it and review if possible.
 
 Please forward me a copy
 
 Thanking you warmly
 
 
 On Thursday, 12 May 2016, 11:08, Mich Talebzadeh 
  wrote:
 
 
 Hi Al,,
 
 
 Following the threads in spark forum, I decided to write up on 
 configuration of Spark including allocation of resources and configuration 
 of driver, executors, threads, execution of Spark apps and general 
 troubleshooting taking into account the allocation of resources for Spark 
 applications and OS tools at the disposal.
 
 Since the most widespread configuration as I notice is with "Spark 
 Standalone Mode", I have decided to write these notes starting with 
 Standalone and later on moving to Yarn
 
 Standalone – a simple cluster manager included with Spark that makes it 
 easy to set up a cluster.
 YARN – the resource manager in Hadoop 2.
 
 I would appreciate if anyone interested in reading and commenting to get 
 in touch with me directly on mich.talebza...@gmail.com so I can send the 
 write-up for their review and comments.
 
 Just to be clear this is not meant to be any commercial proposition or 
 anything like that. As I seem to get involved with members troubleshooting 
 issues and threads on this topic, I thought it is worthwhile writing a 
 note about it to summarise the findings for the benefit of the community.
 
 Regards.
 
 Dr Mich Talebzadeh
  
 LinkedIn  
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
  
 http://talebzadehmich.wordpress.com


Re: Timeout when submitting an application to a remote Spark Standalone master

2016-04-29 Thread Femi Anthony
Have you tried connecting to the port 7077 on the cluster from your local 
machine to see if it works ok ?

Sent from my iPhone

> On Apr 29, 2016, at 5:58 PM, Richard Han  wrote:
> 
> I have an EC2 installation of Spark Standalone Master/Worker set up. The two 
> can talk to one another, and all ports are open in the security group (just 
> to make sure it isn't the port). When I run spark-shell on the master node 
> (setting it to --master spark://ip:7077) it runs everything correctly. When I 
> try to submit a job from my local machine however I get RPC timeout errors. 
> Does anyone know why this is or how to resolve it?
> 
> (cross posted at 
> http://stackoverflow.com/questions/36947811/application-submitted-to-remote-spark-from-local-pyspark-never-completes)
> 
> Thanks!


Re: transformation - spark vs cassandra

2016-03-31 Thread Femi Anthony
Try it out on a smaller subset of data and see which gives the better
performance.

On Thu, Mar 31, 2016 at 12:11 PM, Arun Sethia  wrote:

> Thanks Imre.
>
> But I thought spark-cassandra driver is going to do same internally.
>
> On Thu, Mar 31, 2016 at 10:32 AM, Imre Nagi 
> wrote:
>
>> I think querying by cassandra query language will be better in terms of
>> performance if you want to pull and filter the data from your db, rather
>> than pulling all of the data and do some filtering and transformation by
>> using spark data frame.
>>
>>
>> On 31 Mar 2016 22:19, "asethia"  wrote:
>>
>>> Hi,
>>>
>>> I am working with Cassandra and Spark, would like to know what is best
>>> performance using Cassandra filter based on primary key and cluster key
>>> vs
>>> using spark data frame transformation/filters.
>>>
>>> for example in spark:
>>>
>>>  val rdd = sqlContext.read.format("org.apache.spark.sql.cassandra")
>>>   .options(Map("keyspace" -> "test", "table" -> "test"))
>>>   .load()
>>>
>>> and then rdd.filter("cdate
>>> ='2016-06-07'").filter("country='USA'").count()
>>>
>>> vs
>>>
>>> using Cassandra (where cdate is part of primary key and country as
>>> cluster
>>> key).
>>>
>>> SELECT count(*) FROM test WHERE cdate ='2016-06-07' AND country='USA'
>>>
>>> I would like to know when should we use Cassandra simple query vs
>>> dataframe
>>> in terms of performance with billion of rows.
>>>
>>> Thanks
>>> arun
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/transformation-spark-vs-cassandra-tp26647.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: confusing about Spark SQL json format

2016-03-31 Thread Femi Anthony
I encountered a similar problem reading multi-line JSON files into Spark a
while back, and here's an article I wrote about how to solve it:

http://searchdatascience.com/spark-adventures-1-processing-multi-line-json-files/

You may find it useful.

Femi

On Thu, Mar 31, 2016 at 12:32 PM,  wrote:

> You are correct that it does not take the standard JSON file format. From
> the Spark Docs:
> "Note that the file that is offered as *a json file* is not a typical
> JSON file. Each line must contain a separate, self-contained valid JSON
> object. As a consequence, a regular multi-line JSON file will most often
> fail.”
>
>
> http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
>
> On Mar 31, 2016, at 5:30 AM, charles li  wrote:
>
> hi, UMESH, have you tried to load that json file on your machine? I did
> try it before, and here is the screenshot:
>
> <屏幕快照 2016-03-31 下午5.27.30.png>
> <屏幕快照 2016-03-31 下午5.27.39.png>
> ​
> ​
>
>
>
>
> On Thu, Mar 31, 2016 at 5:19 PM, UMESH CHAUDHARY 
> wrote:
>
>> Hi Charles,
>> The definition of object from www.json.org
>> 
>> :
>>
>> An *object* is an unordered set of name/value pairs. An object begins
>> with { (left brace) and ends with } (right brace). Each name is followed
>> by : (colon) and the name/value pairs are separated by , (comma).
>>
>> Its a pretty much OOPS paradigm , isn't it?
>>
>> Regards,
>> Umesh
>>
>> On Thu, Mar 31, 2016 at 2:34 PM, charles li 
>> wrote:
>>
>>> hi, UMESH, I think you've misunderstood the json definition.
>>>
>>> there is only one object in a json file:
>>>
>>>
>>> for the file, people.json, as bellow:
>>>
>>>
>>> 
>>>
>>> {"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
>>> {"name":"Michael", "address":{"city":null, "state":"California"}}
>>>
>>>
>>> ---
>>>
>>> it does have two valid format:
>>>
>>> 1.
>>>
>>>
>>> 
>>>
>>> [ {"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}},
>>> {"name":"Michael", "address":{"city":null, "state":"California"}}
>>> ]
>>>
>>>
>>> ---
>>>
>>> 2.
>>>
>>>
>>> 
>>>
>>> {"name": ["Yin", "Michael"],
>>> "address":[ {"city":"Columbus","state":"Ohio"},
>>> {"city":null, "state":"California"} ]
>>> }
>>>
>>> ---
>>>
>>>
>>>
>>> On Thu, Mar 31, 2016 at 4:53 PM, UMESH CHAUDHARY 
>>> wrote:
>>>
 Hi,
 Look at below image which is from json.org
 
 :

 

 The above image describes the object formulation of below JSON:

 Object 1=> {"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
 Object=> {"name":"Michael", "address":{"city":null,
 "state":"California"}}


 Note that "address" is also an object.



 On Thu, Mar 31, 2016 at 1:53 PM, charles li 
 wrote:

> as this post  says, that in spark, we can load a json file in this way
> bellow:
>
> *post* :
> https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html
> 
>
>
>
> ---
> sqlContext.jsonFile(file_path)
> or
> sqlContext.read.json(file_path)
>
> ---
>
>
> and the *json file format* looks like bellow, say *people.json*
>
>
> {"name":"Yin",
> "address":{"city":"Columbus","state":"Ohio"}}
> {"name":"Michael", "address":{"city":null, "state":"California"}

Re: How to design the input source of spark stream

2016-03-31 Thread Femi Anthony
Also,  ssc.textFileStream(dataDir) will read all the files from a directory
so as far as I can see there's no need to merge the files. Just write them
to the same HDFS directory.

On Thu, Mar 31, 2016 at 8:04 AM, Femi Anthony  wrote:

> I don't think you need to do it this way.
>
> Take a look here :
> http://spark.apache.org/docs/latest/streaming-programming-guide.html
> in this section:
> Level of Parallelism in Data Receiving
>  Receiving multiple data streams can therefore be achieved by creating
> multiple input DStreams and configuring them to receive different
> partitions of the data stream from the source(s)
> These multiple DStreams can be unioned together to create a single
> DStream. Then the transformations that were being applied on a single input
> DStream can be applied on the unified stream.
>
>
> On Wed, Mar 30, 2016 at 11:08 PM, kramer2...@126.com 
> wrote:
>
>> Hi
>>
>> My environment is described like below:
>>
>> 5 nodes, each nodes generate a big csv file every 5 minutes. I need spark
>> stream to analyze these 5 files in every five minutes to generate some
>> report.
>>
>> I am planning to do it in this way:
>>
>> 1. Put those 5 files into HDSF directory called /data
>> 2. Merge them into one big file in that directory
>> 3. Use spark stream constructor textFileStream('/data') to generate my
>> inputDStream
>>
>> The problem of this way is I do not know how to merge the 5 files in HDFS.
>> It seems very difficult to do it in python.
>>
>> So question is
>>
>> 1. Can you tell me how to merge files in hdfs by python?
>> 2. Do you know some other way to input those files into spark?
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-design-the-input-source-of-spark-stream-tp26641.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> http://www.femibyte.com/twiki5/bin/view/Tech/
> http://www.nextmatrix.com
> "Great spirits have always encountered violent opposition from mediocre
> minds." - Albert Einstein.
>



-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: How to design the input source of spark stream

2016-03-31 Thread Femi Anthony
I don't think you need to do it this way.

Take a look here :
http://spark.apache.org/docs/latest/streaming-programming-guide.html
in this section:
Level of Parallelism in Data Receiving
 Receiving multiple data streams can therefore be achieved by creating
multiple input DStreams and configuring them to receive different
partitions of the data stream from the source(s)
These multiple DStreams can be unioned together to create a single DStream.
Then the transformations that were being applied on a single input DStream
can be applied on the unified stream.


On Wed, Mar 30, 2016 at 11:08 PM, kramer2...@126.com 
wrote:

> Hi
>
> My environment is described like below:
>
> 5 nodes, each nodes generate a big csv file every 5 minutes. I need spark
> stream to analyze these 5 files in every five minutes to generate some
> report.
>
> I am planning to do it in this way:
>
> 1. Put those 5 files into HDSF directory called /data
> 2. Merge them into one big file in that directory
> 3. Use spark stream constructor textFileStream('/data') to generate my
> inputDStream
>
> The problem of this way is I do not know how to merge the 5 files in HDFS.
> It seems very difficult to do it in python.
>
> So question is
>
> 1. Can you tell me how to merge files in hdfs by python?
> 2. Do you know some other way to input those files into spark?
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-design-the-input-source-of-spark-stream-tp26641.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Femi Anthony
I am working on Spark Streaming API and I wish to stream a set of
pre-downloaded web log files continuously to simulate a real-time stream. I
wrote a script that gunzips the compressed logs and pipes the output to nc
on port .

The script looks like this:

BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
zipped_files=`find $BASEDIR -name "*.gz"`

for zfile in $zipped_files
 do
  echo "Unzipping $zfile..."
  gunzip -c $zfile  | nc -l -p  -q 20

 done

I have streaming code written in Scala that processes the streams. It works
well for the most part, but when its run out of files to stream I get the
following error in Spark:

16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
Restarting receiver with delay 2000 ms: Socket data stream had no more data
16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
Restarting receiver with delay 2000ms: Socket data stream had no more data
16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600
replicated to only 0 peer(s) instead of 1 peers

16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

How to I implement a graceful shutdown so that the program exits gracefully
when it no longer detects any data in the stream ?

My Spark Streaming code looks like this:

object StreamingLogEnhanced {
 def main(args: Array[String]) {
  val master = args(0)
  val conf = new
 SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
 // Create a StreamingContext with a n second batch size
  val ssc = new StreamingContext(conf, Seconds(10))
 // Create a DStream from all the input on port 
  val log = Logger.getLogger(getClass.getName)

  sys.ShutdownHookThread {
  log.info("Gracefully stopping Spark Streaming Application")
  ssc.stop(true, true)
  log.info("Application stopped")
  }
  val lines = ssc.socketTextStream("localhost", )
  // Create a count of log hits by ip
  var ipCounts=countByIp(lines)
  ipCounts.print()

  // start our streaming context and wait for it to "finish"
  ssc.start()
  // Wait for 600 seconds then exit
  ssc.awaitTermination(1*600)
  ssc.stop()
  }

 def countByIp(lines: DStream[String]) = {
   val parser = new AccessLogParser
   val accessLogDStream = lines.map(line => parser.parseRecord(line))
   val ipDStream = accessLogDStream.map(entry =>
(entry.get.clientIpAddress, 1))
   ipDStream.reduceByKey((x, y) => x + y)
 }

}

Thanks for any suggestions in advance.


Re: Appending filename information to RDD initialized by sc.textFile

2016-01-20 Thread Femi Anthony
Thanks, I'll take a look.

On Wed, Jan 20, 2016 at 1:38 AM, Akhil Das 
wrote:

> You can use the sc.newAPIHadoopFile and pass your own InputFormat and
> RecordReader which will read the compressed .gz files to your usecase. For
> a start, you can look at the:
>
> - wholeTextFile implementation
> <https://github.com/apache/spark/blob/ad1503f92e1f6e960a24f9f5d36b1735d1f5073a/core/src/main/scala/org/apache/spark/SparkContext.scala#L839>
> - WholeTextFileInputFormat
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala>
> - WholeTextFileRecordReader
> <https://github.com/apache/spark/blob/7a375bb87a8df56d9dde0c484e725e5c497a9876/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala>
>
>
>
>
>
> Thanks
> Best Regards
>
> On Tue, Jan 19, 2016 at 11:48 PM, Femi Anthony  wrote:
>
>>
>>
>>  I  have a set of log files I would like to read into an RDD. These
>> files are all compressed .gz and are the filenames are date stamped. The
>> source of these files is the page view statistics data for wikipedia
>>
>> http://dumps.wikimedia.org/other/pagecounts-raw/
>>
>> The file names look like this:
>>
>> pagecounts-20090501-00.gz
>> pagecounts-20090501-01.gz
>> pagecounts-20090501-02.gz
>>
>> What I would like to do is read in all such files in a directory and
>> prepend the date from the filename (e.g. 20090501) to each row of the
>> resulting RDD. I first thought of using *sc.wholeTextFiles(..)* instead
>> of *sc.textFile(..)*, which creates a PairRDD with the key being the
>> file name with a path, but*sc.wholeTextFiles()* doesn't handle
>> compressed .gz files.
>>
>> Any suggestions would be welcome.
>>
>> --
>> http://www.femibyte.com/twiki5/bin/view/Tech/
>> http://www.nextmatrix.com
>> "Great spirits have always encountered violent opposition from mediocre
>> minds." - Albert Einstein.
>>
>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: Spark Cassandra Java Connector: records missing despite consistency=ALL

2016-01-19 Thread Femi Anthony
So is the logging to Cassandra being done via Spark ?

On Wed, Jan 13, 2016 at 7:17 AM, Dennis Birkholz 
wrote:

> Hi together,
>
> we Cassandra to log event data and process it every 15 minutes with Spark.
> We are using the Cassandra Java Connector for Spark.
>
> Randomly our Spark runs produce too few output records because no data is
> returned from Cassandra for a several minutes window of input data. When
> querying the data (with cqlsh), after multiple tries, the data eventually
> becomes available.
>
> To solve the problem, we tried to use consistency=ALL when reading the
> data in Spark. We use the
> CassandraJavaUtil.javafunctions().cassandraTable() method and have set
> "spark.cassandra.input.consistency.level"="ALL" on the config when creating
> the Spark context. The problem persists but according to
> http://stackoverflow.com/a/25043599 using a consistency level of ONE on
> the write side (which we use) and ALL on the READ side should be sufficient
> for data consistency.
>
> I would really appreciate if someone could give me a hint how to fix this
> problem, thanks!
>
> Greets,
> Dennis
>
> P.s.:
> some information about our setup:
> Cassandra 2.1.12 in a two Node configuration with replication factor=2
> Spark 1.5.1
> Cassandra Java Driver 2.2.0-rc3
> Spark Cassandra Java Connector 2.10-1.5.0-M2
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Appending filename information to RDD initialized by sc.textFile

2016-01-19 Thread Femi Anthony
 I  have a set of log files I would like to read into an RDD. These files
are all compressed .gz and are the filenames are date stamped. The source
of these files is the page view statistics data for wikipedia

http://dumps.wikimedia.org/other/pagecounts-raw/

The file names look like this:

pagecounts-20090501-00.gz
pagecounts-20090501-01.gz
pagecounts-20090501-02.gz

What I would like to do is read in all such files in a directory and
prepend the date from the filename (e.g. 20090501) to each row of the
resulting RDD. I first thought of using *sc.wholeTextFiles(..)* instead of
*sc.textFile(..)*, which creates a PairRDD with the key being the file name
with a path, but*sc.wholeTextFiles()* doesn't handle compressed .gz files.

Any suggestions would be welcome.

-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.


Re: pyspark: calculating row deltas

2016-01-10 Thread Femi Anthony
Can you clarify what you mean with an actual example ?

For example, if your data frame looks like this:

ID  Year   Value
12012   100
22013   101
32014   102

What's your desired output ?

Femi


On Sat, Jan 9, 2016 at 4:55 PM, Franc Carter  wrote:

>
> Hi,
>
> I have a DataFrame with the columns
>
>  ID,Year,Value
>
> I'd like to create a new Column that is Value2-Value1 where the
> corresponding Year2=Year-1
>
> At the moment I am creating  a new DataFrame with renamed columns and doing
>
>DF.join(DF2, . . . .)
>
>  This looks cumbersome to me, is there abtter way ?
>
> thanks
>
>
> --
> Franc
>



-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.