Re: Write DataFrame with Partition and choose Filename in PySpark

2023-05-05 Thread Marco Costantini
Hi Mich,

Thank you. Ah, I want to avoid bringing all data to the driver node. That
is my understanding of what will happen in that case. Perhaps, I'll trigger
a Lambda to rename/combine the files after PySpark writes them.

Cheers,
Marco.

On Thu, May 4, 2023 at 5:25 PM Mich Talebzadeh 
wrote:

> you can try
>
> df2.coalesce(1).write.mode("overwrite").json("/tmp/pairs.json")
>
> hdfs dfs -ls /tmp/pairs.json
> Found 2 items
> -rw-r--r--   3 hduser supergroup  0 2023-05-04 22:21
> /tmp/pairs.json/_SUCCESS
> -rw-r--r--   3 hduser supergroup 96 2023-05-04 22:21
> /tmp/pairs.json/part-0-21f12540-c1c6-441d-a9b2-a82ce2113853-c000.json
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 May 2023 at 22:14, Marco Costantini <
> marco.costant...@rocketfncl.com> wrote:
>
>> Hi Mich,
>> Thank you.
>> Are you saying this satisfies my requirement?
>>
>> On the other hand, I am smelling something going on. Perhaps the Spark
>> 'part' files should not be thought of as files, but rather pieces of a
>> conceptual file. If that is true, then your approach (of which I'm well
>> aware) makes sense. Question: what are some good methods, tools, for
>> combining the parts into a single, well-named file? I imagine that is
>> outside of the scope of PySpark, but any advice is welcome.
>>
>> Thank you,
>> Marco.
>>
>> On Thu, May 4, 2023 at 5:05 PM Mich Talebzadeh 
>> wrote:
>>
>>> AWS S3, or Google gs are hadoop compatible file systems (HCFS) , so they
>>> do sharding to improve read performance when writing to HCFS file systems.
>>>
>>> Let us take your code for a drive
>>>
>>> import findspark
>>> findspark.init()
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import struct
>>> from pyspark.sql.types import *
>>> spark = SparkSession.builder \
>>> .getOrCreate()
>>> pairs = [(1, "a1"), (2, "a2"), (3, "a3")]
>>> Schema = StructType([ StructField("ID", IntegerType(), False),
>>>   StructField("datA" , StringType(), True)])
>>> df = spark.createDataFrame(data=pairs,schema=Schema)
>>> df.printSchema()
>>> df.show()
>>> df2 = df.select(df.ID.alias("ID"), struct(df.datA).alias("Struct"))
>>> df2.printSchema()
>>> df2.show()
>>> df2.write.mode("overwrite").json("/tmp/pairs.json")
>>>
>>> root
>>>  |-- ID: integer (nullable = false)
>>>  |-- datA: string (nullable = true)
>>>
>>> +---++
>>> | ID|datA|
>>> +---++
>>> |  1|  a1|
>>> |  2|  a2|
>>> |  3|  a3|
>>> +---++
>>>
>>> root
>>>  |-- ID: integer (nullable = false)
>>>  |-- Struct: struct (nullable = false)
>>>  ||-- datA: string (nullable = true)
>>>
>>> +---+--+
>>> | ID|Struct|
>>> +---+--+
>>> |  1|  {a1}|
>>> |  2|  {a2}|
>>> |  3|  {a3}|
>>> +---+--+
>>>
>>> Look at the last line where json format is written
>>> df2.write.mode("overwrite").json("/tmp/pairs.json")
>>> Under the bonnet this happens
>>>
>>> hdfs dfs -ls /tmp/pairs.json
>>> Found 5 items
>>> -rw-r--r--   3 hduser supergroup  0 2023-05-04 21:53
>>> /tmp/pairs.json/_SUCCESS
>>> -rw-r--r--   3 hduser supergroup  0 2023-05-04 21:53
>>> /tmp/pairs.json/part-0-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>>> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
>>> /tmp/pairs.json/part-1-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>>> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
>>> /tmp/pairs.json/part-2-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>>> -rw-r--r--   3 hduser supe

Re: Write DataFrame with Partition and choose Filename in PySpark

2023-05-04 Thread Marco Costantini
Hi Mich,
Thank you.
Are you saying this satisfies my requirement?

On the other hand, I am smelling something going on. Perhaps the Spark
'part' files should not be thought of as files, but rather pieces of a
conceptual file. If that is true, then your approach (of which I'm well
aware) makes sense. Question: what are some good methods, tools, for
combining the parts into a single, well-named file? I imagine that is
outside of the scope of PySpark, but any advice is welcome.

Thank you,
Marco.

On Thu, May 4, 2023 at 5:05 PM Mich Talebzadeh 
wrote:

> AWS S3, or Google gs are hadoop compatible file systems (HCFS) , so they
> do sharding to improve read performance when writing to HCFS file systems.
>
> Let us take your code for a drive
>
> import findspark
> findspark.init()
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import struct
> from pyspark.sql.types import *
> spark = SparkSession.builder \
> .getOrCreate()
> pairs = [(1, "a1"), (2, "a2"), (3, "a3")]
> Schema = StructType([ StructField("ID", IntegerType(), False),
>   StructField("datA" , StringType(), True)])
> df = spark.createDataFrame(data=pairs,schema=Schema)
> df.printSchema()
> df.show()
> df2 = df.select(df.ID.alias("ID"), struct(df.datA).alias("Struct"))
> df2.printSchema()
> df2.show()
> df2.write.mode("overwrite").json("/tmp/pairs.json")
>
> root
>  |-- ID: integer (nullable = false)
>  |-- datA: string (nullable = true)
>
> +---++
> | ID|datA|
> +---++
> |  1|  a1|
> |  2|  a2|
> |  3|  a3|
> +---++
>
> root
>  |-- ID: integer (nullable = false)
>  |-- Struct: struct (nullable = false)
>  ||-- datA: string (nullable = true)
>
> +---+--+
> | ID|Struct|
> +---+--+
> |  1|  {a1}|
> |  2|  {a2}|
> |  3|  {a3}|
> +---+--+
>
> Look at the last line where json format is written
> df2.write.mode("overwrite").json("/tmp/pairs.json")
> Under the bonnet this happens
>
> hdfs dfs -ls /tmp/pairs.json
> Found 5 items
> -rw-r--r--   3 hduser supergroup  0 2023-05-04 21:53
> /tmp/pairs.json/_SUCCESS
> -rw-r--r--   3 hduser supergroup  0 2023-05-04 21:53
> /tmp/pairs.json/part-0-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
> /tmp/pairs.json/part-1-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
> /tmp/pairs.json/part-2-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
> -rw-r--r--   3 hduser supergroup 32 2023-05-04 21:53
> /tmp/pairs.json/part-3-0b5780ae-f5b6-47e7-b44b-757948f03c3c-c000.json
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 May 2023 at 21:38, Marco Costantini <
> marco.costant...@rocketfncl.com> wrote:
>
>> Hello,
>>
>> I am testing writing my DataFrame to S3 using the DataFrame `write`
>> method. It mostly does a great job. However, it fails one of my
>> requirements. Here are my requirements.
>>
>> - Write to S3
>> - use `partitionBy` to automatically make folders based on my chosen
>> partition columns
>> - control the resultant filename (whole or in part)
>>
>> I can get the first two requirements met but not the third.
>>
>> Here's an example. When I use the commands...
>>
>> df.write.partitionBy("year","month").mode("append")\
>> .json('s3a://bucket_name/test_folder/')
>>
>> ... I get the partitions I need. However, the filenames are something
>> like:part-0-0e2e2096-6d32-458d-bcdf-dbf7d74d80fd.c000.json
>>
>>
>> Now, I understand Spark's need to include the partition number in the
>> filename. However, it sure would be nice to control the rest of the file
>> name.
>>
>>
>> Any advice? Please and thank you.
>>
>> Marco.
>>
>


Write DataFrame with Partition and choose Filename in PySpark

2023-05-04 Thread Marco Costantini
Hello,

I am testing writing my DataFrame to S3 using the DataFrame `write` method.
It mostly does a great job. However, it fails one of my requirements. Here
are my requirements.

- Write to S3
- use `partitionBy` to automatically make folders based on my chosen
partition columns
- control the resultant filename (whole or in part)

I can get the first two requirements met but not the third.

Here's an example. When I use the commands...

df.write.partitionBy("year","month").mode("append")\
.json('s3a://bucket_name/test_folder/')

... I get the partitions I need. However, the filenames are something
like:part-0-0e2e2096-6d32-458d-bcdf-dbf7d74d80fd.c000.json


Now, I understand Spark's need to include the partition number in the
filename. However, it sure would be nice to control the rest of the file
name.


Any advice? Please and thank you.

Marco.


Re: Write custom JSON from DataFrame in PySpark

2023-05-04 Thread Marco Costantini
Hi Enrico,
What a great answer. Thank you. Seems like I need to get comfortable with
the 'struct' and then I will be golden. Thank you again, friend.

Marco.

On Thu, May 4, 2023 at 3:00 AM Enrico Minack  wrote:

> Hi,
>
> You could rearrange the DataFrame so that writing the DataFrame as-is
> produces your structure:
>
> df = spark.createDataFrame([(1, "a1"), (2, "a2"), (3, "a3")], "id int,
> datA string")
> +---++
> | id|datA|
> +---++
> |  1|  a1|
> |  2|  a2|
> |  3|  a3|
> +---++
>
> df2 = df.select(df.id, struct(df.datA).alias("stuff"))
> root
>   |-- id: integer (nullable = true)
>   |-- stuff: struct (nullable = false)
>   ||-- datA: string (nullable = true)
> +---+-+
> | id|stuff|
> +---+-+
> |  1| {a1}|
> |  2| {a2}|
> |  3| {a3}|
> +---+-+
>
> df2.write.json("data.json")
> {"id":1,"stuff":{"datA":"a1"}}
> {"id":2,"stuff":{"datA":"a2"}}
> {"id":3,"stuff":{"datA":"a3"}}
>
> Looks pretty much like what you described.
>
> Enrico
>
>
> Am 04.05.23 um 06:37 schrieb Marco Costantini:
> > Hello,
> >
> > Let's say I have a very simple DataFrame, as below.
> >
> > +---++
> > | id|datA|
> > +---++
> > |  1|  a1|
> > |  2|  a2|
> > |  3|  a3|
> > +---++
> >
> > Let's say I have a requirement to write this to a bizarre JSON
> > structure. For example:
> >
> > {
> >   "id": 1,
> >   "stuff": {
> > "datA": "a1"
> >   }
> > }
> >
> > How can I achieve this with PySpark? I have only seen the following:
> > - writing the DataFrame as-is (doesn't meet requirement)
> > - using a UDF (seems frowned upon)
> >
> > What I have tried is to do this within a `foreach`. I have had some
> > success, but also some problems with other requirements (serializing
> > other things).
> >
> > Any advice? Please and thank you,
> > Marco.
>
>
>


Write custom JSON from DataFrame in PySpark

2023-05-03 Thread Marco Costantini
Hello,

Let's say I have a very simple DataFrame, as below.

+---++
| id|datA|
+---++
|  1|  a1|
|  2|  a2|
|  3|  a3|
+---++

Let's say I have a requirement to write this to a bizarre JSON structure.
For example:

{
  "id": 1,
  "stuff": {
"datA": "a1"
  }
}

How can I achieve this with PySpark? I have only seen the following:
- writing the DataFrame as-is (doesn't meet requirement)
- using a UDF (seems frowned upon)

What I have tried is to do this within a `foreach`. I have had some
success, but also some problems with other requirements (serializing other
things).

Any advice? Please and thank you,
Marco.


Re: What is the best way to organize a join within a foreach?

2023-04-26 Thread Marco Costantini
Thanks team,
Email was just an example. The point was to illustrate that some actions
could be chained using Spark's foreach. In reality, this is an S3 write and
a Kafka message production, which I think is quite reasonable for spark to
do.

To answer Ayan's first question. Yes, all a users orders, prepared for each
and every user.

Other than the remarks that email transmission is unwise (which I've now
reminded is irrelevant) I am not seeing an alternative to using Spark's
foreach. Unless, your proposal is for the Spark job to target 1 user, and
just run the job 1000's of times taking the user_id as input. That doesn't
sound attractive.

Also, while we say that foreach is not optimal, I cannot find any evidence
of it; neither here nor online. If there are any docs about the inner
workings of this functionality, please pass them to me. I continue to
search for them. Even late last night!

Thanks for your help team,
Marco.

On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh 
wrote:

> Indeed very valid points by Ayan. How email is going to handle 1000s of
> records. As a solution architect I tend to replace. Users by customers and
> for each order there must be products sort of many to many relationship. If
> I was a customer I would also be interested in product details as
> well.sending via email sounds like a Jurassic park solution 
>
> On Wed, 26 Apr 2023 at 10:24, ayan guha  wrote:
>
>> Adding to what Mitch said,
>>
>> 1. Are you trying to send statements of all orders to all users? Or the
>> latest order only?
>>
>> 2. Sending email is not a good use of spark. instead, I suggest to use a
>> notification service or function. Spark should write to a queue (kafka,
>> sqs...pick your choice here).
>>
>> Best regards
>> Ayan
>>
>> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Well OK in a nutshell you want the result set for every user prepared
>>> and email to that user right.
>>>
>>> This is a form of ETL where those result sets need to be posted
>>> somewhere. Say you create a table based on the result set prepared for each
>>> user. You may have many raw target tables at the end of the first ETL. How
>>> does this differ from using forEach? Performance wise forEach may not be
>>> optimal.
>>>
>>> Can you take the sample tables and try your method?
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
>>> marco.costant...@rocketfncl.com> wrote:
>>>
>>>> Hi Mich,
>>>> First, thank you for that. Great effort put into helping.
>>>>
>>>> Second, I don't think this tackles the technical challenge here. I
>>>> understand the windowing as it serves those ranks you created, but I don't
>>>> see how the ranks contribute to the solution.
>>>> Third, the core of the challenge is about performing this kind of
>>>> 'statement' but for all users. In this example we target Mich, but that
>>>> reduces the complexity by a lot! In fact, a simple join and filter would
>>>> solve that one.
>>>>
>>>> Any thoughts on that? For me, the foreach is desirable because I can
>>>> have the workers chain other actions to each iteration (send email, send
>>>> HTTP request, etc).
>>>>
>>>> Thanks Mich,
>>>> Marco.
>>>>
>>>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi Marco,
>>>>>
>>>>> First thoughts.
>>>>>
>>>>> foreach() is an action operation that is to iterate/loop over each
>>>>> element in the dataset, meaning cursor based. Tha

Re: What is the best way to organize a join within a foreach?

2023-04-25 Thread Marco Costantini
Hi Mich,
First, thank you for that. Great effort put into helping.

Second, I don't think this tackles the technical challenge here. I
understand the windowing as it serves those ranks you created, but I don't
see how the ranks contribute to the solution.
Third, the core of the challenge is about performing this kind of
'statement' but for all users. In this example we target Mich, but that
reduces the complexity by a lot! In fact, a simple join and filter would
solve that one.

Any thoughts on that? For me, the foreach is desirable because I can have
the workers chain other actions to each iteration (send email, send HTTP
request, etc).

Thanks Mich,
Marco.

On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh 
wrote:

> Hi Marco,
>
> First thoughts.
>
> foreach() is an action operation that is to iterate/loop over each
> element in the dataset, meaning cursor based. That is different from
> operating over the dataset as a set which is far more efficient.
>
> So in your case as I understand it correctly, you want to get order for
> each user (say Mich), convert the result set to json and send it to Mich
> via email
>
> Let us try this based on sample data
>
> Put your csv files into HDFS directory
>
> hdfs dfs -put users.csv /data/stg/test
> hdfs dfs -put orders.csv /data/stg/test
>
> Then create dataframes from csv files, create temp views and do a join on
> result sets with some slicing and dicing on orders table
>
> #! /usr/bin/env python3
> from __future__ import print_function
> import sys
> import findspark
> findspark.init()
> from pyspark.sql import SparkSession
> from pyspark import SparkContext
> from pyspark.sql import SQLContext, HiveContext
> from pyspark.sql.window import Window
>
> def spark_session(appName):
>   return SparkSession.builder \
> .appName(appName) \
> .enableHiveSupport() \
> .getOrCreate()
>
> def main():
> appName = "ORDERS"
> spark =spark_session(appName)
> # get the sample
> users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
> orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
> users_df =
> spark.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load(users_file)
> users_df.printSchema()
> """
> root
> |-- id: integer (nullable = true)
> |-- name: string (nullable = true)
> """
>
> print(f"""\n Reading from  {users_file}\n""")
> users_df.show(5,False)
> orders_df =
> spark.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load(orders_file)
> orders_df.printSchema()
> """
> root
> |-- id: integer (nullable = true)
> |-- description: string (nullable = true)
> |-- amount: double (nullable = true)
> |-- user_id: integer (nullable = true)
>  """
> print(f"""\n Reading from  {orders_file}\n""")
> orders_df.show(50,False)
> users_df.createOrReplaceTempView("users")
> orders_df.createOrReplaceTempView("orders")
> # Create a list of orders for each user
> print(f"""\n Doing a join on two temp views\n""")
>
> sqltext = """
> SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
> FROM
> (
> SELECT
> user_id AS user_id
> ,   id as order_id
> ,   description as description
> ,   amount AS amount
> ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS RANK
> ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS maxorders
> FROM orders
> ) t
> INNER JOIN users u ON t.user_id = u.id
> AND  u.name = 'Mich'
> ORDER BY t.order_id
> """
> spark.sql(sqltext).show(50)
> if __name__ == '__main__':
> main()
>
> Final outcome displaying orders for user Mich
>
> Doing a join on two temp views
>
>  Doing a join on two temp views
>
> +++-+--+-+
> |name|order_id|  description|amount|maxorders|
> +++-+--+-+
> |Mich|   50001| Mich's 1st order|101.11|   101.11|
> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
> |Mich|   50004| Mich's 4th order|104.11|   104.11|
> |Mich|   50005| Mich's 5th order|105.11|   105.11|
> |Mich|   50006| Mich's 6t

Re: What is the best way to organize a join within a foreach?

2023-04-25 Thread Marco Costantini
Thanks Mich,

Great idea. I have done it. Those files are attached. I'm interested to
know your thoughts. Let's imagine this same structure, but with huge
amounts of data as well.

Please and thank you,
Marco.

On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh 
wrote:

> Hi Marco,
>
> Let us start simple,
>
> Provide a csv file of 5 rows for the users table. Each row has a unique
> user_id and one or two other columns like fictitious email etc.
>
> Also for each user_id, provide 10 rows of orders table, meaning that
> orders table has 5 x 10 rows for each user_id.
>
> both as comma separated csv file
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
> marco.costant...@rocketfncl.com> wrote:
>
>> Thanks Mich,
>> I have not but I will certainly read up on this today.
>>
>> To your point that all of the essential data is in the 'orders' table; I
>> agree! That distills the problem nicely. Yet, I still have some questions
>> on which someone may be able to shed some light.
>>
>> 1) If my 'orders' table is very large, and will need to be aggregated by
>> 'user_id', how will Spark intelligently optimize on that constraint (only
>> read data for relevent 'user_id's). Is that something I have to instruct
>> Spark to do?
>>
>> 2) Without #1, even with windowing, am I asking each partition to search
>> too much?
>>
>> Please, if you have any links to documentation I can read on *how* Spark
>> works under the hood for these operations, I would appreciate it if you
>> give them. Spark has become a pillar on my team and knowing it in more
>> detail is warranted.
>>
>> Slightly pivoting the subject here; I have tried something. It was a
>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>> script I now have the line:
>>
>> ```
>> grouped_orders_df =
>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>> 'timestamp', 'total', 'description'))).alias('orders'))
>> ```
>> (json is ultimately needed)
>>
>> This actually achieves my goal by putting all of the 'orders' in a single
>> Array column. Now my worry is, will this column become too large if there
>> are a great many orders. Is there a limit? I have search for documentation
>> on such a limit but could not find any.
>>
>> I truly appreciate your help Mich and team,
>> Marco.
>>
>>
>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Have you thought of using  windowing function
>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>> achieve this?
>>>
>>> Effectively all your information is in the orders table.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>> marco.costant...@rocketfncl.com> wrote:
>>>
>>>> I have two tables: {users, orders}. In this example, let's say that for
>>>> each 1 User in the users table, there are 10 Orders in the orders 
>>>> table.
>>>>
>>

Re: What is the best way to organize a join within a foreach?

2023-04-25 Thread Marco Costantini
Thanks Mich,
I have not but I will certainly read up on this today.

To your point that all of the essential data is in the 'orders' table; I
agree! That distills the problem nicely. Yet, I still have some questions
on which someone may be able to shed some light.

1) If my 'orders' table is very large, and will need to be aggregated by
'user_id', how will Spark intelligently optimize on that constraint (only
read data for relevent 'user_id's). Is that something I have to instruct
Spark to do?

2) Without #1, even with windowing, am I asking each partition to search
too much?

Please, if you have any links to documentation I can read on *how* Spark
works under the hood for these operations, I would appreciate it if you
give them. Spark has become a pillar on my team and knowing it in more
detail is warranted.

Slightly pivoting the subject here; I have tried something. It was a
suggestion by an AI chat bot and it seemed reasonable. In my main Spark
script I now have the line:

```
grouped_orders_df =
orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
'timestamp', 'total', 'description'))).alias('orders'))
```
(json is ultimately needed)

This actually achieves my goal by putting all of the 'orders' in a single
Array column. Now my worry is, will this column become too large if there
are a great many orders. Is there a limit? I have search for documentation
on such a limit but could not find any.

I truly appreciate your help Mich and team,
Marco.


On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh 
wrote:

> Have you thought of using  windowing function
> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
> achieve this?
>
> Effectively all your information is in the orders table.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
> marco.costant...@rocketfncl.com> wrote:
>
>> I have two tables: {users, orders}. In this example, let's say that for
>> each 1 User in the users table, there are 10 Orders in the orders table.
>>
>> I have to use pyspark to generate a statement of Orders for each User.
>> So, a single user will need his/her own list of Orders. Additionally, I
>> need to send this statement to the real-world user via email (for example).
>>
>> My first intuition was to apply a DataFrame.foreach() on the users
>> DataFrame. This way, I can rely on the spark workers to handle the email
>> sending individually. However, I now do not know the best way to get each
>> User's Orders.
>>
>> I will soon try the following (pseudo-code):
>>
>> ```
>> users_df = 
>> orders_df = 
>>
>> #this is poorly named for max understandability in this context
>> def foreach_function(row):
>>   user_id = row.user_id
>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>
>>   #here, I'd get any User info from 'row'
>>   #then, I'd convert all 'user_orders' to JSON
>>   #then, I'd prepare the email and send it
>>
>> users_df.foreach(foreach_function)
>> ```
>>
>> It is my understanding that if I do my user-specific work in the foreach
>> function, I will capitalize on Spark's scalability when doing that work.
>> However, I am worried of two things:
>>
>> If I take all Orders up front...
>>
>> Will that work?
>> Will I be taking too much? Will I be taking Orders on partitions who
>> won't handle them (different User).
>>
>> If I create the orders_df (filtered) within the foreach function...
>>
>> Will it work?
>> Will that be too much IO to DB?
>>
>> The question ultimately is: How can I achieve this goal efficiently?
>>
>> I have not yet tried anything here. I am doing so as we speak, but am
>> suffering from choice-paralysis.
>>
>> Please and thank you.
>>
>


What is the best way to organize a join within a foreach?

2023-04-24 Thread Marco Costantini
I have two tables: {users, orders}. In this example, let's say that for
each 1 User in the users table, there are 10 Orders in the orders table.

I have to use pyspark to generate a statement of Orders for each User. So,
a single user will need his/her own list of Orders. Additionally, I need to
send this statement to the real-world user via email (for example).

My first intuition was to apply a DataFrame.foreach() on the users
DataFrame. This way, I can rely on the spark workers to handle the email
sending individually. However, I now do not know the best way to get each
User's Orders.

I will soon try the following (pseudo-code):

```
users_df = 
orders_df = 

#this is poorly named for max understandability in this context
def foreach_function(row):
  user_id = row.user_id
  user_orders_df = orders_df.select(f'user_id = {user_id}')

  #here, I'd get any User info from 'row'
  #then, I'd convert all 'user_orders' to JSON
  #then, I'd prepare the email and send it

users_df.foreach(foreach_function)
```

It is my understanding that if I do my user-specific work in the foreach
function, I will capitalize on Spark's scalability when doing that work.
However, I am worried of two things:

If I take all Orders up front...

Will that work?
Will I be taking too much? Will I be taking Orders on partitions who won't
handle them (different User).

If I create the orders_df (filtered) within the foreach function...

Will it work?
Will that be too much IO to DB?

The question ultimately is: How can I achieve this goal efficiently?

I have not yet tried anything here. I am doing so as we speak, but am
suffering from choice-paralysis.

Please and thank you.


What is the best way to organize a join within a foreach?

2023-04-24 Thread Marco Costantini
Marco Costantini 
5:55 PM (5 minutes ago)
to user
I have two tables: {users, orders}. In this example, let's say that for
each 1 User in the users table, there are 10 Orders in the orders table.

I have to use pyspark to generate a statement of Orders for each User. So,
a single user will need his/her own list of Orders. Additionally, I need to
send this statement to the real-world user via email (for example).

My first intuition was to apply a DataFrame.foreach() on the users
DataFrame. This way, I can rely on the spark workers to handle the email
sending individually. However, I now do not know the best way to get each
User's Orders.

I will soon try the following (pseudo-code):

```
users_df = 
orders_df = 

#this is poorly named for max understandability in this context
def foreach_function(row):
  user_id = row.user_id
  user_orders_df = orders_df.select(f'user_id = {user_id}')

  #here, I'd get any User info from 'row'
  #then, I'd convert all 'user_orders' to JSON
  #then, I'd prepare the email and send it

users_df.foreach(foreach_function)
```

It is my understanding that if I do my user-specific work in the foreach
function, I will capitalize on Spark's scalability when doing that work.
However, I am worried of two things:

If I take all Orders up front...

Will that work?
Will I be taking too much? Will I be taking Orders on partitions who won't
handle them (different User).

If I create the orders_df (filtered) within the foreach function...

Will it work?
Will that be too much IO to DB?

The question ultimately is: How can I achieve this goal efficiently?

I have not yet tried anything here. I am doing so as we speak, but am
suffering from choice-paralysis.

Please and thank you.


Re: RDD filter in for loop gave strange results

2021-01-20 Thread Marco Wong
Hmm, I think I got what Jingnan means. The lambda function is x != i and i
is not evaluated when the lambda function was defined. So the pipelined rdd
is rdd.filter(lambda x: x != i).filter(lambda x: x != i), rather than
having the values of i substituted. Does that make sense to you, Sean?

On Wed, 20 Jan 2021 at 15:51, Sean Owen  wrote:

> No, because the final rdd is really the result of chaining 3 filter
> operations. They should all execute. It _should_ work like
> "rdd.filter(...).filter(..).filter(...)"
>
> On Wed, Jan 20, 2021 at 9:46 AM Zhu Jingnan 
> wrote:
>
>> I thought that was right result.
>>
>> As rdd runs on a lacy basis.  so every time rdd.collect() executed, the i
>> will be updated to the latest i value, so only one will be filter out.
>>
>> Regards
>>
>> Jingnan
>>
>>
>>


Spark RDD + HBase: adoption trend

2021-01-20 Thread Marco Firrincieli
Hi, my name is Marco and I'm one of the developers behind 
https://github.com/unicredit/hbase-rdd 
a project we are currently reviewing for various reasons.

We were basically wondering if RDD "is still a thing" nowadays (we see lots of 
usage for DataFrames or Datasets) and we're not sure how much of the community 
still works/uses RDDs.

Also, for lack of time, we always mainly worked using Cloudera-flavored 
Hadoop/HBase & Spark versions. We were thinking the community would then help 
us organize the project in a more "generic" way, but that didn't happen. 

So I figured I would ask here what is the gut feeling of the Spark community so 
to better define the future of our little library. 

Thanks

-Marco

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RDD filter in for loop gave strange results

2021-01-20 Thread Marco Wong
Dear Spark users,

I ran the Python code below on a simple RDD, but it gave strange results.
The filtered RDD contains non-existent elements which were filtered away
earlier. Any idea why this happened?
```
rdd = spark.sparkContext.parallelize([0,1,2])
for i in range(3):
print("RDD is ", rdd.collect())
print("Filtered RDD is ", rdd.filter(lambda x:x!=i).collect())
rdd = rdd.filter(lambda x:x!=i)
print("Result is ", rdd.collect())
print()
```
which gave
```
RDD is  [0, 1, 2]
Filtered RDD is  [1, 2]
Result is  [1, 2]

RDD is  [1, 2]
Filtered RDD is  [0, 2]
Result is  [0, 2]

RDD is  [0, 2]
Filtered RDD is  [0, 1]
Result is  [0, 1]
```

Thanks,

Marco


Edge AI with Spark

2020-09-24 Thread Marco Sassarini
Hi,
I'd like to know if Spark supports edge AI: can Spark run on physical device 
such as mobile devices running Android/iOS?

Best regards,
Marco Sassarini


[cid:b995380c-a2a9-47fd-a865-edcad29e4206]

Marco Sassarini
Artificial Intelligence Department

office: +39 0434 562 978

www.overit.it<http://www.overit.it>


Re: Exposing JIRA issue types at GitHub PRs

2019-06-13 Thread Marco Gaido
Hi Dongjoon,
Thanks for the proposal! I like the idea. Maybe we can extend it to
component too and to some jira labels such as correctness which may be
worth to highlight in PRs too. My only concern is that in many cases JIRAs
are created not very carefully so they may be incorrect at the moment of
the pr creation and it may be updated later: so keeping them in sync may be
an extra effort..

On Thu, 13 Jun 2019, 08:09 Reynold Xin,  wrote:

> Seems like a good idea. Can we test this with a component first?
>
> On Thu, Jun 13, 2019 at 6:17 AM Dongjoon Hyun 
> wrote:
>
>> Hi, All.
>>
>> Since we use both Apache JIRA and GitHub actively for Apache Spark
>> contributions, we have lots of JIRAs and PRs consequently. One specific
>> thing I've been longing to see is `Jira Issue Type` in GitHub.
>>
>> How about exposing JIRA issue types at GitHub PRs as GitHub `Labels`?
>> There are two main benefits:
>> 1. It helps the communication between the contributors and reviewers with
>> more information.
>> (In some cases, some people only visit GitHub to see the PR and
>> commits)
>> 2. `Labels` is searchable. We don't need to visit Apache Jira to search
>> PRs to see a specific type.
>> (For example, the reviewers can see and review 'BUG' PRs first by
>> using `is:open is:pr label:BUG`.)
>>
>> Of course, this can be done automatically without human intervention.
>> Since we already have GitHub Jenkins job to access JIRA/GitHub, that job
>> can add the labels from the beginning. If needed, I can volunteer to update
>> the script.
>>
>> To show the demo, I labeled several PRs manually. You can see the result
>> right now in Apache Spark PR page.
>>
>>   - https://github.com/apache/spark/pulls
>>
>> If you're surprised due to those manual activities, I want to apologize
>> for that. I hope we can take advantage of the existing GitHub features to
>> serve Apache Spark community in a way better than yesterday.
>>
>> How do you think about this specific suggestion?
>>
>> Bests,
>> Dongjoon
>>
>> PS. I saw that `Request Review` and `Assign` features are already used
>> for some purposes, but these feature are out of the scope in this email.
>>
>


Re: testing frameworks

2019-02-04 Thread Marco Mistroni
Thanks Hichame will follow up on that

Anyonen on this list using python version of spark-testing-base? seems
theres support for DataFrame

thanks in advance and regards
 Marco

On Sun, Feb 3, 2019 at 9:58 PM Hichame El Khalfi 
wrote:

> Hi,
> You can use pysparkling => https://github.com/svenkreiss/pysparkling
> This lib is useful in case you have RDD.
>
> Hope this helps,
>
> Hichame
>
> *From:* mmistr...@gmail.com
> *Sent:* February 3, 2019 4:42 PM
> *To:* radams...@gmail.com
> *Cc:* la...@mapflat.com; bpru...@opentext.com; user@spark.apache.org
> *Subject:* Re: testing frameworks
>
> Hi
>  sorry to resurrect this thread
> Any spark libraries for testing code in pyspark?  the github code above
> seems related to Scala
> following links in the original threads (and also LMGFY) i found out
> pytest-spark · PyPI <https://pypi.org/project/pytest-spark/>
>
> w/kindest regards
>  Marco
>
>
>
>
> On Tue, Jun 12, 2018 at 6:44 PM Ryan Adams  wrote:
>
>> We use spark testing base for unit testing.  These tests execute on a
>> very small amount of data that covers all paths the code can take (or most
>> paths anyway).
>>
>> https://github.com/holdenk/spark-testing-base
>>
>> For integration testing we use automated routines to ensure that
>> aggregate values match an aggregate baseline.
>>
>> Ryan
>>
>> Ryan Adams
>> radams...@gmail.com
>>
>> On Tue, Jun 12, 2018 at 11:51 AM, Lars Albertsson 
>> wrote:
>>
>>> Hi,
>>>
>>> I wrote this answer to the same question a couple of years ago:
>>> https://www.mail-archive.com/user%40spark.apache.org/msg48032.html
>>>
>>> I have made a couple of presentations on the subject. Slides and video
>>> are linked on this page: http://www.mapflat.com/presentations/
>>>
>>> You can find more material in this list of resources:
>>> http://www.mapflat.com/lands/resources/reading-list
>>>
>>> Happy testing!
>>>
>>> Regards,
>>>
>>>
>>>
>>> Lars Albertsson
>>> Data engineering consultant
>>> www.mapflat.com
>>> https://twitter.com/lalleal
>>> +46 70 7687109
>>> Calendar: http://www.mapflat.com/calendar
>>>
>>>
>>> On Mon, May 21, 2018 at 2:24 PM, Steve Pruitt 
>>> wrote:
>>> > Hi,
>>> >
>>> >
>>> >
>>> > Can anyone recommend testing frameworks suitable for Spark jobs.
>>> Something
>>> > that can be integrated into a CI tool would be great.
>>> >
>>> >
>>> >
>>> > Thanks.
>>> >
>>> >
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>


Re: testing frameworks

2019-02-03 Thread Marco Mistroni
Hi
 sorry to resurrect this thread
Any spark libraries for testing code in pyspark?  the github code above
seems related to Scala
following links in the original threads (and also LMGFY) i found out
pytest-spark · PyPI <https://pypi.org/project/pytest-spark/>

w/kindest regards
 Marco




On Tue, Jun 12, 2018 at 6:44 PM Ryan Adams  wrote:

> We use spark testing base for unit testing.  These tests execute on a very
> small amount of data that covers all paths the code can take (or most paths
> anyway).
>
> https://github.com/holdenk/spark-testing-base
>
> For integration testing we use automated routines to ensure that aggregate
> values match an aggregate baseline.
>
> Ryan
>
> Ryan Adams
> radams...@gmail.com
>
> On Tue, Jun 12, 2018 at 11:51 AM, Lars Albertsson 
> wrote:
>
>> Hi,
>>
>> I wrote this answer to the same question a couple of years ago:
>> https://www.mail-archive.com/user%40spark.apache.org/msg48032.html
>>
>> I have made a couple of presentations on the subject. Slides and video
>> are linked on this page: http://www.mapflat.com/presentations/
>>
>> You can find more material in this list of resources:
>> http://www.mapflat.com/lands/resources/reading-list
>>
>> Happy testing!
>>
>> Regards,
>>
>>
>>
>> Lars Albertsson
>> Data engineering consultant
>> www.mapflat.com
>> https://twitter.com/lalleal
>> +46 70 7687109
>> Calendar: http://www.mapflat.com/calendar
>>
>>
>> On Mon, May 21, 2018 at 2:24 PM, Steve Pruitt 
>> wrote:
>> > Hi,
>> >
>> >
>> >
>> > Can anyone recommend testing frameworks suitable for Spark jobs.
>> Something
>> > that can be integrated into a CI tool would be great.
>> >
>> >
>> >
>> > Thanks.
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: How to debug Spark job

2018-09-08 Thread Marco Mistroni
Hi
 Might sound like a dumb advice. But try to break apart your process.
Sounds like you
Are doing ETL
start basic with just ET. and do the changes that results in issues
If no problem add the load step
Enable spark logging so that you can post error message to the list
I think you can have a look at spark console to see if your process has
memory issues
Another thing you can do is to run with subset of data and increase the
load until you find the it blows
Sorry hth

On Sep 7, 2018 10:48 AM, "James Starks" 
wrote:


I have a Spark job that reads from a postgresql (v9.5) table, and write
result to parquet. The code flow is not complicated, basically

case class MyCaseClass(field1: String, field2: String)
val df = spark.read.format("jdbc")...load()
df.createOrReplaceTempView(...)
val newdf = spark.sql("seslect field1, field2 from
mytable").as[MyCaseClass].map { row =>
  val fieldX = ... // extract something from field2
  (field1, fileldX)
}.filter { ... /* filter out field 3 that's not valid */ }
newdf.write.mode(...).parquet(destPath)

This job worked correct without a problem. But it's doesn't look working ok
(the job looks like hanged) when adding more fields. The refactored job
looks as below
...
val newdf = spark.sql("seslect field1, field2, ... fieldN from
mytable").as[MyCaseClassWithMoreFields].map { row =>
...
NewCaseClassWithMoreFields(...) // all fields plus fieldX
}.filter { ... }
newdf.write.mode(...).parquet(destPath)

Basically what the job does is extracting some info from one of a field in
db table, appends that newly extracted field to the original row, and then
dumps the whole new table to parquet.

new filed + (original field1 + ... + original fieldN)
...
...

Records loaded by spark sql to spark job (before refactored) are around
8MM, this remains the same, but when the refactored spark runs, it looks
hanging there without progress. The only output on the console is (there is
no crash, no exceptions thrown)

WARN  HeartbeatReceiver:66 - Removing executor driver with no recent
heartbeats: 137128 ms exceeds timeout 12 ms

Memory in top command looks like

VIRT RES SHR%CPU %MEM
15.866g 8.001g  41.4m 740.3   25.6

The command used to  submit spark job is

spark-submit --class ... --master local[*] --driver-memory 10g
--executor-memory 10g ... --files ... --driver-class-path ... 
...

How can I debug or check which part of my code might cause the problem (so
I can improve it)?

Thanks


Reading multiple files in Spark / which pattern to use

2018-07-12 Thread Marco Mistroni
hi all
 i have mutliple files stored in S3 in the following pattern

-MM-DD-securities.txt

I want to read multiple files at the same time..
I am attempting to use this pattern, for example

2016-01*securities.txt,2016-02*securities.txt,2016-03*securities.txt

But it does not seem to work
Could anyone help out?

kind regards
 marco


Re: spark-shell gets stuck in ACCEPTED state forever when ran in YARN client mode.

2018-07-08 Thread Marco Mistroni
You running on emr? You checked the emr logs?
Was in similar situation where job was stuck in accepted and then it
died..turned out to be an issue w. My code when running g with huge
data.perhaps try to reduce gradually the load til it works and then start
from there?
Not a huge help but I followed same when. My job was stuck on accepted
Hth

On Sun, Jul 8, 2018, 2:59 PM kant kodali  wrote:

> Hi All,
>
> I am trying to run a simple word count using YARN as a cluster manager.  I
> am currently using Spark 2.3.1 and Apache hadoop 2.7.3.  When I spawn
> spark-shell like below it gets stuck in ACCEPTED stated forever.
>
> ./bin/spark-shell --master yarn --deploy-mode client
>
>
> I set my log4j.properties in SPARK_HOME/conf to TRACE
>
>  queue: "default" name: "Spark shell" host: "N/A" rpc_port: -1
> yarn_application_state: ACCEPTED trackingUrl: "
> http://Kants-MacBook-Pro-2.local:8088/proxy/application_1531056583425_0001/;
> diagnostics: "" startTime: 1531056632496 finishTime: 0
> final_application_status: APP_UNDEFINED app_resource_Usage {
> num_used_containers: 0 num_reserved_containers: 0 used_resources { memory:
> 0 virtual_cores: 0 } reserved_resources { memory: 0 virtual_cores: 0 }
> needed_resources { memory: 0 virtual_cores: 0 } memory_seconds: 0
> vcore_seconds: 0 } originalTrackingUrl: "N/A" currentApplicationAttemptId {
> application_id { id: 1 cluster_timestamp: 1531056583425 } attemptId: 1 }
> progress: 0.0 applicationType: "SPARK" }}
>
> 18/07/08 06:32:22 INFO Client: Application report for
> application_1531056583425_0001 (state: ACCEPTED)
>
> 18/07/08 06:32:22 DEBUG Client:
>
> client token: N/A
>
> diagnostics: N/A
>
> ApplicationMaster host: N/A
>
> ApplicationMaster RPC port: -1
>
> queue: default
>
> start time: 1531056632496
>
> final status: UNDEFINED
>
> tracking URL:
> http://xxx-MacBook-Pro-2.local:8088/proxy/application_1531056583425_0001/
>
> user: xxx
>
>
>
> 18/07/08 06:32:20 DEBUG Client:
>
> client token: N/A
>
> diagnostics: N/A
>
> ApplicationMaster host: N/A
>
> ApplicationMaster RPC port: -1
>
> queue: default
>
> start time: 1531056632496
>
> final status: UNDEFINED
>
> tracking URL:
> http://Kants-MacBook-Pro-2.local:8088/proxy/application_1531056583425_0001/
>
> user: kantkodali
>
>
> 18/07/08 06:32:21 TRACE ProtobufRpcEngine: 1: Call -> /0.0.0.0:8032:
> getApplicationReport {application_id { id: 1 cluster_timestamp:
> 1531056583425 }}
>
> 18/07/08 06:32:21 DEBUG Client: IPC Client (1608805714) connection to /
> 0.0.0.0:8032 from kantkodali sending #136
>
> 18/07/08 06:32:21 DEBUG Client: IPC Client (1608805714) connection to /
> 0.0.0.0:8032 from kantkodali got value #136
>
> 18/07/08 06:32:21 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
>
> 18/07/08 06:32:21 TRACE ProtobufRpcEngine: 1: Response <- /0.0.0.0:8032:
> getApplicationReport {application_report { applicationId { id: 1
> cluster_timestamp: 1531056583425 } user: "xxx" queue: "default" name:
> "Spark shell" host: "N/A" rpc_port: -1 yarn_application_state: ACCEPTED
> trackingUrl: "
> http://xxx-MacBook-Pro-2.local:8088/proxy/application_1531056583425_0001/;
> diagnostics: "" startTime: 1531056632496 finishTime: 0
> final_application_status: APP_UNDEFINED app_resource_Usage {
> num_used_containers: 0 num_reserved_containers: 0 used_resources { memory:
> 0 virtual_cores: 0 } reserved_resources { memory: 0 virtual_cores: 0 }
> needed_resources { memory: 0 virtual_cores: 0 } memory_seconds: 0
> vcore_seconds: 0 } originalTrackingUrl: "N/A" currentApplicationAttemptId {
> application_id { id: 1 cluster_timestamp: 1531056583425 } attemptId: 1 }
> progress: 0.0 applicationType: "SPARK" }}
>
> 18/07/08 06:32:21 INFO Client: Application report for
> application_1531056583425_0001 (state: ACCEPTED)
>
>
> I have read this link
> 
>  and
> here are the conf files that are different from default settings
>
>
> *yarn-site.xml*
>
>
> 
>
>
> 
>
> yarn.nodemanager.aux-services
>
> mapreduce_shuffle
>
> 
>
>
> 
>
> yarn.nodemanager.resource.memory-mb
>
> 16384
>
> 
>
>
> 
>
>yarn.scheduler.minimum-allocation-mb
>
>256
>
> 
>
>
> 
>
>yarn.scheduler.maximum-allocation-mb
>
>8192
>
> 
>
>
>
>
>yarn.nodemanager.resource.cpu-vcores
>
>8
>
>
>
>
> 
>
> *core-site.xml*
>
>
> 
>
> 
>
> fs.defaultFS
>
> hdfs://localhost:9000
>
> 
>
> 
>
> *hdfs-site.xml*
>
>
> 
>
> 
>
> dfs.replication
>
> 1
>
> 
>
> 
>
>
> you can imagine every other config remains untouched(so everything else
> has default settings) Finally, I have also tried to see if there any clues
> in resource manager logs but they dont seem to be helpful in terms of
> fixing the issue however I am newbie to yarn so please let me know if I
> missed out on something.
>
>
>
> 2018-07-08 06:54:57,345 INFO
> 

Re: Error submitting Spark Job in yarn-cluster mode on EMR

2018-05-08 Thread Marco Mistroni
Did you by any chances left a   sparkSession.setMaster("local") lurking in
your code?

Last time i checked, to run on yarn you have to package a 'fat jar'. could
you make sure the spark depedencies in your jar matches the version you are
running on Yarn?

alternatively please share code including how you submit  your application
to spark
FYI this is the command i am using to submit  a program to spark

spark-submit --master yarn --deploy-mode cluster --class 
 

hth

On Tue, May 8, 2018 at 10:14 AM, SparkUser6 
wrote:

> I have a simple program that works fine in the local mode.  But I am having
> issues when I try to run the program in yarn-cluster mode.  I know usually
> no such method happens when compile and run version mismatch but I made
> sure
> I took the same version.
>
> 205  [main] INFO  org.spark_project.jetty.server.ServerConnector  -
> Started
> Spark@29539e36{HTTP/1.1}{0.0.0.0:4040}
> 205  [main] INFO  org.spark_project.jetty.server.Server  - Started @3265ms
> Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.spark.internal.config.package$.APP_CALLER_
> CONTEXT()Lorg/apache/spark/internal/config/OptionalConfigEntry;
> at org.apache.spark.deploy.yarn.Client.submitApplication(
> Client.scala:163)
> at
> org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(
> YarnClientSchedulerBackend.scala:56)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.start(
> TaskSchedulerImpl.scala:156)
> at org.apache.spark.SparkContext.(SparkContext.scala:509)
> at
> org.apache.spark.api.java.JavaSparkContext.(
> JavaSparkContext.scala:58)
> at
> com.voicebase.etl.PhoenixToElasticSearch.main(PhoenixToElasticSearch.java:
> 54)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
> 62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
> SparkSubmit.scala:187)
> at org.apache.spark.deploy.SparkSubmit$.submit(
> SparkSubmit.scala:212)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.
> scala:126)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: RV: Unintelligible warning arose out of the blue.

2018-05-04 Thread Marco Mistroni
Hi
  i think it has to do with spark configuration,  dont think the standard
configuration is geared up to be running in local mode on windows
your dataframe is ok, you can check out that you have read it successfully
by printing out df.count() and you will see your code is reading the
dataframe successfully

hth
m

On Fri, May 4, 2018 at 9:15 PM, Tomas Zubiri  wrote:

>
>
>
> --
> *De:* Tomas Zubiri
> *Enviado:* viernes, 04 de mayo de 2018 04:23 p.m.
> *Para:* user@spark.apache.org
> *Asunto:* Unintelligible warning arose out of the blue.
>
>
> My setup is as follows:
> Windows 10
> Python 3.6.5
> Spark 2.3.0
> The latest java jdk
> winutils/hadoop installed from this github page https://github.com/
> steveloughran/winutils
>
> I initialize spark from the pyspark shell as follows:
> df = spark.read.csv('mvce.csv')
>
>
> the mvce is a small file with 3 lines and 2 columns.
>
>
> The warning I receive is:
>
> 2018-05-04 16:17:44 WARN  ObjectStore:568 - Failed to get database
> global_temp, returning NoSuchObjectException
>
> What does this mean? I think it has something to do with YARN, but being
> an internal technology I have no clue about it. It doesn't seem to be
> causing any trouble, but I don't want to add the uncertainty that this
> might be causing an bug in future diagnosing of issues.
>
> Thank you for your help!
>
>
>


Re: Problem in persisting file in S3 using Spark: xxx file does not exist Exception

2018-05-02 Thread Marco Mistroni
Hi
 Sorted ..I just replaced s3 with s3aI think I recall similar issues in
the past with aws libraries.
Thx anyway for getting back
Kr

On Wed, May 2, 2018, 4:57 PM Paul Tremblay <paulhtremb...@gmail.com> wrote:

> I would like to see the full error. However, S3 can give misleading
> messages if you don't have the correct permissions.
>
> On Tue, Apr 24, 2018, 2:28 PM Marco Mistroni <mmistr...@gmail.com> wrote:
>
>> HI all
>>  i am using the following code for persisting data into S3 (aws keys are
>> already stored in the environment variables)
>>
>> dataFrame.coalesce(1).write.format("com.databricks.spark.csv").save(fileName)
>>
>>
>> However, i keep on receiving an exception that the file does not exist
>>
>> here's what comes from logs
>>
>> 18/04/24 22:15:32 INFO Persiste: Persisting data to text file:
>> s3://ec2-bucket-mm-spark/form4-results-2404.results
>> Exception in thread "main" java.io.IOException:
>> /form4-results-2404.results doesn't exist
>>
>> It seems that Spark expects the file to be there before writing? which
>> seems bizzarre?
>>
>> I Have even tried to remove the coalesce ,but still got the same exception
>> Could anyone help pls?
>> kind regarsd
>>  marco
>>
>


Re: A naive ML question

2018-04-29 Thread Marco Mistroni
Maybe not necessarily what you want but you could, based on trans
attributes, find out initial state and end state and give it to a decision
tree to figure out if you if based on these attributes you can oreditc
tinal stage
Again, not what you asked but an idea to use ml for your data?
Kr

On Sun, Apr 29, 2018, 10:22 AM Nick Pentreath 
wrote:

> One potential approach could be to construct a transition matrix showing
> the probability of moving from each state to another state. This can be
> visualized with a “heat map” encoding (I think matshow in numpy/matplotlib
> does this).
>
> On Sat, 28 Apr 2018 at 21:34, kant kodali  wrote:
>
>> Hi,
>>
>> I mean a transaction goes typically goes through different states like
>> STARTED, PENDING, CANCELLED, COMPLETED, SETTLED etc...
>>
>> Thanks,
>> kant
>>
>> On Sat, Apr 28, 2018 at 4:11 AM, Jörn Franke 
>> wrote:
>>
>>> What do you mean by “how it evolved over time” ? A transaction describes
>>> basically an action at a certain point of time. Do you mean how a financial
>>> product evolved over time given a set of a transactions?
>>>
>>> > On 28. Apr 2018, at 12:46, kant kodali  wrote:
>>> >
>>> > Hi All,
>>> >
>>> > I have a bunch of financial transactional data and I was wondering if
>>> there is any ML model that can give me a graph structure for this data?
>>> other words, show how a transaction had evolved over time?
>>> >
>>> > Any suggestions or references would help.
>>> >
>>> > Thanks!
>>> >
>>>
>>
>>


Re: Dataframe vs dataset

2018-04-28 Thread Marco Mistroni
Imho .neither..I see datasets as typed df and therefore ds are enhanced df
Feel free to disagree..
Kr

On Sat, Apr 28, 2018, 2:24 PM Michael Artz  wrote:

> Hi,
>
> I use Spark everyday and I have a good grip on the basics of Spark, so
> this question isnt for myself.  But this came up and I wanted to see what
> other Spark users would say, and I dont want to influence your answer.  And
> SO is weird about polls. The question is
>
>  "Which one do you feel is accurate... Dataset is a subset of DataFrame,
> or DataFrame a subset of Dataset?"
>


Problem in persisting file in S3 using Spark: xxx file does not exist Exception

2018-04-24 Thread Marco Mistroni
HI all
 i am using the following code for persisting data into S3 (aws keys are
already stored in the environment variables)

dataFrame.coalesce(1).write.format("com.databricks.spark.csv").save(fileName)


However, i keep on receiving an exception that the file does not exist

here's what comes from logs

18/04/24 22:15:32 INFO Persiste: Persisting data to text file:
s3://ec2-bucket-mm-spark/form4-results-2404.results
Exception in thread "main" java.io.IOException: /form4-results-2404.results
doesn't exist

It seems that Spark expects the file to be there before writing? which
seems bizzarre?

I Have even tried to remove the coalesce ,but still got the same exception
Could anyone help pls?
kind regarsd
 marco


Re: Live Stream Code Reviews :)

2018-04-12 Thread Marco Mistroni
PST  I believelike last time
Works out 9pm bst & 10 pm cet if I m correct

On Thu, Apr 12, 2018, 8:47 PM Matteo Olivi  wrote:

> Hi,
> 11 am in which timezone?
>
> Il gio 12 apr 2018, 21:23 Holden Karau  ha scritto:
>
>> Hi Y'all,
>>
>> If your interested in learning more about how the development process in
>> Apache Spark works I've been doing a weekly live streamed code review most
>> Fridays at 11am. This weeks will be on twitch/youtube (
>> https://www.twitch.tv/holdenkarau /
>> https://www.youtube.com/watch?v=vGVSa9KnD80 ). If you have a PR into
>> Spark (or a related project) you'd like me to review live let me know and
>> I'll add it to my queue.
>>
>> Cheers,
>>
>> Holden :)
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>


Re: Best active groups, forums or contacts for Spark ?

2018-01-26 Thread Marco Mistroni
Hi
 From personal experienceand I might be asking u obvious question
1. Does it work in standalone (no cluster)
2. Can u break down app in pieces and try to see at which step the code
gets killed?
3. Have u had a look at spark gui to see if we executors go oom?

I might be oversimplifying what spark does. But if ur logic works
standalone and does not work in clusterthe cluster might b ur
problem..(apart from modules not being serializable)
If it breaks in no cluster mode then it's easier to debug
I am no way an expert, just talking from my little personal experience.
I m sure someone here can give more hints on how to debug properly a spark
app
Hth


On Jan 26, 2018 1:18 PM, "Chandu"  wrote:

> @Esa Thanks for posting this as I was thinking the same way when trying to
> get some help about Spark (I am just a beginner)
>
> @Jack
> I posted a question @ here (
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-
> Standalone-Mode-application-runs-but-executor-is-killed-tc30739.html
>  Spark-Standalone-Mode-application-runs-but-executor-is-killed-tc30739.html
> >
> ) and stackoverflow (
> https://stackoverflow.com/questions/48445145/spark-
> standalone-mode-application-runs-but-executor-is-killed-with-exitstatus
>  standalone-mode-application-runs-but-executor-is-killed-with-exitstatus>
> ) and haven't received much of views or even a comment.
>
> I am new to Spark and may be my question is framed badly.
> Would you be able to take a look?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: good materiala to learn apache spark

2018-01-18 Thread Marco Mistroni
Jacek lawskowski on this mail list wrote a book which is available
online.
Hth

On Jan 18, 2018 6:16 AM, "Manuel Sopena Ballesteros" <
manuel...@garvan.org.au> wrote:

> Dear Spark community,
>
>
>
> I would like to learn more about apache spark. I have a Horton works HDP
> platform and have ran a few spark jobs in a cluster but now I need to know
> more in depth how spark works.
>
>
>
> My main interest is sys admin and operational point of Spark and it’s
> ecosystem.
>
>
>
> Is there any material?
>
>
>
> Thank you very much
>
>
>
> *Manuel Sopena Ballesteros *| Big data Engineer
> *Garvan Institute of Medical Research *
> The Kinghorn Cancer Centre, 370 Victoria Street, Darlinghurst, NSW 2010
> 
> *T:* + 61 (0)2 9355 5760 <+61%202%209355%205760> | *F:* +61 (0)2 9295 8507
> <+61%202%209295%208507> | *E:* manuel...@garvan.org.au
>
>
> NOTICE
> Please consider the environment before printing this email. This message
> and any attachments are intended for the addressee named and may contain
> legally privileged/confidential/copyright information. If you are not the
> intended recipient, you should not read, use, disclose, copy or distribute
> this communication. If you have received this message in error please
> notify us at once by return email and then delete both messages. We accept
> no liability for the distribution of viruses or similar in electronic
> communications. This notice should not be removed.
>


Re: Please Help with DecisionTree/FeatureIndexer

2017-12-16 Thread Marco Mistroni
Hello Wei
 Thanks, i should have c hecked the data
My data has this format
|col1|col2|col3|label|

so it looks like i cannot use VectorIndexer directly (it accepts a Vector
column).
I am guessing what i should do is something like this (given i have few
categorical features)

val assembler = new VectorAssembler().
  setInputCols(inputData.columns.filter(_ != "Label")).
  setOutputCol("features")

val transformedData = assembler.transform(inputData)


val featureIndexer =
  new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(5) // features with > 4 distinct values are treated
as continuous.
  .fit(transformedData)

?
Apologies for the basic question btu last time i worked on an ML project i
was using Spark 1.x

kr
 marco









On Dec 16, 2017 1:24 PM, "Weichen Xu" <weichen...@databricks.com> wrote:

> Hi, Marco,
>
> val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_
> data.txt")
>
> The data now include a feature column with name "features",
>
> val featureIndexer = new VectorIndexer()
>   .setInputCol("features")   <-- Here specify the "features" column to 
> index.
>   .setOutputCol("indexedFeatures")
>
>
> Thanks.
>
>
> On Sat, Dec 16, 2017 at 6:26 AM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> HI all
>>  i am trying to run a sample decision tree, following examples here (for
>> Mllib)
>>
>> https://spark.apache.org/docs/latest/ml-classification-regre
>> ssion.html#decision-tree-classifier
>>
>> the example seems to use  a Vectorindexer, however i am missing something.
>> How does the featureIndexer knows which columns are features?
>> Isnt' there something missing?  or the featuresIndexer is able to figure
>> out by itself
>> which columns of teh DAtaFrame are features?
>>
>> val labelIndexer = new StringIndexer()
>>   .setInputCol("label")
>>   .setOutputCol("indexedLabel")
>>   .fit(data)// Automatically identify categorical features, and index 
>> them.val featureIndexer = new VectorIndexer()
>>   .setInputCol("features")
>>   .setOutputCol("indexedFeatures")
>>   .setMaxCategories(4) // features with > 4 distinct values are treated as 
>> continuous.
>>   .fit(data)
>>
>> Using this code i am getting back this exception
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Field 
>> "features" does not exist.
>> at 
>> org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
>> at 
>> org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
>> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
>> at scala.collection.AbstractMap.getOrElse(Map.scala:59)
>> at org.apache.spark.sql.types.StructType.apply(StructType.scala:265)
>> at 
>> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:40)
>> at 
>> org.apache.spark.ml.feature.VectorIndexer.transformSchema(VectorIndexer.scala:141)
>> at 
>> org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
>> at 
>> org.apache.spark.ml.feature.VectorIndexer.fit(VectorIndexer.scala:118)
>>
>> what am i missing?
>>
>> w/kindest regarsd
>>
>>  marco
>>
>>
>


Please Help with DecisionTree/FeatureIndexer

2017-12-15 Thread Marco Mistroni
HI all
 i am trying to run a sample decision tree, following examples here (for
Mllib)

https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier

the example seems to use  a Vectorindexer, however i am missing something.
How does the featureIndexer knows which columns are features?
Isnt' there something missing?  or the featuresIndexer is able to figure
out by itself
which columns of teh DAtaFrame are features?

val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)// Automatically identify categorical features, and index
them.val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4) // features with > 4 distinct values are
treated as continuous.
  .fit(data)

Using this code i am getting back this exception

Exception in thread "main" java.lang.IllegalArgumentException: Field
"features" does not exist.
at 
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
at 
org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.types.StructType.apply(StructType.scala:265)
at 
org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:40)
at 
org.apache.spark.ml.feature.VectorIndexer.transformSchema(VectorIndexer.scala:141)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
at 
org.apache.spark.ml.feature.VectorIndexer.fit(VectorIndexer.scala:118)

what am i missing?

w/kindest regarsd

 marco


How to control logging in testing package com.holdenkarau.spark.testing.

2017-12-13 Thread Marco Mistroni
HI all
 could anyone advise on how to control logging  in
com,holdenkarau.spark.testing?

there are loads of spark logging statement every time i run a test
I tried to disable spark logging using statements below, but with no success

   import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.log4j.{ Level, Logger }
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)

thanks and kr
 marco


Re: pyspark configuration with Juyter

2017-11-04 Thread Marco Mistroni
Hi probably not what u r looking for but if u get stuck with conda jupyther
and spark, if u get an account @ community.cloudera you will enjoy jupyther
and spark out of the box
Gd luck and hth
Kr

On Nov 4, 2017 4:59 PM, "makoto"  wrote:

> I setup environment variables in my ~/.bashrc as follows:
>
> export PYSPARK_PYTHON=/usr/local/oss/anaconda3/bin/python3.6
> export PYTHONPATH=$(ls -a ${SPARK_HOME}/python/lib/py4j-
> *-src.zip):${SPARK_HOME}/python:$PYTHONPATH
> export PYSPARK_DRIVER_PYTHON=jupyter
> export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
>
>
> 2017-11-03 20:56 GMT+09:00 Jeff Zhang :
>
>>
>> You are setting PYSPARK_DRIVER to jupyter, please set it to python exec
>> file
>>
>>
>> anudeep 于2017年11月3日周五 下午7:31写道:
>>
>>> Hello experts,
>>>
>>> I install jupyter notebook thorugh anacoda, set the pyspark driver to
>>> use jupyter notebook.
>>>
>>> I see the below issue when i try to open pyspark.
>>>
>>> anudeepg@datanode2 spark-2.1.0]$ ./bin/pyspark
>>> [I 07:29:53.184 NotebookApp] The port  is already in use, trying
>>> another port.
>>> [I 07:29:53.211 NotebookApp] JupyterLab alpha preview extension loaded
>>> from /home/anudeepg/anaconda2/lib/python2.7/site-packages/jupyterlab
>>> JupyterLab v0.27.0
>>> Known labextensions:
>>> [I 07:29:53.212 NotebookApp] Running the core application with no
>>> additional extensions or settings
>>> [I 07:29:53.214 NotebookApp] Serving notebooks from local directory:
>>> /opt/mapr/spark/spark-2.1.0
>>> [I 07:29:53.214 NotebookApp] 0 active kernels
>>> [I 07:29:53.214 NotebookApp] The Jupyter Notebook is running at:
>>> http://localhost:8889/?token=9aa5dc87cb5a6d987237f68e2f0b7e9
>>> c70a7f2e8c9a7cf2e
>>> [I 07:29:53.214 NotebookApp] Use Control-C to stop this server and shut
>>> down all kernels (twice to skip confirmation).
>>> [W 07:29:53.214 NotebookApp] No web browser found: could not locate
>>> runnable browser.
>>> [C 07:29:53.214 NotebookApp]
>>>
>>> Copy/paste this URL into your browser when you connect for the first
>>> time,
>>> to login with a token:
>>> http://localhost:8889/?token=9aa5dc87cb5a6d987237f68e2f0b7e9
>>> c70a7f2e8c9a7cf2e
>>>
>>>
>>> Can someone please help me here.
>>>
>>> Thanks!
>>> Anudeep
>>>
>>>
>


Re: PySpark 2.1 Not instantiating properly

2017-10-20 Thread Marco Mistroni
Hello
  i believe i followed instructions here to get Spark to work on Windows.
The article refers to Win7, but it will work for win10 as well

http://nishutayaltech.blogspot.co.uk/2015/04/how-to-run-apache-spark-on-windows7-in.html

Jagat posted a similar link on winutils...i believe it would probably
say the same as it says here
1- download winutils and place it somehwere inyour file system
2- in your environment settings, ste HADOOP_HOME=


This should get you sorted.
Btw, i got the impression , from what i have seen , that Spark and Windows
aren't best friends. you'd better get a Docker container and run spark
off that container...

hth
 marco







On Fri, Oct 20, 2017 at 5:57 PM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Hey Marco/Jagat,
>
> As I earlier informed you, that I've already done those basic checks and
> permission changes.
>
> eg: D:\winutils\bin\winutils.exe chmod 777 D:\tmp\hive, but to no avail.
> It still throws the same error. At the very first place, I do not
> understand, without any manual change, how did the permissions change
> automatically?
>
> To Jagat's question - "Do you have winutils in your system relevant for
> your system." - How to understand that? I did not find winutils specific to
> OS/bits.
>
> Any other solutions? Should I download the fresh zip of Spark and redo all
> the steps of configuring? The chmod is just not working (without any errors
> while submitting the above command).
>
>
> Thanks,
> Aakash.
>
> On Fri, Oct 20, 2017 at 9:53 PM, Jagat Singh <jagatsi...@gmail.com> wrote:
>
>> Do you have winutils in your system relevant for your system.
>>
>> This SO post has infomation related https://stackoverflow.
>> com/questions/34196302/the-root-scratch-dir-tmp-hive-on-hdfs
>> -should-be-writable-current-permissions
>>
>>
>>
>> On 21 October 2017 at 03:16, Marco Mistroni <mmistr...@gmail.com> wrote:
>>
>>> Did u build spark or download the zip?
>>> I remember having similar issue...either you have to give write perm to
>>> your /tmp directory or there's a spark config you need to override
>>> This error is not 2.1 specific...let me get home and check my configs
>>> I think I amended my /tmp permissions via xterm instead of control panel
>>>
>>> Hth
>>>  Marco
>>>
>>>
>>> On Oct 20, 2017 8:31 AM, "Aakash Basu" <aakash.spark@gmail.com>
>>> wrote:
>>>
>>> Hi all,
>>>
>>> I have Spark 2.1 installed in my laptop where I used to run all my
>>> programs. PySpark wasn't used for around 1 month, and after starting it
>>> now, I'm getting this exception (I've tried the solutions I could find on
>>> Google, but to no avail).
>>>
>>> Specs: Spark 2.1.1, Python 3.6, HADOOP 2.7, Windows 10 Pro, 64 Bits.
>>>
>>>
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o27.sessionState.
>>> : java.lang.IllegalArgumentException: Error while instantiating
>>> 'org.apache.spark.sql.hive.HiveSessionState':
>>> at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$Spar
>>> kSession$$reflect(SparkSession.scala:981)
>>> at org.apache.spark.sql.SparkSession.sessionState$lzycompute(Sp
>>> arkSession.scala:110)
>>> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.
>>> scala:109)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.jav
>>> a:357)
>>> at py4j.Gateway.invoke(Gateway.java:280)
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.j
>>> ava:132)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:214)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native
>>&

Re: Write to HDFS

2017-10-20 Thread Marco Mistroni
Use  counts.repartition(1).save..
Hth

On Oct 20, 2017 3:01 PM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote:

Actually, when I run following code,

  val textFile = sc.textFile("Sample.txt")
  val counts = textFile.flatMap(line => line.split(" "))
 .map(word => (word, 1))
 .reduceByKey(_ + _)


It save the results into more than one partition like part-0,
part-1. I want to collect all of them into one file.


2017-10-20 16:43 GMT+03:00 Marco Mistroni <mmistr...@gmail.com>:

> Hi
>  Could you just create an rdd/df out of what you want to save and store it
> in hdfs?
> Hth
>
> On Oct 20, 2017 9:44 AM, "Uğur Sopaoğlu" <usopao...@gmail.com> wrote:
>
>> Hi all,
>>
>> In word count example,
>>
>>   val textFile = sc.textFile("Sample.txt")
>>   val counts = textFile.flatMap(line => line.split(" "))
>>  .map(word => (word, 1))
>>  .reduceByKey(_ + _)
>>  counts.saveAsTextFile("hdfs://master:8020/user/abc")
>>
>> I want to write collection of "*counts" *which is used in code above to
>> HDFS, so
>>
>> val x = counts.collect()
>>
>> Actually I want to write *x *to HDFS. But spark wants to RDD to write
>> sometihng to HDFS
>>
>> How can I write Array[(String,Int)] to HDFS
>>
>>
>> --
>> Uğur
>>
>


-- 
Uğur Sopaoğlu


Re: PySpark 2.1 Not instantiating properly

2017-10-20 Thread Marco Mistroni
Did u build spark or download the zip?
I remember having similar issue...either you have to give write perm to
your /tmp directory or there's a spark config you need to override
This error is not 2.1 specific...let me get home and check my configs
I think I amended my /tmp permissions via xterm instead of control panel

Hth
 Marco


On Oct 20, 2017 8:31 AM, "Aakash Basu" <aakash.spark@gmail.com> wrote:

Hi all,

I have Spark 2.1 installed in my laptop where I used to run all my
programs. PySpark wasn't used for around 1 month, and after starting it
now, I'm getting this exception (I've tried the solutions I could find on
Google, but to no avail).

Specs: Spark 2.1.1, Python 3.6, HADOOP 2.7, Windows 10 Pro, 64 Bits.


py4j.protocol.Py4JJavaError: An error occurred while calling
o27.sessionState.
: java.lang.IllegalArgumentException: Error while instantiating
'org.apache.spark.sql.hive.HiveSessionState':
at org.apache.spark.sql.SparkSession$.org$apache$
spark$sql$SparkSession$$reflect(SparkSession.scala:981)
at org.apache.spark.sql.SparkSession.sessionState$
lzycompute(SparkSession.scala:110)
at org.apache.spark.sql.SparkSession.sessionState(
SparkSession.scala:109)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(
ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.
java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.SparkSession$.org$apache$
spark$sql$SparkSession$$reflect(SparkSession.scala:978)
... 13 more
Caused by: java.lang.IllegalArgumentException: Error while instantiating
'org.apache.spark.sql.hive.HiveExternalCatalog':
at org.apache.spark.sql.internal.SharedState$.org$apache$spark$
sql$internal$SharedState$$reflect(SharedState.scala:169)
at org.apache.spark.sql.internal.SharedState.(
SharedState.scala:86)
at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(
SparkSession.scala:101)
at org.apache.spark.sql.SparkSession$$anonfun$sharedState$1.apply(
SparkSession.scala:101)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession.sharedState$
lzycompute(SparkSession.scala:101)
at org.apache.spark.sql.SparkSession.sharedState(
SparkSession.scala:100)
at org.apache.spark.sql.internal.SessionState.(
SessionState.scala:157)
at org.apache.spark.sql.hive.HiveSessionState.(
HiveSessionState.scala:32)
... 18 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.internal.SharedState$.org$apache$spark$
sql$internal$SharedState$$reflect(SharedState.scala:166)
... 26 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(
NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(
DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.spark.sql.hive.client.IsolatedClientLoader.
createClient(IsolatedClientLoader.scala:264)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(
HiveUtils.scala:358)
at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(
HiveUtils.scala:262)
at org.apache.spark.sql.hive.HiveExternalCatalog.(
HiveExternalCatalog.scala:66)
... 31 more
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: The root
s

Re: Write to HDFS

2017-10-20 Thread Marco Mistroni
Hi
 Could you just create an rdd/df out of what you want to save and store it
in hdfs?
Hth

On Oct 20, 2017 9:44 AM, "Uğur Sopaoğlu"  wrote:

> Hi all,
>
> In word count example,
>
>   val textFile = sc.textFile("Sample.txt")
>   val counts = textFile.flatMap(line => line.split(" "))
>  .map(word => (word, 1))
>  .reduceByKey(_ + _)
>  counts.saveAsTextFile("hdfs://master:8020/user/abc")
>
> I want to write collection of "*counts" *which is used in code above to
> HDFS, so
>
> val x = counts.collect()
>
> Actually I want to write *x *to HDFS. But spark wants to RDD to write
> sometihng to HDFS
>
> How can I write Array[(String,Int)] to HDFS
>
>
> --
> Uğur
>


Re: Database insert happening two times

2017-10-17 Thread Marco Mistroni
Hi
 Uh if the problem is really with parallel exec u can try to call
repartition(1) before u save
Alternatively try to store data in a csv file and see if u have same
behaviour, to exclude dynamodb issues
Also ..are the multiple rows being written dupes (they have all same
fields/values)?
Hth


On Oct 17, 2017 1:08 PM, "Harsh Choudhary"  wrote:

> This is the code -
> hdfs_path=
> if(hdfs_path.contains(".avro")){
>   data_df = spark.read.format("com.databricks.spark.avro").load(hdfs_
> path)
> }else if(hdfs_path.contains(".tsv")){
>   data_df = spark.read.option("delimiter",
> "\t").option("header","true").csv(hdfs_path)
> }else if(hdfs_path.contains(".scsv")){
>   data_df = spark.read.option("delimiter",
> ";").option("header","true").csv(hdfs_path)
> }else{
>   System.exit(1)
> }
> data_df = data_df.withColumn("edl_created_by",
> lit("IndexerSpark")).withColumn("edl_created_at",lit(currentTime))
> data_df.write.mode("append").parquet(dest_file)
> val status1 = AddLogToDynamo(Json.toJson(fil
> eLineageEntity)(fileLineAgeFormat), conf.getString("lambda.filelin
> eage.dynamodb.update.function.name"), GetAuth.getLambdaClient)
>
> def AddLogToDynamo(updatedLambdaJson: JsValue, updateFunctionName:
> String,lambdaClient: AWSLambdaClient):String = {
>   System.out.println("new metadata to be updated: "+updatedLambdaJson);
>   val updatelambdaReq:InvokeRequest = new InvokeRequest();
>   updatelambdaReq.setFunctionName(updateFunctionName);
>   updatelambdaReq.setPayload(updatedLambdaJson.toString());
>   System.out.println("Calling lambda to add log");
>   val updateLambdaResult = byteBufferToString(lambdaClien
> t.invoke(updatelambdaReq).getPayload(),Charset.forName("UTF-8"));
>   return updateLambdaResult;
>   }
>
>
> Harsh Choudhary
>
> On Tue, Oct 17, 2017 at 5:32 PM, ayan guha  wrote:
>
>> Can you share your code?
>>
>> On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary 
>> wrote:
>>
>>> Hi
>>>
>>> I'm running a Spark job in which I am appending new data into Parquet
>>> file. At last, I make a log entry in my Dynamodb table stating the number
>>> of records appended, time etc. Instead of one single entry in the database,
>>> multiple entries are being made to it. Is it because of parallel execution
>>> of code in workers? If it is so then how can I solve it so that it only
>>> writes once.
>>>
>>> *Thanks!*
>>>
>>> *Cheers!*
>>>
>>> Harsh Choudhary
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Quick one... AWS SDK version?

2017-10-07 Thread Marco Mistroni
Hi JG
 out of curiosity what's ur usecase? are you writing to S3? you could use
Spark to do that , e.g using hadoop package
org.apache.hadoop:hadoop-aws:2.7.1 ..that will download the aws client
which is in line with hadoop 2.7.1?

hth
 marco

On Fri, Oct 6, 2017 at 10:58 PM, Jonathan Kelly <jonathaka...@gmail.com>
wrote:

> Note: EMR builds Hadoop, Spark, et al, from source against specific
> versions of certain packages like the AWS Java SDK, httpclient/core,
> Jackson, etc., sometimes requiring some patches in these applications in
> order to work with versions of these dependencies that differ from what the
> applications may support upstream.
>
> For emr-5.8.0, we have built Hadoop and Spark (the Spark Kinesis
> connector, that is, since that's the only part of Spark that actually
> depends upon the AWS Java SDK directly) against AWS Java SDK 1.11.160
> instead of the much older version that vanilla Hadoop 2.7.3 would otherwise
> depend upon.
>
> ~ Jonathan
>
> On Wed, Oct 4, 2017 at 7:17 AM Steve Loughran <ste...@hortonworks.com>
> wrote:
>
>> On 3 Oct 2017, at 21:37, JG Perrin <jper...@lumeris.com> wrote:
>>
>> Sorry Steve – I may not have been very clear: thinking about
>> aws-java-sdk-z.yy.xxx.jar. To the best of my knowledge, none is bundled
>> with Spark.
>>
>>
>>
>> I know, but if you are talking to s3 via the s3a client, you will need
>> the SDK version to match the hadoop-aws JAR of the same version of Hadoop
>> your JARs have. Similarly, if you were using spark-kinesis, it needs to be
>> in sync there.
>>
>>
>> *From:* Steve Loughran [mailto:ste...@hortonworks.com
>> <ste...@hortonworks.com>]
>> *Sent:* Tuesday, October 03, 2017 2:20 PM
>> *To:* JG Perrin <jper...@lumeris.com>
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Quick one... AWS SDK version?
>>
>>
>>
>> On 3 Oct 2017, at 02:28, JG Perrin <jper...@lumeris.com> wrote:
>>
>> Hey Sparkians,
>>
>> What version of AWS Java SDK do you use with Spark 2.2? Do you stick with
>> the Hadoop 2.7.3 libs?
>>
>>
>> You generally to have to stick with the version which hadoop was built
>> with I'm afraid...very brittle dependency.
>>
>>


RE: Spark 2.2.0 Win 7 64 bits Exception while deleting Spark temp dir

2017-10-04 Thread Marco Mistroni
Hi
 Got similar issues on win 10. It has to do imho with the way permissions
are setup in windows.
That should not prevent ur program from getting back a result..
Kr

On Oct 3, 2017 9:42 PM, "JG Perrin"  wrote:

> do you have a little more to share with us?
>
>
>
> maybe you can set another TEMP directory. are you getting a result?
>
>
>
> *From:* usa usa [mailto:usact2...@gmail.com]
> *Sent:* Tuesday, October 03, 2017 10:50 AM
> *To:* user@spark.apache.org
> *Subject:* Spark 2.2.0 Win 7 64 bits Exception while deleting Spark temp
> dir
>
>
>
> Hi,
>
> I have installed Spark 2.2.0 in win 7 64 bits.
>
> When I did a test:
>
>   c:>run-example SparkPI 10
>
> I got error:
>Exception while deleting Spark temp dir
>  C:\Users\jding01\AppData\Local\Temp\spark-xxx
>
>
> The solution at
>
> https://stackoverflow.com/questions/31274170/spark-
> error-error-utils-exception-while-deleting-spark-temp-dir
>
> cannot help me.
>
> Could anyone point out how to fix it ?
>
> Thanks,
>
> David
>


Re: NullPointerException error while saving Scala Dataframe to HBase

2017-10-01 Thread Marco Mistroni
Hi
 The question is getting to the list.
I have no experience in hbase ...though , having seen similar stuff when
saving a df somewhere else...it might have to do with the properties you
need to set to let spark know it is dealing with hbase? Don't u need to set
some properties on the spark context you are using?
Hth
 Marco


On Oct 1, 2017 4:33 AM, <mailford...@gmail.com> wrote:

Hi Guys- am not sure whether the email is reaching to the community
members. Please can somebody acknowledge

Sent from my iPhone

> On 30-Sep-2017, at 5:02 PM, Debabrata Ghosh <mailford...@gmail.com> wrote:
>
> Dear All,
>Greetings ! I am repeatedly hitting a NullPointerException
error while saving a Scala Dataframe to HBase. Please can you help
resolving this for me. Here is the code snippet:
>
> scala> def catalog = s"""{
>  ||"table":{"namespace":"default", "name":"table1"},
>  ||"rowkey":"key",
>  ||"columns":{
>  |  |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
>  |  |"col1":{"cf":"cf1", "col":"col1", "type":"string"}
>  ||}
>  |  |}""".stripMargin
> catalog: String
>
> scala> case class HBaseRecord(
>  |col0: String,
>  |col1: String)
> defined class HBaseRecord
>
> scala> val data = (0 to 255).map { i =>  HBaseRecord(i.toString, "extra")}
> data: scala.collection.immutable.IndexedSeq[HBaseRecord] =
Vector(HBaseRecord(0,extra), HBaseRecord(1,extra), HBaseRecord
>
> (2,extra), HBaseRecord(3,extra), HBaseRecord(4,extra),
HBaseRecord(5,extra), HBaseRecord(6,extra), HBaseRecord(7,extra),
>
> HBaseRecord(8,extra), HBaseRecord(9,extra), HBaseRecord(10,extra),
HBaseRecord(11,extra), HBaseRecord(12,extra),
>
> HBaseRecord(13,extra), HBaseRecord(14,extra), HBaseRecord(15,extra),
HBaseRecord(16,extra), HBaseRecord(17,extra),
>
> HBaseRecord(18,extra), HBaseRecord(19,extra), HBaseRecord(20,extra),
HBaseRecord(21,extra), HBaseRecord(22,extra),
>
> HBaseRecord(23,extra), HBaseRecord(24,extra), HBaseRecord(25,extra),
HBaseRecord(26,extra), HBaseRecord(27,extra),
>
> HBaseRecord(28,extra), HBaseRecord(29,extra), HBaseRecord(30,extra),
HBaseRecord(31,extra), HBase...
>
> scala> import org.apache.spark.sql.datasources.hbase
> import org.apache.spark.sql.datasources.hbase
>
>
> scala> import org.apache.spark.sql.datasources.hbase.{HBaseTableCatalog}
> import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog
>
> scala> 
> sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog
-> catalog, HBaseTableCatalog.newTable ->
>
> "5")).format("org.apache.hadoop.hbase.spark").save()
>
> java.lang.NullPointerException
>   at org.apache.hadoop.hbase.spark.HBaseRelation.(
DefaultSource.scala:134)
>   at org.apache.hadoop.hbase.spark.DefaultSource.createRelation(
DefaultSource.scala:75)
>   at org.apache.spark.sql.execution.datasources.
DataSource.write(DataSource.scala:426)
>   at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>   ... 56 elided
>
>
> Thanks in advance !
>
> Debu
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: PLs assist: trying to FlatMap a DataSet / partially OT

2017-09-16 Thread Marco Mistroni
Not exactly...I was not going to flatmap the rdd
In the end I amended my approach to the problem and managed to get the
flatmap on the dataset
Thx for answering
Kr

On Sep 16, 2017 4:53 PM, "Akhil Das" <ak...@hacked.work> wrote:

> scala> case class Fruit(price: Double, name: String)
> defined class Fruit
>
> scala> val ds = Seq(Fruit(10.0,"Apple")).toDS()
> ds: org.apache.spark.sql.Dataset[Fruit] = [price: double, name: string]
>
> scala> ds.rdd.flatMap(f => f.name.toList).collect
> res8: Array[Char] = Array(A, p, p, l, e)
>
>
> This is what you want to do?
>
> On Fri, Sep 15, 2017 at 4:21 AM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> HI all
>>  could anyone assist pls?
>> i am trying to flatMap a DataSet[(String, String)] and i am getting
>> errors in Eclipse
>> the errors are more Scala related than spark -related, but i was
>> wondering if someone came across
>> a similar situation
>>
>> here's what i got. A DS of (String, String) , out of which i am using
>> flatMap to get a List[Char] of for the second element in the tuple.
>>
>> val tplDataSet = < DataSet[(String, String)] >
>>
>> val expanded = tplDataSet.flatMap(tpl  => tpl._2.toList,
>> Encoders.product[(String, String)])
>>
>>
>> Eclipse complains that  'tpl' in the above function is missing parameter
>> type
>>
>> what am i missing? or perhaps i am using the wrong approach?
>>
>> w/kindest regards
>>  Marco
>>
>
>
>
> --
> Cheers!
>
>


PLs assist: trying to FlatMap a DataSet / partially OT

2017-09-14 Thread Marco Mistroni
HI all
 could anyone assist pls?
i am trying to flatMap a DataSet[(String, String)] and i am getting errors
in Eclipse
the errors are more Scala related than spark -related, but i was wondering
if someone came across
a similar situation

here's what i got. A DS of (String, String) , out of which i am using
flatMap to get a List[Char] of for the second element in the tuple.

val tplDataSet = < DataSet[(String, String)] >

val expanded = tplDataSet.flatMap(tpl  => tpl._2.toList,
Encoders.product[(String, String)])


Eclipse complains that  'tpl' in the above function is missing parameter
type

what am i missing? or perhaps i am using the wrong approach?

w/kindest regards
 Marco


Re: [Meetup] Apache Spark and Ignite for IoT scenarious

2017-09-07 Thread Marco Mistroni
Hi
 Will there be a podcast to view afterwards for remote EMEA users?
Kr

On Sep 7, 2017 12:15 AM, "Denis Magda"  wrote:

> Folks,
>
> Those who are craving for mind food this weekend come over the meetup  -
> Santa Clara, Sept 9, 9.30 AM:
> https://www.meetup.com/datariders/events/242523245/?a=socialmedia
>
> —
> Denis
>


Re: SPARK Issue in Standalone cluster

2017-08-06 Thread Marco Mistroni
Sengupta
 further to this, if you try the following notebook in databricks cloud, it
will read a .csv file , write to a parquet file and read it again (just to
count the number of rows stored)
Please note that the path to the csv file might differ for you.
So, what you will need todo is
1 - create an account to community.cloud.databricks.com
2 - upload the .csv file onto the Data of your databricks private cluster
3  - run the script. that will store the data on the distrubuted filesystem
of the databricks cloudn (dbfs)

It's worth investing in this free databricks cloud as it can create a
cluster for you with minimal effort, and it's  a very easy way to test your
spark scripts on a real cluster

hope this helps
kr

##
from pyspark.sql import SQLContext

from random import randint
from time import sleep
from pyspark.sql.session import SparkSession
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
logger.addHandler(ch)


import sys

def read_parquet_file(parquetFileName):
  logger.info('Reading now the parquet files we just created...:%s',
parquetFileName)
  parquet_data = sqlContext.read.parquet(parquetFileName)
  logger.info('Parquet file has %s', parquet_data.count())

def dataprocessing(filePath, count, sqlContext):
logger.info( 'Iter count is:%s' , count)
if count == 0:
print 'exiting'
else:
df_traffic_tmp =
sqlContext.read.format("csv").option("header",'true').load(filePath)
logger.info( '#DataSet has:%s' ,
df_traffic_tmp.count())
logger.info('WRting to a parquet file')
parquetFileName = "dbfs:/myParquetDf2.parquet"
df_traffic_tmp.write.parquet(parquetFileName)
sleepInterval = randint(10,100)
logger.info( '#Sleeping for %s' ,
sleepInterval)
sleep(sleepInterval)
read_parquet_file(parquetFileName)
dataprocessing(filePath, count-1, sqlContext)

filename =
'/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This path
might differ for you
iterations = 1
logger.info('--')
logger.info('Filename:%s', filename)
logger.info('Iterations:%s', iterations )
logger.info('--')

logger.info ('Initializing sqlContext')
logger.info( 'Starting spark..Loading from%s for %s
iterations' , filename, iterations)
logger.info(  'Starting up')
sc = SparkSession.builder.appName("Data Processsing").getOrCreate()
logger.info ('Initializing sqlContext')
sqlContext = SQLContext(sc)
dataprocessing(filename, iterations, sqlContext)
logger.info('Out of here..')
##


On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni <mmistr...@gmail.com> wrote:

> Uh believe me there are lots of ppl on this list who will send u code
> snippets if u ask... 
>
> Yes that is what Steve pointed out, suggesting also that for that simple
> exercise you should perform all operations on a spark standalone instead
> (or alt. Use an nfs on the cluster)
> I'd agree with his suggestion
> I suggest u another alternative:
> https://community.cloud.databricks.com/
>
> That's a ready made cluster and you can run your spark app as well store
> data on the cluster (well I haven't tried myself but I assume it's
> possible).   Try that out... I will try ur script there as I have an
> account there (though I guess I'll get there before me.)
>
> Try that out and let me know if u get stuck
> Kr
>
> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <gourav.sengu...@gmail.com>
> wrote:
>
>> Hi Marco,
>>
>> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
>> someone actually executing code and providing response. It feel wonderful
>> that at least someone considered to respond back by executing code and just
>> did not filter out each and every technical details to brood only on my
>> superb social skills, while claiming the reason for ignoring technical
>> details is that it elementary. I think that Steve also is the first person
>> who could answer the WHY of an elementary question instead of saying that
>> is how it is and pointed out to the correct documentation.
>>
>> That code works fantastically. But the problem which I have tried to find
>> out is while writing out the data and not reading it.
>>
>>
>> So if you see try to read the data from the same folder which has the
>> same file across all the nodes then it will work fine. In fact that is what
>> should work.
>>
>> What does not work is that if you try to write back the file and then
>> read it once again from the location you have written that i

Re: SPARK Issue in Standalone cluster

2017-08-05 Thread Marco Mistroni
Uh believe me there are lots of ppl on this list who will send u code
snippets if u ask... 

Yes that is what Steve pointed out, suggesting also that for that simple
exercise you should perform all operations on a spark standalone instead
(or alt. Use an nfs on the cluster)
I'd agree with his suggestion
I suggest u another alternative:
https://community.cloud.databricks.com/

That's a ready made cluster and you can run your spark app as well store
data on the cluster (well I haven't tried myself but I assume it's
possible).   Try that out... I will try ur script there as I have an
account there (though I guess I'll get there before me.)

Try that out and let me know if u get stuck
Kr

On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <gourav.sengu...@gmail.com> wrote:

> Hi Marco,
>
> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
> someone actually executing code and providing response. It feel wonderful
> that at least someone considered to respond back by executing code and just
> did not filter out each and every technical details to brood only on my
> superb social skills, while claiming the reason for ignoring technical
> details is that it elementary. I think that Steve also is the first person
> who could answer the WHY of an elementary question instead of saying that
> is how it is and pointed out to the correct documentation.
>
> That code works fantastically. But the problem which I have tried to find
> out is while writing out the data and not reading it.
>
>
> So if you see try to read the data from the same folder which has the same
> file across all the nodes then it will work fine. In fact that is what
> should work.
>
> What does not work is that if you try to write back the file and then read
> it once again from the location you have written that is when the issue
> starts happening.
>
> Therefore if in my code you were to save the pandas dataframe as a CSV
> file and then read it then you will find the following observations:
>
> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
> 
> 
> 
> ---
> pandasdf = pandas.DataFrame(numpy.random.randn(1, 4),
> columns=list('ABCD'))
> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
> header=True, sep=",", index=0)
> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/
> sparkdata/testdir/")
> testdf.cache()
> testdf.count()
> 
> 
> 
> ---
>
>
> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH
> THE DATA DOES NOT EXISTS
> 
> 
> 
> ---
> pandasdf = pandas.DataFrame(numpy.random.randn(1, 4),
> columns=list('ABCD'))
> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
> header=True, sep=",", index=0)
> testdf = spark.read.load("file:///Users/gouravsengupta/
> Development/spark/sparkdata/testdir/")
> testdf.cache()
> testdf.count()
> 
> 
> 
> ---
>
>
> if you execute my code then also you will surprisingly see that the writes
> in the nodes which is not the master node does not complete moving the
> files from the _temporary folder to the main one.
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> Hello
>>  please have a look at this. it'sa simple script that just read a
>> dataframe for n time, sleeping at random interval. i used it to test memory
>> issues that another user was experiencing on a spark cluster
>>
>> you should run it like this e.g
>> spark-submit dataprocessing_Sample.-2py  > of iterations>
>>
>> i ran it on the cluster like this
>>
>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>> west-2.compute.amazonaws.com:7077   
>> /root/pyscripts/dataprocessing_Sample-2.py
>> file:///root/pyscript

Re: SPARK Issue in Standalone cluster

2017-08-03 Thread Marco Mistroni
Hello
 my 2 cents here, hope it helps
If you want to just to play around with Spark, i'd leave Hadoop out, it's
an unnecessary dependency that you dont need for just running a python
script
Instead do the following:
- got to the root of our master / slave node. create a directory
/root/pyscripts
- place your csv file there as well as the python script
- run the script to replicate the whole directory  across the cluster (i
believe it's called copy-script.sh)
- then run your spark-submit , it will be something lke
./spark-submit /root/pyscripts/mysparkscripts.py
file:///root/pyscripts/tree_addhealth.csv 10 --master
spark://ip-172-31-44-155.us-west-2.compute.internal:7077
- in your python script, as part of your processing, write the parquet file
in directory /root/pyscripts

If you have an AWS account and you are versatile with that - you need to
setup bucket permissions etc - , you can just
- store your file in one of your S3 bucket
- create an EMR cluster
- connect to master or slave
- run your  scritp that reads from the s3 bucket and write to the same s3
bucket


Feel free to mail me privately, i have a working script i have used to test
some code on spark standalone cluster
hth










On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta 
wrote:

> Hi Steve,
>
> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>
> I am now going through the documentation (https://github.com/
> steveloughran/hadoop/blob/s3guard/HADOOP-13786-
> committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/
> hadoop-aws/s3a_committer_architecture.md) and it makes much much more
> sense now.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran 
> wrote:
>
>>
>> On 2 Aug 2017, at 20:05, Gourav Sengupta 
>> wrote:
>>
>> Hi Steve,
>>
>> I have written a sincere note of apology to everyone in a separate email.
>> I sincerely request your kind forgiveness before hand if anything does
>> sound impolite in my emails, in advance.
>>
>> Let me first start by thanking you.
>>
>> I know it looks like I formed all my opinion based on that document, but
>> that is not the case at all. If you or anyone tries to execute the code
>> that I have given then they will see what I mean. Code speaks louder and
>> better than words for me.
>>
>> So I am not saying you are wrong. I am asking verify and expecting
>> someone will be able to correct  a set of understanding that a moron like
>> me has gained after long hours of not having anything better to do.
>>
>>
>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with
>> replication 2 and there is a HADOOP cluster of three nodes. All these nodes
>> have SPARK workers (executors) running in them.  Both are stored in the
>> following way:
>> -
>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>> | (worker1)   |  (worker2)|  (worker3)   |
>> | (master) | ||
>> -
>> | file1.csv  | | file1.csv |
>> -
>> ||  file2.csv  | file2.csv |
>> -
>> | file3.csv  |  file3.csv  |   |
>> -
>>
>>
>>
>>
>>
>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>> HDFS replication does not store the same file in all the nodes in the
>> cluster. So if I have three nodes and the replication is two then the same
>> file will be stored physically in two nodes in the cluster. Does that sound
>> right?
>>
>>
>> HDFS breaks files up into blocks (default = 128MB). If a .csv file is >
>> 128 then it will be broken up into blocks
>>
>> file1.cvs -> [block0001, block002, block0003]
>>
>> and each block will be replicated. With replication = 2 there will be two
>> copies of each block, but the file itself can span > 2 hosts.
>>
>>
>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>> If SPARK is trying to process to the records then I am expecting that
>> WORKER2 should not be processing file1.csv, and similary WORKER 1 should
>> not be processing file2.csv and WORKER3 should not be processing file3.csv.
>> Because in case WORKER2 was trying to process file1.csv then it will
>> actually causing network transmission of the file unnecessarily.
>>
>>
>> Spark prefers to schedule work locally, so as to save on network traffic,
>> but it schedules for execution time over waiting for workers free on the
>> node with the data. IF a block is on nodes 2 and 3 but there is only a free
>> thread on node 1, then node 1 gets the work
>>
>> There's details on whether/how work across blocks takes place which I'm
>> avoiding. For now know those formats which are "splittable" will have work
>> scheduled by block. If you use 

Re: problem initiating spark context with pyspark

2017-06-10 Thread Marco Mistroni
Ha...it's a 1 off.I run spk on Ubuntu and docker on windows...I
don't think spark and windows are best friends.  

On Jun 10, 2017 6:36 PM, "Gourav Sengupta" <gourav.sengu...@gmail.com>
wrote:

> seeing for the very first time someone try SPARK on Windows :)
>
> On Thu, Jun 8, 2017 at 8:38 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> try this link
>>
>> http://letstalkspark.blogspot.co.uk/2016/02/getting-started-
>> with-spark-on-window-64.html
>>
>> it helped me when i had similar problems with windows...
>>
>> hth
>>
>> On Wed, Jun 7, 2017 at 3:46 PM, Curtis Burkhalter <
>> curtisburkhal...@gmail.com> wrote:
>>
>>> Thanks Doc I saw this on another board yesterday so I've tried this by
>>> first going to the directory where I've stored the wintutils.exe and then
>>> as an admin running the command  that you suggested and I get this
>>> exception when checking the permissions:
>>>
>>> C:\winutils\bin>winutils.exe ls -F C:\tmp\hive
>>> FindFileOwnerAndPermission error (1789): The trust relationship between
>>> this workstation and the primary domain failed.
>>>
>>> I'm fairly new to the command line and determining what the different
>>> exceptions mean. Do you have any advice what this error means and how I
>>> might go about fixing this?
>>>
>>> Thanks again
>>>
>>>
>>> On Wed, Jun 7, 2017 at 9:51 AM, Doc Dwarf <doc.dwar...@gmail.com> wrote:
>>>
>>>> Hi Curtis,
>>>>
>>>> I believe in windows, the following command needs to be executed: (will
>>>> need winutils installed)
>>>>
>>>> D:\winutils\bin\winutils.exe chmod 777 D:\tmp\hive
>>>>
>>>>
>>>>
>>>> On 6 June 2017 at 09:45, Curtis Burkhalter <curtisburkhal...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I'm new to Spark and I'm trying to interact with it using Pyspark. I'm
>>>>> using the prebuilt version of spark v. 2.1.1 and when I go to the command
>>>>> line and use the command 'bin\pyspark' I have initialization problems and
>>>>> get the following message:
>>>>>
>>>>> C:\spark\spark-2.1.1-bin-hadoop2.7> bin\pyspark
>>>>> Python 3.6.0 |Anaconda 4.3.1 (64-bit)| (default, Dec 23 2016,
>>>>> 11:57:41) [MSC v.1900 64 bit (AMD64)] on win32
>>>>> Type "help", "copyright", "credits" or "license" for more information.
>>>>> Using Spark's default log4j profile: org/apache/spark/log4j-default
>>>>> s.properties
>>>>> Setting default log level to "WARN".
>>>>> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
>>>>> setLogLevel(newLevel).
>>>>> 17/06/06 10:30:14 WARN NativeCodeLoader: Unable to load native-hadoop
>>>>> library for your platform... using builtin-java classes where applicable
>>>>> 17/06/06 10:30:21 WARN ObjectStore: Version information not found in
>>>>> metastore. hive.metastore.schema.verification is not enabled so
>>>>> recording the schema version 1.2.0
>>>>> 17/06/06 10:30:21 WARN ObjectStore: Failed to get database default,
>>>>> returning NoSuchObjectException
>>>>> Traceback (most recent call last):
>>>>>   File "C:\spark\spark-2.1.1-bin-hadoop2.7\python\pyspark\sql\utils.py",
>>>>> line 63, in deco
>>>>> return f(*a, **kw)
>>>>>   File 
>>>>> "C:\spark\spark-2.1.1-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py",
>>>>> line 319, in get_return_value
>>>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>>> o22.sessionState.
>>>>> : java.lang.IllegalArgumentException: Error while instantiating
>>>>> 'org.apache.spark.sql.hive.HiveSessionState':
>>>>> at org.apache.spark.sql.SparkSess
>>>>> ion$.org$apache$spark$sql$SparkSession$$reflect(SparkSession
>>>>> .scala:981)
>>>>> at org.apache.spark.sql.SparkSess
>>>>> ion.sessionState$lzycompute(SparkSession.scala:110)
>>>>> at org.apache.spark.sql.SparkSess
>>>>> ion.sessionState(SparkSession.scala:109)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invo

Re: problem initiating spark context with pyspark

2017-06-08 Thread Marco Mistroni
try this link

http://letstalkspark.blogspot.co.uk/2016/02/getting-started-with-spark-on-window-64.html

it helped me when i had similar problems with windows...

hth

On Wed, Jun 7, 2017 at 3:46 PM, Curtis Burkhalter <
curtisburkhal...@gmail.com> wrote:

> Thanks Doc I saw this on another board yesterday so I've tried this by
> first going to the directory where I've stored the wintutils.exe and then
> as an admin running the command  that you suggested and I get this
> exception when checking the permissions:
>
> C:\winutils\bin>winutils.exe ls -F C:\tmp\hive
> FindFileOwnerAndPermission error (1789): The trust relationship between
> this workstation and the primary domain failed.
>
> I'm fairly new to the command line and determining what the different
> exceptions mean. Do you have any advice what this error means and how I
> might go about fixing this?
>
> Thanks again
>
>
> On Wed, Jun 7, 2017 at 9:51 AM, Doc Dwarf  wrote:
>
>> Hi Curtis,
>>
>> I believe in windows, the following command needs to be executed: (will
>> need winutils installed)
>>
>> D:\winutils\bin\winutils.exe chmod 777 D:\tmp\hive
>>
>>
>>
>> On 6 June 2017 at 09:45, Curtis Burkhalter 
>> wrote:
>>
>>> Hello all,
>>>
>>> I'm new to Spark and I'm trying to interact with it using Pyspark. I'm
>>> using the prebuilt version of spark v. 2.1.1 and when I go to the command
>>> line and use the command 'bin\pyspark' I have initialization problems and
>>> get the following message:
>>>
>>> C:\spark\spark-2.1.1-bin-hadoop2.7> bin\pyspark
>>> Python 3.6.0 |Anaconda 4.3.1 (64-bit)| (default, Dec 23 2016, 11:57:41)
>>> [MSC v.1900 64 bit (AMD64)] on win32
>>> Type "help", "copyright", "credits" or "license" for more information.
>>> Using Spark's default log4j profile: org/apache/spark/log4j-default
>>> s.properties
>>> Setting default log level to "WARN".
>>> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
>>> setLogLevel(newLevel).
>>> 17/06/06 10:30:14 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where applicable
>>> 17/06/06 10:30:21 WARN ObjectStore: Version information not found in
>>> metastore. hive.metastore.schema.verification is not enabled so
>>> recording the schema version 1.2.0
>>> 17/06/06 10:30:21 WARN ObjectStore: Failed to get database default,
>>> returning NoSuchObjectException
>>> Traceback (most recent call last):
>>>   File "C:\spark\spark-2.1.1-bin-hadoop2.7\python\pyspark\sql\utils.py",
>>> line 63, in deco
>>> return f(*a, **kw)
>>>   File 
>>> "C:\spark\spark-2.1.1-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\protocol.py",
>>> line 319, in get_return_value
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o22.sessionState.
>>> : java.lang.IllegalArgumentException: Error while instantiating
>>> 'org.apache.spark.sql.hive.HiveSessionState':
>>> at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$Spar
>>> kSession$$reflect(SparkSession.scala:981)
>>> at org.apache.spark.sql.SparkSession.sessionState$lzycompute(Sp
>>> arkSession.scala:110)
>>> at org.apache.spark.sql.SparkSession.sessionState(SparkSession.
>>> scala:109)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.jav
>>> a:357)
>>> at py4j.Gateway.invoke(Gateway.java:280)
>>> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.j
>>> ava:132)
>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>> at py4j.GatewayConnection.run(GatewayConnection.java:214)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.reflect.InvocationTargetException
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>> Method)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(Native
>>> ConstructorAccessorImpl.java:62)
>>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(De
>>> legatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:4
>>> 23)
>>> at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$Spar
>>> kSession$$reflect(SparkSession.scala:978)
>>> ... 13 more
>>> Caused by: java.lang.IllegalArgumentException: Error while
>>> instantiating 'org.apache.spark.sql.hive.HiveExternalCatalog':
>>> at org.apache.spark.sql.internal.SharedState$.org$apache$spark$
>>> sql$internal$SharedState$$reflect(SharedState.scala:169)
>>>   

Sampling data on RDD vs sampling data on Dataframes

2017-05-21 Thread Marco Didonna
Hello,

me and my team have developed a fairly large big data application using
only the dataframe api (Spark 1.6.3). Since our application uses machine
learning to do prediction we need to sample the train dataset in order not
to have skewed data.

To achieve such objective we use stratified sampling: now, you all probably
know that the DataFrameStatFunctions provided a useful sampleBy method that
supposedly carries out stratified sampling based on the fraction map passed
as input. There are a few question that have risen:

- the samplyBy methods seems to return variabile results with the same
input data therefore looks more like and *approximate* stratified sampling.
Inspection of the spark source code seems to confirm such hypothesis. There
is no mention on the documentation of such approximation nor a confidence
interval that guarantees how good the approximation is supposed to be.

- on the RDD world there is a sampleByKeyExact method which clearly states
that it will produce a sampled datasets with tight guarantees ... is there
anything like that in the DataFrame world?

Has anybody in the community worked around such shortcomings of the
dataframe api? I'm very much aware that I can get an rdd from a dataframe,
perform sampleByKeyExact and then convert the RDD back to a dataframe. I'd
really like to avoid such conversion, if possibile.

Thank you for any help you people can give :)

Best,

Marco


Re: Spark Testing Library Discussion

2017-04-26 Thread Marco Mistroni
Uh i stayed online in the other link but nobody joinedWill follow
transcript
Kr

On 26 Apr 2017 9:35 am, "Holden Karau"  wrote:

> And the recording of our discussion is at https://www.youtube.com/
> watch?v=2q0uAldCQ8M
> A few of us have follow up things and we will try and do another meeting
> in about a month or two :)
>
> On Tue, Apr 25, 2017 at 1:04 PM, Holden Karau 
> wrote:
>
>> Urgh hangouts did something frustrating, updated link
>> https://hangouts.google.com/hangouts/_/ha6kusycp5fvzei2trhay4uhhqe
>>
>> On Mon, Apr 24, 2017 at 12:13 AM, Holden Karau 
>> wrote:
>>
>>> The (tentative) link for those interested is https://hangouts.google.com
>>> /hangouts/_/oyjvcnffejcjhi6qazf3lysypue .
>>>
>>> On Mon, Apr 24, 2017 at 12:02 AM, Holden Karau 
>>> wrote:
>>>
 So 14 people have said they are available on Tuesday the 25th at 1PM
 pacific so we will do this meeting then ( https://doodle.com/poll/69y6
 yab4pyf7u8bn ).

 Since hangouts tends to work ok on the Linux distro I'm running my
 default is to host this as a "hangouts-on-air" unless there are alternative
 ideas.

 I'll record the hangout and if it isn't terrible I'll post it for those
 who weren't able to make it (and for next time I'll include more European
 friendly time options - Doodle wouldn't let me update it once posted).

 On Fri, Apr 14, 2017 at 11:17 AM, Holden Karau 
 wrote:

> Hi Spark Users (+ Some Spark Testing Devs on BCC),
>
> Awhile back on one of the many threads about testing in Spark there
> was some interest in having a chat about the state of Spark testing and
> what people want/need.
>
> So if you are interested in joining an online (with maybe an IRL
> component if enough people are SF based) chat about Spark testing please
> fill out this doodle - https://doodle.com/poll/69y6yab4pyf7u8bn
>
> I think reasonable topics of discussion could be:
>
> 1) What is the state of the different Spark testing libraries in the
> different core (Scala, Python, R, Java) and extended languages (C#,
> Javascript, etc.)?
> 2) How do we make these more easily discovered by users?
> 3) What are people looking for in their testing libraries that we are
> missing? (can be functionality, documentation, etc.)
> 4) Are there any examples of well tested open source Spark projects
> and where are they?
>
> If you have other topics that's awesome.
>
> To clarify this about libraries and best practices for people testing
> their Spark applications, and less about testing Spark's internals
> (although as illustrated by some of the libraries there is some strong
> overlap in what is required to make that work).
>
> Cheers,
>
> Holden :)
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>



 --
 Cell : 425-233-8271 <(425)%20233-8271>
 Twitter: https://twitter.com/holdenkarau

>>>
>>>
>>>
>>> --
>>> Cell : 425-233-8271 <(425)%20233-8271>
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>


Re: Upgrade the scala code using the most updated Spark version

2017-03-28 Thread Marco Mistroni
1.7.5

On 28 Mar 2017 10:10 pm, "Anahita Talebi" <anahita.t.am...@gmail.com> wrote:

> Hi,
>
> Thanks for your answer.
> What is the version of "org.slf4j" % "slf4j-api" in your sbt file?
> I think the problem might come from this part.
>
> On Tue, Mar 28, 2017 at 11:02 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> Hello
>>  uhm ihave a project whose build,sbt is closest to yours, where i am
>> using spark 2.1, scala 2.11 and scalatest (i upgraded to 3.0.0) and it
>> works fine
>> in my projects though i don thave any of the following libraries that you
>> mention
>> - breeze
>> - netlib,all
>> -  scoopt
>>
>> hth
>>
>> On Tue, Mar 28, 2017 at 9:10 PM, Anahita Talebi <
>> anahita.t.am...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thanks for your answer.
>>>
>>> I first changed the scala version to 2.11.8 and kept the spark version
>>> 1.5.2 (old version). Then I changed the scalatest version into "3.0.1".
>>> With this configuration, I could run the code and compile it and generate
>>> the .jar file.
>>>
>>> When I changed the spark version into 2.1.0, I get the same error as
>>> before. So I imagine the problem should be somehow related to the version
>>> of spark.
>>>
>>> Cheers,
>>> Anahita
>>>
>>> 
>>> 
>>> 
>>> import AssemblyKeys._
>>>
>>> assemblySettings
>>>
>>> name := "proxcocoa"
>>>
>>> version := "0.1"
>>>
>>> organization := "edu.berkeley.cs.amplab"
>>>
>>> scalaVersion := "2.11.8"
>>>
>>> parallelExecution in Test := false
>>>
>>> {
>>>   val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
>>>   libraryDependencies ++= Seq(
>>> "org.slf4j" % "slf4j-api" % "1.7.2",
>>> "org.slf4j" % "slf4j-log4j12" % "1.7.2",
>>> "org.scalatest" %% "scalatest" % "3.0.1" % "test",
>>> "org.apache.spark" %% "spark-core" % "2.1.0"
>>> excludeAll(excludeHadoop),
>>> "org.apache.spark" %% "spark-mllib" % "2.1.0"
>>> excludeAll(excludeHadoop),
>>> "org.apache.spark" %% "spark-sql" % "2.1.0"
>>> excludeAll(excludeHadoop),
>>> "org.apache.commons" % "commons-compress" % "1.7",
>>> "commons-io" % "commons-io" % "2.4",
>>> "org.scalanlp" % "breeze_2.11" % "0.11.2",
>>> "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),
>>> "com.github.scopt" %% "scopt" % "3.3.0"
>>>   )
>>> }
>>>
>>> {
>>>   val defaultHadoopVersion = "1.0.4"
>>>   val hadoopVersion =
>>> scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION",
>>> defaultHadoopVersion)
>>>   libraryDependencies += "org.apache.hadoop" % "hadoop-client" %
>>> hadoopVersion
>>> }
>>>
>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
>>>
>>> resolvers ++= Seq(
>>>   "Local Maven Repository" at Path.userHome.asFile.toURI.toURL +
>>> ".m2/repository",
>>>   "Typesafe" at "http://repo.typesafe.com/typesafe/releases;,
>>>   "Spray" at "http://repo.spray.cc;
>>> )
>>>
>>> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>>>   {
>>> case PathList("javax", "servlet", xs @ _*)   =>
>>> MergeStrategy.first
>>> case PathList(ps @ _*) if ps.last endsWith ".html"   =>
>>> MergeStrategy.first
>>> case "application.conf"  =>
>>> MergeStrategy.concat
>>> case "reference.conf"=>
>>> MergeStrategy.concat
>>> case "log4j.properties"  

Re: Upgrade the scala code using the most updated Spark version

2017-03-28 Thread Marco Mistroni
Hello
 uhm ihave a project whose build,sbt is closest to yours, where i am using
spark 2.1, scala 2.11 and scalatest (i upgraded to 3.0.0) and it works fine
in my projects though i don thave any of the following libraries that you
mention
- breeze
- netlib,all
-  scoopt

hth

On Tue, Mar 28, 2017 at 9:10 PM, Anahita Talebi <anahita.t.am...@gmail.com>
wrote:

> Hi,
>
> Thanks for your answer.
>
> I first changed the scala version to 2.11.8 and kept the spark version
> 1.5.2 (old version). Then I changed the scalatest version into "3.0.1".
> With this configuration, I could run the code and compile it and generate
> the .jar file.
>
> When I changed the spark version into 2.1.0, I get the same error as
> before. So I imagine the problem should be somehow related to the version
> of spark.
>
> Cheers,
> Anahita
>
> 
> 
> 
> import AssemblyKeys._
>
> assemblySettings
>
> name := "proxcocoa"
>
> version := "0.1"
>
> organization := "edu.berkeley.cs.amplab"
>
> scalaVersion := "2.11.8"
>
> parallelExecution in Test := false
>
> {
>   val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
>   libraryDependencies ++= Seq(
> "org.slf4j" % "slf4j-api" % "1.7.2",
> "org.slf4j" % "slf4j-log4j12" % "1.7.2",
> "org.scalatest" %% "scalatest" % "3.0.1" % "test",
> "org.apache.spark" %% "spark-core" % "2.1.0" excludeAll(excludeHadoop),
> "org.apache.spark" %% "spark-mllib" % "2.1.0"
> excludeAll(excludeHadoop),
> "org.apache.spark" %% "spark-sql" % "2.1.0" excludeAll(excludeHadoop),
> "org.apache.commons" % "commons-compress" % "1.7",
> "commons-io" % "commons-io" % "2.4",
> "org.scalanlp" % "breeze_2.11" % "0.11.2",
> "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),
> "com.github.scopt" %% "scopt" % "3.3.0"
>   )
> }
>
> {
>   val defaultHadoopVersion = "1.0.4"
>   val hadoopVersion =
> scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION",
> defaultHadoopVersion)
>   libraryDependencies += "org.apache.hadoop" % "hadoop-client" %
> hadoopVersion
> }
>
> libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.1.0"
>
> resolvers ++= Seq(
>   "Local Maven Repository" at Path.userHome.asFile.toURI.toURL +
> ".m2/repository",
>   "Typesafe" at "http://repo.typesafe.com/typesafe/releases;,
>   "Spray" at "http://repo.spray.cc;
> )
>
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>   {
> case PathList("javax", "servlet", xs @ _*)   =>
> MergeStrategy.first
>     case PathList(ps @ _*) if ps.last endsWith ".html"   =>
> MergeStrategy.first
> case "application.conf"  =>
> MergeStrategy.concat
> case "reference.conf"=>
> MergeStrategy.concat
> case "log4j.properties"      =>
> MergeStrategy.discard
> case m if m.toLowerCase.endsWith("manifest.mf")  =>
> MergeStrategy.discard
> case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
> MergeStrategy.discard
> case _ => MergeStrategy.first
>   }
> }
>
> test in assembly := {}
> 
> 
> 
>
> On Tue, Mar 28, 2017 at 9:33 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
>
>> Hello
>>  that looks to me like there's something dodgy withyour Scala installation
>> Though Spark 2.0 is built on Scala 2.11, it still support 2.10... i
>> suggest you change one thing at a time in your sbt
>> First Spark version. run it and see if it works
>> Then amend the scala version
>>
>> hth
>>  marco
>>
>> On Tue, Mar 28, 2017 at 5:20 PM, Anahita Talebi <
>> anahita.t.am...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Thanks you all for your inf

Re: Upgrade the scala code using the most updated Spark version

2017-03-28 Thread Marco Mistroni
Hello
 that looks to me like there's something dodgy withyour Scala installation
Though Spark 2.0 is built on Scala 2.11, it still support 2.10... i suggest
you change one thing at a time in your sbt
First Spark version. run it and see if it works
Then amend the scala version

hth
 marco

On Tue, Mar 28, 2017 at 5:20 PM, Anahita Talebi <anahita.t.am...@gmail.com>
wrote:

> Hello,
>
> Thanks you all for your informative answers.
> I actually changed the scala version to the 2.11.8 and spark version into
> 2.1.0 in the build.sbt
>
> Except for these two guys (scala and spark version), I kept the same
> values for the rest in the build.sbt file.
> 
> ---
> import AssemblyKeys._
>
> assemblySettings
>
> name := "proxcocoa"
>
> version := "0.1"
>
> scalaVersion := "2.11.8"
>
> parallelExecution in Test := false
>
> {
>   val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
>   libraryDependencies ++= Seq(
> "org.slf4j" % "slf4j-api" % "1.7.2",
> "org.slf4j" % "slf4j-log4j12" % "1.7.2",
> "org.scalatest" %% "scalatest" % "1.9.1" % "test",
> "org.apache.spark" % "spark-core_2.11" % "2.1.0"
> excludeAll(excludeHadoop),
> "org.apache.spark" % "spark-mllib_2.11" % "2.1.0"
> excludeAll(excludeHadoop),
> "org.apache.spark" % "spark-sql_2.11" % "2.1.0"
> excludeAll(excludeHadoop),
> "org.apache.commons" % "commons-compress" % "1.7",
> "commons-io" % "commons-io" % "2.4",
> "org.scalanlp" % "breeze_2.11" % "0.11.2",
> "com.github.fommil.netlib" % "all" % "1.1.2" pomOnly(),
> "com.github.scopt" %% "scopt" % "3.3.0"
>   )
> }
>
> {
>   val defaultHadoopVersion = "1.0.4"
>   val hadoopVersion =
> scala.util.Properties.envOrElse("SPARK_HADOOP_VERSION",
> defaultHadoopVersion)
>   libraryDependencies += "org.apache.hadoop" % "hadoop-client" %
> hadoopVersion
> }
>
> libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" %
> "2.1.0"
>
> resolvers ++= Seq(
>   "Local Maven Repository" at Path.userHome.asFile.toURI.toURL +
> ".m2/repository",
>   "Typesafe" at "http://repo.typesafe.com/typesafe/releases;,
>   "Spray" at "http://repo.spray.cc;
> )
>
> mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
>   {
> case PathList("javax", "servlet", xs @ _*)   =>
> MergeStrategy.first
> case PathList(ps @ _*) if ps.last endsWith ".html"   =>
> MergeStrategy.first
> case "application.conf"  =>
> MergeStrategy.concat
> case "reference.conf"=>
> MergeStrategy.concat
> case "log4j.properties"  =>
> MergeStrategy.discard
> case m if m.toLowerCase.endsWith("manifest.mf")  =>
> MergeStrategy.discard
> case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  =>
> MergeStrategy.discard
> case _ => MergeStrategy.first
>   }
> }
>
> test in assembly := {}
> 
>
> When I compile the code, I get the following error:
>
> [info] Compiling 4 Scala sources to /Users/atalebi/Desktop/new_
> version_proxcocoa-master/target/scala-2.11/classes...
> [error] /Users/atalebi/Desktop/new_version_proxcocoa-master/src/
> main/scala/utils/OptUtils.scala:40: value mapPartitionsWithSplit is not a
> member of org.apache.spark.rdd.RDD[String]
> [error] val sizes = data.mapPartitionsWithSplit{ case(i,lines) =>
> [error]  ^
> [error] /Users/atalebi/Desktop/new_version_proxcocoa-master/src/
> main/scala/utils/OptUtils.scala:41: value length is not a member of Any
> [error]   Iterator(i -> lines.length)
> [error]   ^
> 
> It gets the error in the code. Does it mean that for the different version
> of the spark and scala, I need to change the main code?
>
> Thanks,
> Anahita
>
>
>
>
>
>
> On Tue, Mar 28, 2017 at 10:28 AM, Dinko Srkoč <dinko.sr...@gmail.com>
> 

Re:

2017-03-09 Thread Marco Mistroni
Try to remove  the Kafka code as it seems Kafka is not the issue. Here.
Create a DS and save to Cassandra and see what happensEven in the
console
That should give u a starting point?
Hth

On 9 Mar 2017 3:07 am, "sathyanarayanan mudhaliyar" <
sathyanarayananmudhali...@gmail.com> wrote:

code:
directKafkaStream.foreachRDD(rdd ->
{
rdd.foreach(record ->
{
messages1.add(record._2);
});
JavaRDD lines = sc.parallelize(messages1);
JavaPairRDD data =
lines.mapToPair(new PairFunction()
{
@Override
public Tuple2 call(String a)
{
String[] tokens = StringUtil.split(a, '%');
return new Tuple2(Integer.getInteger(tokens[3]),tokens[2]);
}
});
Function2 reduceSumFunc =
(accum, n) -> (accum.concat(n));
JavaPairRDD yearCount =
data.reduceByKey(reduceSumFunc);

javaFunctions(yearCount).writerBuilder("movie_keyspace",
"movie_count", mapTupleToRow(Integer.class,
String.class)).withColumnSelector(someColumns("year","list_of_
movies")).saveToCassandra();
// this is the error line
});




--


error:
com.datastax.spark.connector.writer.NullKeyColumnException:
Invalid null value for key column year
at com.datastax.spark.connector.writer.RoutingKeyGenerator$$
anonfun$fillRoutingKey$1.apply$mcVI$sp(RoutingKeyGenerator.scala:49)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.
scala:160)
at com.datastax.spark.connector.writer.RoutingKeyGenerator.
fillRoutingKey(RoutingKeyGenerator.scala:47)
at com.datastax.spark.connector.writer.RoutingKeyGenerator.
apply(RoutingKeyGenerator.scala:56)
at com.datastax.spark.connector.writer.TableWriter.
batchRoutingKey(TableWriter.scala:126)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$
write$1$$anonfun$19.apply(TableWriter.scala:151)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$
write$1$$anonfun$19.apply(TableWriter.scala:151)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.
next(GroupingBatchBuilder.scala:107)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.
next(GroupingBatchBuilder.scala:31)
at scala.collection.Iterator$class.foreach(Iterator.scala:
893)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.
foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$
write$1.apply(TableWriter.scala:158)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$
write$1.apply(TableWriter.scala:135)
at com.datastax.spark.connector.cql.CassandraConnector$$
anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector$$
anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.
closeResourceAfterUse(CassandraConnector.scala:140)
at com.datastax.spark.connector.cql.CassandraConnector.
withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.writer.TableWriter.write(
TableWriter.scala:135)
at com.datastax.spark.connector.RDDFunctions$$anonfun$
saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$
saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(
Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


--

Trying to connect Kafka and cassandra using spark
Able to store a JavaRDD but not able to store a JavaPairRDD into
cassandra
I have given comment in the line where the error is
Thank you

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: question on transforms for spark 2.0 dataset

2017-03-01 Thread Marco Mistroni
Hi I think u need an UDF if u want to transform a column
Hth

On 1 Mar 2017 4:22 pm, "Bill Schwanitz"  wrote:

> Hi all,
>
> I'm fairly new to spark and scala so bear with me.
>
> I'm working with a dataset containing a set of column / fields. The data
> is stored in hdfs as parquet and is sourced from a postgres box so fields
> and values are reasonably well formed. We are in the process of trying out
> a switch from pentaho and various sql databases to pulling data into hdfs
> and applying transforms / new datasets with processing being done in spark
> ( and other tools - evaluation )
>
> A rough version of the code I'm running so far:
>
> val sample_data = spark.read.parquet("my_data_input")
>
> val example_row = spark.sql("select * from parquet.my_data_input where id
> = 123").head
>
> I want to apply a trim operation on a set of fields - lets call them
> field1, field2, field3 and field4.
>
> What is the best way to go about applying those trims and creating a new
> dataset? Can I apply the trip to all fields in a single map? or do I need
> to apply multiple map functions?
>
> When I try the map ( even with a single )
>
> scala> val transformed_data = sample_data.map(
>  |   _.trim(col("field1"))
>  |   .trim(col("field2"))
>  |   .trim(col("field3"))
>  |   .trim(col("field4"))
>  | )
>
> I end up with the following error:
>
> :26: error: value trim is not a member of org.apache.spark.sql.Row
>  _.trim(col("field1"))
>^
>
> Any ideas / guidance would be appreciated!
>


Re: error in kafka producer

2017-02-28 Thread Marco Mistroni
This exception coming from a Spark program?
could you share few lines of code ?

kr
 marco

On Tue, Feb 28, 2017 at 10:23 PM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> producer send callback exception: 
> org.apache.kafka.common.errors.TimeoutException:
> Expiring 1 record(s) for positionevent-6 due to 30003 ms has passed since
> batch creation plus linger time
>


Re: Run spark machine learning example on Yarn failed

2017-02-28 Thread Marco Mistroni
Or place the file in s3 and provide the s3 path
Kr

On 28 Feb 2017 1:18 am, "Yunjie Ji"  wrote:

>  After start the dfs, yarn and spark, I run these code under the root
> directory of spark on my master host:
> `MASTER=yarn ./bin/run-example ml.LogisticRegressionExample
> data/mllib/sample_libsvm_data.txt`
>
> Actually I get these code from spark's README. And here is the source code
> about LogisticRegressionExample on GitHub:
> https://github.com/apache/spark/blob/master/examples/
> src/main/scala/org/apache/spark/examples/ml/LogisticRegressionExample.
> scala
>  src/main/scala/org/apache/spark/examples/ml/LogisticRegressionExample.
> scala>
>
> Then, error occurs:
> `Exception in thread "main" org.apache.spark.sql.AnalysisException: Path
> does notexist:
> hdfs://master:9000/user/root/data/mllib/sample_libsvm_data.txt;`
>
> Firstly, I don't know why it's `hdfs://master:9000/user/root`, I do set
> namenode's IP address to `hdfs://master:9000`, but why spark chose the
> directory `/user/root`?
>
> Then, I make a directory `/user/root/data/mllib/sample_libsvm_data.txt` on
> every host of the cluster, so I hope spark can find this file. But the same
> error occurs again.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Run-spark-machine-learning-example-on-Yarn-failed-
> tp28435.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: No main class set in JAR; please specify one with --class and java.lang.ClassNotFoundException

2017-02-26 Thread Marco Mistroni
Hi Raymond
 run this command and it should work, provided you have kafka setup a s
well  on localhost at port 2181

spark-submit --packages
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1  kafka_wordcount.py
localhost:2181 test

But i suggest, if you are a beginner, to use Spark examples' wordcount
instead, as i believe it reads from a local directory rather than setting
up kafka , which is an additional overhead you dont really need
If you want to go ahead with Kafka, the two links below can give you a start

https://dzone.com/articles/running-apache-kafka-on-windows-os   (i believe
similar setup can be used on Linux)
https://spark.apache.org/docs/latest/streaming-kafka-integration.html

kr




On Sat, Feb 25, 2017 at 11:12 PM, Marco Mistroni <mmistr...@gmail.com>
wrote:

> Hi I have a look. At GitHub project tomorrow and let u know. U have a py
> scripts to run and dependencies to specify.. pls check spark docs in
> meantime...I do all my coding in Scala and specify dependencies using
> --packages. ::.
> Kr
>
> On 25 Feb 2017 11:06 pm, "Raymond Xie" <xie3208...@gmail.com> wrote:
>
>> Thank you very much Marco,
>>
>> I am a beginner in this area, is it possible for you to show me what you
>> think the right script should be to get it executed in terminal?
>>
>>
>> *----*
>> *Sincerely yours,*
>>
>>
>> *Raymond*
>>
>> On Sat, Feb 25, 2017 at 6:00 PM, Marco Mistroni <mmistr...@gmail.com>
>> wrote:
>>
>>> Try to use --packages to include the jars. From error it seems it's
>>> looking for main class in jars but u r running a python script...
>>>
>>> On 25 Feb 2017 10:36 pm, "Raymond Xie" <xie3208...@gmail.com> wrote:
>>>
>>> That's right Anahita, however, the class name is not indicated in the
>>> original github project so I don't know what class should be used here. The
>>> github only says:
>>> and then run the example
>>> `$ bin/spark-submit --jars \
>>> external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar
>>> \
>>> examples/src/main/python/streaming/kafka_wordcount.py \
>>> localhost:2181 test`
>>> """ Can anyone give any thought on how to find out? Thank you very much
>>> in advance.
>>>
>>>
>>> **
>>> *Sincerely yours,*
>>>
>>>
>>> *Raymond*
>>>
>>> On Sat, Feb 25, 2017 at 5:27 PM, Anahita Talebi <
>>> anahita.t.am...@gmail.com> wrote:
>>>
>>>> You're welcome.
>>>> You need to specify the class. I meant like that:
>>>>
>>>> spark-submit  /usr/hdp/2.5.0.0-1245/spark/l
>>>> ib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
>>>> --class "give the name of the class"
>>>>
>>>>
>>>>
>>>> On Saturday, February 25, 2017, Raymond Xie <xie3208...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thank you, it is still not working:
>>>>>
>>>>> [image: Inline image 1]
>>>>>
>>>>> By the way, here is the original source:
>>>>>
>>>>> https://github.com/apache/spark/blob/master/examples/src/mai
>>>>> n/python/streaming/kafka_wordcount.py
>>>>>
>>>>>
>>>>> **
>>>>> *Sincerely yours,*
>>>>>
>>>>>
>>>>> *Raymond*
>>>>>
>>>>> On Sat, Feb 25, 2017 at 4:48 PM, Anahita Talebi <
>>>>> anahita.t.am...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I think if you remove --jars, it will work. Like:
>>>>>>
>>>>>> spark-submit  /usr/hdp/2.5.0.0-1245/spark/l
>>>>>> ib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
>>>>>>
>>>>>>  I had the same problem before and solved it by removing --jars.
>>>>>>
>>>>>> Cheers,
>>>>>> Anahita
>>>>>>
>>>>>> On Saturday, February 25, 2017, Raymond Xie <xie3208...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I am doing a spark streaming on a hortonworks sandbox and am stuck
>>>>>>> here now, can anyone tell me what's wrong with the following code

Re: No main class set in JAR; please specify one with --class and java.lang.ClassNotFoundException

2017-02-25 Thread Marco Mistroni
Try to use --packages to include the jars. From error it seems it's looking
for main class in jars but u r running a python script...

On 25 Feb 2017 10:36 pm, "Raymond Xie"  wrote:

That's right Anahita, however, the class name is not indicated in the
original github project so I don't know what class should be used here. The
github only says:
and then run the example
`$ bin/spark-submit --jars \
external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar
\
examples/src/main/python/streaming/kafka_wordcount.py \
localhost:2181 test`
""" Can anyone give any thought on how to find out? Thank you very much in
advance.


**
*Sincerely yours,*


*Raymond*

On Sat, Feb 25, 2017 at 5:27 PM, Anahita Talebi 
wrote:

> You're welcome.
> You need to specify the class. I meant like that:
>
> spark-submit  /usr/hdp/2.5.0.0-1245/spark/lib/spark-assembly-1.6.2.2.5.0.
> 0-1245-hadoop2.7.3.2.5.0.0-1245.jar --class "give the name of the class"
>
>
>
> On Saturday, February 25, 2017, Raymond Xie  wrote:
>
>> Thank you, it is still not working:
>>
>> [image: Inline image 1]
>>
>> By the way, here is the original source:
>>
>> https://github.com/apache/spark/blob/master/examples/src/mai
>> n/python/streaming/kafka_wordcount.py
>>
>>
>> **
>> *Sincerely yours,*
>>
>>
>> *Raymond*
>>
>> On Sat, Feb 25, 2017 at 4:48 PM, Anahita Talebi <
>> anahita.t.am...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I think if you remove --jars, it will work. Like:
>>>
>>> spark-submit  /usr/hdp/2.5.0.0-1245/spark/l
>>> ib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
>>>
>>>  I had the same problem before and solved it by removing --jars.
>>>
>>> Cheers,
>>> Anahita
>>>
>>> On Saturday, February 25, 2017, Raymond Xie 
>>> wrote:
>>>
 I am doing a spark streaming on a hortonworks sandbox and am stuck here
 now, can anyone tell me what's wrong with the following code and the
 exception it causes and how do I fix it? Thank you very much in advance.

 spark-submit --jars /usr/hdp/2.5.0.0-1245/spark/li
 b/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
  /usr/hdp/2.5.0.0-1245/kafka/libs/kafka-streams-0.10.0.2.5.0.0-1245.jar
 /root/hdp/kafka_wordcount.py 192.168.128.119:2181 test

 Error:
 No main class set in JAR; please specify one with --class


 spark-submit --class /usr/hdp/2.5.0.0-1245/spark/li
 b/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
  /usr/hdp/2.5.0.0-1245/kafka/libs/kafka-streams-0.10.0.2.5.0.0-1245.jar
 /root/hdp/kafka_wordcount.py 192.168.128.119:2181 test

 Error:
 java.lang.ClassNotFoundException: /usr/hdp/2.5.0.0-1245/spark/li
 b/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar

 spark-submit --class  /usr/hdp/2.5.0.0-1245/kafka/l
 ibs/kafka-streams-0.10.0.2.5.0.0-1245.jar
 /usr/hdp/2.5.0.0-1245/spark/lib/spark-assembly-1.6.2.2.5.0.0
 -1245-hadoop2.7.3.2.5.0.0-1245.jar  /root/hdp/kafka_wordcount.py
 192.168.128.119:2181 test

 Error:
 java.lang.ClassNotFoundException: /usr/hdp/2.5.0.0-1245/kafka/li
 bs/kafka-streams-0.10.0.2.5.0.0-1245.jar

 **
 *Sincerely yours,*


 *Raymond*

>>>
>>


Re: care to share latest pom forspark scala applications eclipse?

2017-02-24 Thread Marco Mistroni
Hi
 i am using sbt to generate ecliipse project file
these are my dependencies
they 'll probably translate to some thing like this in mvn dependencies


these are same for all packages listed below
org.apache,spark
2.1.0


spark-core_2.11
spark-streaming_2.11spark-mllib_2.11
spark-sql_2.11
spark-streaming-flume-sink_2.11
spark-streaming-kafka-0-10_2.11

hth




On Fri, Feb 24, 2017 at 8:16 AM, nancy henry 
wrote:

> Hi Guys,
>
> Please one of you who is successfully able to bbuild maven packages in
> eclipse scala IDE please share your pom.xml
>
>
>
>


Error when trying to filter

2017-02-21 Thread Marco Mans
Hi!

I'm trying to execute this code:

StructField datetime = new StructField("DateTime", DataTypes.DateType,
true, Metadata.empty());
StructField tagname = new StructField("Tagname", DataTypes.StringType,
true, Metadata.empty());
StructField value = new StructField("Value", DataTypes.DoubleType,
false, Metadata.empty());
StructField quality = new StructField("Quality",
DataTypes.IntegerType, true, Metadata.empty());

StructType schema = new StructType(new StructField[]{datetime,
tagname, value, quality});

Dataset allData = spark.read().option("header",
"true").option("dateFormat", "-MM-dd
HH:mm:ss.SSS").option("comment",
"-").schema(schema).csv("/ingest/davis/landing");

allData = allData.filter((Row value1) -> {
// SOME COOL FILTER-CODE.
return true;
});

allData.show();

I get this error on the executors:

java.lang.NoSuchMethodError:
org.apache.commons.lang3.time.FastDateFormat.parse(Ljava/lang/String;)Ljava/util/Date;


I'm running spark 2.0.0.cloudera1


Does anyone know why this error occurs?


Regards,

Marco


Basic Grouping Question

2017-02-20 Thread Marco Mans
Hi!

I'm new to Spark and trying to write my first spark job on some data I have.
The data is in this (parquet) format:

Code,timestamp, value
A, 2017-01-01, 123
A, 2017-01-02, 124
A, 2017-01-03, 126
B, 2017-01-01, 127
B, 2017-01-02, 126
B, 2017-01-03, 123

I want to write a little map-reduce application that must be run on each
'code'.
So I would need to group the data on the 'code' column and than execute the
map and the reduce steps on each code; 2 times in this example, A and B.

But when I group the data (groupBy-function), it returns a
RelationalDatasetGroup. On this I cannot apply the map and reduce function.

I have the feeling that I am running in the wrong direction. Does anyone
know how to approach this? (I hope I explained it right, so it can be
understand :))

Regards,
Marco


Re: SSpark streaming: Could not initialize class kafka.consumer.FetchRequestAndResponseStatsRegistry$

2017-02-06 Thread Marco Mistroni
My bad! Confused myself with different build.sbt I tried in different
projects
Thx Cody for pointing that out(again)
Spark streaming Kafka was all I needed
Kr

On 6 Feb 2017 9:02 pm, "Cody Koeninger" <c...@koeninger.org> wrote:

> You should not need to include jars for Kafka, the spark connectors
> have the appropriate transitive dependency on the correct version.
>
> On Sat, Feb 4, 2017 at 3:25 PM, Marco Mistroni <mmistr...@gmail.com>
> wrote:
> > Hi
> >  not sure if this will help at all, and pls take it with a pinch of salt
> as
> > i dont have your setup and i am not running on a cluster
> >
> >  I have tried to run a kafka example which was originally workkign on
> spark
> > 1.6.1 on spark 2.
> > These are the jars i am using
> >
> > spark-streaming-kafka-0-10_2.11_2.0.1.jar
> >
> > kafka_2.11-0.10.1.1
> >
> >
> > And here's the code up to the creation of the Direct Stream. apparently
> with
> > the new version of kafka libs some properties have to be specified
> >
> >
> > import org.apache.spark.SparkConf
> > import org.apache.spark.streaming.{Seconds, StreamingContext}
> > import org.apache.spark.storage.StorageLevel
> >
> > import java.util.regex.Pattern
> > import java.util.regex.Matcher
> >
> > import Utilities._
> >
> > import org.apache.spark.streaming.kafka010.KafkaUtils
> > import kafka.serializer.StringDecoder
> > import
> > org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> > import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
> >
> > /** Working example of listening for log data from Kafka's testLogs
> topic on
> > port 9092. */
> > object KafkaExample {
> >
> >   def main(args: Array[String]) {
> >
> > // Create the context with a 1 second batch size
> > val ssc = new StreamingContext("local[*]", "KafkaExample",
> Seconds(1))
> >
> > setupLogging()
> >
> > // Construct a regular expression (regex) to extract fields from raw
> > Apache log lines
> > val pattern = apacheLogPattern()
> >
> > val kafkaParams = Map("metadata.broker.list" -> "localhost:9092",
> > "bootstrap.servers" -> "localhost:9092",
> > "key.deserializer"
> > ->"org.apache.kafka.common.serialization.StringDeserializer",
> > "value.deserializer"
> > ->"org.apache.kafka.common.serialization.StringDeserializer",
> > "group.id" -> "group1")
> > val topics = List("testLogs").toSet
> > val lines = KafkaUtils.createDirectStream[String, String](
> > ssc,
> > PreferConsistent,
> > Subscribe[String,
> > String](topics, kafkaParams)
> >   ).map(cr => cr.value())
> >
> > hth
> >
> >  marco
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com>
> > wrote:
> >>
> >> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark
> >> 1.5).
> >>
> >> Admittedly I am messing around with Spark-shell. However, I am surprised
> >> why this does not work with Spark 2 and is ok with CDH 5.1
> >>
> >> scala> val dstream = KafkaUtils.createDirectStream[String, String,
> >> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
> >>
> >> java.lang.NoClassDefFoundError: Could not initialize class
> >> kafka.consumer.FetchRequestAndResponseStatsRegistry$
> >>   at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster.connect(
> KafkaCluster.scala:52)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:345)
> >>   at
> >> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:342)
> >>   at
> >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.
> scala:33)
> >>   at scala.collection.mutable.WrappedArray.f

Re: SSpark streaming: Could not initialize class kafka.consumer.FetchRequestAndResponseStatsRegistry$

2017-02-04 Thread Marco Mistroni
Hi
 not sure if this will help at all, and pls take it with a pinch of salt as
i dont have your setup and i am not running on a cluster

 I have tried to run a kafka example which was originally workkign on spark
1.6.1 on spark 2.
These are the jars i am using

spark-streaming-kafka-0-10_2.11_2.0.1.jar
kafka_2.11-0.10.1.1


And here's the code up to the creation of the Direct Stream. apparently
with the new version of kafka libs some properties have to be specified


import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

import java.util.regex.Pattern
import java.util.regex.Matcher

import Utilities._

import org.apache.spark.streaming.kafka010.KafkaUtils
import kafka.serializer.StringDecoder
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/** Working example of listening for log data from Kafka's testLogs topic
on port 9092. */
object KafkaExample {

  def main(args: Array[String]) {

// Create the context with a 1 second batch size
val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))

setupLogging()

// Construct a regular expression (regex) to extract fields from raw
Apache log lines
val pattern = apacheLogPattern()

val kafkaParams = Map("metadata.broker.list" -> "localhost:9092",
"bootstrap.servers" -> "localhost:9092",
"key.deserializer"
->"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer"
->"org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> "group1")
val topics = List("testLogs").toSet
val lines = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
    Subscribe[String,
String](topics, kafkaParams)
  ).map(cr => cr.value())

hth

 marco












On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark
> 1.5).
>
> Admittedly I am messing around with Spark-shell. However, I am surprised
> why this does not work with Spark 2 and is ok with CDH 5.1
>
> scala> val dstream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>
> java.lang.NoClassDefFoundError: Could not initialize class kafka.consumer.
> FetchRequestAndResponseStatsRegistry$
>   at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>   at org.apache.spark.streaming.kafka.KafkaCluster.connect(
> KafkaCluster.scala:52)
>   at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:345)
>   at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(
> KafkaCluster.scala:342)
>   at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>   at org.apache.spark.streaming.kafka.KafkaCluster.org$apache$
> spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>   at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(
> KafkaCluster.scala:125)
>   at org.apache.spark.streaming.kafka.KafkaCluster.
> getPartitions(KafkaCluster.scala:112)
>   at org.apache.spark.streaming.kafka.KafkaUtils$.
> getFromOffsets(KafkaUtils.scala:211)
>   at org.apache.spark.streaming.kafka.KafkaUtils$.
> createDirectStream(KafkaUtils.scala:484)
>   ... 74 elided
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Re: Running a spark code on multiple machines using google cloud platform

2017-02-02 Thread Marco Mistroni
U can use EMR if u want to run. On a cluster
Kr

On 2 Feb 2017 12:30 pm, "Anahita Talebi"  wrote:

> Dear all,
>
> I am trying to run a spark code on multiple machines using submit job in
> google cloud platform.
> As the inputs of my code, I have a training and testing datasets.
>
> When I use small training data set like (10kb), the code can be
> successfully ran on the google cloud while when I have a large data set
> like 50Gb, I received the following error:
>
> 17/02/01 19:08:06 ERROR org.apache.spark.scheduler.LiveListenerBus: 
> SparkListenerBus has already stopped! Dropping event 
> SparkListenerTaskEnd(2,0,ResultTask,TaskKilled,org.apache.spark.scheduler.TaskInfo@3101f3b3,null)
>
> Does anyone can give me a hint how I can solve my problem?
>
> PS: I cannot use small training data set because I have an optimization code 
> which needs to use all the data.
>
> I have to use google could platform because I need to run the code on 
> multiple machines.
>
> Thanks a lot,
>
> Anahita
>
>


Re: Suprised!!!!!Spark-shell showing inconsistent results

2017-02-02 Thread Marco Mistroni
Hi
 Have u tried to sort the results before comparing?


On 2 Feb 2017 10:03 am, "Alex"  wrote:

> Hi As shown below same query when ran back to back showing inconsistent
> results..
>
> testtable1 is Avro Serde table...
>
> [image: Inline image 1]
>
>
>
>  hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res14: Array[org.apache.spark.sql.Row] = Array([1570,3364,201607,Y,APJ,
> PHILIPPINES,8518944,null,null,null,null,-15.992583,0.0,-15.
> 992583,null,null,MONTH_ITEM_GROUP])
>
> scala> hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res15: Array[org.apache.spark.sql.Row] = Array([1570,485888,20163,N,
> AMERICAS,BRAZIL,null,null,null,null,null,6019.2999,17198.0,6019.
> 2999,null,null,QUARTER_GROUP])
>
> scala> hc.sql("select * from testtable1 order by col1 limit 1").collect;
> res16: Array[org.apache.spark.sql.Row] = Array([1570,3930,201607,Y,APJ,INDIA
> SUB-CONTINENT,8741220,null,null,null,null,-208.485216,0.
> 0,-208.485216,null,null,MONTH_ITEM_GROUP])
>
>


Re: Hive Java UDF running on spark-sql issue

2017-02-01 Thread Marco Mistroni
Hi
 What is the UDF supposed to do? Are you trying to write a generic function
to convert values to another type depending on what is the type of the
original value?
Kr



On 1 Feb 2017 5:56 am, "Alex"  wrote:

Hi ,


we have Java Hive UDFS which are working perfectly fine in Hive

SO for Better performance we are migrating the same To Spark-sql

SO these jar files we are giving --jars argument to spark-sql
and defining temporary functions to make it to run on spark-sql

there is this particular Java UDF which is working fine on hive But when
ran on spark-sql it is giving the error

Caused by:org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.apache.hadoop.io.LongWritable
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.hadoop.io.Text
Caused by:org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: java.lang.Double cannot be cast to
org.apache.hadoop.hive.serde2.io.DoubleWritable

The piece of code where it is throwing the error is in teh switch case below

public String getName(int pos) {
if (pos < 0 && pos >= colnames.size())
return null;
return ((StructField) colnames.get(pos)).getFieldName();
}

public int getPos(String name) {
// System.out.println(name+transactionObject.toString());
Integer pos = (Integer) transactionObject.get(name.toLowerCase());
if (pos == null)
return -1;
return pos;
}

public Object get(Object name) {
int pos = getPos((String) name);
if (pos < 0)
return null;
String f = "string";
Object obj = list.get(pos);
if (obj == null)
return null;
ObjectInspector ins = ((StructField) colnames.get(pos)).
getFieldObjectInspector();
if (ins != null)
f = ins.getTypeName();
switch (f) {
case "double":
return ((DoubleWritable) obj).get();
case "bigint":
return ((Long) obj).get();
case "string":
return ((Text) obj).toString();
default:
return obj;
}
}

So I made the code change to below

public int getPos(String name) {
// System.out.println(name+transactionObject.toString());
Integer pos = (Integer) transactionObject.get(name.toLowerCase());
if (pos == null)
return -1;
return pos;
}

public Object get(Object name) {
int pos = getPos((String) name);
if (pos < 0)
return null;
String f = "string";
Object obj = list.get(pos);
Object result = null;
if (obj == null)
return null;
ObjectInspector ins = ((StructField) colnames.get(pos)).
getFieldObjectInspector();
if (ins != null)
f = ins.getTypeName();

PrimitiveObjectInspector ins2 = (PrimitiveObjectInspector) ins;
switch (ins2.getPrimitiveCategory()) {
case DOUBLE:

Double res = (Double)(((DoubleObjectInspector) ins2).get(obj));

result = (double) res;
System.out.println("printlog when double"+result);
return result;


case LONG:

Long res1 = (Long)(((LongObjectInspector) ins2).get(obj));
result = (long) res1;
System.out.println("printlog when long"+result);
return result;


case STRING:
result = (((StringObjectInspector) ins2).getPrimitiveJavaObject(
obj)).toString();
System.out.println("printlog when String"+result);
return result;

default:
result = obj;
return result;
}

}
After making This Changes .. The java hive udf started working fine on
Spark-sql

But it is giving different results when the UDF is used in the query..

If you think You can give it a shot solving this issue please reach me out
on hangouts or reply here


Kafka dependencies in Eclipse project /Pls assist

2017-01-31 Thread Marco Mistroni
HI all
  i am trying to run a sample spark code which reads streaming data from
Kafka
I Have followed instructions here

https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html


Here's my setup
Spark: 2.0.1
Kafka:0.10.1.1
Scala Version: 2.11



Libraries used
- spark-streaming-kafka-0.10_2.11-2.0.1
- kafka-_2.11-0.10.0.1.jar

These are my imports

import org.apache.spark.streaming.kafka010.KafkaUtils
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

But Eclipse is giving me the following errors:
Missing or invlaid dependency detected while loading class file
KafkaUtils.class. Could not access term clients in value org.apache.kafka
because it (or its dependencies) are missing.
Missing or invalid dependency detected while loading class file
KafkaUtils.class Could not access term kafka in package org.apache because
it (or its dependencies are missing)
missing or invalid dependencies detected while loading class file
KafkaUtils.class: could not access type ConsumerRecord in value
org.apache.consumer because it(or its dependencies are missing)

So it seems i have some dependencies clashing. Has any one encountered a
similar error?

kr
 marco


converting timestamp column to a java.util.Date

2017-01-23 Thread Marco Mistroni
HI all
  i am trying to convert a  string column, in a Dataframe , to a
java.util.Date but i am getting this exception

[dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo -
Removed broadcast_0_piece0 on 169.254.2.140:53468 in memory (size: 14.3 KB,
free: 767.4 MB)
Exception in thread "main" java.lang.UnsupportedOperationException: Schema
for type java.util.Date is not supported

here's my code

val tstampToDateFunc:(java.sql.Timestamp => java.util.Date) = ts => new
java.util.Date(ts.getTime)
val tsampConversionFunc = udf(tstampToDateFunc)

sharesDf.withColumn("price", col("_c2").cast("double"))
.withColumn("creationTime",
tsampConversionFunc(col("_c1")))

Are there any workarounds?
i m trying to import data into mongoDB via Spark. The source is a csv file
where
i have  1 timestamp column and a bunch of strings. i will need to
convert that
to something compatible with a mongo's ISODate

kr
 marco


Re: Spark vs MongoDB: saving DataFrame to db raises missing database name exception

2017-01-18 Thread Marco Mistroni
Thansk Palash, ur suggestion put me on the right track
Reading works fine, however it seems that in writng, as the sparkSession is
not involved, then the connector does not know where to write
had to replace my writing code with this

MongoSpark.save(df.write.option("spark.mongodb.output.uri",
"mongodb://localhost:27017/test.tree"))

kr
 marco



On Tue, Jan 17, 2017 at 7:53 AM, Marco Mistroni <mmistr...@gmail.com> wrote:

> Uh. Many thanksWill try it out
>
> On 17 Jan 2017 6:47 am, "Palash Gupta" <spline_pal...@yahoo.com> wrote:
>
>> Hi Marco,
>>
>> What is the user and password you are using for mongodb connection? Did
>> you enable authorization?
>>
>> Better to include user & pass in mongo url.
>>
>> I remember I tested with python successfully.
>>
>> Best Regards,
>> Palash
>>
>>
>> Sent from Yahoo Mail on Android
>> <https://overview.mail.yahoo.com/mobile/?.src=Android>
>>
>> On Tue, 17 Jan, 2017 at 5:37 am, Marco Mistroni
>> <mmistr...@gmail.com> wrote:
>> hi all
>>  i have the folllowign snippet which loads a dataframe from  a csv file
>> and tries to save
>> it to mongodb.
>> For some reason, the MongoSpark.save method raises the following exception
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Missing
>> database name. Set via the 'spark.mongodb.output.uri' or
>> 'spark.mongodb.output.database' property
>> at com.mongodb.spark.config.MongoCompanionConfig$class.database
>> Name(MongoCompanionConfig.scala:260)
>> at com.mongodb.spark.config.WriteConfig$.databaseName(WriteConf
>> ig.scala:36)
>>
>> Which is bizzarre as i m pretty sure i am setting all the necessary
>> properties in the SparkConf
>>
>> could you kindly assist?
>>
>> I am running Spark 2.0.1 locally with a local mongodb instance running at
>> 127.0.0.1:27017
>> I am using version 2.0.0 of mongo-spark-connector
>> I am running on Scala 2.11
>>
>> kr
>>
>> val spark = SparkSession
>>  .builder()
>>  .master("local")
>>  .appName("Spark Mongo Example")
>>  .getOrCreate()
>> spark.conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/
>> ")
>> spark.conf.set("spark.mongodb.output.uri", "mongodb://
>> 127.0.0.1:27017/")
>> spark.conf.set("spark.mongodb.output.database", "test")
>>
>> println(s"SparkPRoperties:${spark.conf.getAll}")
>>
>>
>> val df = getDataFrame(spark) // Loading any dataframe from a file
>>
>> df.printSchema()
>>
>> println(s"Head:${df.head()}")
>> println(s"Count:${df.count()}")
>> println("##  SAVING TO MONGODB #")
>> import com.mongodb.spark.config._
>>
>> import com.mongodb.spark.config._
>>
>> val writeConfig = WriteConfig(Map("collection" -> "spark",
>> "writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext)))
>> MongoSpark.save(df, writeConfig)
>>
>>
>>
>>


Re: Spark vs MongoDB: saving DataFrame to db raises missing database name exception

2017-01-16 Thread Marco Mistroni
Uh. Many thanksWill try it out

On 17 Jan 2017 6:47 am, "Palash Gupta" <spline_pal...@yahoo.com> wrote:

> Hi Marco,
>
> What is the user and password you are using for mongodb connection? Did
> you enable authorization?
>
> Better to include user & pass in mongo url.
>
> I remember I tested with python successfully.
>
> Best Regards,
> Palash
>
>
> Sent from Yahoo Mail on Android
> <https://overview.mail.yahoo.com/mobile/?.src=Android>
>
> On Tue, 17 Jan, 2017 at 5:37 am, Marco Mistroni
> <mmistr...@gmail.com> wrote:
> hi all
>  i have the folllowign snippet which loads a dataframe from  a csv file
> and tries to save
> it to mongodb.
> For some reason, the MongoSpark.save method raises the following exception
>
> Exception in thread "main" java.lang.IllegalArgumentException: Missing
> database name. Set via the 'spark.mongodb.output.uri' or
> 'spark.mongodb.output.database' property
> at com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(
> MongoCompanionConfig.scala:260)
> at com.mongodb.spark.config.WriteConfig$.databaseName(
> WriteConfig.scala:36)
>
> Which is bizzarre as i m pretty sure i am setting all the necessary
> properties in the SparkConf
>
> could you kindly assist?
>
> I am running Spark 2.0.1 locally with a local mongodb instance running at
> 127.0.0.1:27017
> I am using version 2.0.0 of mongo-spark-connector
> I am running on Scala 2.11
>
> kr
>
> val spark = SparkSession
>  .builder()
>  .master("local")
>  .appName("Spark Mongo Example")
>  .getOrCreate()
> spark.conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/
> ")
> spark.conf.set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/
> ")
> spark.conf.set("spark.mongodb.output.database", "test")
>
> println(s"SparkPRoperties:${spark.conf.getAll}")
>
>
> val df = getDataFrame(spark) // Loading any dataframe from a file
>
> df.printSchema()
>
> println(s"Head:${df.head()}")
> println(s"Count:${df.count()}")
> println("##  SAVING TO MONGODB #")
> import com.mongodb.spark.config._
>
> import com.mongodb.spark.config._
>
> val writeConfig = WriteConfig(Map("collection" -> "spark",
> "writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext)))
> MongoSpark.save(df, writeConfig)
>
>
>
>


Spark vs MongoDB: saving DataFrame to db raises missing database name exception

2017-01-16 Thread Marco Mistroni
hi all
 i have the folllowign snippet which loads a dataframe from  a csv file and
tries to save
it to mongodb.
For some reason, the MongoSpark.save method raises the following exception

Exception in thread "main" java.lang.IllegalArgumentException: Missing
database name. Set via the 'spark.mongodb.output.uri' or
'spark.mongodb.output.database' property
at
com.mongodb.spark.config.MongoCompanionConfig$class.databaseName(MongoCompanionConfig.scala:260)
at
com.mongodb.spark.config.WriteConfig$.databaseName(WriteConfig.scala:36)

Which is bizzarre as i m pretty sure i am setting all the necessary
properties in the SparkConf

could you kindly assist?

I am running Spark 2.0.1 locally with a local mongodb instance running at
127.0.0.1:27017
I am using version 2.0.0 of mongo-spark-connector
I am running on Scala 2.11

kr

val spark = SparkSession
 .builder()
 .master("local")
 .appName("Spark Mongo Example")
 .getOrCreate()
spark.conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/")
spark.conf.set("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/")
spark.conf.set("spark.mongodb.output.database", "test")

println(s"SparkPRoperties:${spark.conf.getAll}")


val df = getDataFrame(spark) // Loading any dataframe from a file

df.printSchema()

println(s"Head:${df.head()}")
println(s"Count:${df.count()}")
println("##  SAVING TO MONGODB #")
import com.mongodb.spark.config._

import com.mongodb.spark.config._

val writeConfig = WriteConfig(Map("collection" -> "spark",
"writeConcern.w" -> "majority"), Some(WriteConfig(spark.sparkContext)))
MongoSpark.save(df, writeConfig)


Re: Spark 2.0 vs MongoDb /Cannot find dependency using sbt

2017-01-16 Thread Marco Mistroni
sorry. should have done more research before jumping to the list
the version of the connector is 2.0.0, available from maven repors

sorry

On Mon, Jan 16, 2017 at 9:32 PM, Marco Mistroni <mmistr...@gmail.com> wrote:

> HI all
> in searching on how to use Spark 2.0 with mongo i came across this link
>
> https://jira.mongodb.org/browse/SPARK-20
>
> i amended my build.sbt (content below), however the mongodb dependency was
> not found
> Could anyone assist?
>
> kr
>  marco
>
> name := "SparkExamples"
> version := "1.0"
> scalaVersion := "2.11.8"
> val sparkVersion = "2.0.1"
>
> // Add a single dependency
> libraryDependencies += "junit" % "junit" % "4.8" % "test"
> libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
> "org.slf4j" % "slf4j-simple" % "1.7.5",
> "org.clapper" %% "grizzled-slf4j" % "1.0.2")
> libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
> sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-mllib"   % sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-streaming-flume-sink" %
> sparkVersion
> libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion
> libraryDependencies += "org.mongodb.spark" % "mongo-spark-connector_2.10"
> % "2.0.0-SNAPSHOT"
>
> resolvers += "MavenRepository" at "https://mvnrepository.com/;
>
>


Spark 2.0 vs MongoDb /Cannot find dependency using sbt

2017-01-16 Thread Marco Mistroni
HI all
in searching on how to use Spark 2.0 with mongo i came across this link

https://jira.mongodb.org/browse/SPARK-20

i amended my build.sbt (content below), however the mongodb dependency was
not found
Could anyone assist?

kr
 marco

name := "SparkExamples"
version := "1.0"
scalaVersion := "2.11.8"
val sparkVersion = "2.0.1"

// Add a single dependency
libraryDependencies += "junit" % "junit" % "4.8" % "test"
libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.5",
"org.slf4j" % "slf4j-simple" % "1.7.5",
"org.clapper" %% "grizzled-slf4j" % "1.0.2")
libraryDependencies += "org.apache.spark"%%"spark-core"   % sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-streaming"   %
sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-mllib"   % sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-streaming-flume-sink" %
sparkVersion
libraryDependencies += "org.apache.spark"%%"spark-sql"   % sparkVersion
libraryDependencies += "org.mongodb.spark" % "mongo-spark-connector_2.10" %
"2.0.0-SNAPSHOT"

resolvers += "MavenRepository" at "https://mvnrepository.com/;


Re: Importing a github project on sbt

2017-01-16 Thread Marco Mistroni
UhmNot a SPK issueAnyway...Had similar issues with sbt
The quick sol. To get u going is to place ur dependency in your lib folder
The notsoquick is to build the sbt dependency and do a sbt publish-local,
or deploy local
But I consider both approaches hacks.
Hth

On 16 Jan 2017 2:00 pm, "marcos rebelo"  wrote:

Hi all,

I have this project:
https://github.com/oleber/aws-stepfunctions


I have a second project that should import the first one. On the second
project I did something like:

lazy val awsStepFunctions = RootProject(uri("git://github.
com/oleber/aws-stepfunctions.git#31990fce907cbda3814954c390dcbc1b7807b2d5"))

lazy val importerWithStepFunction = project.in(file("modules/
importerWithStepFunction"))
  .settings(global: _*)
  .dependsOn(
awsStepFunctions % allScopes,
commonCommons % allScopes,
home24AWS % allScopes,
importerBing % allScopes
  )


and I get an error like:

[warn] ::
[warn] ::  UNRESOLVED DEPENDENCIES ::
[warn] ::
[warn] :: default#aws-stepfunctions_2.11;1.0: not found
[warn] ::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] default:aws-stepfunctions_2.11:1.0
[warn]   +- de.home24:importerwithstepfunction_2.11:0.1-SNAPSHOT


Clearly I'm missing something. Can you direct me to the solution or to
documentation? I will work something.

Best Regards
Marcos Rebelo


Re: Running Spark on EMR

2017-01-15 Thread Marco Mistroni
thanks Neil. I followed original suggestion from Andrw and everything is
working fine now
kr

On Sun, Jan 15, 2017 at 4:27 PM, Neil Jonkers <neilod...@gmail.com> wrote:

> Hello,
>
> Can you drop the url:
>
>  spark://master:7077
>
> The url is used when running Spark in standalone mode.
>
> Regards
>
>
> ---- Original message 
> From: Marco Mistroni
> Date:15/01/2017 16:34 (GMT+02:00)
> To: User
> Subject: Running Spark on EMR
>
> hi all
>  could anyone assist here?
> i am trying to run spark 2.0.0 on an EMR cluster,but i am having issues
> connecting to the master node
> So, below is a snippet of what i am doing
>
>
> sc = SparkSession.builder.master(sparkHost).appName("
> DataProcess").getOrCreate()
>
> sparkHost is passed as input parameter. that was thought so that i can run
> the script locally
> on my spark local instance as well as submitting scripts on any cluster i
> want
>
>
> Now i have
> 1 - setup a cluster on EMR.
> 2 - connected to masternode
> 3  - launch the command spark-submit myscripts.py spark://master:7077
>
> But that results in an connection refused exception
> Then i have tried to remove the .master call above and launch the script
> with the following command
>
> spark-submit --master spark://master:7077   myscript.py  but still i am
> getting
> connectionREfused exception
>
>
> I am using Spark 2.0.0 , could anyone advise on how shall i build the
> spark session and how can i submit a pythjon script to the cluster?
>
> kr
>  marco
>


Running Spark on EMR

2017-01-15 Thread Marco Mistroni
hi all
 could anyone assist here?
i am trying to run spark 2.0.0 on an EMR cluster,but i am having issues
connecting to the master node
So, below is a snippet of what i am doing


sc =
SparkSession.builder.master(sparkHost).appName("DataProcess").getOrCreate()

sparkHost is passed as input parameter. that was thought so that i can run
the script locally
on my spark local instance as well as submitting scripts on any cluster i
want


Now i have
1 - setup a cluster on EMR.
2 - connected to masternode
3  - launch the command spark-submit myscripts.py spark://master:7077

But that results in an connection refused exception
Then i have tried to remove the .master call above and launch the script
with the following command

spark-submit --master spark://master:7077   myscript.py  but still i am
getting
connectionREfused exception


I am using Spark 2.0.0 , could anyone advise on how shall i build the spark
session and how can i submit a pythjon script to the cluster?

kr
 marco


Re: Debugging a PythonException with no details

2017-01-14 Thread Marco Mistroni
It seems it has to do with UDF..Could u share snippet of code you are
running?
Kr

On 14 Jan 2017 1:40 am, "Nicholas Chammas" 
wrote:

> I’m looking for tips on how to debug a PythonException that’s very sparse
> on details. The full exception is below, but the only interesting bits
> appear to be the following lines:
>
> org.apache.spark.api.python.PythonException:
> ...
> py4j.protocol.Py4JError: An error occurred while calling 
> None.org.apache.spark.api.java.JavaSparkContext
>
> Otherwise, the only other clue from the traceback I can see is that the
> problem may involve a UDF somehow.
>
> I’ve tested this code against many datasets (stored as ORC) and it works
> fine. The same code only seems to throw this error on a few datasets that
> happen to be sourced via JDBC. I can’t seem to get a lead on what might be
> going wrong here.
>
> Does anyone have tips on how to debug a problem like this? How do I find
> more specifically what is going wrong?
>
> Nick
>
> Here’s the full exception:
>
> 17/01/13 17:12:14 WARN TaskSetManager: Lost task 7.0 in stage 9.0 (TID 15, 
> devlx023.private.massmutual.com, executor 4): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 
> 161, in main
> func, profiler, deserializer, serializer = read_udfs(pickleSer, infile)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 97, 
> in read_udfs
> arg_offsets, udf = read_single_udf(pickleSer, infile)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 78, 
> in read_single_udf
> f, return_type = read_command(pickleSer, infile)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/worker.py", line 54, 
> in read_command
> command = serializer._read_with_length(file)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 169, in _read_with_length
> return self.loads(obj)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/serializers.py", 
> line 431, in loads
> return pickle.loads(obj, encoding=encoding)
>   File 
> "/hadoop/yarn/nm/usercache/jenkins/appcache/application_1483203887152_1207/container_1483203887152_1207_01_05/splinkr/person.py",
>  line 111, in 
> py_normalize_udf = udf(py_normalize, StringType())
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/sql/functions.py", 
> line 1868, in udf
> return UserDefinedFunction(f, returnType)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/sql/functions.py", 
> line 1826, in __init__
> self._judf = self._create_judf(name)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/sql/functions.py", 
> line 1830, in _create_judf
> sc = SparkContext.getOrCreate()
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 307, in getOrCreate
> SparkContext(conf=conf or SparkConf())
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 118, in __init__
> conf, jsc, profiler_cls)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 179, in _do_init
> self._jsc = jsc or self._initialize_context(self._conf._jconf)
>   File "/hadoop/spark/2.1/python/lib/pyspark.zip/pyspark/context.py", line 
> 246, in _initialize_context
> return self._jvm.JavaSparkContext(jconf)
>   File 
> "/hadoop/spark/2.1/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 
> 1401, in __call__
> answer, self._gateway_client, None, self._fqn)
>   File "/hadoop/spark/2.1/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", 
> line 327, in get_return_value
> format(target_id, ".", name))
> py4j.protocol.Py4JError: An error occurred while calling 
> None.org.apache.spark.api.java.JavaSparkContext
>
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
> at 
> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:234)
> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
> at 
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:144)
> at 
> org.apache.spark.sql.execution.python.BatchEvalPythonExec$$anonfun$doExecute$1.apply(BatchEvalPythonExec.scala:87)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
> at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:796)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at 
> 

Re: backward compatibility

2017-01-10 Thread Marco Mistroni
I think old APIs are still supported but u r advised to migrate
I migrated few apps from 1.6 to 2.0 with minimal changes
Hth

On 10 Jan 2017 4:14 pm, "pradeepbill"  wrote:

> hi there, I am using spark 1.4 code and now we plan to move to spark 2.0,
> and
> when I check the documentation below, there are only a few features
> backward
> compatible, does that mean I have change most of my code , please advice.
>
> One of the largest changes in Spark 2.0 is the new updated APIs:
>
> Unifying DataFrame and Dataset: In Scala and Java, DataFrame and Dataset
> have been unified, i.e. DataFrame is just a type alias for Dataset of Row.
> In Python and R, given the lack of type safety, DataFrame is the main
> programming interface.
> *SparkSession: new entry point that replaces the old SQLContext and
> HiveContext for DataFrame and Dataset APIs. SQLContext and HiveContext are
> kept for backward compatibility.*
> A new, streamlined configuration API for SparkSession
> Simpler, more performant accumulator API
> A new, improved Aggregator API for typed aggregation in Datasets
>
>
> thanks
> Pradeep
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/backward-compatibility-tp28296.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark ML's RandomForestClassifier OOM

2017-01-10 Thread Marco Mistroni
You running locally? Found exactly same issue.
2 solutions:
_ reduce datA size.
_ run on EMR
Hth

On 10 Jan 2017 10:07 am, "Julio Antonio Soto"  wrote:

> Hi,
>
> I am running into OOM problems while training a Spark ML
> RandomForestClassifier (maxDepth of 30, 32 maxBins, 100 trees).
>
> My dataset is arguably pretty big given the executor count and size
> (8x5G), with approximately 20M rows and 130 features.
>
> The "fun fact" is that a single DecisionTreeClassifier with the same specs
> (same maxDepth and maxBins) is able to train without problems in a couple
> of minutes.
>
> AFAIK the current random forest implementation grows each tree
> sequentially, which means that DecisionTreeClassifiers are fit one by one,
> and therefore the training process should be similar in terms of memory
> consumption. Am I missing something here?
>
> Thanks
> Julio
>


Re: Spark Python in Jupyter Notebook

2017-01-05 Thread Marco Mistroni
Hi
   might be off topic, but databricks has a web application in whicn you
can use spark with jupyter. have a look at
https://community.cloud.databricks.com

kr

On Thu, Jan 5, 2017 at 7:53 PM, Jon G  wrote:

> I don't use MapR but I use pyspark with jupyter, and this MapR blogpost
> looks similar to what I do to setup:
>
> https://community.mapr.com/docs/DOC-1874-how-to-use-
> jupyter-pyspark-on-mapr
>
>
> On Thu, Jan 5, 2017 at 3:05 AM, neil90  wrote:
>
>> Assuming you don't have your environment variables setup in your
>> .bash_profile you would do it like this -
>>
>> import os
>> import sys
>>
>> spark_home = '/usr/local/spark'
>> sys.path.insert(0, spark_home + "/python")
>> sys.path.insert(0, os.path.join(spark_home,
>> 'python/lib/py4j-0.10.1-src.zip'))
>> #os.environ['PYSPARK_SUBMIT_ARGS'] = """--master spark://
>> 54.68.147.137:7077
>> pyspark-shell""" < where you can pass commands you would pass in
>> launching pyspark directly from command line
>>
>> from pyspark import SparkContext, SparkConf
>> from pyspark.sql import SparkSession
>>
>> conf = SparkConf()\
>> .setMaster("local[8]")\
>> .setAppName("Test")
>>
>> sc = SparkContext(conf=conf)
>>
>> spark = SparkSession.builder\
>> .config(conf=sc.getConf())\
>> .enableHiveSupport()\
>> .getOrCreate()
>>
>> Mind you this is for spark 2.0 and above
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Spark-Python-in-Jupyter-Notebook-tp28268p28274.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2017-01-05 Thread Marco Mistroni
Hi
 If it only happens when u run 2 app at same time could it be that these 2
apps somehow run on same host?
Kr

On 5 Jan 2017 9:00 am, "Palash Gupta" <spline_pal...@yahoo.com> wrote:

> Hi Marco and respected member,
>
> I have done all the possible things suggested by Forum but still I'm
> having same issue:
>
> 1. I will migrate my applications to production environment where I will
> have more resources
> Palash>> I migrated my application in production where I have more CPU
> Cores, Memory & total 7 host in spark cluster.
> 2. Use Spark 2.0.0 function to load CSV rather using databrics api
> Palash>> Earlier I'm using databricks csv api with Spark 2.0.0. As
> suggested by one of the mate, Now I'm using spark 2.0.0 built in csv loader.
> 3. In production I will run multiple spark application at a time and try
> to reproduce this error for both file system and HDFS loading cas
> Palash>> yes I reproduced and it only happen when two spark application
> run at a time. Please see the logs:
>
> 17/01/05 01:50:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 0.0 (TID 0, 10.15.187.79): java.io.IOException: org.apache.spa
> rk.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1260)
> at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(
> TorrentBroadcast.scala:174)
> at org.apache.spark.broadcast.TorrentBroadcast._value$
> lzycompute(TorrentBroadcast.scala:65)
> at org.apache.spark.broadcast.TorrentBroadcast._value(
> TorrentBroadcast.scala:65)
> at org.apache.spark.broadcast.TorrentBroadcast.getValue(
> TorrentBroadcast.scala:89)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.
> scala:67)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_1_piece0 of broadcast_1
> at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$
> apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$s
> p(TorrentBroadcast.scala:146)
> at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$
> apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(Torren
> tBroadcast.scala:125)
> at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$
> apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(Torren
> tBroadcast.scala:125)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.apache.spark.broadcast.TorrentBroadcast.org$apache$
> spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:
> 125)
> at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$
> readBroadcastBlock$1.apply(TorrentBroadcast.scala:186)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1253)
> ... 11 more
>
> 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.1 in
> stage 0.0 (TID 1, 10.15.187.78, partition 0, ANY, 7305 bytes)
> 17/01/05 01:50:15 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint:
> Launching task 1 on executor id: 1 hostname: 10.15.187.78
> .
> 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.1 in stage
> 0.0 (TID 1) on executor 10.15.187.78: java.io.IOException (org
> .apache.spark.SparkException: Failed to get broadcast_1_piece0 of
> broadcast_1) [duplicate 1]
> 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.2 in
> stage 0.0 (TID 2, 10.15.187.78, partition 0, ANY, 7305 bytes)
> 17/01/05 01:50:15 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint:
> Launching task 2 on executor id: 1 hostname: 10.15.187.78
> .
> 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Lost task 0.2 in stage
> 0.0 (TID 2) on executor 10.15.187.78: java.io.IOException (org
> .apache.spark.SparkException: Failed to get broadcast_1_piece0 of
> broadcast_1) [duplicate 2]
> 17/01/05 01:50:15 INFO scheduler.TaskSetManager: Starting task 0.3 in
> stage 0.0 (TID 3, 10.15.187.76, partition 0, ANY, 7305 bytes)
> 17/01/05 01:50:15 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint:
> Launching task 3 on executor id: 6 hostname: 10.15.187.76
> .
> 17/01/05 01:50:16 INFO scheduler.TaskSetManager: Lost task 0.3 in stage
> 0.0 (TID 3) on executor 10.15.1

Re: Re: Re: Spark Streaming prediction

2017-01-03 Thread Marco Mistroni
Hi
 ok then my suggestion stays.Check out ML
you can train your ML model on past data (let's say, either yesteday or
past x days) to have Spark find out what is the relation betwen the value
you have at T-zero and the value you have at T+n hours and you can try ml
outside your. Streaming app by gathering data for x days , feed it to your
model and see results
Hth

On Mon, Jan 2, 2017 at 9:51 PM, Daniela S <daniela_4...@gmx.at> wrote:

> Dear Marco
>
> No problem, thank you very much for your help!
> Yes, that is correct. I always know the minute values for the next e.g.
> 180 minutes (may vary between the different devices) and I want to predict
> the values for the next 24 hours (one value per minute). So as long as
> I know the values (e.g. 180 minutes) I would of course like to use these
> values and the missing ones to get values for the next 24 hours (one value
> per minute) should be predicted.
>
> Thank you in advance.
>
> Regards,
> Daniela
>
> *Gesendet:* Montag, 02. Januar 2017 um 22:30 Uhr
> *Von:* "Marco Mistroni" <mmistr...@gmail.com>
> *An:* "Daniela S" <daniela_4...@gmx.at>
> *Cc:* User <user@spark.apache.org>
> *Betreff:* Re: Re: Spark Streaming prediction
> Apologies, perhaps i misunderstood your usecase.
> My assumption was that you have 2-3 hours worth fo data and you want to
> know the values for the next 24 based on the values you already have, that
> is why i suggested  the ML path.
> If that is not the case please ignore everything i said..
>
> so, let's take the simple case where you have only 1 device
> So every event contains the minute value of that device for the next 180
> mins. So at any point in time you only  have visibility of the next 180
> minutes, correct?
> Now do you want to predict what the value will be for the next 24 hrs, or
> do you  just want to accumulate data worth of 24 hrs and display it in the
> dashboard?
> or is it something else?
>
> for dashboard update, i guess you either
> - poll 'a  database' (where you store the compuation of your spark logic )
> periodically
> - propagate events from your spark streaming application to your dashboard
> somewhere (via actors/ JMS or whatever mechanism)
>
> kr
>  marco
>
>
>
>
>
>
>
>
>
> On Mon, Jan 2, 2017 at 8:26 PM, Daniela S <daniela_4...@gmx.at> wrote:
>>
>> Hi
>>
>> Thank you very much for your answer!
>>
>> My problem is that I know the values for the next 2-3 hours in advance
>> but i do not know the values from hour 2 or 3 to hour 24. How is it
>> possible to combine the known values with the predicted values as both are
>> values in the future? And how can i ensure that there are always 1440
>> values?
>> And I do not know how to map the values for 1440 minutes to a specific
>> time on the dashboard (e.g. how does the dashboard know that the value for
>> minute 300 maps to time 15:05?
>>
>> Thank you in advance.
>>
>> Best regards,
>> Daniela
>>
>>
>>
>> *Gesendet:* Montag, 02. Januar 2017 um 21:07 Uhr
>> *Von:* "Marco Mistroni" <mmistr...@gmail.com>
>> *An:* "Daniela S" <daniela_4...@gmx.at>
>> *Cc:* User <user@spark.apache.org>
>> *Betreff:* Re: Spark Streaming prediction
>> Hi
>>  you  might want to have a look at the Regression ML  algorithm and
>> integrate it in your SparkStreaming application, i m sure someone on the
>> list has  a similar use case
>> shortly, you'd want to process all your events and feed it through a ML
>> model which,based on your inputs will predict output
>> You say that your events predict minutes values for next 2-3 hrs...
>> gather data for a day and train ur model based on that. Then save it
>> somewhere and have your streaming app load the module and have the module
>> do the predictions based on incoming events from your streaming app.
>> Save the results somewhere and have your dashboard poll periodically your
>> data store to read the predictions
>> I have seen ppl on the list doing ML over a Spark streaming app, i m sure
>> someone can reply back
>> Hpefully i gave u a starting point
>>
>> hth
>>  marco
>>
>> On 2 Jan 2017 4:03 pm, "Daniela S" <daniela_4...@gmx.at> wrote:
>>>
>>> Hi
>>>
>>> I am trying to solve the following problem with Spark Streaming.
>>> I receive timestamped events from Kafka. Each event refers to a device
>>> and contains values for every minute of the next 2 to 3 hours. What I would
>>> like to do is to predict the minute values for the next 2

Re: Re: Spark Streaming prediction

2017-01-02 Thread Marco Mistroni
Apologies, perhaps i misunderstood your usecase.
My assumption was that you have 2-3 hours worth fo data and you want to
know the values for the next 24 based on the values you already have, that
is why i suggested  the ML path.
If that is not the case please ignore everything i said..

so, let's take the simple case where you have only 1 device
So every event contains the minute value of that device for the next 180
mins. So at any point in time you only  have visibility of the next 180
minutes, correct?
Now do you want to predict what the value will be for the next 24 hrs, or
do you  just want to accumulate data worth of 24 hrs and display it in the
dashboard?
or is it something else?

for dashboard update, i guess you either
- poll 'a  database' (where you store the compuation of your spark logic )
periodically
- propagate events from your spark streaming application to your dashboard
somewhere (via actors/ JMS or whatever mechanism)

kr
 marco









On Mon, Jan 2, 2017 at 8:26 PM, Daniela S <daniela_4...@gmx.at> wrote:

> Hi
>
> Thank you very much for your answer!
>
> My problem is that I know the values for the next 2-3 hours in advance but
> i do not know the values from hour 2 or 3 to hour 24. How is it possible to
> combine the known values with the predicted values as both are values in
> the future? And how can i ensure that there are always 1440 values?
> And I do not know how to map the values for 1440 minutes to a specific
> time on the dashboard (e.g. how does the dashboard know that the value for
> minute 300 maps to time 15:05?
>
> Thank you in advance.
>
> Best regards,
> Daniela
>
>
>
> *Gesendet:* Montag, 02. Januar 2017 um 21:07 Uhr
> *Von:* "Marco Mistroni" <mmistr...@gmail.com>
> *An:* "Daniela S" <daniela_4...@gmx.at>
> *Cc:* User <user@spark.apache.org>
> *Betreff:* Re: Spark Streaming prediction
> Hi
>  you  might want to have a look at the Regression ML  algorithm and
> integrate it in your SparkStreaming application, i m sure someone on the
> list has  a similar use case
> shortly, you'd want to process all your events and feed it through a ML
> model which,based on your inputs will predict output
> You say that your events predict minutes values for next 2-3 hrs... gather
> data for a day and train ur model based on that. Then save it somewhere and
> have your streaming app load the module and have the module do the
> predictions based on incoming events from your streaming app.
> Save the results somewhere and have your dashboard poll periodically your
> data store to read the predictions
> I have seen ppl on the list doing ML over a Spark streaming app, i m sure
> someone can reply back
> Hpefully i gave u a starting point
>
> hth
>  marco
>
> On 2 Jan 2017 4:03 pm, "Daniela S" <daniela_4...@gmx.at> wrote:
>>
>> Hi
>>
>> I am trying to solve the following problem with Spark Streaming.
>> I receive timestamped events from Kafka. Each event refers to a device
>> and contains values for every minute of the next 2 to 3 hours. What I would
>> like to do is to predict the minute values for the next 24 hours. So I
>> would like to use the known values and to predict the other values to
>> achieve the 24 hours prediction. My thought was to use arrays with a length
>> of 1440 (1440 minutes = 24 hours). One for the known values and one for the
>> predicted values for each device. Then I would like to show the next 24
>> hours on a dashboard. The dashboard should be updated automatically in
>> realtime.
>>
>> My questions:
>> is this a possible solution?
>> how is it possible to combine known future values and predicted values?
>> how should I treat the timestamp as the length of 1440 does not
>> correspond to a timestamp?
>> how is it possible to update the dashboard automatically in realtime?
>>
>> Thank you in advance!
>>
>> Best regards,
>> Daniela
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Spark Streaming prediction

2017-01-02 Thread Marco Mistroni
Hi
 you  might want to have a look at the Regression ML  algorithm and
integrate it in your SparkStreaming application, i m sure someone on the
list has  a similar use case
shortly, you'd want to process all your events and feed it through a ML
model which,based on your inputs will predict output
You say that your events predict minutes values for next 2-3 hrs... gather
data for a day and train ur model based on that. Then save it somewhere and
have your streaming app load the module and have the module do the
predictions based on incoming events from your streaming app.
Save the results somewhere and have your dashboard poll periodically your
data store to read the predictions
I have seen ppl on the list doing ML over a Spark streaming app, i m sure
someone can reply back
Hpefully i gave u a starting point

hth
 marco

On 2 Jan 2017 4:03 pm, "Daniela S" <daniela_4...@gmx.at> wrote:

> Hi
>
> I am trying to solve the following problem with Spark Streaming.
> I receive timestamped events from Kafka. Each event refers to a device and
> contains values for every minute of the next 2 to 3 hours. What I would
> like to do is to predict the minute values for the next 24 hours. So I
> would like to use the known values and to predict the other values to
> achieve the 24 hours prediction. My thought was to use arrays with a length
> of 1440 (1440 minutes = 24 hours). One for the known values and one for the
> predicted values for each device. Then I would like to show the next 24
> hours on a dashboard. The dashboard should be updated automatically in
> realtime.
>
> My questions:
> is this a possible solution?
> how is it possible to combine known future values and predicted values?
> how should I treat the timestamp as the length of 1440 does not correspond
> to a timestamp?
> how is it possible to update the dashboard automatically in realtime?
>
> Thank you in advance!
>
> Best regards,
> Daniela
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Error when loading json to spark

2017-01-01 Thread Marco Mistroni
Hi
   you will need to pass the schema, like in the snippet below (even though
the code might have been superseeded in spark 2.0)

import sqlContext.implicits._
val jsonRdd = sc.textFile("file:///c:/tmp/1973-01-11.json")
val schema = (new StructType).add("hour", StringType).add("month",
StringType)
  .add("second", StringType).add("year", StringType)
  .add("timezone", StringType).add("day", StringType)
  .add("minute", StringType)
val jsonContentWithSchema = sqlContext.jsonRDD(jsonRdd, schema)

But somehow i seem to remember that there was a way , in Spark 2.0, so that
Spark will infer the schema  for you..

hth
marco





On Sun, Jan 1, 2017 at 12:40 PM, Raymond Xie <xie3208...@gmail.com> wrote:

> I found the cause:
>
> I need to "put" the json file onto hdfs first before it can be used, here
> is what I did:
>
> hdfs dfs -put  /root/Downloads/data/json/world_bank.json
> hdfs://localhost:9000/json
> df = sqlContext.read.json("/json/")
> df.show(10)
>
> .
>
> However, there is a new problem here, the json data needs to be sort of
> treaked before it can be really used, simply using df =
> sqlContext.read.json("/json/") just makes the df messy, I need the df know
> the fields in the json file.
>
> How?
>
> Thank you.
>
>
>
>
> **
> *Sincerely yours,*
>
>
> *Raymond*
>
> On Sat, Dec 31, 2016 at 11:52 PM, Miguel Morales <therevolti...@gmail.com>
> wrote:
>
>> Looks like it's trying to treat that path as a folder, try omitting
>> the file name and just use the folder path.
>>
>> On Sat, Dec 31, 2016 at 7:58 PM, Raymond Xie <xie3208...@gmail.com>
>> wrote:
>> > Happy new year!!!
>> >
>> > I am trying to load a json file into spark, the json file is attached
>> here.
>> >
>> > I received the following error, can anyone help me to fix it? Thank you
>> very
>> > much. I am using Spark 1.6.2 and python 2.7.5
>> >
>> >>>> from pyspark.sql import SQLContext
>> >>>> sqlContext = SQLContext(sc)
>> >>>> df = sqlContext.read.json("/root/Downloads/data/json/world_bank.
>> json")
>> > 16/12/31 22:54:53 INFO json.JSONRelation: Listing
>> > hdfs://localhost:9000/root/Downloads/data/json/world_bank.json on
>> driver
>> > 16/12/31 22:54:54 INFO storage.MemoryStore: Block broadcast_0 stored as
>> > values in memory (estimated size 212.4 KB, free 212.4 KB)
>> > 16/12/31 22:54:54 INFO storage.MemoryStore: Block broadcast_0_piece0
>> stored
>> > as bytes in memory (estimated size 19.6 KB, free 232.0 KB)
>> > 16/12/31 22:54:54 INFO storage.BlockManagerInfo: Added
>> broadcast_0_piece0 in
>> > memory on localhost:39844 (size: 19.6 KB, free: 511.1 MB)
>> > 16/12/31 22:54:54 INFO spark.SparkContext: Created broadcast 0 from
>> json at
>> > NativeMethodAccessorImpl.java:-2
>> > Traceback (most recent call last):
>> >   File "", line 1, in 
>> >   File "/opt/spark/python/pyspark/sql/readwriter.py", line 176, in json
>> > return self._df(self._jreader.json(path))
>> >   File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py",
>> line
>> > 813, in __call__
>> >   File "/opt/spark/python/pyspark/sql/utils.py", line 45, in deco
>> > return f(*a, **kw)
>> >   File "/opt/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line
>> 308,
>> > in get_return_value
>> > py4j.protocol.Py4JJavaError: An error occurred while calling o19.json.
>> > : java.io.IOException: No input paths specified in job
>> > at
>> > org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInpu
>> tFormat.java:201)
>> > at
>> > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInput
>> Format.java:313)
>> > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
>> > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> > at scala.Option.getOrElse(Option.scala:120)
>> > at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>> > at
>> > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapParti
>> tionsRDD.scala:35)
>> > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
&g

Re: [ML] Converting ml.DenseVector to mllib.Vector

2016-12-31 Thread Marco Mistroni
Hi.
you have a DataFrame.. there should be either a way to
- convert a DF to a Vector without doing a cast
- use a ML library which relies to DataFrames only

I can see that your code is still importing libraries from two different
'machine learning ' packages

import org.apache.spark.ml.feature.{MinMaxScaler, Normalizer,
StandardScaler, VectorAssembler}
import org.apache.spark.mllib.linalg.{DenseVector, Vector, Vectors}

You should be able to find exactly same data  structures that you had in
mllib  under the ml package.i'd advise to stick to ml libaries only,
that will avoid confusion

i concur with you, this line looks dodgy to me

val rddVec = dfScaled
.select("scaled_features")
.rdd
.map(_(0)
.asInstanceOf[org.apache.spark.mllib.linalg.Vector])

converting a DF to a Vector is not as simple as doing a cast (like you
would do in Java)

I did a random search and found this, mayb it'll help

https://community.hortonworks.com/questions/33375/how-to-convert-a-dataframe-to-a-vectordense-in-sca.html




hth
 marco



On Sat, Dec 31, 2016 at 4:24 AM, Jason Wolosonovich <jmwol...@asu.edu>
wrote:

> Hello All,
>
> I'm working through the Data Science with Scala course on Big Data
> University and it is not updated to work with Spark 2.0, so I'm adapting
> the code as I work through it, however I've finally run into something that
> is over my head. I'm new to Scala as well.
>
> When I run this code (https://gist.github.com/jmwol
> oso/a715cc4d7f1e7cc7951fab4edf6218b1) I get the following error:
>
> `java.lang.ClassCastException: org.apache.spark.ml.linalg.DenseVector
> cannot be cast to org.apache.spark.mllib.linalg.Vector`
>
> I believe this is occurring at line 107 of the gist above. The code
> starting at this line (and continuing to the end of the gist) is the
> current code in the course.
>
> If I try to map to any other class type, then I have problems with the
> `Statistics.corr(rddVec)`.
>
> How can I convert `rddVec` from an `ml.linalg.DenseVector` into an
> `mllib.linalg.Vector` for use with `Statistics`?
>
> Thanks!
>
> -Jason
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-30 Thread Marco Mistroni
Hi Palash

so you have a pyspark application running on spark 2.0
You have python scripts dropping files on HDFS
then you have two spark job
- 1 load expected hour data (pls explain. HOw many files on average)
- 1 load delayed data(pls explain. how many files on average)

Do these scripts run continuously (they have a while loop) or you kick them
off  via a job scheduler on an hourly basis
Do these scripts run on a cluster?


So, at T1 in HDFS there are 3 csv files. Your job starts up and load all 3
of them, does aggregation etc then populate mongo
At T+1 hour, in HDFS there are now 5 files (the previous 3 plus 2
additonal. Presumably these files are not deleted). So your job now loads 5
files, does aggregation and store data in mongodb? Or does your job at T+1
only loads deltas (the two new csv files which appeared at T+1)?

You said before that simply parsing csv files via spark in a standalone app
works fine. Then what you can try is to do exactly the same processig you
are doing but instead of loading csv files from HDFS you can load from
local directory and see if the problem persists..(this just to exclude
any issues with loading HDFS data.)

hth
   Marco












On Fri, Dec 30, 2016 at 2:02 PM, Palash Gupta <spline_pal...@yahoo.com>
wrote:

> Hi Marco & Ayan,
>
> I have now clearer idea about what Marco means by Reduce. I will do it to
> dig down.
>
> Let me answer to your queries:
>
> hen you see the broadcast errors, does your job terminate?
> Palash>> Yes it terminated the app.
>
> Or are you assuming that something is wrong just because you see the
> message in the logs?
>
> Palash>> No it terminated for the very first step of Spark processing (in
> my case loading csv from hdfs)
>
> Plus...Wrt logicWho writes the CSV? With what frequency?
> Palash>> We parsed xml files using python (not in spark scope) & make csv
> and put in hdfs
>
> Does it app run all the time loading CSV from hadoop?
>
> Palash>> Every hour two separate pyspark app are running
> 1. Loading current expected hour data, prepare kpi, do aggregation, load
> in mongodb
> 2. Same operation will run for delayed hour data
>
>
> Are you using spark streaming?
> Palash>> No
>
> Does it app run fine with an older version of spark (1.6 )
> Palash>> I didn't test with Spark 1.6. My app is running now good as I
> stopped second app (delayed data loading) since last two days. Even most of
> the case both are running well except few times...
>
>
> Sent from Yahoo Mail on Android
> <https://overview.mail.yahoo.com/mobile/?.src=Android>
>
> On Fri, 30 Dec, 2016 at 4:57 pm, Marco Mistroni
> <mmistr...@gmail.com> wrote:
> Correct. I mean reduce the functionality.
> Uhm I realised I didn't ask u a fundamental question. When you see the
> broadcast errors, does your job terminate? Or are you assuming that
> something is wrong just because you see the message in the logs?
> Plus...Wrt logicWho writes the CSV? With what frequency?
> Does it app run all the time loading CSV from hadoop?
> Are you using spark streaming?
> Does it app run fine with an older version of spark (1.6 )
> Hth
>
> On 30 Dec 2016 12:44 pm, "ayan guha" <guha.a...@gmail.com> wrote:
>
>> @Palash: I think what Macro meant by "reduce functionality" is to reduce
>> scope of your application's functionality so that you can isolate the issue
>> in certain part(s) of the app...I do not think he meant "reduce" operation
>> :)
>>
>> On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta <spline_pal...@yahoo.com.
>> invalid> wrote:
>>
>>> Hi Marco,
>>>
>>> All of your suggestions are highly appreciated, whatever you said so
>>> far. I would apply to implement in my code and let you know.
>>>
>>> Let me answer your query:
>>>
>>> What does your program do?
>>> Palash>> In each hour I am loading many CSV files and then I'm making
>>> some KPI(s) out of them. Finally I am doing some aggregation and inserting
>>> into mongodb from spark.
>>>
>>> you say it runs for 2-3 hours, what is the logic? just processing a huge
>>> amount of data? doing ML ?
>>> Palash>> Yes you are right whatever I'm processing it should not take
>>> much time. Initially my processing was taking only 5 minutes as I was using
>>> all cores running only one application. When I created more separate spark
>>> applications for handling delayed data loading and implementing more use
>>> cases with parallel run, I started facing the error randomly. And due to
>>> separate resource distribution among four parallel spark

Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-30 Thread Marco Mistroni
Correct. I mean reduce the functionality.
Uhm I realised I didn't ask u a fundamental question. When you see the
broadcast errors, does your job terminate? Or are you assuming that
something is wrong just because you see the message in the logs?
Plus...Wrt logicWho writes the CSV? With what frequency?
Does it app run all the time loading CSV from hadoop?
Are you using spark streaming?
Does it app run fine with an older version of spark (1.6 )
Hth

On 30 Dec 2016 12:44 pm, "ayan guha" <guha.a...@gmail.com> wrote:

> @Palash: I think what Macro meant by "reduce functionality" is to reduce
> scope of your application's functionality so that you can isolate the issue
> in certain part(s) of the app...I do not think he meant "reduce" operation
> :)
>
> On Fri, Dec 30, 2016 at 9:26 PM, Palash Gupta <spline_pal...@yahoo.com.
> invalid> wrote:
>
>> Hi Marco,
>>
>> All of your suggestions are highly appreciated, whatever you said so far.
>> I would apply to implement in my code and let you know.
>>
>> Let me answer your query:
>>
>> What does your program do?
>> Palash>> In each hour I am loading many CSV files and then I'm making
>> some KPI(s) out of them. Finally I am doing some aggregation and inserting
>> into mongodb from spark.
>>
>> you say it runs for 2-3 hours, what is the logic? just processing a huge
>> amount of data? doing ML ?
>> Palash>> Yes you are right whatever I'm processing it should not take
>> much time. Initially my processing was taking only 5 minutes as I was using
>> all cores running only one application. When I created more separate spark
>> applications for handling delayed data loading and implementing more use
>> cases with parallel run, I started facing the error randomly. And due to
>> separate resource distribution among four parallel spark application to run
>> in parallel now some task is taking longer time than usual. But still it
>> should not take 2-3 hours time...
>>
>> Currently whole applications are running in a development environment
>> where we have only two VM cluster and I will migrate to production platform
>> by next week. I will let you know if there is any improvement over there.
>>
>> I'd say break down your application..  reduce functionality , run and see
>> outcome. then add more functionality, run and see again.
>>
>> Palash>> Macro as I'm not very good in Spark. It would be helpful for me
>> if you provide some example of reduce functionality. Cause I'm using Spark
>> data frame, join data frames, use SQL statement to manipulate KPI(s). Here
>> How could I apply reduce functionality?
>>
>>
>>
>> Thanks & Best Regards,
>> Palash Gupta
>>
>>
>> --
>> *From:* Marco Mistroni <mmistr...@gmail.com>
>> *To:* "spline_pal...@yahoo.com" <spline_pal...@yahoo.com>
>> *Cc:* User <user@spark.apache.org>
>> *Sent:* Thursday, December 29, 2016 11:28 PM
>>
>> *Subject:* Re: [TorrentBroadcast] Pyspark Application terminated saying
>> "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"
>>
>> Hello
>>  no sorry i dont have any further insight into that i have seen
>> similar errors but for completely different issues, and in most of hte
>> cases it had to do with my data or my processing rather than Spark itself.
>> What does your program do? you say it runs for 2-3 hours, what is the
>> logic? just processing a huge amount of data?
>> doing ML ?
>> i'd say break down your application..  reduce functionality , run and see
>> outcome. then add more functionality, run and see again.
>> I found myself doing htese kinds of things when i got errors in my spark
>> apps.
>>
>> To get a concrete help you will have to trim down the code to a few lines
>> that can reproduces the error  That will be a great start
>>
>> Sorry for not being of much help
>>
>> hth
>>  marco
>>
>>
>>
>>
>>
>> On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta <spline_pal...@yahoo.com>
>> wrote:
>>
>> Hi Marco,
>>
>> Thanks for your response.
>>
>> Yes I tested it before & am able to load from linux filesystem and it
>> also sometimes have similar issue.
>>
>> However in both cases (either from hadoop or linux file system), this
>> error comes in some specific scenario as per my observations:
>>
>> 1. When two parallel spark separate application is initiated from one
>> driver (not all the ti

Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-29 Thread Marco Mistroni
Hello
 no sorry i dont have any further insight into that i have seen similar
errors but for completely different issues, and in most of hte cases it had
to do with my data or my processing rather than Spark itself.
What does your program do? you say it runs for 2-3 hours, what is the
logic? just processing a huge amount of data?
doing ML ?
i'd say break down your application..  reduce functionality , run and see
outcome. then add more functionality, run and see again.
I found myself doing htese kinds of things when i got errors in my spark
apps.

To get a concrete help you will have to trim down the code to a few lines
that can reproduces the error  That will be a great start

Sorry for not being of much help

hth
 marco





On Thu, Dec 29, 2016 at 12:00 PM, Palash Gupta <spline_pal...@yahoo.com>
wrote:

> Hi Marco,
>
> Thanks for your response.
>
> Yes I tested it before & am able to load from linux filesystem and it also
> sometimes have similar issue.
>
> However in both cases (either from hadoop or linux file system), this
> error comes in some specific scenario as per my observations:
>
> 1. When two parallel spark separate application is initiated from one
> driver (not all the time, sometime)
> 2. If one spark jobs are running for more than expected hour let say 2-3
> hours, the second application terminated giving the error.
>
> To debug the problem for me it will be good if you can share some possible
> reasons why failed to broadcast error may come.
>
> Or if you need more logs I can share.
>
> Thanks again Spark User Group.
>
> Best Regards
> Palash Gupta
>
>
>
> Sent from Yahoo Mail on Android
> <https://overview.mail.yahoo.com/mobile/?.src=Android>
>
> On Thu, 29 Dec, 2016 at 2:57 pm, Marco Mistroni
> <mmistr...@gmail.com> wrote:
> Hi
>  Pls try to read a CSV from filesystem instead of hadoop. If you can read
> it successfully then your hadoop file is the issue and you can start
> debugging from there.
> Hth
>
> On 29 Dec 2016 6:26 am, "Palash Gupta" <spline_pal...@yahoo.com. invalid>
> wrote:
>
>> Hi Apache Spark User team,
>>
>>
>>
>> Greetings!
>>
>> I started developing an application using Apache Hadoop and Spark using
>> python. My pyspark application randomly terminated saying "Failed to get
>> broadcast_1*" and I have been searching for suggestion and support in
>> Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in
>> pyspark application
>> <http://stackoverflow.com/questions/41236661/failed-to-get-broadcast-1-piece0-of-broadcast-1-in-pyspark-application>
>>
>>
>> Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
>> I was building an application on Apache Spark 2.00 with Python 3.4 and
>> trying to load some CSV files from HDFS (...
>>
>> <http://stackoverflow.com/questions/41236661/failed-to-get-broadcast-1-piece0-of-broadcast-1-in-pyspark-application>
>>
>>
>> Could you please provide suggestion registering myself in Apache User
>> list or how can I get suggestion or support to debug the problem I am
>> facing?
>>
>> Your response will be highly appreciated.
>>
>>
>>
>> Thanks & Best Regards,
>> Engr. Palash Gupta
>> WhatsApp/Viber: +8801817181502
>> Skype: palash2494
>>
>>
>>


Re: [TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-29 Thread Marco Mistroni
Hi
 Pls try to read a CSV from filesystem instead of hadoop. If you can read
it successfully then your hadoop file is the issue and you can start
debugging from there.
Hth

On 29 Dec 2016 6:26 am, "Palash Gupta" 
wrote:

> Hi Apache Spark User team,
>
>
>
> Greetings!
>
> I started developing an application using Apache Hadoop and Spark using
> python. My pyspark application randomly terminated saying "Failed to get
> broadcast_1*" and I have been searching for suggestion and support in
> Stakeoverflow at Failed to get broadcast_1_piece0 of broadcast_1 in
> pyspark application
> 
>
>
> Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
> I was building an application on Apache Spark 2.00 with Python 3.4 and
> trying to load some CSV files from HDFS (...
>
> 
>
>
> Could you please provide suggestion registering myself in Apache User list
> or how can I get suggestion or support to debug the problem I am facing?
>
> Your response will be highly appreciated.
>
>
>
> Thanks & Best Regards,
> Engr. Palash Gupta
> WhatsApp/Viber: +8801817181502 <+880%201817-181502>
> Skype: palash2494
>
>
>


Re: [Spark Core] - Spark dynamoDB integration

2016-12-12 Thread Marco Mistroni
Hi
 If it can help
1.Check Java docs of when that method was introduced
2. U building a fat jar? Check which libraries have been includedsome
other dependencies might have forced an old copy to be included
3. If u. Take code outside spark.does it work successfully?
4. Send short sample
Hth

On 12 Dec 2016 11:03 am, "Pratyaksh Sharma" <
pratyaksh.sharma.ec...@itbhu.ac.in> wrote:

Hey I am using Apache Spark for one streaming application. I am trying to
store the processed data into dynamodb using java sdk. Getting the
following exception -
16/12/08 23:23:43 WARN TaskSetManager: Lost task 0.0 in stage 1.0:
java.lang.NoSuchMethodError: com.amazonaws.SDKGlobalConfigu
ration.isInRegionOptimizedModeEnabled()Z
at com.amazonaws.ClientConfigurationFactory.getConfig(ClientCon
figurationFactory.java:35)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.(AmazonDynamoDBClient.java:374)

Spark version - 1.6.1
Scala version - 2.10.5
aws sdk version - 1.11.33

Has anyone faced this issues? Any help will be highly appreciated.

-- 
Regards

Pratyaksh Sharma
12105EN013
Department of Electronics Engineering
IIT Varanasi
Contact No +91-8127030223 <+91%2081270%2030223>


Re: Random Forest hangs without trace of error

2016-12-11 Thread Marco Mistroni
OK. Did u change spark version? Java/scala/python version?
Have u tried with different versions of any of the above?
Hope this helps
Kr

On 10 Dec 2016 10:37 pm, "Morten Hornbech" <mor...@datasolvr.com> wrote:

> I haven’t actually experienced any non-determinism. We have nightly
> integration tests comparing output from random forests with no variations.
>
> The workaround we will probably try is to split the dataset, either
> randomly or on one of the variables, and then train a forest on each
> partition, which should then be sufficiently small.
>
> I hope to be able to provide a good repro case in some weeks. If the
> problem was in our own code I will also post it in this thread.
>
> Morten
>
> Den 10. dec. 2016 kl. 23.25 skrev Marco Mistroni <mmistr...@gmail.com>:
>
> Hello Morten
> ok.
> afaik there is a tiny bit of randomness in these ML algorithms (pls anyone
> correct me if i m wrong).
> In fact if you run your RDF code multiple times, it will not give you
> EXACTLY the same results (though accuracy and errors should me more or less
> similar)..at least this is what i found when playing around with
> RDF and decision trees and other ML algorithms
>
> If RDF is not a must for your usecase, could you try 'scale back' to
> Decision Trees and see if you still get intermittent failures?
> this at least to exclude issues with the data
>
> hth
>  marco
>
> On Sat, Dec 10, 2016 at 5:20 PM, Morten Hornbech <mor...@datasolvr.com>
> wrote:
>
>> Already did. There are no issues with smaller samples. I am running this
>> in a cluster of three t2.large instances on aws.
>>
>> I have tried to find the threshold where the error occurs, but it is not
>> a single factor causing it. Input size and subsampling rate seems to be
>> most significant, and number of trees the least.
>>
>> I have also tried running on a test frame of randomized numbers with the
>> same number of rows, and could not reproduce the problem here.
>>
>> By the way maxDepth is 5 and maxBins is 32.
>>
>> I will probably need to leave this for a few weeks to focus on more
>> short-term stuff, but I will write here if I solve it or reproduce it more
>> consistently.
>>
>> Morten
>>
>> Den 10. dec. 2016 kl. 17.29 skrev Marco Mistroni <mmistr...@gmail.com>:
>>
>> Hi
>>  Bring back samples to 1k range to debugor as suggested reduce tree
>> and bins had rdd running on same size data with no issues.or send
>> me some sample code and data and I try it out on my ec2 instance ...
>> Kr
>>
>> On 10 Dec 2016 3:16 am, "Md. Rezaul Karim" <rezaul.karim@insight-centre.o
>> rg> wrote:
>>
>>> I had similar experience last week. Even I could not find any error
>>> trace.
>>>
>>> Later on, I did the following to get rid of the problem:
>>> i) I downgraded to Spark 2.0.0
>>> ii) Decreased the value of maxBins and maxDepth
>>>
>>> Additionally, make sure that you set the featureSubsetStrategy as "auto" to
>>> let the algorithm choose the best feature subset strategy for your
>>> data. Finally, set the impurity as "gini" for the information gain.
>>>
>>> However, setting the value of no. of trees to just 1 does not give you
>>> either real advantage of the forest neither better predictive performance.
>>>
>>>
>>>
>>> Best,
>>> Karim
>>>
>>>
>>> On Dec 9, 2016 11:29 PM, "mhornbech" <mor...@datasolvr.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I have spent quite some time trying to debug an issue with the Random
>>>> Forest
>>>> algorithm on Spark 2.0.2. The input dataset is relatively large at
>>>> around
>>>> 600k rows and 200MB, but I use subsampling to make each tree manageable.
>>>> However even with only 1 tree and a low sample rate of 0.05 the job
>>>> hangs at
>>>> one of the final stages (see attached). I have checked the logs on all
>>>> executors and the driver and find no traces of error. Could it be a
>>>> memory
>>>> issue even though no error appears? The error does seem sporadic to some
>>>> extent so I also wondered whether it could be a data issue, that only
>>>> occurs
>>>> if the subsample includes the bad data rows.
>>>>
>>>> Please comment if you have a clue.
>>>>
>>>> Morten
>>>>
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n2
>>>> 8192/Sk%C3%A6rmbillede_2016-12-10_kl.png>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context: http://apache-spark-user-list.
>>>> 1001560.n3.nabble.com/Random-Forest-hangs-without-trace-of-e
>>>> rror-tp28192.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>>> <http://nabble.com/>.
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>
>
>


Re: Random Forest hangs without trace of error

2016-12-10 Thread Marco Mistroni
Hello Morten
ok.
afaik there is a tiny bit of randomness in these ML algorithms (pls anyone
correct me if i m wrong).
In fact if you run your RDF code multiple times, it will not give you
EXACTLY the same results (though accuracy and errors should me more or less
similar)..at least this is what i found when playing around with
RDF and decision trees and other ML algorithms

If RDF is not a must for your usecase, could you try 'scale back' to
Decision Trees and see if you still get intermittent failures?
this at least to exclude issues with the data

hth
 marco

On Sat, Dec 10, 2016 at 5:20 PM, Morten Hornbech <mor...@datasolvr.com>
wrote:

> Already did. There are no issues with smaller samples. I am running this
> in a cluster of three t2.large instances on aws.
>
> I have tried to find the threshold where the error occurs, but it is not a
> single factor causing it. Input size and subsampling rate seems to be most
> significant, and number of trees the least.
>
> I have also tried running on a test frame of randomized numbers with the
> same number of rows, and could not reproduce the problem here.
>
> By the way maxDepth is 5 and maxBins is 32.
>
> I will probably need to leave this for a few weeks to focus on more
> short-term stuff, but I will write here if I solve it or reproduce it more
> consistently.
>
> Morten
>
> Den 10. dec. 2016 kl. 17.29 skrev Marco Mistroni <mmistr...@gmail.com>:
>
> Hi
>  Bring back samples to 1k range to debugor as suggested reduce tree
> and bins had rdd running on same size data with no issues.or send
> me some sample code and data and I try it out on my ec2 instance ...
> Kr
>
> On 10 Dec 2016 3:16 am, "Md. Rezaul Karim" <rezaul.karim@insight-centre.
> org> wrote:
>
>> I had similar experience last week. Even I could not find any error
>> trace.
>>
>> Later on, I did the following to get rid of the problem:
>> i) I downgraded to Spark 2.0.0
>> ii) Decreased the value of maxBins and maxDepth
>>
>> Additionally, make sure that you set the featureSubsetStrategy as "auto" to
>> let the algorithm choose the best feature subset strategy for your data.
>> Finally, set the impurity as "gini" for the information gain.
>>
>> However, setting the value of no. of trees to just 1 does not give you
>> either real advantage of the forest neither better predictive performance.
>>
>>
>>
>> Best,
>> Karim
>>
>>
>> On Dec 9, 2016 11:29 PM, "mhornbech" <mor...@datasolvr.com> wrote:
>>
>>> Hi
>>>
>>> I have spent quite some time trying to debug an issue with the Random
>>> Forest
>>> algorithm on Spark 2.0.2. The input dataset is relatively large at around
>>> 600k rows and 200MB, but I use subsampling to make each tree manageable.
>>> However even with only 1 tree and a low sample rate of 0.05 the job
>>> hangs at
>>> one of the final stages (see attached). I have checked the logs on all
>>> executors and the driver and find no traces of error. Could it be a
>>> memory
>>> issue even though no error appears? The error does seem sporadic to some
>>> extent so I also wondered whether it could be a data issue, that only
>>> occurs
>>> if the subsample includes the bad data rows.
>>>
>>> Please comment if you have a clue.
>>>
>>> Morten
>>>
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n2
>>> 8192/Sk%C3%A6rmbillede_2016-12-10_kl.png>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Random-Forest-hangs-without-trace-of-e
>>> rror-tp28192.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>> <http://nabble.com>.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>


Re: Random Forest hangs without trace of error

2016-12-10 Thread Marco Mistroni
Hi
 Bring back samples to 1k range to debugor as suggested reduce tree and
bins had rdd running on same size data with no issues.or send me
some sample code and data and I try it out on my ec2 instance ...
Kr

On 10 Dec 2016 3:16 am, "Md. Rezaul Karim" 
wrote:

> I had similar experience last week. Even I could not find any error trace.
>
> Later on, I did the following to get rid of the problem:
> i) I downgraded to Spark 2.0.0
> ii) Decreased the value of maxBins and maxDepth
>
> Additionally, make sure that you set the featureSubsetStrategy as "auto" to
> let the algorithm choose the best feature subset strategy for your data.
> Finally, set the impurity as "gini" for the information gain.
>
> However, setting the value of no. of trees to just 1 does not give you
> either real advantage of the forest neither better predictive performance.
>
>
>
> Best,
> Karim
>
>
> On Dec 9, 2016 11:29 PM, "mhornbech"  wrote:
>
>> Hi
>>
>> I have spent quite some time trying to debug an issue with the Random
>> Forest
>> algorithm on Spark 2.0.2. The input dataset is relatively large at around
>> 600k rows and 200MB, but I use subsampling to make each tree manageable.
>> However even with only 1 tree and a low sample rate of 0.05 the job hangs
>> at
>> one of the final stages (see attached). I have checked the logs on all
>> executors and the driver and find no traces of error. Could it be a memory
>> issue even though no error appears? The error does seem sporadic to some
>> extent so I also wondered whether it could be a data issue, that only
>> occurs
>> if the subsample includes the bad data rows.
>>
>> Please comment if you have a clue.
>>
>> Morten
>>
>> > n28192/Sk%C3%A6rmbillede_2016-12-10_kl.png>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Random-Forest-hangs-without-trace-of-e
>> rror-tp28192.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: unit testing in spark

2016-12-09 Thread Marco Mistroni
Me too as I spent most of my time writing unit/integ tests  pls advise
on where I  can start
Kr

On 9 Dec 2016 12:15 am, "Miguel Morales"  wrote:

> I would be interested in contributing.  Ive created my own library for
> this as well.  In my blog post I talk about testing with Spark in RSpec
> style:
> https://medium.com/@therevoltingx/test-driven-development-w-apache-spark-
> 746082b44941
>
> Sent from my iPhone
>
> On Dec 8, 2016, at 4:09 PM, Holden Karau  wrote:
>
> There are also libraries designed to simplify testing Spark in the various
> platforms, spark-testing-base
>  for Scala/Java/Python (&
> video https://www.youtube.com/watch?v=f69gSGSLGrY), sscheck
>  (scala focused property based),
> pyspark.test (python focused with py.test instead of unittest2) (& blog
> post from nextdoor https://engblog.nextdoor.com/unit-testing-
> apache-spark-with-py-test-3b8970dc013b#.jw3bdcej9 )
>
> Good luck on your Spark Adventures :)
>
> P.S.
>
> If anyone is interested in helping improve spark testing libraries I'm
> always looking for more people to be involved with spark-testing-base
> because I'm lazy :p
>
> On Thu, Dec 8, 2016 at 2:05 PM, Lars Albertsson  wrote:
>
>> I wrote some advice in a previous post on the list:
>> http://markmail.org/message/bbs5acrnksjxsrrs
>>
>> It does not mention python, but the strategy advice is the same. Just
>> replace JUnit/Scalatest with pytest, unittest, or your favourite
>> python test framework.
>>
>>
>> I recently held a presentation on the subject. There is a video
>> recording at https://vimeo.com/192429554 and slides at
>> http://www.slideshare.net/lallea/test-strategies-for-data-
>> processing-pipelines-67244458
>>
>> You can find more material on test strategies at
>> http://www.mapflat.com/lands/resources/reading-list/index.html
>>
>>
>>
>>
>> Lars Albertsson
>> Data engineering consultant
>> www.mapflat.com
>> https://twitter.com/lalleal
>> +46 70 7687109
>> Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com
>>
>>
>> On Thu, Dec 8, 2016 at 4:14 PM, pseudo oduesp 
>> wrote:
>> > somone can tell me how i can make unit test on pyspark ?
>> > (book, tutorial ...)
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>
>


Re: How to convert a unix timestamp column into date format(yyyy-MM-dd) ?

2016-12-04 Thread Marco Mistroni
Hi
 In python you can use date
time.fromtimestamp(..).strftime('%Y%m%d')
Which spark API are you using?
Kr

On 5 Dec 2016 7:38 am, "Devi P.V"  wrote:

> Hi all,
>
> I have a dataframe like following,
>
> ++---+
> |client_id   |timestamp|
> ++---+
> |cd646551-fceb-4166-acbc-b9|1477989416803  |
> |3bc61951-0f49-43bf-9848-b2|1477983725292  |
> |688acc61-753f-4a33-a034-bc|1479899459947  |
> |5ff1eb6c-14ec-4716-9798-00|1479901374026  |
> ++---+
>
>  I want to convert timestamp column into -MM-dd format.
> How to do this?
>
>
> Thanks
>


Re: java.lang.Exception: Could not compute split, block input-0-1480539568000 not found

2016-12-01 Thread Marco Mistroni
Kant,
We need to narrow it down to a reproducible code. You are using streaming
What is the content of ur streamed data. If u provide that I can run a
streaming programming that reads from a local dir and narrow down the
problem
I have seen similar error for doing something completely different. As u
say there might be problem with ur transformation coming from the structure
of the data. Send me a sample of the data you are streaming and I write a
small test casekr

On 1 Dec 2016 9:44 am, "kant kodali" <kanth...@gmail.com> wrote:

> sorry for multiple emails. I just think more info is needed every time to
> address this problem
>
> My Spark Client program runs in a client mode and it runs on a node that
> has 2 vCPU's and 8GB RAM (m4.large)
> I have 2 Spark worker nodes and each have 4 vCPU's and 16GB RAM
>  (m3.xlarge for each spark worker instance)
>
>
>
> On Thu, Dec 1, 2016 at 12:55 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> My batch interval is 1s
>> slide interval is 1s
>> window interval is 1 minute
>>
>> I am using a standalone alone cluster. I don't have any storage layer
>> like HDFS.  so I dont know what is a connection between RDD and blocks (I
>> know that for every batch one RDD is produced)? what is a block in this
>> context? is it a disk block ? if so, what is it default size? and Finally,
>> why does the following error happens so often?
>>
>> java.lang.Exception: Could not compute split, block input-0-1480539568000
>> not found
>>
>>
>>
>> On Thu, Dec 1, 2016 at 12:42 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> I also use this super(StorageLevel.MEMORY_AND_DISK_2());
>>>
>>> inside my receiver
>>>
>>> On Wed, Nov 30, 2016 at 10:44 PM, kant kodali <kanth...@gmail.com>
>>> wrote:
>>>
>>>> Here is another transformation that might cause the error but it has to
>>>> be one of these two since I only have two transformations
>>>>
>>>> jsonMessagesDStream
>>>> .window(new Duration(6), new Duration(1000))
>>>> .mapToPair(new PairFunction<String, String, Long>() {
>>>> @Override
>>>> public Tuple2<String, Long> call(String s) throws Exception {
>>>> //System.out.println(s + " *");
>>>> JsonParser parser = new JsonParser();
>>>> JsonObject jsonObj = parser.parse(s).getAsJsonObject();
>>>>
>>>> if (jsonObj != null && jsonObj.has("var1")) {
>>>> JsonObject jsonObject = 
>>>> jsonObj.get("var1").getAsJsonObject();
>>>> if (jsonObject != null && jsonObject.has("var2") && 
>>>> jsonObject.get("var2").getAsBoolean() && jsonObject.has("var3") ) {
>>>> long num = jsonObject.get("var3").getAsLong();
>>>>
>>>> return new Tuple2<String, Long>("var3", num);
>>>> }
>>>> }
>>>>
>>>> return new Tuple2<String, Long>("var3", 0L);
>>>> }
>>>> }).reduceByKey(new Function2<Long, Long, Long>() {
>>>> @Override
>>>> public Long call(Long v1, Long v2) throws Exception {
>>>> return v1+v2;
>>>>  }
>>>> }).foreachRDD(new VoidFunction<JavaPairRDD<String, Long>>() {
>>>> @Override
>>>> public void call(JavaPairRDD<String, Long> 
>>>> stringIntegerJavaPairRDD) throws Exception {
>>>> Map<String, Long> map = new HashMap<>();
>>>> Gson gson = new Gson();
>>>> stringIntegerJavaPairRDD
>>>> .collect()
>>>> .forEach((Tuple2<String, Long> KV) -> {
>>>> String status = KV._1();
>>>> Long count = KV._2();
>>>> map.put(status, count);
>>>> }
>>>> );
>>>> NSQReceiver.send(producer, "dashboard", 
>>>> gson.toJson(map).getBytes())

  1   2   3   >