RE: Watch "Airbus makes more of the sky with Spark - Jesse Anderson & Hassene Ben Salem" on YouTube

2020-04-25 Thread email
Zahid, 

 

Starting with Spark 2.3.0, the Spark team introduced an experimental feature 
called “Continuous Streaming”[1][2] to enter that space, but in general, Spark 
streaming operates using micro-batches while Flink operates using the 
Continuous Flow Operator model. 

 

There are many resources online comparing the two but I am leaving you one[3] 
(old, but still relevant)  so you can start looking into it.

 

Note that while I am not a subject expert, that’s the basic explanation. Until 
recently we were not competing with Flink in that space, so it explains why 
Flink was preferred at the time and why it would still be preferred today. We 
will catch up eventually.

 

[1] 
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-processing
 

[2] 
https://databricks.com/blog/2018/03/20/low-latency-continuous-processing-mode-in-structured-streaming-in-apache-spark-2-3-0.html
 

[3] https://www.youtube.com/watch?v=Dzx-iE6RN4w&feature=emb_title

 

From: Zahid Rahman  
Sent: Saturday, April 25, 2020 7:36 AM
To: joerg.stre...@posteo.de
Cc: user 
Subject: Re: Watch "Airbus makes more of the sky with Spark - Jesse Anderson & 
Hassene Ben Salem" on YouTube

 

My motive is simple . I want you (spark product  experts user)  to challenge 
the reason given by Jesse Anderson for choosing flink over spark. 

 

You know the saying keep your friends close, keep your enemies even closer.

The video  only has  328 views.

It is a great educational tool to see a recent recent Use Case. Should be of 
compelling interest to anyone in this field. Commercial Companies do not often 
share or discuss their projects openly.

 

Incidentally Heathrow is the busiest airport in the world.

1. Because the emailing facility completed my sentence. 

 

2. I think at Heathrow the gap is less than two minutes.

 

 

On Sat, 25 Apr 2020, 09:42 Jörg Strebel, mailto:joerg.stre...@gmail.com> > wrote:

Hallo!

Well, the title of the video is actually "Airbus makes more of the sky with 
Flink - Jesse Anderson & Hassene Ben Salem"and it talks about Apache Flink and 
specifically not about Apache Spark.They excluded Spark Streaming for high 
latency reasons. 

Why are you posting this video on a Spark mailing list?

Regards

J. Strebel

Am 25.04.20 um 05:07 schrieb Zahid Rahman:

 

https://youtu.be/sYlbD_OoHhs




Backbutton.co.uk  

¯\_(ツ)_/¯ 
♡۶Java♡۶RMI ♡۶

Make Use Method {MUM}

makeuse.org  

-- 
Jörg Strebel
Aachener Straße 2
80804 München



Serialization or internal functions?

2020-04-04 Thread email
Dear Community, 

 

Recently, I had to solve the following problem "for every entry of a
Dataset[String], concat a constant value" , and to solve it, I used built-in
functions : 

 

val data = Seq("A","b","c").toDS

 

scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit("
"),lit("concat"))).select("valueconcat").explain()

== Physical Plan ==

LocalTableScan [valueconcat#161]

 

As an alternative , a much simpler version of the program is to use map, but
it adds a serialization step that does not seem to be present for the
version above : 

 

scala> data.map(e=> s"$e concat").explain

== Physical Plan ==

*(1) SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true, false) AS value#92]

+- *(1) MapElements , obj#91: java.lang.String

   +- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String

  +- LocalTableScan [value#12]

 

Is this over-optimization or is this the right way to go?  

 

As a follow up , is there any better API to get the one and only column
available in a DataSet[String] when using built-in functions?
"col(data.columns.head)" works but it is not ideal.

 

Thanks!



RE: Caching tables in spark

2019-08-28 Thread email
Take a look at this article 

 

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-caching.html

 

From: Tzahi File  
Sent: Wednesday, August 28, 2019 5:18 AM
To: user 
Subject: Caching tables in spark

 

Hi, 

 

Looking for your knowledge with some question. 

I have 2 different processes that read from the same raw data table (around 1.5 
TB). 

Is there a way to read this data once and cache it somehow and to use this data 
in both processes? 

 

 

Thanks

-- 


Tzahi File
Data Engineer


 <http://www.ironsrc.com/> 


email  <mailto:tzahi.f...@ironsrc.com> tzahi.f...@ironsrc.com

mobile   +972-546864835

fax +972-77-5448273

ironSource HQ - 121 Derech Menachem Begin st. Tel Aviv


 <http://www.ironsrc.com/> ironsrc.com


 <https://www.linkedin.com/company/ironsource>  
<https://twitter.com/ironsource>  <https://www.facebook.com/ironSource>  
<https://plus.google.com/+ironsrc> 


This email (including any attachments) is for the sole use of the intended 
recipient and may contain confidential information which may be protected by 
legal privilege. If you are not the intended recipient, or the employee or 
agent responsible for delivering it to the intended recipient, you are hereby 
notified that any use, dissemination, distribution or copying of this 
communication and/or its content is strictly prohibited. If you are not the 
intended recipient, please immediately notify us by reply email or by 
telephone, delete this email and destroy any copies. Thank you.

 



What is the compatibility between releases?

2019-06-11 Thread email
Dear Community , 

 

>From what I understand , Spark uses a variation of Semantic Versioning[1] ,
but this information is not enough for me to clarify if it is compatible or
not within versions. 

 

For example , if my cluster is running Spark 2.3.1 , can I develop using API
additions in Spark 2.4? (higher order functions to give an  example). What
about the other way around? 

 

Typically , I assume that a job created in Spark 1.x will fail in Spark 2.x
, but that's also something I would like to get a confirmation. 

 

Thank you for your help!

 

[1] https://spark.apache.org/versioning-policy.html 



RE: Turning off Jetty Http Options Method

2019-04-30 Thread email
If this is correct “This method exposes what all methods are supported by the 
end point” , I really don’t understand how’s that a security vulnerability 
considering the OSS nature of this project. Are you adding new endpoints to 
this webserver? 

 

More info about info/other methods : 
https://security.stackexchange.com/questions/21413/how-to-exploit-http-methods

 

 

From: Ankit Jain  
Sent: Tuesday, April 30, 2019 7:25 PM
To: user@spark.apache.org; d...@spark.apache.org
Subject: Re: Turning off Jetty Http Options Method

 

+ d...@spark.apache.org  

 

On Tue, Apr 30, 2019 at 4:23 PM Ankit Jain mailto:ankitjain@gmail.com> > wrote:

Aah - actually found https://issues.apache.org/jira/browse/SPARK-18664 - "Don't 
respond to HTTP OPTIONS in HTTP-based UIs"

 

Does anyone know if this can be prioritized?

 

Thanks

Ankit

 

On Tue, Apr 30, 2019 at 1:31 PM Ankit Jain mailto:ankitjain@gmail.com> > wrote:

Hi Fellow Spark users,

We are using Spark 2.3.0 and security team is reporting a violation that Spark 
allows HTTP OPTIONS method to work(This method exposes what all methods are 
supported by the end point which could be exploited by a hacker).

 

This method is on Jetty web server, I see Spark uses Jetty for web UI and some 
internal communication as well. 

 

For Spark UI, we are planning to write a javaxfiler, create a jar and add it to 
spark libs to not respond to options method. We don't have a clean solution for 
internal jetty server that is used as a file server though.

 

It will be nice if Spark itself didn't allow Options method to work, similar to 
what was done for TRACE - https://issues.apache.org/jira/browse/SPARK-5983

 

What do you guys think? Does community feel this should be something added 
directly to spark code?

 

Also, if there is a later version of Spark where this has been addressed, 
please let us know too.

 

-- 

Thanks & Regards,

Ankit.




 

-- 

Thanks & Regards,

Ankit.




 

-- 

Thanks & Regards,

Ankit.



RE: [EXT] handling skewness issues

2019-04-30 Thread email
Please share the links if they are publicly available. Otherwise please share 
the name of the talks.  Thank you

 

From: Jules Damji  
Sent: Monday, April 29, 2019 8:04 PM
To: Michael Mansour 
Cc: rajat kumar ; user@spark.apache.org
Subject: Re: [EXT] handling skewness issues

 

Yes, indeed! A few talks in the developer and deep dives address the data skews 
issue and how to address them. 

 

I shall let the group know when the talk sessions are available.

 

Cheers 

Jules

 

Sent from my iPhone

Pardon the dumb thumb typos :)


On Apr 29, 2019, at 2:13 PM, Michael Mansour mailto:michael_mans...@symantec.com> > wrote:

There were recently some fantastic talks about this at the SparkSummit 
conference in San Francisco.  I suggest you check out the SparkSummit YouTube 
channel after May 9th for a deep dive into this topic. 

 

From: rajat kumar mailto:kumar.rajat20...@gmail.com> >
Date: Monday, April 29, 2019 at 9:34 AM
To: "user@spark.apache.org  " 
mailto:user@spark.apache.org> >
Subject: [EXT] handling skewness issues

 

Hi All, 

 

How to overcome skewness issues in spark ?

 

I read that we can add some randomness to key column before join and remove 
that random part after join.

 

is there any better way ? Above method seems to be a workaround.

 

thanks 

rajat



RE: How to print DataFrame.show(100) to text file at HDFS

2019-04-14 Thread email
Please note that limit drops the partitions to 1. 

 

If it is only 100 records you might be able to fit it in one executor , so 
limit followed by a  write is okay. 

 

From: Brandon Geise  
Sent: Sunday, April 14, 2019 9:54 AM
To: Chetan Khatri 
Cc: Nuthan Reddy ; user 
Subject: Re: How to print DataFrame.show(100) to text file at HDFS

 

Use .limit on the dataframe followed by .write

On Apr 14, 2019, at 5:10 AM, Chetan Khatri mailto:chetan.opensou...@gmail.com> > wrote:

Nuthan, 

 

Thank you for reply. the solution proposed will give everything. for me is like 
one Dataframe show(100) in 3000 lines of Scala Spark code. 

However, yarn logs --applicationId  > 1.log also gives all 
stdout and stderr. 

 

Thanks 

 

On Sun, Apr 14, 2019 at 10:30 AM Nuthan Reddy < nut...@sigmoidanalytics.com 
 > wrote: 

Hi Chetan, 

 

You can use  

 

spark-submit showDF.py | hadoop fs -put - showDF.txt

 

showDF.py: 

from pyspark.sql import SparkSession

 

spark = SparkSession.builder.appName("Write stdout").getOrCreate()

spark.sparkContext.setLogLevel("OFF")

 

spark.table("").show(100,truncate=false)

 

But is there any specific reason you want to write it to hdfs? Is this for 
human consumption? 

 

Regards, 

Nuthan 

 

On Sat, Apr 13, 2019 at 6:41 PM Chetan Khatri < chetan.opensou...@gmail.com 
 > wrote: 

Hello Users, 

 

In spark when I have a DataFrame and do  .show(100) the output which gets 
printed, I wants to save as it is content to txt file in HDFS.  

 

How can I do this? 

 

Thanks 




 

-- 

Nuthan Reddy 

Sigmoid Analytics 

 

 

Disclaimer: This is not a mass e-mail and my intention here is purely from a 
business perspective, and not to spam or encroach your privacy. I am writing 
with a specific agenda to build a personal business connection. Being a reputed 
and genuine organization, Sigmoid respects the digital security of every 
prospect and tries to comply with GDPR and other regional laws. Please let us 
know if you feel otherwise and we will rectify the misunderstanding and adhere 
to comply in the future. In case we have missed any of the compliance, it is 
completely unintentional. 



RE: Question about relationship between number of files and initial tasks(partitions)

2019-04-13 Thread email
Before we can confirm that the issue is skewed data,  let’s confirm it : 

 

import org.apache.spark.sql.functions.spark_partition_id

 

df.groupBy(spark_partition_id).count

 

This should give the number of records you have in each partition. 

 

 

From: Sagar Grover  
Sent: Thursday, April 11, 2019 8:23 AM
To: yeikel valdes 
Cc: jasonnerot...@gmail.com; arthur...@flipp.com; user @spark/'user 
@spark'/spark users/user@spark 
Subject: Re: Question about relationship between number of files and initial 
tasks(partitions)

 

Extending Arthur's question,

I am facing the same problem(no of partitions were huge- cored 960, partitions 
- 16000). I tried to decrease the number of partitions with coalesce, but the 
problem is unbalanced data. After using coalesce, it gives me Java out of heap 
space error. There was no out of heap error without coalesce. I am guessing the 
error is due to uneven data and some heavy partitions getting merged together.

Let me know if you have any pointers on how to handle this.

 

On Wed, Apr 10, 2019 at 11:21 PM yeikel valdes mailto:em...@yeikel.com> > wrote:

If you need to reduce the number of partitions you could also try 

df.coalesce


 On Thu, 04 Apr 2019 06:52:26 -0700  <mailto:jasonnerot...@gmail.com> 
jasonnerot...@gmail.com wrote 

Have you tried something like this?

 

spark.conf.set("spark.sql.shuffle.partitions", "5" ) 

 

 

 

On Wed, Apr 3, 2019 at 8:37 PM Arthur Li < <mailto:arthur...@flipp.com> 
arthur...@flipp.com> wrote:

Hi Sparkers,

 

I noticed that in my spark application, the number of tasks in the first stage 
is equal to the number of files read by the application(at least for Avro) if 
the number of cpu cores is less than the number of files. Though If cpu cores 
are more than number of files, it's usually equal to default parallelism 
number. Why is it behave like this? Would this require a lot of resource from 
the driver? Is there any way we can do to decrease the number of 
tasks(partitions) in the first stage without merge files before loading? 

 

Thanks,

Arthur 

 


IMPORTANT NOTICE:  This message, including any attachments (hereinafter 
collectively referred to as "Communication"), is intended only for the 
addressee(s) named above.  This Communication may include information that is 
privileged, confidential and exempt from disclosure under applicable law.  If 
the recipient of this Communication is not the intended recipient, or the 
employee or agent responsible for delivering this Communication to the intended 
recipient, you are notified that any dissemination, distribution or copying of 
this Communication is strictly prohibited.  If you have received this 
Communication in error, please notify the sender immediately by phone or email 
and permanently delete this Communication from your computer without making a 
copy. Thank you.

 

 

-- 

Thanks,

Jason

 



RE: How can I parse an "unnamed" json array present in a column?

2019-02-24 Thread email
Individual columns are small but the table contains millions of rows with this 
problem.  I am probably overthinking , and I will implement the workaround for 
now. 

 

Thank you for your help   @Magnus Nilsson , I will try 
that for now and wait for the upgrade.

 

From: Magnus Nilsson  
Sent: Sunday, February 24, 2019 4:47 PM
To: Yeikel 
Cc: user@spark.apache.org
Subject: Re: How can I parse an "unnamed" json array present in a column?

 

Well, I'm guessing the file is small enough so you don't have any memory issues.

 

If you're using spark to read the file use the spark.sql.functions.concat 
function. If you use Scala use the string method concat.

 

val prepend = """{"next":"""

val append = "}"

 

df.select(concat(prepend, $"rawStringFromFileColumn", append) as "values")

or

val df = Seq(prepend.concat(rawStringFromFile).concat(append)).toDF("values")

 

val df2 = df.select(from_json($"values", schema))

 

Haven't tried it, might be a comma wrong somewhere.

 

Extracting the function, compiling it and using in your own library you mean? I 
wouldn't even bother looking into it to be honest. Might work, might not. Might 
have to pull in half of spark to compile it, might not. Is it really worth your 
time investigating when the workaround is so simple and you know it will be 
fixed once you upgrade to a newer Spark version?

 

 

On Sun, Feb 24, 2019 at 10:17 PM mailto:em...@yeikel.com> > 
wrote:

Unfortunately , I can’t change the source system , so changing the JSON at 
runtime is the best I can do right now.

 

Is there any preferred way to modify the String other than an UDF or map on the 
string? 

 

At the moment I am modifying it returning a generic type “t” so I can use the 
same UDF  for many different JSONs that have the same issue. 

 

Also , is there any advantage(if possible) to extract the function from the 
original source code and run it on an older version of Spark? 

 

 

From: Magnus Nilsson mailto:ma...@kth.se> > 
Sent: Sunday, February 24, 2019 5:34 AM
To: Yeikel mailto:em...@yeikel.com> >
Cc: user@spark.apache.org  
Subject: Re: How can I parse an "unnamed" json array present in a column?

 

That's a bummer, if you're unable to upgrade to Spark 2.3+ your best bet is 
probably to prepend/append the jsonarray-string and locate the json array as 
the value of a root attribute in a json-document (as in your first work 
around). I mean, it's such an easy and safe fix, you can still do it even if 
you stream the file.

 

Even better, make the source system create a JSON-lines file instead of an json 
array if possible.

 

When I use Datasets (Tungsten) I basically try to stay there and use the 
available column functions unless I have no choice but to serialize and run 
custom advanced calculations/parsings. In your case just modifying the string 
and use the tested from_json function beats the available alternatives if you 
ask me.

 

 

On Sun, Feb 24, 2019 at 1:13 AM mailto:em...@yeikel.com> > 
wrote:

What you suggested works in Spark 2.3 , but in the version that I am using 
(2.1) it produces the following exception : 

 

found   : org.apache.spark.sql.types.ArrayType

required: org.apache.spark.sql.types.StructType

   ds.select(from_json($"news", schema) as "news_parsed").show(false)

 

Is it viable/possible to export a function from 2.3 to 2.1?  What other options 
do I have? 

 

Thank you.

 

 

From: Magnus Nilsson mailto:ma...@kth.se> > 
Sent: Saturday, February 23, 2019 3:43 PM
Cc: user@spark.apache.org  
Subject: Re: How can I parse an "unnamed" json array present in a column?

 

Use spark.sql.types.ArrayType instead of a Scala Array as the root type when 
you define the schema and it will work.

 

Regards,

 

Magnus

 

On Fri, Feb 22, 2019 at 11:15 PM Yeikel mailto:em...@yeikel.com> > wrote:

I have an "unnamed" json array stored in a *column*.  

The format is the following : 

column name : news

Data : 

[
  {
"source": "source1",
"name": "News site1"
  },
   {
"source": "source2",
"name": "News site2"
  }
]


Ideally , I'd like to parse it as : 

news ARRAY>

I've tried the following : 

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types._;

val entry = scala.io.Source.fromFile("1.txt").mkString

val ds = Seq(entry).toDF("news")

val schema = Array(new StructType().add("name", StringType).add("source",
StringType))

ds.select(from_json($"news", schema) as "news_parsed").show(false)

But this is not allowed : 

found   : Array[org.apache.spark.sql.types.StructType]
required: org.apache.spark.sql.types.StructType


I also tried passing the following schema : 

val schema = StructType(new StructType().add("name",
StringType).add("source", StringType))

But this only parsed the first record : 

++
|news_parsed |
++
|[News site1,source1]|
++


I am aware that i

RE: How can I parse an "unnamed" json array present in a column?

2019-02-24 Thread email
Unfortunately , I can’t change the source system , so changing the JSON at 
runtime is the best I can do right now.

 

Is there any preferred way to modify the String other than an UDF or map on the 
string? 

 

At the moment I am modifying it returning a generic type “t” so I can use the 
same UDF  for many different JSONs that have the same issue. 

 

Also , is there any advantage(if possible) to extract the function from the 
original source code and run it on an older version of Spark? 

 

 

From: Magnus Nilsson  
Sent: Sunday, February 24, 2019 5:34 AM
To: Yeikel 
Cc: user@spark.apache.org
Subject: Re: How can I parse an "unnamed" json array present in a column?

 

That's a bummer, if you're unable to upgrade to Spark 2.3+ your best bet is 
probably to prepend/append the jsonarray-string and locate the json array as 
the value of a root attribute in a json-document (as in your first work 
around). I mean, it's such an easy and safe fix, you can still do it even if 
you stream the file.

 

Even better, make the source system create a JSON-lines file instead of an json 
array if possible.

 

When I use Datasets (Tungsten) I basically try to stay there and use the 
available column functions unless I have no choice but to serialize and run 
custom advanced calculations/parsings. In your case just modifying the string 
and use the tested from_json function beats the available alternatives if you 
ask me.

 

 

On Sun, Feb 24, 2019 at 1:13 AM mailto:em...@yeikel.com> > 
wrote:

What you suggested works in Spark 2.3 , but in the version that I am using 
(2.1) it produces the following exception : 

 

found   : org.apache.spark.sql.types.ArrayType

required: org.apache.spark.sql.types.StructType

   ds.select(from_json($"news", schema) as "news_parsed").show(false)

 

Is it viable/possible to export a function from 2.3 to 2.1?  What other options 
do I have? 

 

Thank you.

 

 

From: Magnus Nilsson mailto:ma...@kth.se> > 
Sent: Saturday, February 23, 2019 3:43 PM
Cc: user@spark.apache.org  
Subject: Re: How can I parse an "unnamed" json array present in a column?

 

Use spark.sql.types.ArrayType instead of a Scala Array as the root type when 
you define the schema and it will work.

 

Regards,

 

Magnus

 

On Fri, Feb 22, 2019 at 11:15 PM Yeikel mailto:em...@yeikel.com> > wrote:

I have an "unnamed" json array stored in a *column*.  

The format is the following : 

column name : news

Data : 

[
  {
"source": "source1",
"name": "News site1"
  },
   {
"source": "source2",
"name": "News site2"
  }
]


Ideally , I'd like to parse it as : 

news ARRAY>

I've tried the following : 

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types._;

val entry = scala.io.Source.fromFile("1.txt").mkString

val ds = Seq(entry).toDF("news")

val schema = Array(new StructType().add("name", StringType).add("source",
StringType))

ds.select(from_json($"news", schema) as "news_parsed").show(false)

But this is not allowed : 

found   : Array[org.apache.spark.sql.types.StructType]
required: org.apache.spark.sql.types.StructType


I also tried passing the following schema : 

val schema = StructType(new StructType().add("name",
StringType).add("source", StringType))

But this only parsed the first record : 

++
|news_parsed |
++
|[News site1,source1]|
++


I am aware that if I fix the JSON like this : 

{
  "news": [
{
  "source": "source1",
  "name": "News site1"
},
{
  "source": "source2",
  "name": "News site2"
}
  ]
}

The parsing works as expected , but I would like to avoid doing that if
possible. 

Another approach that I can think of is to map on it and parse it using
third party libraries like Gson , but  I am not sure if this is any better
than fixing the json beforehand. 

I am running Spark 2.1



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
 



RE: Difference between Typed and untyped transformation in dataset API

2019-02-23 Thread email
>From what I understand , if the transformation is untyped it will return a 
>Dataframe , otherwise it will return a Dataset.  In the source code you will 
>see that return type is a Dataframe instead of a Dataset and they should also 
>be annotated with @group untypedrel. Thus , you could check the signature of 
>the method to determine if it is untyped or not. 

 

In general , anything that changes the type of a column or adds a new column in 
a Dataset will be untyped. The idea of a Dataset is to stay constant when it 
comes to the schema. The moment you try to modify the schema , we need to 
fallback to a Dataframe. 

 

For example , withColumn is untyped because it transforms the Dataset(typed) to 
an untyped structure(Dataframe). 

 

From: Akhilanand  
Sent: Thursday, February 21, 2019 7:35 PM
To: user 
Subject: Difference between Typed and untyped transformation in dataset API

 

What is the key difference between Typed and untyped transformation in dataset 
API?

How do I determine if its typed or untyped?

Any gotchas when to use what apart from the reason that it does the job for me?

 

 



RE: How can I parse an "unnamed" json array present in a column?

2019-02-23 Thread email
What you suggested works in Spark 2.3 , but in the version that I am using 
(2.1) it produces the following exception : 

 

found   : org.apache.spark.sql.types.ArrayType

required: org.apache.spark.sql.types.StructType

   ds.select(from_json($"news", schema) as "news_parsed").show(false)

 

Is it viable/possible to export a function from 2.3 to 2.1?  What other options 
do I have? 

 

Thank you.

 

 

From: Magnus Nilsson  
Sent: Saturday, February 23, 2019 3:43 PM
Cc: user@spark.apache.org
Subject: Re: How can I parse an "unnamed" json array present in a column?

 

Use spark.sql.types.ArrayType instead of a Scala Array as the root type when 
you define the schema and it will work.

 

Regards,

 

Magnus

 

On Fri, Feb 22, 2019 at 11:15 PM Yeikel mailto:em...@yeikel.com> > wrote:

I have an "unnamed" json array stored in a *column*.  

The format is the following : 

column name : news

Data : 

[
  {
"source": "source1",
"name": "News site1"
  },
   {
"source": "source2",
"name": "News site2"
  }
]


Ideally , I'd like to parse it as : 

news ARRAY>

I've tried the following : 

import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types._;

val entry = scala.io.Source.fromFile("1.txt").mkString

val ds = Seq(entry).toDF("news")

val schema = Array(new StructType().add("name", StringType).add("source",
StringType))

ds.select(from_json($"news", schema) as "news_parsed").show(false)

But this is not allowed : 

found   : Array[org.apache.spark.sql.types.StructType]
required: org.apache.spark.sql.types.StructType


I also tried passing the following schema : 

val schema = StructType(new StructType().add("name",
StringType).add("source", StringType))

But this only parsed the first record : 

++
|news_parsed |
++
|[News site1,source1]|
++


I am aware that if I fix the JSON like this : 

{
  "news": [
{
  "source": "source1",
  "name": "News site1"
},
{
  "source": "source2",
  "name": "News site2"
}
  ]
}

The parsing works as expected , but I would like to avoid doing that if
possible. 

Another approach that I can think of is to map on it and parse it using
third party libraries like Gson , but  I am not sure if this is any better
than fixing the json beforehand. 

I am running Spark 2.1



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
 



RE: Spark streaming filling the disk with logs

2019-02-14 Thread email
I have a quick question about this configuration. Particularly this line : 

 

log4j.appender.rolling.file=/var/log/spark/

 

Where is that path at? At the driver level or for each executor individually? 

 

Thank you

 

From: Jain, Abhishek 3. (Nokia - IN/Bangalore)  
Sent: Thursday, February 14, 2019 7:48 AM
To: Deepak Sharma 
Cc: spark users 
Subject: RE: Spark streaming filling the disk with logs

 

++

If you can afford loosing few old logs, then you can make use of rolling file 
Appender as well.

 

log4j.rootLogger=INFO, rolling

log4j.appender.rolling=org.apache.log4j.RollingFileAppender

log4j.appender.rolling.layout=org.apache.log4j.PatternLayout

log4j.appender.rolling.maxFileSize=50MB

log4j.appender.rolling.maxBackupIndex=5

log4j.appender.rolling.file=/var/log/spark/

log4j.logger.org.apache.spark=

 

This means log4j will roll the log file by 50MB and keep only 5 recent files. 
These files are saved in /var/log/spark directory, with filename mentioned.

 

Regards,
Abhishek Jain

 

From: Jain, Abhishek 3. (Nokia - IN/Bangalore) 
Sent: Thursday, February 14, 2019 5:58 PM
To: Deepak Sharma mailto:deepakmc...@gmail.com> >
Cc: spark users mailto:user@spark.apache.org> >
Subject: RE: Spark streaming filling the disk with logs

 

Hi Deepak,

 

The spark logging can be set for different purposes. Say for example if you 
want to control the spark-submit log, 
“log4j.logger.org.apache.spark.repl.Main=WARN/INFO/ERROR” can be set.

 

Similarly, to control third party logs:

log4j.logger.org.spark_project.jetty=, 
log4j.logger.org.apache.parquet= etc..

 

These properties can be set in the conf/log4j .properties file.

 

Hope this helps! 😊

 

Regards,

Abhishek Jain

 

From: Deepak Sharma mailto:deepakmc...@gmail.com> > 
Sent: Thursday, February 14, 2019 12:10 PM
To: spark users mailto:user@spark.apache.org> >
Subject: Spark streaming filling the disk with logs

 

Hi All

I am running a spark streaming job with below configuration :

 

--conf "spark.executor.extraJavaOptions=-Droot.logger=WARN,console"

 

But it’s still filling the disk with info logs.

If the logging level is set to WARN at cluster level , then only the WARN logs 
are getting written but then it affects all the jobs .

 

Is there any way to get rid of INFO level of logging at spark streaming job 
level ?

 

Thanks

Deepak 

 

-- 

Thanks
Deepak
www.bigdatabig.com  
www.keosha.net  



What is the recommended way to store records that don't meet a filter?

2019-01-28 Thread email
Community , 

 

Given a dataset ds , what is the recommended way to store the records that
don't meet a filter?

 

For example : 

 

val ds = Seq(1,2,3,4).toDS

 

val f = (i : Integer) => i < 2 

 

val filtered = ds.filter(f(_)) 

 

I understand I can do this : 

 

val filterNotMet = ds.filter(!f(_)) 

 

But unless I am missing something , I believe this means that Spark will
iterate and apply the filter twice which sounds like an overhead to me.
Please clarify if this is not the case.

 

Another option I can think of is to do something like this : 

 

val fudf = udf(f)

 

val applyFilterUDF = ds.withColumn("filtered",fudf($"value"))

 

val filteredUDF = applyFilter.where(applyFilter("filtered") === true)

 

val filterNotMetUDF = applyFilter.where(applyFilter("filtered") === false)

 

But from the plan I can't really tell if it is any better : 

 

scala> filtered.explain

== Physical Plan ==

*(1) Filter .apply$mcZI$sp

+- LocalTableScan [value#149]

 

scala> applyFilterUDF.explain

== Physical Plan ==

LocalTableScan [value#149, filtered#153]

 

scala> filterNotMet.explain

== Physical Plan ==

*(1) Filter .apply$mcZI$sp

+- LocalTableScan [value#149]

 

scala> filterNotMetUDF.explain

== Physical Plan ==

*(1) Project [value#62, UDF(value#62) AS filtered#97]

+- *(1) Filter (UDF(value#62) = false)

   +- LocalTableScan [value#62]

 

 

Thank you.

 

 



RE: Is it possible to rate limit an UDP?

2019-01-12 Thread email
Thank you for your suggestion Ramandeep , but the code is not clear to me. 
Could you please explain it?  Particularly this part : 

 

Flowable.zip[java.lang.Long, Row, Row](delSt, itF, new 
BiFunction[java.lang.Long, Row, Row]() {

 

Also , is it possible to achieve this without third party libraries? 

 

Thank you

 

From: Ramandeep Singh  
Sent: Thursday, January 10, 2019 1:48 AM
To: Sonal Goyal 
Cc: em...@yeikel.com; user 
Subject: Re: Is it possible to rate limit an UDP?

 

Backpressure is the suggested way out here and is the correct approach, it rate 
limits at the source itself for safety.   Imagine a service with throttling 
enabled, It can outright reject your calls. 

 

Even if you split your df that alone won't achieve your purpose, You can 
combine that with backpressure enabled API or restricting by time.

 

Here's an example, Using RxJava, if you don't want to use any streaming api. 

def main(args: Array[String]): Unit = {
  val ss = 
SparkSession.builder().master("local[*]").enableHiveSupport().getOrCreate()

  import ss.sqlContext.implicits._

  val df = ss.read.json("src/main/resources/person.json")
  implicit val encoder = RowEncoder(df.schema)
  df.repartition(2).mapPartitions(it => {
val itF = Flowable.fromIterable[Row](it.toIterable.asJava)
val delSt = Flowable.interval(1, TimeUnit.SECONDS)
Flowable.zip[java.lang.Long, Row, Row](delSt, itF, new 
BiFunction[java.lang.Long, Row, Row]() {
  override def apply(t1: java.lang.Long, t2: Row): Row = {
//call api here
t2
  }
}).toList.blockingGet().iterator().asScala
  })
  df.show()
}

 

On Wed, Jan 9, 2019 at 6:12 AM Sonal Goyal mailto:sonalgoy...@gmail.com> > wrote:

Have you tried controlling the number of partitions of the dataframe? Say you 
have 5 partitions, it means you are making 5 concurrent calls to the web 
service. The throughput of the web service would be your bottleneck and Spark 
workers would be waiting for tasks, but if you cant control the REST service, 
maybe its worth a shot.  




Thanks,
Sonal
Nube Technologies 

  

 





 

 

On Wed, Jan 9, 2019 at 4:51 AM mailto:em...@yeikel.com> > 
wrote:

I have a data frame for which I apply an UDF that calls a REST web service.  
This web service is distributed in only a few nodes and it won’t be able to 
handle a massive load from Spark. 

 

Is it possible to rate limit this UDP? For example , something like 100 op/s. 

 

If not , what are the options? Is splitting the df an option? 

 

I’ve read a similar question in Stack overflow [1] and the solution suggests 
Spark Streaming , but my application does not involve streaming. Do I need to 
turn the operations into a streaming workflow to achieve something like that? 

 

Current Workflow : Hive -> Spark ->  Service

 

Thank you

 

[1] 
https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-operation
 

 




 

-- 

Regards,

Ramandeep Singh

Blog:http://ramannanda.blogspot.com



Is it possible to rate limit an UDP?

2019-01-08 Thread email
I have a data frame for which I apply an UDF that calls a REST web service.
This web service is distributed in only a few nodes and it won't be able to
handle a massive load from Spark. 

 

Is it possible to rate limit this UDP? For example , something like 100
op/s. 

 

If not , what are the options? Is splitting the df an option? 

 

I've read a similar question in Stack overflow [1] and the solution suggests
Spark Streaming , but my application does not involve streaming. Do I need
to turn the operations into a streaming workflow to achieve something like
that? 

 

Current Workflow : Hive -> Spark ->  Service

 

Thank you

 

[1]
https://stackoverflow.com/questions/43953882/how-to-rate-limit-a-spark-map-o
peration



Can an UDF return a custom class other than case class?

2019-01-06 Thread email
Hi , 

 

Is it possible to return a custom class from an UDF other than a case class?


 

If so , how can we avoid this exception ? :
java.lang.UnsupportedOperationException: Schema for type {custom type} is
not supported

 

Full Example : 

 

import spark.implicits._

import org.apache.spark.sql.functions.udf

 

class Person (val name : String)

 

val toPerson = (s1 : String) => new Person(s1)

 

val dataset = Seq("John Smith").toDF("name")

 

val personUDF = udf(toPerson)

 

java.lang.UnsupportedOperationException: Schema for type Person is not
supported

  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(Sca
laReflection.scala:780)

  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(Sca
laReflection.scala:715)

  at
scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scal
a:56)

  at
org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects
(ScalaReflection.scala:824)

  at
org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(Scal
aReflection.scala:39)

  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.sca
la:714)

  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.sca
la:711)

  at org.apache.spark.sql.functions$.udf(functions.scala:3340)

 

dataset.withColumn("person", personUDF($"name"))

 

 

Thank you. 



Do spark-submit overwrite the Spark session created manually?

2018-12-31 Thread email
Hi Community , 

 

When we submit a job using 'spark-submit' passing options like the 'master
url' what should be the content of the main class? 

 

For example , if I create the session myself : 

 

val spark = SparkSession.builder.

  master("local[*]")

  .appName("Console")

  .config("spark.app.id", "spark-es")

  .getOrCreate()

 

Will spark-submit overwrite parameters like the master url , name , etc?  

 

Thank you



RE: What are the alternatives to nested DataFrames?

2018-12-29 Thread email
1 - I am not sure how can I do what you suggest for #1 because I  use the 
entries in the initial df to build the query and then from it I get the second 
df. Could you explain more?

 

2 - I also thought about doing what you consider in #2 , but if I am not 
mistaken If I use regular Scala data structures it won’t be distributed and it 
might run out of memory?

 

 

I also tried collecting the second dataframe to a Seq , but it also produced 
the null pointer.  

 

From: Shahab Yunus  
Sent: Friday, December 28, 2018 11:21 PM
To: em...@yeikel.com
Cc: Andrew Melo ; user 
Subject: Re: What are the alternatives to nested DataFrames?

 

2 options I can think of:

 

1- Can you perform a union of dfs returned by elastic research queries. It 
would still be distributed but I don't know if you will run out of how many 
union operations you can perform at a time.

 

2- Can you used some other api method of elastic search other than which 
returns a dataframe?

 

On Fri, Dec 28, 2018 at 10:30 PM mailto:em...@yeikel.com> > 
wrote:

I could , but only if I had it beforehand.  I do not know what the dataframe is 
until I pass the query parameter and receive the resultant dataframe inside the 
iteration.  

 

The steps are : 

 

Original DF -> Iterate -> Pass every element to a function that takes the 
element of the original DF and returns a new dataframe including all the 
matching terms

 

 

From: Andrew Melo mailto:andrew.m...@gmail.com> > 
Sent: Friday, December 28, 2018 8:48 PM
To: em...@yeikel.com  
Cc: Shahab Yunus mailto:shahab.yu...@gmail.com> >; 
user mailto:user@spark.apache.org> >
Subject: Re: What are the alternatives to nested DataFrames?

 

Could you join() the DFs on a common key?

 

On Fri, Dec 28, 2018 at 18:35 mailto:em...@yeikel.com> > 
wrote:

Shabad , I am not sure what you are trying to say. Could you please give me an 
example? The result of the Query is a Dataframe that is created after 
iterating, so I am not sure how could I map that to a column without iterating 
and getting the values. 

 

I have a Dataframe that contains a list of cities for which I would like to 
iterate over and search in Elasticsearch.  This list is stored in Dataframe 
because it contains hundreds of thousands of elements with multiple properties 
that would not fit in a single machine. 

 

The issue is that the elastic-spark connector returns a Dataframe as well which 
leads to a dataframe creation within a Dataframe

 

The only solution I found is to store the list of cities in a a regular scala 
Seq and iterate over that, but as far as I know this would make Seq centralized 
instead of distributed (run at the executor only?)

 

Full example : 

 

val cities= Seq("New York","Michigan")

cities.foreach(r => {

  val qb = QueryBuilders.matchQuery("name", r).operator(Operator.AND)
  print(qb.toString)

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // Returns a dataframe 
for each city

  dfs.show() // Works as expected. It prints the individual dataframe with the 
result of the query

})

 

 

val cities = Seq("New York","Michigan").toDF()

 

cities.foreach(r => {

 

  val city  = r.getString(0)

 

  val qb = QueryBuilders.matchQuery("name", city).operator(Operator.AND)

  print(qb.toString)

 

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // null pointer

 

  dfs.show()

 

})

 

 

From: Shahab Yunus mailto:shahab.yu...@gmail.com> > 
Sent: Friday, December 28, 2018 12:34 PM
To: em...@yeikel.com  
Cc: user mailto:user@spark.apache.org> >
Subject: Re: What are the alternatives to nested DataFrames?

 

Can you have a dataframe with a column which stores json (type string)? Or you 
can also have a column of array type in which you store all cities matching 
your query.

 

 

 

On Fri, Dec 28, 2018 at 2:48 AM mailto:em...@yeikel.com> > 
wrote:

Hi community ,  

 

As shown in other answers online , Spark does not support the nesting of 
DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in 
dataFrame1

 

I've tried :

 

 val cities= sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

val companyName = r.getString(0)

println(companyName)

val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a 
DataFrame consisting of all the cities matching the entry in cities

})

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

at 
org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

at Main$$anonfun$main$1.apply(Main.scala:43)

at Main$$anonfun$main$1.apply(Main.scala:39)

at scala.collection.Itera

RE: What are the alternatives to nested DataFrames?

2018-12-28 Thread email
I could , but only if I had it beforehand.  I do not know what the dataframe is 
until I pass the query parameter and receive the resultant dataframe inside the 
iteration.  

 

The steps are : 

 

Original DF -> Iterate -> Pass every element to a function that takes the 
element of the original DF and returns a new dataframe including all the 
matching terms

 

 

From: Andrew Melo  
Sent: Friday, December 28, 2018 8:48 PM
To: em...@yeikel.com
Cc: Shahab Yunus ; user 
Subject: Re: What are the alternatives to nested DataFrames?

 

Could you join() the DFs on a common key?

 

On Fri, Dec 28, 2018 at 18:35 mailto:em...@yeikel.com> > 
wrote:

Shabad , I am not sure what you are trying to say. Could you please give me an 
example? The result of the Query is a Dataframe that is created after 
iterating, so I am not sure how could I map that to a column without iterating 
and getting the values. 

 

I have a Dataframe that contains a list of cities for which I would like to 
iterate over and search in Elasticsearch.  This list is stored in Dataframe 
because it contains hundreds of thousands of elements with multiple properties 
that would not fit in a single machine. 

 

The issue is that the elastic-spark connector returns a Dataframe as well which 
leads to a dataframe creation within a Dataframe

 

The only solution I found is to store the list of cities in a a regular scala 
Seq and iterate over that, but as far as I know this would make Seq centralized 
instead of distributed (run at the executor only?)

 

Full example : 

 

val cities= Seq("New York","Michigan")

cities.foreach(r => {

  val qb = QueryBuilders.matchQuery("name", r).operator(Operator.AND)
  print(qb.toString)

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // Returns a dataframe 
for each city

  dfs.show() // Works as expected. It prints the individual dataframe with the 
result of the query

})

 

 

val cities = Seq("New York","Michigan").toDF()

 

cities.foreach(r => {

 

  val city  = r.getString(0)

 

  val qb = QueryBuilders.matchQuery("name", city).operator(Operator.AND)

  print(qb.toString)

 

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // null pointer

 

  dfs.show()

 

})

 

 

From: Shahab Yunus mailto:shahab.yu...@gmail.com> > 
Sent: Friday, December 28, 2018 12:34 PM
To: em...@yeikel.com  
Cc: user mailto:user@spark.apache.org> >
Subject: Re: What are the alternatives to nested DataFrames?

 

Can you have a dataframe with a column which stores json (type string)? Or you 
can also have a column of array type in which you store all cities matching 
your query.

 

 

 

On Fri, Dec 28, 2018 at 2:48 AM mailto:em...@yeikel.com> > 
wrote:

Hi community ,  

 

As shown in other answers online , Spark does not support the nesting of 
DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in 
dataFrame1

 

I've tried :

 

 val cities= sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

val companyName = r.getString(0)

println(companyName)

val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a 
DataFrame consisting of all the cities matching the entry in cities

})

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

at 
org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

at Main$$anonfun$main$1.apply(Main.scala:43)

at Main$$anonfun$main$1.apply(Main.scala:39)

at scala.collection.Iterator$class.foreach(Iterator.scala:742)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)

at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1 
times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost 
task 7

RE: What are the alternatives to nested DataFrames?

2018-12-28 Thread email
Shabad , I am not sure what you are trying to say. Could you please give me an 
example? The result of the Query is a Dataframe that is created after 
iterating, so I am not sure how could I map that to a column without iterating 
and getting the values. 

 

I have a Dataframe that contains a list of cities for which I would like to 
iterate over and search in Elasticsearch.  This list is stored in Dataframe 
because it contains hundreds of thousands of elements with multiple properties 
that would not fit in a single machine. 

 

The issue is that the elastic-spark connector returns a Dataframe as well which 
leads to a dataframe creation within a Dataframe

 

The only solution I found is to store the list of cities in a a regular scala 
Seq and iterate over that, but as far as I know this would make Seq centralized 
instead of distributed (run at the executor only?)

 

Full example : 

 

val cities= Seq("New York","Michigan")

cities.foreach(r => {

  val qb = QueryBuilders.matchQuery("name", r).operator(Operator.AND)
  print(qb.toString)

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // Returns a dataframe 
for each city

  dfs.show() // Works as expected. It prints the individual dataframe with the 
result of the query

})

 

 

val cities = Seq("New York","Michigan").toDF()

 

cities.foreach(r => {

 

  val city  = r.getString(0)

 

  val qb = QueryBuilders.matchQuery("name", city).operator(Operator.AND)

  print(qb.toString)

 

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // null pointer

 

  dfs.show()

 

})

 

 

From: Shahab Yunus  
Sent: Friday, December 28, 2018 12:34 PM
To: em...@yeikel.com
Cc: user 
Subject: Re: What are the alternatives to nested DataFrames?

 

Can you have a dataframe with a column which stores json (type string)? Or you 
can also have a column of array type in which you store all cities matching 
your query.

 

 

 

On Fri, Dec 28, 2018 at 2:48 AM mailto:em...@yeikel.com> > 
wrote:

Hi community ,  

 

As shown in other answers online , Spark does not support the nesting of 
DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in 
dataFrame1

 

I've tried :

 

 val cities= sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

val companyName = r.getString(0)

println(companyName)

val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a 
DataFrame consisting of all the cities matching the entry in cities

})

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

at 
org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

at Main$$anonfun$main$1.apply(Main.scala:43)

at Main$$anonfun$main$1.apply(Main.scala:39)

at scala.collection.Iterator$class.foreach(Iterator.scala:742)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)

at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1 
times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost 
task 7.0 in stage 0.0 (TID 7, localhost, executor driver): 
java.lang.NullPointerException

 

What options do I have?

 

Thank you.



What are the alternatives to nested DataFrames?

2018-12-27 Thread email
Hi community ,  

 

As shown in other answers online , Spark does not support the nesting of
DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in
dataFrame1

 

I've tried :

 

 val cities= sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

val companyName = r.getString(0)

println(companyName)

val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns
a DataFrame consisting of all the cities matching the entry in cities

})

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

at
org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:3
7)

at Main$$anonfun$main$1.apply(Main.scala:43)

at Main$$anonfun$main$1.apply(Main.scala:39)

at scala.collection.Iterator$class.foreach(Iterator.scala:742)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)

at
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scal
a:921)

at
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scal
a:921)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:206
7)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:206
7)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

at org.apache.spark.scheduler.Task.run(Task.scala:109)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
49)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
24)

at java.lang.Thread.run(Thread.java:748)

2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1
times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure:
Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver):
java.lang.NullPointerException

 

What options do I have?

 

Thank you.