Re: Looping through a series of telephone numbers

2023-04-02 Thread Anastasios Zouzias
Hi Philippe,

I would like to draw your attention to this great library that saved my day
in the past when parsing phone numbers in Spark:

https://github.com/google/libphonenumber

If you combine it with Bjørn's suggestions you will have a good start on
your linkage task.

Best regards,
Anastasios Zouzias


On Sat, Apr 1, 2023 at 8:31 PM Philippe de Rochambeau 
wrote:

> Hello,
> I’m looking for an efficient way in Spark to search for a series of
> telephone numbers, contained in a CSV file, in a data set column.
>
> In pseudo code,
>
> for tel in [tel1, tel2, …. tel40,000]
> search for tel in dataset using .like(« %tel% »)
> end for
>
> I’m using the like function because the telephone numbers in the data set
> main contain prefixes, such as « + « ; e.g., « +331222 ».
>
> Any suggestions would be welcome.
>
> Many thanks.
>
> Philippe
>
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
-- Anastasios Zouzias



[Structured Streaming] Robust watermarking calculation with future timestamps

2019-11-13 Thread Anastasios Zouzias
Hi all,

We currently have the following issue with a Spark Structured Streaming
(SS) application. The application reads messages from thousands of source
systems, stores them in Kafka and Spark aggregates them using SS and
watermarking (15 minutes).

The root problem is that a few of the source systems have a wrong timezone
setup that makes them emit messages from the future, i.e., +1 hour ahead of
current time (mis-configuration or winter/summer timezone change (yeah!) ).
Since watermarking is calculated as

(most latest timestamp value of all messages) - (watermarking threshold
value, 15 mins),

most of the messages are dropped due to the fact that are delayed by more
than 45 minutes. To an even more extreme scenario, even a single "future" /
adversarial message can make the structured streaming application to report
zero messages (per mini-batch).

Is there any user exposed SS API that allows a more robust calculation of
watermarking, i.e., 95th percentile of timestamps instead of max timestamp?
I understand that such calculation will be more expensive, but it will make
the application more robust.

Any suggestions/ideas?

PS. Of course the best approach would be to fix the issue on all source
systems but this might take time to do so (or perhaps drop future messages
programmatically (yikes) ).

Best regards,
Anastasios


Re: Handling of watermark in structured streaming

2019-05-14 Thread Anastasios Zouzias
Hi Joe,

How often do you trigger your mini-batch? Maybe you can specify the trigger
time explicitly to a low value or even better set it off.

See:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

Best,
Anastasios



On Tue, May 14, 2019 at 3:49 PM Joe Ammann  wrote:

> Hi all
>
> I'm fairly new to Spark structured streaming and I'm only starting to
> develop an understanding for the watermark handling.
>
> Our application reads data from a Kafka input topic and as one of the
> first steps, it has to group incoming messages. Those messages come in
> bulks, e.g. 5 messages which belong to the same "business event" (share a
> common key), with event timestamps differing in only a few millisecs. And
> then no messages for say 3 minutes. And after that another bulk of 3
> messages with very close event timestamps.
>
> I have set a watermark of 20 seconds on my streaming query, and a groupBy
> on the shared common key, and a window of 20 seconds (10 seconds sliding).
> So something like
>
> df = inputStream.withWatermark("eventtime", "20
> seconds").groupBy("sharedId", window("20 seconds", "10 seconds")
>
> The output mode is set to append, since I intend to join this streams with
> other streams later in the application.
>
> Naively, I would have expected to see any incoming bulk of messages as an
> aggregated message ~20 seconds after it's eventtime on the output stream.
> But my observations indicate that the "latest bulk of events" always stays
> queued inside the query, until a new bulk of events arrive and bump up the
> watermark. In my example above, this means that I see the first bulk of
> events only after 3 minutes, when the second bulk comes in.
>
> This does indeed make some sense, and if I understand the documentation
> correctly the watermark is only ever updated upon arrival of new inputs.
> The "real time" does not play a role in the setting of watermarks.
>
> But to me this means that any bulk of events is prohibited from being sent
> downstreams until a new bulk comes in. This is not what I intended.
>
> Is my understanding more or less correct? And is there any way of bringing
> "the real time" into the calculation of the watermark (short of producing
> regular dummy messages which are then again filtered out).
>
> --
> CU, Joe
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
-- Anastasios Zouzias



Re: Spark Structured Streaming | Highly reliable de-duplication strategy

2019-05-01 Thread Anastasios Zouzias
Hi,

Have you checked the docs, i.e.,
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#streaming-deduplication

You can generate a uuid column in your streaming DataFrame and drop
duplicate messages with a single line of code.

Best,
Anastasios

On Wed, May 1, 2019 at 11:15 AM Akshay Bhardwaj <
akshay.bhardwaj1...@gmail.com> wrote:

> Hi All,
>
> Floating this again. Any suggestions?
>
>
> Akshay Bhardwaj
> +91-97111-33849
>
>
> On Tue, Apr 30, 2019 at 7:30 PM Akshay Bhardwaj <
> akshay.bhardwaj1...@gmail.com> wrote:
>
>> Hi Experts,
>>
>> I am using spark structured streaming to read message from Kafka, with a
>> producer that works with at-least once guarantee. This streaming job is
>> running on Yarn cluster with hadoop 2.7 and spark 2.3
>>
>> What is the most reliable strategy for avoiding duplicate data within
>> stream in the scenarios of fail-over or job restarts/re-submits, and
>> guarantee exactly once non-duplicate stream?
>>
>>
>>1. One of the strategies I have read other people using is to
>>maintain an external KV store for unique-key/checksum of the incoming
>>message, and write to a 2nd kafka topic only if the checksum is not 
>> present
>>in KV store.
>>- My doubts with this approach is how to ensure safe write to both
>>   the 2nd topic and to KV store for storing checksum, in the case of 
>> unwanted
>>   failures. How does that guarantee exactly-once with restarts?
>>
>> Any suggestions are highly appreciated.
>>
>>
>> Akshay Bhardwaj
>> +91-97111-33849
>>
>

-- 
-- Anastasios Zouzias



Re: Packaging kafka certificates in uber jar

2018-12-25 Thread Anastasios Zouzias
Hi Colin,

You can place your certificates under src/main/resources and include them
in the uber JAR, see e.g. :
https://stackoverflow.com/questions/40252652/access-files-in-resources-directory-in-jar-from-apache-spark-streaming-context

Best,
Anastasios

On Mon, Dec 24, 2018 at 10:29 PM Colin Williams <
colin.williams.seat...@gmail.com> wrote:

> I've been trying to read from kafka via a spark streaming client. I
> found out spark cluster doesn't have certificates deployed. Then I
> tried using the same local certificates I've been testing with by
> packing them in an uber jar and getting a File handle from the
> Classloader resource. But I'm getting a File Not Found exception.
> These are jks certificates. Is anybody aware how to package
> certificates in a jar with a kafka client preferably the spark one?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
-- Anastasios Zouzias



Re: conflicting version question

2018-10-26 Thread Anastasios Zouzias
Hi Nathan,

You can try to shade the dependency version that you want to use. That
said, shading is a tricky technique. Good luck.

https://softwareengineering.stackexchange.com/questions/297276/what-is-a-shaded-java-dependency


See also elasticsearch's discussion on shading

https://www.elastic.co/de/blog/to-shade-or-not-to-shade

Best,
Anastasios


On Fri, 26 Oct 2018, 15:45 Nathan Kronenfeld,
 wrote:

> Our code is currently using Gson 2.8.5.  Spark, through Hadoop-API, pulls
> in Gson 2.2.4.
>
> At the moment, we just get "method X not found" exceptions because of this
> - because when we run in Spark, 2.2.4 is what gets loaded.
>
> Is there any way to have both versions exist simultaneously? To load 2.8.5
> so that our code uses it, without messing up spark?
>
> Thanks,
>   -Nathan Kronenfeld
>


Re: Dataframe from 1.5G json (non JSONL)

2018-06-05 Thread Anastasios Zouzias
Are you sure that your JSON file has the right format?

spark.read.json(...) expects a file where *each line is a json object*.

My wild guess is that

val hdf=spark.read.json("/user/tmp/hugedatafile")
hdf.show(2) or hdf.take(1) gives OOM

tries to fetch all the data into the driver. Can you reformat your input
file and try again?

Best,
Anastasios



On Tue, Jun 5, 2018 at 8:39 PM, raksja  wrote:

> I have a json file which is a continuous array of objects of similar type
> [{},{}...] for about 1.5GB uncompressed and 33MB gzip compressed.
>
> This is uploaded hugedatafile to hdfs and this is not a JSONL file, its a
> whole regular json file.
>
>
> [{"id":"1","entityMetadata":{"lastChange":"2018-05-11
> 01:09:18.0","createdDateTime":"2018-05-11
> 01:09:18.0","modifiedDateTime":"2018-05-11
> 01:09:18.0"},"type":"11"},{"id":"2","entityMetadata":{"
> lastChange":"2018-05-11
> 01:09:18.0","createdDateTime":"2018-05-11
> 01:09:18.0","modifiedDateTime":"2018-05-11
> 01:09:18.0"},"type":"11"},{"id":"3","entityMetadata":{"
> lastChange":"2018-05-11
> 01:09:18.0","createdDateTime":"2018-05-11
> 01:09:18.0","modifiedDateTime":"2018-05-11
> 01:09:18.0"},"type":"11"}..]
>
>
> I get OOM on executors whenever i try to load this into spark.
>
> Try 1
> val hdf=spark.read.json("/user/tmp/hugedatafile")
> hdf.show(2) or hdf.take(1) gives OOM
>
> Try 2
> Took a small sampledatafile and got schema to avoid schema infering
> val sampleSchema=spark.read.json("/user/tmp/sampledatafile").schema
> val hdf=spark.read.schema(sampleSchema).json("/user/tmp/hugedatafile")
> hdf.show(2) or hdf.take(1) stuck for 1.5 hrs and gives OOM
>
> Try 3
> Repartition it after before performing action
> gives OOM
>
> Try 4
> Read about the https://issues.apache.org/jira/browse/SPARK-20980
> completely
> val hdf = spark.read.option("multiLine",
> true)..schema(sampleSchema).json("/user/tmp/hugedatafile")
> hdf.show(1) or hdf.take(1) gives OOM
>
>
> Can any one help me here?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
-- Anastasios Zouzias



Re: Fastest way to drop useless columns

2018-05-31 Thread Anastasios Zouzias
Hi Julien,

One quick and easy to implement idea is to use sampling on your dataset,
i.e., sample a large enough subset of your data and test is there are no
unique values on some columns. Repeat the process a few times and then do
the full test on the surviving columns.

This will allow you to load only a subset of your dataset if it is stored
in Parquet.

Best,
Anastasios

On Thu, May 31, 2018 at 10:34 AM,  wrote:

> Hi there !
>
> I have a potentially large dataset ( regarding number of rows and cols )
>
> And I want to find the fastest way to drop some useless cols for me, i.e.
> cols containing only an unique value !
>
> I want to know what do you think that I could do to do this as fast as
> possible using spark.
>
>
> I already have a solution using distinct().count() or approxCountDistinct()
> But, they may not be the best choice as this requires to go through all
> the data, even if the 2 first tested values for a col are already different
> ( and in this case I know that I can keep the col )
>
>
> Thx for your ideas !
>
> Julien
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
-- Anastasios Zouzias



Re: Can spark handle this scenario?

2018-02-17 Thread Anastasios Zouzias
>>>>>>>
>>>>>>> I have a user case:
>>>>>>>
>>>>>>> I want to download S stock data from Yahoo API in parallel
>>>>>>> using Spark. I have got all stock symbols as a Dataset. Then I used 
>>>>>>> below
>>>>>>> code to call Yahoo API for each symbol:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> case class Symbol(symbol: String, sector: String)
>>>>>>>
>>>>>>> case class Tick(symbol: String, sector: String, open: Double, close:
>>>>>>> Double)
>>>>>>>
>>>>>>>
>>>>>>> // symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns
>>>>>>> Dataset[Tick]
>>>>>>>
>>>>>>>
>>>>>>> symbolDs.map { k =>
>>>>>>>
>>>>>>>   pullSymbolFromYahoo(k.symbol, k.sector)
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> This statement cannot compile:
>>>>>>>
>>>>>>>
>>>>>>> Unable to find encoder for type stored in a Dataset.  Primitive
>>>>>>> types (Int, String, etc) and Product types (case classes) are supported 
>>>>>>> by
>>>>>>> importing spark.implicits._  Support for serializing other types
>>>>>>> will be added in future releases.
>>>>>>>
>>>>>>>
>>>>>>> My questions are:
>>>>>>>
>>>>>>>
>>>>>>> 1. As you can see, this scenario is not traditional dataset handling
>>>>>>> such as count, sql query... Instead, it is more like a UDF which apply
>>>>>>> random operation on each record. Is Spark good at handling such 
>>>>>>> scenario?
>>>>>>>
>>>>>>>
>>>>>>> 2. Regarding the compilation error, any fix? I did not find a
>>>>>>> satisfactory solution online.
>>>>>>>
>>>>>>>
>>>>>>> Thanks for help!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Ayan Guha
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>>
>>
>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Several Aggregations on a window function

2017-12-18 Thread Anastasios Zouzias
Hi,

You can use https://twitter.github.io/algebird/ which provides an
implementation of interesting Monoids and ways to combine them to tuples
(or products) of Monoids. Of course, you are not bound to use the algebird
library but it might be helpful to bootstrap.



On Mon, Dec 18, 2017 at 7:18 PM, Julien CHAMP <jch...@tellmeplus.com> wrote:

> It seems interesting, however scalding seems to require be used outside of
> spark ?
>
>
> Le lun. 18 déc. 2017 à 17:15, Anastasios Zouzias <zouz...@gmail.com> a
> écrit :
>
>> Hi Julien,
>>
>> I am not sure if my answer applies on the streaming part of your
>> question. However, in batch processing, if you want to perform multiple
>> aggregations over an RDD with a single pass, a common approach is to use
>> multiple aggregators (a.k.a. tuple monoids), see below an example from
>> algebird:
>>
>> https://github.com/twitter/scalding/wiki/Aggregation-
>> using-Algebird-Aggregators#composing-aggregators.
>>
>> Best,
>> Anastasios
>>
>> On Mon, Dec 18, 2017 at 10:38 AM, Julien CHAMP <jch...@tellmeplus.com>
>> wrote:
>>
>>> I've been looking for several solutions but I can't find something
>>> efficient to compute many window function efficiently ( optimized
>>> computation or efficient parallelism )
>>> Am I the only one interested by this ?
>>>
>>>
>>> Regards,
>>>
>>> Julien
>>>
>>
>>> Le ven. 15 déc. 2017 à 21:34, Julien CHAMP <jch...@tellmeplus.com> a
>>> écrit :
>>>
>>>> May be I should consider something like impala ?
>>>>
>>>> Le ven. 15 déc. 2017 à 11:32, Julien CHAMP <jch...@tellmeplus.com> a
>>>> écrit :
>>>>
>>>>> Hi Spark Community members !
>>>>>
>>>>> I want to do several ( from 1 to 10) aggregate functions using window
>>>>> functions on something like 100 columns.
>>>>>
>>>>> Instead of doing several pass on the data to compute each aggregate
>>>>> function, is there a way to do this efficiently ?
>>>>>
>>>>>
>>>>>
>>>>> Currently it seems that doing
>>>>>
>>>>>
>>>>> val tw =
>>>>>   Window
>>>>> .orderBy("date")
>>>>> .partitionBy("id")
>>>>> .rangeBetween(-803520L, 0)
>>>>>
>>>>> and then
>>>>>
>>>>> x
>>>>>.withColumn("agg1", max("col").over(tw))
>>>>>.withColumn("agg2", min("col").over(tw))
>>>>>.withColumn("aggX", avg("col").over(tw))
>>>>>
>>>>>
>>>>> Is not really efficient :/
>>>>> It seems that it iterates on the whole column for each aggregation ?
>>>>> Am I right ?
>>>>>
>>>>> Is there a way to compute all the required operations on a columns
>>>>> with a single pass ?
>>>>> Event better, to compute all the required operations on ALL columns
>>>>> with a single pass ?
>>>>>
>>>>> Thx for your Future[Answers]
>>>>>
>>>>> Julien
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>> Julien CHAMP — Data Scientist
>>>>>
>>>>>
>>>>> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : 
>>>>> **jch...@tellmeplus.com
>>>>> <jch...@tellmeplus.com>*
>>>>>
>>>>> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
>>>>> <https://www.linkedin.com/in/julienchamp>
>>>>>
>>>>> TellMePlus S.A — Predictive Objects
>>>>>
>>>>> *Paris* : 7 rue des Pommerots, 78400 Chatou
>>>>> <https://maps.google.com/?q=7+rue+des+Pommerots,+78400+Chatou=gmail=g>
>>>>> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière
>>>>> <https://maps.google.com/?q=51+impasse+des+%C3%A9glantiers,+34980+St+Cl%C3%A9ment+de+Rivi%C3%A8re=gmail=g>
>>>>>
>>>> --
>>>>
>>>>
>>>> Julien CHAMP — Data Scientist
>>>>
>>>>
>>>> *Web : **www.tell

Re: Several Aggregations on a window function

2017-12-18 Thread Anastasios Zouzias
Hi Julien,

I am not sure if my answer applies on the streaming part of your question.
However, in batch processing, if you want to perform multiple aggregations
over an RDD with a single pass, a common approach is to use multiple
aggregators (a.k.a. tuple monoids), see below an example from algebird:

https://github.com/twitter/scalding/wiki/Aggregation-using-Algebird-Aggregators#composing-aggregators
.

Best,
Anastasios

On Mon, Dec 18, 2017 at 10:38 AM, Julien CHAMP <jch...@tellmeplus.com>
wrote:

> I've been looking for several solutions but I can't find something
> efficient to compute many window function efficiently ( optimized
> computation or efficient parallelism )
> Am I the only one interested by this ?
>
>
> Regards,
>
> Julien
>
> Le ven. 15 déc. 2017 à 21:34, Julien CHAMP <jch...@tellmeplus.com> a
> écrit :
>
>> May be I should consider something like impala ?
>>
>> Le ven. 15 déc. 2017 à 11:32, Julien CHAMP <jch...@tellmeplus.com> a
>> écrit :
>>
>>> Hi Spark Community members !
>>>
>>> I want to do several ( from 1 to 10) aggregate functions using window
>>> functions on something like 100 columns.
>>>
>>> Instead of doing several pass on the data to compute each aggregate
>>> function, is there a way to do this efficiently ?
>>>
>>>
>>>
>>> Currently it seems that doing
>>>
>>>
>>> val tw =
>>>   Window
>>> .orderBy("date")
>>> .partitionBy("id")
>>> .rangeBetween(-803520L, 0)
>>>
>>> and then
>>>
>>> x
>>>.withColumn("agg1", max("col").over(tw))
>>>.withColumn("agg2", min("col").over(tw))
>>>.withColumn("aggX", avg("col").over(tw))
>>>
>>>
>>> Is not really efficient :/
>>> It seems that it iterates on the whole column for each aggregation ? Am
>>> I right ?
>>>
>>> Is there a way to compute all the required operations on a columns with
>>> a single pass ?
>>> Event better, to compute all the required operations on ALL columns with
>>> a single pass ?
>>>
>>> Thx for your Future[Answers]
>>>
>>> Julien
>>>
>>>
>>>
>>>
>>>
>>> --
>>>
>>>
>>> Julien CHAMP — Data Scientist
>>>
>>>
>>> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : 
>>> **jch...@tellmeplus.com
>>> <jch...@tellmeplus.com>*
>>>
>>> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
>>> <https://www.linkedin.com/in/julienchamp>
>>>
>>> TellMePlus S.A — Predictive Objects
>>>
>>> *Paris* : 7 rue des Pommerots, 78400 Chatou
>>> <https://maps.google.com/?q=7+rue+des+Pommerots,+78400+Chatou=gmail=g>
>>> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière
>>> <https://maps.google.com/?q=51+impasse+des+%C3%A9glantiers,+34980+St+Cl%C3%A9ment+de+Rivi%C3%A8re=gmail=g>
>>>
>> --
>>
>>
>> Julien CHAMP — Data Scientist
>>
>>
>> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : 
>> **jch...@tellmeplus.com
>> <jch...@tellmeplus.com>*
>>
>> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
>> <https://www.linkedin.com/in/julienchamp>
>>
>> TellMePlus S.A — Predictive Objects
>>
>> *Paris* : 7 rue des Pommerots, 78400 Chatou
>> <https://maps.google.com/?q=7+rue+des+Pommerots,+78400+Chatou=gmail=g>
>> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière
>> <https://maps.google.com/?q=51+impasse+des+%C3%A9glantiers,+34980+St+Cl%C3%A9ment+de+Rivi%C3%A8re=gmail=g>
>>
> --
>
>
> Julien CHAMP — Data Scientist
>
>
> *Web : **www.tellmeplus.com* <http://tellmeplus.com/> — *Email : 
> **jch...@tellmeplus.com
> <jch...@tellmeplus.com>*
>
> *Phone ** : **06 89 35 01 89 <0689350189> * — *LinkedIn* :  *here*
> <https://www.linkedin.com/in/julienchamp>
>
> TellMePlus S.A — Predictive Objects
>
> *Paris* : 7 rue des Pommerots, 78400 Chatou
> <https://maps.google.com/?q=7+rue+des+Pommerots,+78400+Chatou=gmail=g>
> *Montpellier* : 51 impasse des églantiers, 34980 St Clément de Rivière
> <https://maps.google.com/?q=51+impasse+des+%C3%A9glantiers,+34980+St+Cl%C3%A9ment+de+Rivi%C3%A8re=gmail=g>
>
>
> Ce message peut contenir des informations confidentielles ou couvertes par
> le secret professionnel, à l’intention de son destinataire. Si vous n’en
> êtes pas le destinataire, merci de contacter l’expéditeur et d’en supprimer
> toute copie.
> This email may contain confidential and/or privileged information for the
> intended recipient. If you are not the intended recipient, please contact
> the sender and delete all copies.
>
>
> <http://www.tellmeplus.com/assets/emailing/banner.html>
>



-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: best spark spatial lib?

2017-10-10 Thread Anastasios Zouzias
Hi,

Which spatial operations do you require exactly? Also, I don't follow what
you mean by combining logical operators?

I have created a library that wraps Lucene's spatial functionality here:
https://github.com/zouzias/spark-lucenerdd/wiki/Spatial-search

You could give a try to the library, it supports intersections / within /
etc. Ideally, I try to push all spatial Lucene features in the library.

See also, https://github.com/zouzias/spark-lucenerdd/wiki/Related-Work for
related libraries.

Best,
Anastasios


On Tue, Oct 10, 2017 at 11:21 AM, Imran Rajjad <raj...@gmail.com> wrote:

> I need to have a location column inside my Dataframe so that I can do
> spatial queries and geometry operations. Are there any third-party packages
> that perform this kind of operations. I have seen a few like Geospark and
> megalan but they don't support operations where spatial and logical
> operators can be combined.
>
> regards,
> Imran
>
> --
> I.R
>



-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Error - Spark reading from HDFS via dataframes - Java

2017-10-01 Thread Anastasios Zouzias
Hi,

Set the inferschema option to true in spark-csv. you may also want to set
the mode option. See readme below

https://github.com/databricks/spark-csv/blob/master/README.md

Best,
Anastasios

Am 01.10.2017 07:58 schrieb "Kanagha Kumar" :

Hi,

I'm trying to read data from HDFS in spark as dataframes. Printing the
schema, I see all columns are being read as strings. I'm converting it to
RDDs and creating another dataframe by passing in the correct schema ( how
the rows should be interpreted finally).

I'm getting the following error:

Caused by: java.lang.RuntimeException: java.lang.String is not a valid
external type for schema of bigint



Spark read API:

Dataset hdfs_dataset = new SQLContext(spark).read().option("header",
"false").csv("hdfs:/inputpath/*");

Dataset ds = new
SQLContext(spark).createDataFrame(hdfs_dataset.toJavaRDD(),
conversionSchema);
This is the schema to be converted to:
StructType(StructField(COL1,StringType,true),
StructField(COL2,StringType,true),
StructField(COL3,LongType,true),
StructField(COL4,StringType,true),
StructField(COL5,StringType,true),
StructField(COL6,LongType,true))

This is the original schema obtained once read API was invoked
StructType(StructField(_c1,StringType,true),
StructField(_c2,StringType,true),
StructField(_c3,StringType,true),
StructField(_c4,StringType,true),
StructField(_c5,StringType,true),
StructField(_c6,StringType,true))

My interpretation is even when a JavaRDD is cast to dataframe by passing in
the new schema, values are not getting type casted.
This is occurring because the above read API reads data as string types
from HDFS.

How can I  convert an RDD to dataframe by passing in the correct schema
once it is read?
How can the values by type cast correctly during this RDD to dataframe
conversion?

Or how can I read data from HDFS with an input schema in java?
Any suggestions are helpful. Thanks!


Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread Anastasios Zouzias
Hi,

I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1
solved my issue. Can you try with 2.1.1 as well and report back?

Best,
Anastasios

Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :


Hi

I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
partitions of each rdd in a dStream formed using KafkaUtils, i am getting
the below exception, please suggest a fix.

I have following config

kafka :
enable.auto.commit:"true",
auto.commit.interval.ms:"1000",
session.timeout.ms:"3",

Spark:

spark.streaming.backpressure.enabled=true

spark.streaming.kafka.maxRatePerPartition=200


Exception in task 0.2 in stage 3236.0 (TID 77795)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access

--
Kind Regards
Harsh


Re: compile error: No classtag available while calling RDD.zip()

2017-09-13 Thread Anastasios Zouzias
Hi there,

If it is OK with you to work with DataFrames, you can do

https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType,
LongType}
val df = sc.parallelize(Seq(
(1.0, 2.0), (0.0, -1.0),
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
// Append "rowid" column of type Long
val schema = df.schema
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid",
LongType, false)))
// Zip on RDD level
val rddWithId = df.rdd.zipWithIndex
// Convert back to DataFrame
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index)
=> Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
// Show results
dfZippedWithId.show

Best,
Anastasios



On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏 <blue...@cnic.cn> wrote:

> Hello,Since Dataset has no zip(..) methods, so I wrote following code to
> zip two datasets:
>
> 1   def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m:
> Dataset[X], n: Dataset[Y]) = {
> 2   val rdd = m.rdd.zip(n.rdd);
> 3   import spark.implicits._
> 4   spark.createDataset(rdd);
> 5   }
>
> However, in the m.rdd.zip(…) call, compile error is reported:   No
> ClassTag available for Y
>
> I know this error can be corrected when I declare Y as a ClassTag like
> this:
>
> 1   def foo[X: Encoder, Y: ClassTag](spark: SparkSession, …
>
> But this will make line 5 report a new error:
> Unable to find encoder for type stored in a Dataset.
>
> Now, I have no idea to solve this problem. How to declared Y as both an
> Encoder and a ClassTag?
>
> Many thanks!
>
> Best regards,
> bluejoe
> ---------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Spark SVD benchmark for dense matrices

2017-08-10 Thread Anastasios Zouzias
Hi Jose,

Just to note that in the databricks blog they state that they compute the
top-5 singular vectors, not all singular values/vectors. Computing all is
much more computational intense.

Cheers,
Anastasios





Am 09.08.2017 15:19 schrieb "Jose Francisco Saray Villamizar" <
jsa...@gmail.com>:

Hi everyone,

I am trying to invert a 5000 x 5000 Dense Matrix (99% non-zeros), by using
SVD with an approach simmilar to :

https://stackoverflow.com/questions/29969521/how-to-
compute-the-inverse-of-a-rowmatrix-in-apache-spark

The time Im getting with SVD is close to 10 minutes what is very long for
me.

A benchmark for SVD is already given here

https://databricks.com/blog/2014/07/21/distributing-the-
singular-value-decomposition-with-spark.html

However, it seems they are using sparse matrices, thats why they get short
times.
Have anyone of you try to perform a SVD on a very dense big matrix . ?

Is this time normal ?

Thank you.

-- 
-- 
Buen dia, alegria !!
José Francisco Saray Villamizar
cel +33 6 13710693 <+33%206%2013%2071%2006%2093>
Lyon, France


Re: Slow responce on Solr Cloud with Spark

2017-07-19 Thread Anastasios Zouzias
Hi Imran,

It seems that you do not cache your underlying DataFrame. I would suggest
to force a cache with tweets.cache() and then tweets.count(). Let us know
if your problem persists.

Best,
Anastasios

On Wed, Jul 19, 2017 at 2:49 PM, Imran Rajjad <raj...@gmail.com> wrote:

> Greetings,
>
> We are trying out Spark 2 + ThriftServer to join multiple
> collections from a Solr Cloud (6.4.x). I have followed this blog
> https://lucidworks.com/2015/08/20/solr-spark-sql-datasource/
>
> I understand that initially spark populates the temporary table with 18633014
> records and takes its due time, however any following SQLs on the
> temporary table take the same amount of time . It seems the temporary
> tables is not being re-used or cached. The fields in the solr collection do
> not have the docValue enabled, could that be the reason? Apparently I have
> missed a trick
>
> regards,
> Imran
>
> --
> I.R
>



-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Can we access files on Cluster mode

2017-06-25 Thread Anastasios Zouzias
Hi Mich,

If the driver starts on the edge node with cluster mode, then I don't see
the difference between client and cluster deploy mode.

In cluster mode, it is the responsibility of the resource manager (yarn,
etc) to decide where to run the driver (at least for spark 1.6 this is what
I have experienced).

Best,
Anastasios

On Sun, Jun 25, 2017 at 11:14 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> Hi Anastasios.
>
> Are you implying that in Yarn cluster mode even if you submit your Spark
> application on an Edge node the driver can start on any node. I was under
> the impression that the driver starts from the Edge node? and the executors
> can be on any node in the cluster (where Spark agents are running)?
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 25 June 2017 at 09:39, Anastasios Zouzias <zouz...@gmail.com> wrote:
>
>> Just to note that in cluster mode the spark driver might run on any node
>> of the cluster, hence you need to make sure that the file exists on *all*
>> nodes. Push the file on all nodes or use client deploy-mode.
>>
>> Best,
>> Anastasios
>>
>>
>> Am 24.06.2017 23:24 schrieb "Holden Karau" <hol...@pigscanfly.ca>:
>>
>>> addFile is supposed to not depend on a shared FS unless the semantics
>>> have changed recently.
>>>
>>> On Sat, Jun 24, 2017 at 11:55 AM varma dantuluri <dvsnva...@gmail.com>
>>> wrote:
>>>
>>>> Hi Sudhir,
>>>>
>>>> I believe you have to use a shared file system that is accused by all
>>>> nodes.
>>>>
>>>>
>>>> On Jun 24, 2017, at 1:30 PM, sudhir k <k.sudhi...@gmail.com> wrote:
>>>>
>>>>
>>>> I am new to Spark and i need some guidance on how to fetch files from
>>>> --files option on Spark-Submit.
>>>>
>>>> I read on some forums that we can fetch the files from
>>>> Spark.getFiles(fileName) and can use it in our code and all nodes should
>>>> read it.
>>>>
>>>> But i am facing some issue
>>>>
>>>> Below is the command i am using
>>>>
>>>> spark-submit --deploy-mode cluster --class com.check.Driver --files
>>>> /home/sql/first.sql test.jar 20170619
>>>>
>>>> so when i use SparkFiles.get(first.sql) , i should be able to read the
>>>> file Path but it is throwing File not Found exception.
>>>>
>>>> I tried SpackContext.addFile(/home/sql/first.sql) and then
>>>> SparkFiles.get(first.sql) but still the same error.
>>>>
>>>> Its working on the stand alone mode but not on cluster mode. Any help
>>>> is appreciated.. Using Spark 2.1.0 and Scala 2.11
>>>>
>>>> Thanks.
>>>>
>>>>
>>>> Regards,
>>>> Sudhir K
>>>>
>>>>
>>>>
>>>> --
>>>> Regards,
>>>> Sudhir K
>>>>
>>>>
>>>> --
>>> Cell : 425-233-8271 <(425)%20233-8271>
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Can we access files on Cluster mode

2017-06-25 Thread Anastasios Zouzias
Just to note that in cluster mode the spark driver might run on any node of
the cluster, hence you need to make sure that the file exists on *all*
nodes. Push the file on all nodes or use client deploy-mode.

Best,
Anastasios

Am 24.06.2017 23:24 schrieb "Holden Karau" :

> addFile is supposed to not depend on a shared FS unless the semantics have
> changed recently.
>
> On Sat, Jun 24, 2017 at 11:55 AM varma dantuluri 
> wrote:
>
>> Hi Sudhir,
>>
>> I believe you have to use a shared file system that is accused by all
>> nodes.
>>
>>
>> On Jun 24, 2017, at 1:30 PM, sudhir k  wrote:
>>
>>
>> I am new to Spark and i need some guidance on how to fetch files from
>> --files option on Spark-Submit.
>>
>> I read on some forums that we can fetch the files from
>> Spark.getFiles(fileName) and can use it in our code and all nodes should
>> read it.
>>
>> But i am facing some issue
>>
>> Below is the command i am using
>>
>> spark-submit --deploy-mode cluster --class com.check.Driver --files
>> /home/sql/first.sql test.jar 20170619
>>
>> so when i use SparkFiles.get(first.sql) , i should be able to read the
>> file Path but it is throwing File not Found exception.
>>
>> I tried SpackContext.addFile(/home/sql/first.sql) and then
>> SparkFiles.get(first.sql) but still the same error.
>>
>> Its working on the stand alone mode but not on cluster mode. Any help is
>> appreciated.. Using Spark 2.1.0 and Scala 2.11
>>
>> Thanks.
>>
>>
>> Regards,
>> Sudhir K
>>
>>
>>
>> --
>> Regards,
>> Sudhir K
>>
>>
>> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: KMeans Clustering is not Reproducible

2017-05-22 Thread Anastasios Zouzias
Hi Christoph,

Take a look at this, you might end up having a similar case:

http://www.spark.tc/using-sparks-cache-for-correctness-not-just-performance/

If this is not the case, then I agree with you the kmeans should be
partitioning agnostic (although I haven't check the code yet).

Best,
Anastasios


On Mon, May 22, 2017 at 3:42 PM, Christoph Bruecke <carabo...@gmail.com>
wrote:

> Hi,
>
> I’m trying to figure out how to use KMeans in order to achieve
> reproducible results. I have found that running the same kmeans instance on
> the same data, with different partitioning will produce different
> clusterings.
>
> Given a simple KMeans run with fixed seed returns different results on the
> same
> training data, if the training data is partitioned differently.
>
> Consider the following example. The same KMeans clustering set up is run on
> identical data. The only difference is the partitioning of the training
> data
> (one partition vs. four partitions).
>
> ```
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.ml.clustering.KMeans
> import org.apache.spark.ml.features.VectorAssembler
>
> // generate random data for clustering
> val randomData = spark.range(1, 1000).withColumn("a",
> rand(123)).withColumn("b", rand(321))
>
> val vecAssembler = new VectorAssembler().setInputCols(Array("a",
> "b")).setOutputCol("features")
>
> val data = vecAssembler.transform(randomData)
>
> // instantiate KMeans with fixed seed
> val kmeans = new KMeans().setK(10).setSeed(9876L)
>
> // train the model with different partitioning
> val dataWith1Partition = data.repartition(1)
> println("1 Partition: " + kmeans.fit(dataWith1Partition).computeCost(
> dataWith1Partition))
>
> val dataWith4Partition = data.repartition(4)
> println("4 Partition: " + kmeans.fit(dataWith4Partition).computeCost(
> dataWith4Partition))
> ```
>
> I get the following related cost
>
> ```
> 1 Partition: 16.028212597888057
> 4 Partition: 16.14758460544976
> ```
>
> What I want to achieve is that repeated computations of the KMeans
> Clustering should yield identical result on identical training data,
> regardless of the partitioning.
>
> Looking through the Spark source code, I guess the cause is the
> initialization method of KMeans which in turn uses the `takeSample` method,
> which does not seem to be partition agnostic.
>
> Is this behaviour expected? Is there anything I could do to achieve
> reproducible results?
>
> Best,
> Christoph
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Best Practice for Enum in Spark SQL

2017-05-12 Thread Anastasios Zouzias
Hi Mike,

FYI: Is you are using Spark 2.x, you might have issues with encoders if you
use a case class with Enumeration type field, see
https://issues.apache.org/jira/browse/SPARK-17248

For (1), (2), I would guess Int would be better (space-wise), but I am not
familiar with parquet's internals.

Best,
Anastasios

On Fri, May 12, 2017 at 5:07 AM, Mike Wheeler <rotationsymmetr...@gmail.com>
wrote:

> Hi Spark Users,
>
> I want to store Enum type (such as Vehicle Type: Car, SUV, Wagon)  in my
> data. My storage format will be parquet and I need to access the data from
> Spark-shell, Spark SQL CLI, and hive. My questions:
>
> 1) Should I store my Enum type as String or store it as numeric encoding
> (aka 1=Car, 2=SUV, 3=Wagon)?
>
> 2) If I choose String, any penalty in hard drive space or memory?
>
> Thank you!
>
> Mike
>



-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: I am not sure why I am getting java.lang.NoClassDefFoundError

2017-02-17 Thread Anastasios Zouzias
ask.run(Task.scala:86)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> 17/02/17 17:35:33 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1,
> localhost): java.lang.NoClassDefFoundError: scala/runtime/
> AbstractPartialFunction$mcJL$sp
>
> at java.lang.ClassLoader.defineClass1(Native Method)
>
> at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
>
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
>
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(
> CassandraLimit.scala:21)
>
> at com.datastax.spark.connector.rdd.CassandraTableScanRDD.
> compute(CassandraTableScanRDD.scala:367)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.lang.ClassNotFoundException: scala.runtime.
> AbstractPartialFunction$mcJL$sp
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> ... 35 more
>
>
> 17/02/17 17:35:33 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1
> times; aborting job
>
> 17/02/17 17:35:33 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> localhost): java.lang.NoClassDefFoundError: com/datastax/spark/connector/
> rdd/CassandraLimit$$anonfun$limitForIterator$1
>
> at com.datastax.spark.connector.rdd.CassandraLimit$.limitForIterator(
> CassandraLimit.scala:21)
>
> at com.datastax.spark.connector.rdd.CassandraTableScanRDD.
> compute(CassandraTableScanRDD.scala:367)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
>
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:79)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:47)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>



-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: NoNodeAvailableException (None of the configured nodes are available) error when trying to push data to Elastic from a Spark job

2017-02-03 Thread Anastasios Zouzias
> 1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
>
> at org.apache.spark.streaming.dstream.DStream.createRDDWithLoca
> lProperties(DStream.scala:415)
>
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$
> 1.apply$mcV$sp(ForEachDStream.scala:50)
>
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$
> 1.apply(ForEachDStream.scala:50)
>
> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$
> 1.apply(ForEachDStream.scala:50)
>
> at scala.util.Try$.apply(Try.scala:192)
>
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler
> $$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
>
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler
> $$anonfun$run$1.apply(JobScheduler.scala:247)
>
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler
> $$anonfun$run$1.apply(JobScheduler.scala:247)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>
> at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler
> .run(JobScheduler.scala:246)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
> Executor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
> lExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: NoNodeAvailableException[None of the configured nodes are
> available: [{#transport#-1}{XX.XXX.XXX.XX}{XX.XXX.XXX.XX:9300}]]
>
> at org.elasticsearch.client.transport.TransportClientNodesServi
> ce.ensureNodesAreAvailable(TransportClientNodesService.java:290)
>
> at org.elasticsearch.client.transport.TransportClientNodesServi
> ce.execute(TransportClientNodesService.java:207)
>
> at org.elasticsearch.client.transport.support.TransportProxyCli
> ent.execute(TransportProxyClient.java:55)
>
> at org.elasticsearch.client.transport.TransportClient.doExecute
> (TransportClient.java:288)
>
> at org.elasticsearch.client.support.AbstractClient.execute(
> AbstractClient.java:359)
>
> at org.elasticsearch.client.support.AbstractClient$ClusterAdmin
> .execute(AbstractClient.java:853)
>
> at org.elasticsearch.action.ActionRequestBuilder.execute(Action
> RequestBuilder.java:86)
>
> at org.elasticsearch.action.ActionRequestBuilder.execute(Action
> RequestBuilder.java:56)
>
> at org.elasticsearch.action.ActionRequestBuilder.get(ActionRequ
> estBuilder.java:64)
>
> at com.myco.MyDriver.work()
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartit
> ion$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartit
> ion$1.apply(JavaRDDLike.scala:218)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfu
> n$apply$28.apply(RDD.scala:902)
>
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfu
> n$apply$28.apply(RDD.scala:902)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkC
> ontext.scala:1916)
>
> at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkC
> ontext.scala:1916)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.sca
> la:70)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
> scala:274)
>



-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Equally split a RDD partition into two partition at the same node

2017-01-15 Thread Anastasios Zouzias
Hi Fei,

I looked at the code of CoalescedRDD and probably what I suggested will not
work.

Speaking of which, CoalescedRDD is private[spark]. If this was not the
case, you could set balanceSlack to 1, and get what you requested, see

https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala#L75

Maybe you could try to use the CoalescedRDD code to implement your
requirement.

Good luck!
Cheers,
Anastasios


On Sun, Jan 15, 2017 at 5:39 PM, Fei Hu <hufe...@gmail.com> wrote:

> Hi Anastasios,
>
> Thanks for your reply. If I just increase the numPartitions to be twice
> larger, how coalesce(numPartitions: Int, shuffle: Boolean = false) keeps
> the data locality? Do I need to define my own Partitioner?
>
> Thanks,
> Fei
>
> On Sun, Jan 15, 2017 at 3:58 AM, Anastasios Zouzias <zouz...@gmail.com>
> wrote:
>
>> Hi Fei,
>>
>> How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ?
>>
>> https://github.com/apache/spark/blob/branch-1.6/core/src/
>> main/scala/org/apache/spark/rdd/RDD.scala#L395
>>
>> coalesce is mostly used for reducing the number of partitions before
>> writing to HDFS, but it might still be a narrow dependency (satisfying your
>> requirements) if you increase the # of partitions.
>>
>> Best,
>> Anastasios
>>
>> On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote:
>>
>>> Dear all,
>>>
>>> I want to equally divide a RDD partition into two partitions. That
>>> means, the first half of elements in the partition will create a new
>>> partition, and the second half of elements in the partition will generate
>>> another new partition. But the two new partitions are required to be at the
>>> same node with their parent partition, which can help get high data
>>> locality.
>>>
>>> Is there anyone who knows how to implement it or any hints for it?
>>>
>>> Thanks in advance,
>>> Fei
>>>
>>>
>>
>>
>> --
>> -- Anastasios Zouzias
>> <a...@zurich.ibm.com>
>>
>
>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Equally split a RDD partition into two partition at the same node

2017-01-15 Thread Anastasios Zouzias
Hi Fei,

How you tried coalesce(numPartitions: Int, shuffle: Boolean = false) ?

https://github.com/apache/spark/blob/branch-1.6/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L395

coalesce is mostly used for reducing the number of partitions before
writing to HDFS, but it might still be a narrow dependency (satisfying your
requirements) if you increase the # of partitions.

Best,
Anastasios

On Sun, Jan 15, 2017 at 12:58 AM, Fei Hu <hufe...@gmail.com> wrote:

> Dear all,
>
> I want to equally divide a RDD partition into two partitions. That means,
> the first half of elements in the partition will create a new partition,
> and the second half of elements in the partition will generate another new
> partition. But the two new partitions are required to be at the same node
> with their parent partition, which can help get high data locality.
>
> Is there anyone who knows how to implement it or any hints for it?
>
> Thanks in advance,
> Fei
>
>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Broadcast destroy

2017-01-02 Thread Anastasios Zouzias
Hi Bryan,

I think the ContextCleaner will take care of the broadcasted variables, see
i.e.,

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-service-contextcleaner.html

If it is easy to spot when to cleanup the broadcast variables in your case,
a "xBroadcasted.destroy()" will make things more explicit.

Best,
Anastasios




On Mon, Jan 2, 2017 at 4:26 PM, <bryan.jeff...@gmail.com> wrote:

> All,
>
>
>
> Anyone have a thought?
>
>
>
> Thank you,
>
>
>
> Bryan Jeffrey
>
>
>
> *From: *bryan.jeff...@gmail.com
> *Sent: *Friday, December 30, 2016 1:20 PM
> *To: *user <user@spark.apache.org>
> *Subject: *Broadcast destroy
>
>
>
> All,
>
>
>
> If we are updating broadcast variables do we need to manually destroy the
> replaced broadcast, or will they be automatically pruned?
>
>
>
> Thank you,
>
>
>
> Bryan Jeffrey
>
>
>
> Sent from my Windows 10 phone
>
>
>
>
>



-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Ingesting data in elasticsearch from hdfs using spark , cluster setup and usage

2016-12-23 Thread Anastasios Zouzias
Hi Rohit,

Since your instances have 16G dual core only, I would suggest to use
dedicated nodes for elastic using 8GB for elastic heap memory. This way you
won't have any interference between spark executors and elastic.

Also, if possible, you could try to use SSD disk on these 3 machines for
storing the elastic indices; this will boost your elastic cluster
performance.

Best,
Anastasios

On Thu, Dec 22, 2016 at 6:35 PM, Rohit Verma <rohit.ve...@rokittech.com>
wrote:

> I am setting up a spark cluster. I have hdfs data nodes and spark master
> nodes on same instances. To add elasticsearch to this cluster, should I
> spawn es on different machine on same machine. I have only 12 machines,
> 1-master (spark and hdfs)
> 8-spark workers and hdfs data nodes
> I can use 3 nodes for es dedicatedly or can use 11 nodes running all three.
>
> All instances are same, 16gig dual core (unfortunately).
>
> Also I am trying with es hadoop, es-spark project but I felt ingestion is
> very slow if I do 3 dedicated nodes, its like 0.6 million records/minute.
> If any one had experience using that project can you please share your
> thoughts about tuning.
>
> Regards
> Rohit
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Apache Spark or Spark-Cassandra-Connector doesnt look like it is reading multiple partitions in parallel.

2016-11-26 Thread Anastasios Zouzias
Hi there,

spark.read.json usually takes a filesystem path (usually HDFS) where there
is a file containing JSON per new line. See also

http://spark.apache.org/docs/latest/sql-programming-guide.html

Hence, in your case

val df4 = spark.read.json(rdd) // This line takes forever

seems wrong. I guess you might want to first store rdd as a text file on
HDFS and then read it using spark.read.json .

Cheers,
Anastasios



On Sat, Nov 26, 2016 at 9:34 AM, kant kodali <kanth...@gmail.com> wrote:

> up vote
> down votefavorite
> <http://stackoverflow.com/questions/40797231/apache-spark-or-spark-cassandra-connector-doesnt-look-like-it-is-reading-multipl?noredirect=1#>
>
> Apache Spark or Spark-Cassandra-Connector doesnt look like it is reading
> multiple partitions in parallel.
>
> Here is my code using spark-shell
>
> import org.apache.spark.sql._
> import org.apache.spark.sql.types.StringType
> spark.sql("""CREATE TEMPORARY VIEW hello USING org.apache.spark.sql.cassandra 
> OPTIONS (table "hello", keyspace "db", cluster "Test Cluster", pushdown 
> "true")""")
> val df = spark.sql("SELECT test from hello")
> val df2 = df.select(df("test").cast(StringType).as("test"))
> val rdd = df2.rdd.map { case Row(j: String) => j }
> val df4 = spark.read.json(rdd) // This line takes forever
>
> I have about 700 million rows each row is about 1KB and this line
>
> val df4 = spark.read.json(rdd) takes forever as I get the following
> output after 1hr 30 mins
>
> Stage 1:==> (4866 + 2) / 25256]
>
> so at this rate it will probably take days.
>
> I measured the network throughput rate of spark worker nodes using iftop
> and it is about 2.2KB/s (kilobytes per second) which is too low so that
> tells me it not reading partitions in parallel or at very least it is not
> reading good chunk of data else it would be in MB/s. Any ideas on how to
> fix it?
>
>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Apache Spark SQL is taking forever to count billion rows from Cassandra?

2016-11-24 Thread Anastasios Zouzias
How fast is Cassandra without Spark on the count operation?

cqsh> SELECT COUNT(*) FROM hello

(this is not equivalent with what you are doing but might help you find the
root of the cause)

On Thu, Nov 24, 2016 at 9:03 AM, kant kodali <kanth...@gmail.com> wrote:

> I have the following code
>
> I invoke spark-shell as follows
>
> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
> --executor-memory 15G --executor-cores 12 --conf
> spark.cassandra.input.split.size_in_mb=67108864
>
> code
>
> scala> val df = spark.sql("SELECT test from hello") // Billion rows in
> hello and test column is 1KB
>
> df: org.apache.spark.sql.DataFrame = [test: binary]
>
> scala> df.count
>
> [Stage 0:>   (0 + 2) / 13] // I dont know what these numbers mean
> precisely.
>
> If I invoke spark-shell as follows
>
> ./spark-shell --conf spark.cassandra.connection.host=170.99.99.134
>
> code
>
>
> val df = spark.sql("SELECT test from hello") // This has about billion
> rows
>
> scala> df.count
>
>
> [Stage 0:=>  (686 + 2) / 24686] // What are these numbers precisely?
>
>
> Both of these versions didn't work Spark keeps running forever and I have
> been waiting for more than 15 mins and no response. Any ideas on what could
> be wrong and how to fix this?
>
> I am using Spark 2.0.2
> and spark-cassandra-connector_2.11-2.0.0-M3.jar
>
>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: Broadcast big dataset

2016-10-01 Thread Anastasios Zouzias
Hey,

Is the driver running OOM? Try 8g on the driver memory. Speaking of which,
how do you estimate that your broadcasted dataset is 500M?

Best,
Anastasios

Am 29.09.2016 5:32 AM schrieb "WangJianfei" :

> First thank you very much!
>   My executor memeory is also 4G, but my spark version is 1.5. Does spark
> version make a trouble?
>
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Broadcast-big-
> dataset-tp19127p19143.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Large-scale matrix inverse in Spark

2016-09-27 Thread Anastasios Zouzias
Hi there,

As Edward noted, if you ask a numerical analyst about matrix inversion,
they will respond "you never invert a matrix, but you solve the linear
system associated with the matrix". Linear system solving is usually done
with iterative methods or matrix decompositions (as noted above). The
reason why people avoid matrix inversion is because of its inherited poor
numerical stability.

Best,
Anastasios

On Tue, Sep 27, 2016 at 8:42 AM, Edward Fine <edward.f...@gmail.com> wrote:

> I have not found matrix inversion algorithms in Spark and I would be
> surprised to see them.  Except for matrices with very special structure
> (like those nearly the identity), inverting and n*n matrix is slower than
> O(n^2), which does not scale.  Whenever a matrix is inverted, usually a
> decomposition or a low rank approximation is used, just as Sean pointed
> out.  See further https://en.wikipedia.org/wiki/Computational_
> complexity_of_mathematical_operations#Matrix_algebra
> or if you really want to dig into it
> Stoer and Bulirsch http://www.springer.com/us/book/9780387954523
>
> On Mon, Sep 26, 2016 at 11:00 PM Sean Owen <so...@cloudera.com> wrote:
>
>> I don't recall any code in Spark that computes a matrix inverse. There is
>> code that solves linear systems Ax = b with a decomposition. For example
>> from looking at the code recently, I think the regression implementation
>> actually solves AtAx = Atb using a Cholesky decomposition. But, A = n x k,
>> where n is large but k is smallish (number of features), so AtA is k x k
>> and can be solved in-memory with a library.
>>
>> On Tue, Sep 27, 2016 at 3:05 AM, Cooper <ahmad.raban...@gmail.com> wrote:
>> > How is the problem of large-scale matrix inversion approached in Apache
>> Spark
>> > ?
>> >
>> > This linear algebra operation is obviously the very base of a lot of
>> other
>> > algorithms (regression, classification, etc). However, I have not been
>> able
>> > to find a Spark API on parallel implementation of matrix inversion. Can
>> you
>> > please clarify approaching this operation on the Spark internals ?
>> >
>> > Here <http://ieeexplore.ieee.org/abstract/document/7562171/>   is a
>> paper on
>> > the parallelized matrix inversion in Spark, however I am trying to use
>> an
>> > existing code instead of implementing one from scratch, if available.
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Large-scale-matrix-inverse-in-Spark-tp27796.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>>


-- 
-- Anastasios Zouzias
<a...@zurich.ibm.com>


Re: databricks spark-csv: linking coordinates are what?

2016-09-24 Thread Anastasios Zouzias
Hi Dan,

If you use spark <= 1.6, you can also do

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

to quickly link the spark-csv jars to spark shell. Otherwise as Holden
suggested you link it in your maven/sbt dependencies. Spark guys assume
that their users have a good working knowledge on maven/sbt; you might need
to read on these before jumping to Spark.

Best,
Anastasios

On Fri, Sep 23, 2016 at 10:26 PM, Dan Bikle <bikle...@gmail.com> wrote:

>

> >

> hello world-of-spark,
>
> I am learning spark today.
>
> I want to understand the spark code in this repo:
>
> https://github.com/databricks/spark-csv
<https://github.com/databricks/spark-csv>
>
> In the README.md I see this info:
>
> Linking
>
> You can link against this library in your program at the following
coordinates:
> Scala 2.10
>
> groupId: com.databricks
> artifactId: spark-csv_2.10
> version: 1.5.0
>
> Scala 2.11
>
> groupId: com.databricks
> artifactId: spark-csv_2.11
> version: 1.5.0
>
> I want to know how I can use the above info.
>
> The people who wrote spark-csv should give some kind of example, demo, or
context.
>
> My understanding of Linking is limited.
>
> I have some experience operating sbt which I learned from this URL:
>
>
http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications
<http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications>
>
> The above URL does not give me enough information so that I can link
spark-csv with spark.
>
> Question:
> How do I learn how to use the info in the Linking section of the
README.md of
> https://github.com/databricks/spark-csv
<https://github.com/databricks/spark-csv>
> ??
>

-- 
-- Anastasios Zouzias