Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Georg Heiler
https://stackoverflow.com/questions/46832394/spark-access-first-n-rows-take-vs-limit
might be related

Best,
Georg

Am Fr., 12. Nov. 2021 um 07:48 Uhr schrieb Sergey Ivanychev <
sergeyivanyc...@gmail.com>:

> Hi Gourav,
>
> Please, read my question thoroughly. My problem is with the plan of the
> execution and with the fact that toPandas collects all the data not on the
> driver but on an executor, not with the fact that there’s some memory
> overhead.
>
> I don’t understand how your excerpts answer my question. The chapters
> you’ve shared describe that serialization is costly, that workers can fail
> due to the memory constraints and inter-language serialization.
>
> This is irrelevant to my question — building pandas DataFrame using
> Spark’s collect() works fine and this operation itself involves much
> deserialization of Row objects.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 12 нояб. 2021 г., в 05:05, Gourav Sengupta 
> написал(а):
>
> 
> Hi Sergey,
>
> Please read the excerpts from the book of Dr. Zaharia that I had sent,
> they explain these fundamentals clearly.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev <
> sergeyivanyc...@gmail.com> wrote:
>
>> Yes, in fact those are the settings that cause this behaviour. If set to
>> false, everything goes fine since the implementation in spark sources in
>> this case is
>>
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh 
>> написал(а):
>>
>> 
>> Have you tried the following settings:
>>
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh 
>> wrote:
>>
>>> Ok so it boils down on how spark does create toPandas() DF under the
>>> bonnet. How many executors are involved in k8s cluster. In this model spark
>>> will create executors = no of nodes - 1
>>>
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev 
>>> wrote:
>>>
 > Just to confirm with Collect() alone, this is all on the driver?

 I shared the screenshot with the plan in the first email. In the
 collect() case the data gets fetched to the driver without problems.

 Best regards,


 Sergey Ivanychev

 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
 написал(а):

 Just to confirm with Collect() alone, this is all on the driver?

 --
>>>
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>


arbitrary state handling in python API

2020-09-08 Thread Georg Heiler (TU Vienna)
Hi,

how can I apply arbitrary state handling as provided by the method:
mapGroupsWithState in the java API from the python side?

Currently, it looks like this method is not available on spark 3.x in the
structured streaming python API.

Best,
Georg


Re: Parallelising JDBC reads in spark

2020-05-25 Thread Georg Heiler
Well you seem to have performance and consistency problems. Using a CDC
tool fitting for your database you might be able to fix both.
However, streaming the change events of the database log might be a bit
more complicated. Tools like https://debezium.io/ could be useful -
depending on your source database.

Best,
Georg

Am Mo., 25. Mai 2020 um 08:16 Uhr schrieb Manjunath Shetty H <
manjunathshe...@live.com>:

> Hi Georg,
>
> Thanks for the response, can please elaborate what do mean by change data
> capture ?
>
> Thanks
> Manjunath
> ----------
> *From:* Georg Heiler 
> *Sent:* Monday, May 25, 2020 11:14 AM
> *To:* Manjunath Shetty H 
> *Cc:* Mike Artz ; user 
> *Subject:* Re: Parallelising JDBC reads in spark
>
> Why don't you apply proper change data capture?
> This will be more complex though.
>
> Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H <
> manjunathshe...@live.com>:
>
> Hi Mike,
>
> Thanks for the response.
>
> Even with that flag set data miss can happen right ?. As the fetch is
> based on the last watermark (maximum timestamp of the row that last batch
> job fetched ), Take a scenario like this with table
>
> a :  1
> b :  2
> c :  3
> d :  4
> *f  :  6*
> g :  7
> h :  8
> e :  5
>
>
>- a,b,c,d,e get picked by 1 task
>- by the time second task starts, e has been updated, so the row order
>changes
>- As f moves up, it will completely get missed in the fetch
>
>
> Thanks
> Manjunath
>
> --
> *From:* Mike Artz 
> *Sent:* Monday, May 25, 2020 10:50 AM
> *To:* Manjunath Shetty H 
> *Cc:* user 
> *Subject:* Re: Parallelising JDBC reads in spark
>
> Does anything different happened when you set the isolationLevel to do
> Dirty Reads i.e. "READ_UNCOMMITTED"
>
> On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H <
> manjunathshe...@live.com> wrote:
>
> Hi,
>
> We are writing a ETL pipeline using Spark, that fetch the data from SQL
> server in batch mode (every 15mins). Problem we are facing when we try to
> parallelising single table reads into multiple tasks without missing any
> data.
>
> We have tried this,
>
>
>- Use `ROW_NUMBER` window function in the SQL query
>- Then do
>-
>
>DataFrame df =
>hiveContext
>.read()
>.jdbc(
>**,
>query,
>"row_num",
>1,
>,
>noOfPartitions,
>jdbcOptions);
>
>
>
> The problem with this approach is if our tables get updated in between in SQL 
> Server while tasks are still running then the `ROW_NUMBER` will change and we 
> may miss some records.
>
>
> Any approach to how to fix this issue ? . Any pointers will be helpful
>
>
> *Note*: I am on spark 1.6
>
>
> Thanks
>
> Manjiunath Shetty
>
>


Re: Parallelising JDBC reads in spark

2020-05-24 Thread Georg Heiler
Why don't you apply proper change data capture?
This will be more complex though.

Am Mo., 25. Mai 2020 um 07:38 Uhr schrieb Manjunath Shetty H <
manjunathshe...@live.com>:

> Hi Mike,
>
> Thanks for the response.
>
> Even with that flag set data miss can happen right ?. As the fetch is
> based on the last watermark (maximum timestamp of the row that last batch
> job fetched ), Take a scenario like this with table
>
> a :  1
> b :  2
> c :  3
> d :  4
> *f  :  6*
> g :  7
> h :  8
> e :  5
>
>
>- a,b,c,d,e get picked by 1 task
>- by the time second task starts, e has been updated, so the row order
>changes
>- As f moves up, it will completely get missed in the fetch
>
>
> Thanks
> Manjunath
>
> --
> *From:* Mike Artz 
> *Sent:* Monday, May 25, 2020 10:50 AM
> *To:* Manjunath Shetty H 
> *Cc:* user 
> *Subject:* Re: Parallelising JDBC reads in spark
>
> Does anything different happened when you set the isolationLevel to do
> Dirty Reads i.e. "READ_UNCOMMITTED"
>
> On Sun, May 24, 2020 at 7:50 PM Manjunath Shetty H <
> manjunathshe...@live.com> wrote:
>
> Hi,
>
> We are writing a ETL pipeline using Spark, that fetch the data from SQL
> server in batch mode (every 15mins). Problem we are facing when we try to
> parallelising single table reads into multiple tasks without missing any
> data.
>
> We have tried this,
>
>
>- Use `ROW_NUMBER` window function in the SQL query
>- Then do
>-
>
>DataFrame df =
>hiveContext
>.read()
>.jdbc(
>**,
>query,
>"row_num",
>1,
>,
>noOfPartitions,
>jdbcOptions);
>
>
>
> The problem with this approach is if our tables get updated in between in SQL 
> Server while tasks are still running then the `ROW_NUMBER` will change and we 
> may miss some records.
>
>
> Any approach to how to fix this issue ? . Any pointers will be helpful
>
>
> *Note*: I am on spark 1.6
>
>
> Thanks
>
> Manjiunath Shetty
>
>


Re: Optimising multiple hive table join and query in spark

2020-03-15 Thread Georg Heiler
Did you only partition or also bucket by the join column? Are ORCI indices
active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H <
manjunathshe...@live.com>:

> Mostly the concern is the reshuffle. Even though all the DF's are
> partitioned by same column. During join it does reshuffle, that is the
> bottleneck as of now in our POC implementation.
>
> Is there any way tell spark that keep all partitions with same partition
> key at the same place so that during the join it wont do shuffle again.
>
>
> -
> Manjunath
> --
> *From:* ayan guha 
> *Sent:* Sunday, March 15, 2020 5:46 PM
> *To:* Magnus Nilsson 
> *Cc:* user 
> *Subject:* Re: Optimising multiple hive table join and query in spark
>
> Hi
>
> I would first and foremost try to identify where is the most time spend
> during the query. One possibility is it just takes ramp up time for
> executors to be available, if thats the case then maybe a dedicated yarn
> queue may help, or using Spark thriftserver may help.
>
> On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson  wrote:
>
> Been a while but I remember reading on Stack Overflow you can use a UDF as
> a join condition to trick catalyst into not reshuffling the partitions, ie
> use regular equality on the column you partitioned or bucketed by and your
> custom comparer for the other columns. Never got around to try it out
> hough. I really would like a native way to tell catalyst not to reshuffle
> just because you use more columns in the join condition.
>
> On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <
> manjunathshe...@live.com> wrote:
>
> Hi All,
>
> We have 10 tables in data warehouse (hdfs/hive) written using ORC format.
> We are serving a usecase on top of that by joining 4-5 tables using Hive as
> of now. But it is not fast as we wanted it to be, so we are thinking of
> using spark for this use case.
>
> Any suggestion on this ? Is it good idea to use the Spark for this use
> case ? Can we get better performance by using spark ?
>
> Any pointers would be helpful.
>
> *Notes*:
>
>- Data is partitioned by date (MMdd) as integer.
>- Query will fetch data for last 7 days from some tables while joining
>with other tables.
>
>
> *Approach we thought of as now :*
>
>- Create dataframe for each table and partition by same column for all
>tables ( Lets say Country as partition column )
>- Register all tables as temporary tables
>- Run the sql query with joins
>
> But the problem we are seeing with this approach is , even though we
> already partitioned using country it still does hashParittioning +
> shuffle during join. All the table join contain `Country` column with some
> extra column based on the table.
>
> Is there any way to avoid these shuffles ? and improve performance ?
>
>
> Thanks and regards
> Manjunath
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: [pyspark 2.4.3] nested windows function performance

2019-10-21 Thread Georg Heiler
No, as you shuffle each time again (you always partition by different
windows)
Instead: could you choose a single window (w3 with more columns =fine
granular) and the nfilter out records to achieve the same result?

Or instead:
df.groupBy(a,b,c).agg(sort_array(collect_list(foo,bar,baz))
and then an UDF which performs your desired aggregation

Best,
Georg

Am Mo., 21. Okt. 2019 um 13:59 Uhr schrieb Rishi Shah <
rishishah.s...@gmail.com>:

> Hi All,
>
> Any suggestions?
>
> Thanks,
> -Rishi
>
> On Sun, Oct 20, 2019 at 12:56 AM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I have a use case where I need to perform nested windowing functions on a
>> data frame to get final set of columns. Example:
>>
>> w1 = Window.partitionBy('col1')
>> df = df.withColumn('sum1', F.sum('val'))
>>
>> w2 = Window.partitionBy('col1', 'col2')
>> df = df.withColumn('sum2', F.sum('val'))
>>
>> w3 = Window.partitionBy('col1', 'col2', 'col3')
>> df = df.withColumn('sum3', F.sum('val'))
>>
>> These 3 partitions are not huge at all, however the data size is 2T
>> parquet snappy compressed. This throws a lot of outofmemory errors.
>>
>> I would like to get some advice around whether nested window functions is
>> a good idea in pyspark? I wanted to avoid using multiple filter + joins to
>> get to the final state, as join can create crazy shuffle.
>>
>> Any suggestions would be appreciated!
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>


[no subject]

2019-09-19 Thread Georg Heiler
Hi,

How can I create an initial state by hands so that structured streaming
files source only reads data which is semantically (i.e. using a file path
lexicographically) greater than the minimum committed initial state?

Details here:
https://stackoverflow.com/questions/58004832/spark-structured-streaming-file-source-read-from-a-certain-partition-onwards

Best,
Georg


Re: Creating custom Spark-Native catalyst/codegen functions

2019-08-22 Thread Georg Heiler
https://github.com/DataSystemsLab/GeoSpark should be public.

Am Do., 22. Aug. 2019 um 09:50 Uhr schrieb Arwin Tio :

> Hey,
>
> It seems like the GeoSpark repo is not publicly accessible?
>
> But from the filepath it seems like the Spark codebase itself was forked
> or modified.
>
> The examples that I've seen seem to suggest that you need to register
> custom Spark-Native functions inside Spark's private namespace like you
> said (FunctionRegistry.scala I believe).
>
> I was wondering if it was possible to add the more efficient Spark-Native
> functions in my user application without having to fork or modify Spark
> itself.
>
> Thanks,
>
> Arwin
>
> From: Georg Heiler
> Sent: Wednesday, August 21, 11:18 PM
> Subject: Re: Creating custom Spark-Native catalyst/codegen functions
> To: Arwin Tio
> Cc: user@spark.apache.org
>
>
> Look at
> https://github.com/DataSystemsLab/GeoSpark/tree/master/sql/src/main/scala/org/apache/spark/sql/geospark
> sql for an example.
>
>
> Using custom function registration and functions residing inside sparks
> private namespace should work.
>
> But I am not aware of a public user facing API.
> Is there any I am missing?
>
>
> Arwin Tio < arwin@hotmail.com> schrieb am Do. 22. Aug. 2019 04:28:
>
> Hi friends,
>
> I am looking into converting some UDFs/UDAFs to Spark-Native functions to
> leverage Catalyst and codegen.
>
> Looking through some examples (for example:
> https://github.com/apache/spark/pull/7214/files for Levenshtein) it seems
> like we need to add these functions to the Spark framework itself.
>
> Is there a way to add custom Spark-Native functions in "userspace"?
>
> Thank you!
>
> Arwin
>
>
>
>


Re: Creating custom Spark-Native catalyst/codegen functions

2019-08-22 Thread Georg Heiler
Look at
https://github.com/DataSystemsLab/GeoSpark/tree/master/sql/src/main/scala/org/apache/spark/sql/geospark
sql for an example.

Using custom function registration and functions residing inside sparks
private namespace should work.

But I am not aware of a public user facing API.
Is there any I am missing?


Arwin Tio  schrieb am Do. 22. Aug. 2019 04:28:

> Hi friends,
>
> I am looking into converting some UDFs/UDAFs to Spark-Native functions to
> leverage Catalyst and codegen.
>
> Looking through some examples (for example:
> https://github.com/apache/spark/pull/7214/files for Levenshtein) it seems
> like we need to add these functions to the Spark framework itself.
>
> Is there a way to add custom Spark-Native functions in "userspace"?
>
> Thank you!
>
> Arwin
>


Re: [Pyspark 2.4] Best way to define activity within different time window

2019-06-11 Thread Georg Heiler
For grouping with each: look into grouping sets
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-multi-dimensional-aggregation.html

Am Di., 11. Juni 2019 um 06:09 Uhr schrieb Rishi Shah <
rishishah.s...@gmail.com>:

> Thank you both for your input!
>
> To calculate moving average of active users, could you comment on whether
> to go for RDD based implementation or dataframe? If dataframe, will window
> function work here?
>
> In general, how would spark behave when working with dataframe with date,
> week, month, quarter, year columns and groupie against each one one by one?
>
>
>
> On Sun, Jun 9, 2019 at 1:17 PM Jörn Franke  wrote:
>
>> Depending on what accuracy is needed, hyperloglogs can be an interesting
>> alternative
>> https://en.m.wikipedia.org/wiki/HyperLogLog
>>
>> Am 09.06.2019 um 15:59 schrieb big data :
>>
>> From m opinion, Bitmap is the best solution for active users calculation.
>> Other solution almost bases on count(distinct) calculation process, which
>> is more slower.
>>
>> If you 've implemented Bitmap solution including how to build Bitmap, how
>> to load Bitmap, then Bitmap is the best choice.
>> 在 2019/6/5 下午6:49, Rishi Shah 写道:
>>
>> Hi All,
>>
>> Is there a best practice around calculating daily, weekly, monthly,
>> quarterly, yearly active users?
>>
>> One approach is to create a window of daily bitmap and aggregate it based
>> on period later. However I was wondering if anyone has a better approach to
>> tackling this problem..
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>>
>
> --
> Regards,
>
> Rishi Shah
>


Re: [pyspark 2.3+] Bucketing with sort - incremental data load?

2019-05-31 Thread Georg Heiler
Bucketing will only help you with joins. And these usually happen on a key.
You mentioned that there is no such key in your data. If just want to
search through large  quantities of data  sorting an partitioning by time
is left.

Rishi Shah  schrieb am Sa. 1. Juni 2019 um 05:57:

> Thanks much for your input Gourav, Silvio.
>
> I have about 10TB of data, which gets stored daily. There's no qualifying
> column for partitioning, which makes querying this table super slow. So I
> wanted to sort the results before storing them daily. This is why I was
> thinking to use bucketing and sorting ... Do you think sorting data based
> on a column or two before saving would help query performance on this
> table?
>
> My concern is, data will be sorted on daily basis and not globally. Would
> that help with performance? I can compact files every month as well and
> sort before saving. Just not sure if this is going to help with performance
> issues on this table.
>
> Would be great to get your advice on this.
>
>
>
>
>
>
>
>
>
> On Fri, May 31, 2019 at 10:42 AM Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>> Spark does allow appending new files to bucketed tables. When the data is
>> read in, Spark will combine the multiple files belonging to the same
>> buckets into the same partitions.
>>
>>
>>
>> Having said that, you need to be very careful with bucketing especially
>> as you’re appending to avoid generating lots of small files. So, you may
>> need to consider periodically running a compaction job.
>>
>>
>>
>> If you’re simply appending daily snapshots, then you could just consider
>> using date partitions, instead?
>>
>>
>>
>> *From: *Rishi Shah 
>> *Date: *Thursday, May 30, 2019 at 10:43 PM
>> *To: *"user @spark" 
>> *Subject: *[pyspark 2.3+] Bucketing with sort - incremental data load?
>>
>>
>>
>> Hi All,
>>
>>
>>
>> Can we use bucketing with sorting functionality to save data
>> incrementally (say daily) ? I understand bucketing is supported in Spark
>> only with saveAsTable, however can this be used with mode "append" instead
>> of "overwrite"?
>>
>>
>>
>> My understanding around bucketing was, you need to rewrite entire table
>> every time, can someone help advice?
>>
>>
>>
>> --
>>
>> Regards,
>>
>>
>>
>> Rishi Shah
>>
>
>
> --
> Regards,
>
> Rishi Shah
>


Re: [pyspark] Use output of one aggregated function for another aggregated function within the same groupby

2019-04-24 Thread Georg Heiler
Is analytical window funktions to rank the result and then filter to the
desired rank.

Rishi Shah  schrieb am Do. 25. Apr. 2019 um 05:07:

> Hi All,
>
> [PySpark 2.3, python 2.7]
>
> I would like to achieve something like this, could you please suggest best
> way to implement (perhaps highlight pros & cons of the approach in terms of
> performance)?
>
> df = df.groupby('grp_col').agg(max(date).alias('max_date'), count(when
> col('file_date') == col('max_date')))
>
> Please note 'max_date' is a result of aggregate function max inside the
> group by agg. I can definitely use multiple groupbys to achieve this but is
> there a better way? better performance wise may be?
>
> Appreciate your help!
>
>
> --
> Regards,
>
> Rishi Shah
>


Re: use rocksdb for spark structured streaming (SSS)

2019-03-10 Thread Georg Heiler
Use https://github.com/chermenin/spark-states instead

Am So., 10. März 2019 um 20:51 Uhr schrieb Arun Mahadevan :

>
> Read the link carefully,
>
> This solution is available (*only*) in Databricks Runtime.
>
> You can enable RockDB-based state management by setting the following
> configuration in the SparkSession before starting the streaming query.
>
> spark.conf.set(
>   "spark.sql.streaming.stateStore.providerClass",
>   "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
>
>
> On Sun, 10 Mar 2019 at 11:54, Lian Jiang  wrote:
>
>> Hi,
>>
>> I have a very simple SSS pipeline which does:
>>
>> val query = df
>>   .dropDuplicates(Array("Id", "receivedAt"))
>>   .withColumn(timePartitionCol, timestamp_udfnc(col("receivedAt")))
>>   .writeStream
>>   .format("parquet")
>>   .partitionBy("availabilityDomain", timePartitionCol)
>>   .trigger(Trigger.ProcessingTime(5, TimeUnit.MINUTES))
>>   .option("path", "/data")
>>   .option("checkpointLocation", "/data_checkpoint")
>>   .start()
>>
>> After ingesting 2T records, the state under checkpoint folder on HDFS 
>> (replicator factor 2) grows to 2T bytes.
>> My cluster has only 2T bytes which means the cluster can barely handle 
>> further data growth.
>>
>> Online spark documents 
>> (https://docs.databricks.com/spark/latest/structured-streaming/production.html)
>> says using rocksdb help SSS job reduce JVM memory overhead. But I cannot 
>> find any document how
>>
>> to setup rocksdb for SSS. Spark class CheckpointReader seems to only handle 
>> HDFS.
>>
>> Any suggestions? Thanks!
>>
>>
>>
>>


Re: Spark Sql group by less performant

2018-12-10 Thread Georg Heiler
See
https://databricks.com/blog/2016/05/19/approximate-algorithms-in-apache-spark-hyperloglog-and-quantiles.html
you most probably do not require exact counts.

Am Di., 11. Dez. 2018 um 02:09 Uhr schrieb 15313776907 <15313776...@163.com
>:

> i think you can add executer memory
>
> 15313776907
> 邮箱:15313776...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>
> On 12/11/2018 08:28, lsn24  wrote:
> Hello,
>
> I have a requirement where I need to get total count of rows and total
> count of failedRows based on a grouping.
>
> The code looks like below:
>
> myDataset.createOrReplaceTempView("temp_view");
>
> Dataset  countDataset = sparkSession.sql("Select
> column1,column2,column3,column4,column5,column6,column7,column8, count(*)
> as
> totalRows, sum(CASE WHEN (column8 is NULL) THEN 1 ELSE 0 END) as
> failedRows
> from temp_view group by
> column1,column2,column3,column4,column5,column6,column7,column8");
>
>
> Up till around 50 Million records,  the query performance was ok. After
> that
> it gave it up. Mostly resulting in out of Memory exception.
>
> I read documentation and blogs, most of them gives me examples of
> RDD.reduceByKey. But here I got dataset and spark Sql.
>
> What  am I missing here ? .
>
> Any help will be appreciated.
>
> Thanks!
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: What is BDV in Spark Source

2018-11-10 Thread Georg Heiler
Just renamed.
Breeze allows you to perform efficient linear algebra if fitting blas
backend is installed.

Soheil Pourbafrani  schrieb am Fr. 9. Nov. 2018 um
20:07:

> Hi,
>
> Checking the Spark Sources, I faced with a type BDV:
>
> breeze.linalg.{DenseVector => BDV}
>
> and they used it in calculating IDF from Term Frequencies. What is it
> exactly?
>


Re: Best way to process this dataset

2018-06-18 Thread Georg Heiler
use pandas or dask

If you do want to use spark store the dataset as parquet / orc. And then
continue to perform analytical queries on that dataset.

Raymond Xie  schrieb am Di., 19. Juni 2018 um
04:29 Uhr:

> I have a 3.6GB csv dataset (4 columns, 100,150,807 rows), my environment
> is 20GB ssd harddisk and 2GB RAM.
>
> The dataset comes with
> User ID: 987,994
> Item ID: 4,162,024
> Category ID: 9,439
> Behavior type ('pv', 'buy', 'cart', 'fav')
> Unix Timestamp: span between November 25 to December 03, 2017
>
> I would like to hear any suggestion from you on how should I process the
> dataset with my current environment.
>
> Thank you.
>
> **
> *Sincerely yours,*
>
>
> *Raymond*
>


Re: best practices to implement library of custom transformations of Dataframe/Dataset

2018-06-18 Thread Georg Heiler
I believe explicit is better than implicits, however as you mentioned the
notation is very nice.

Therefore, I suggest
https://medium.com/@mrpowers/chaining-custom-dataframe-transformations-in-spark-a39e315f903c
to
use df.transform(myFunction)

Valery Khamenya  schrieb am Mo., 18. Juni 2018 um
21:34 Uhr:

> Dear Spark gurus,
>
> *Question*: what way would you recommend to shape a library of custom
> transformations for Dataframes/Datasets?
>
> *Details*: e.g., consider we need several custom transformations over the
> Dataset/Dataframe instances. For example injecting columns, apply specially
> tuned row filtering, lookup-table based replacements, etc.
>
> I'd consider basically 2 options:
>
> 1) implicits! create a class that looks like derived from
> Dataset/Dataframe and then and implement the transformations as its methods
>
> or
>
> 2) implement the transformations as stand-alone functions
>
> The use of first approach leads to such beautiful code:
>
> val result = inputDataframe
>   .myAdvancedFiter(params)
>   .myAdvancedReplacement(params)
>   .myColumnInjection(params)
>   .mySomethingElseTransformation(params)
>   .andTheFinalGoodies(params)
>
> nice!
>
> whereas the second option will lead to this:
>
> val result = andTheFinalGoodies(
>   mySomethingElseTransformation(
> myColumnInjection(
>   myAdvancedReplacement(
> inputDataframe.myAdvancedFiter(params),
> params),
>   params),
> params),
>   params)
>
> terrible! ;)
>
> So, ideally it would be nice to learn how to implement Option 1. Luckily
> there are different approaches for this:
> https://stackoverflow.com/questions/32585670/what-is-the-best-way-to-define-custom-methods-on-a-dataframe
>
> However in reality such transformations rely on
>
>   import spark.implicits._
>
> and I never seen solution on how to pass SparkContext to such library
> classes and safely use it in there. This article shows, that it is not that
> straight-forward thing:
>
>
> https://docs.azuredatabricks.net/spark/latest/rdd-streaming/tips-for-running-streaming-apps-in-databricks.html
>
> Said that, I still need a wisdom of Spark community to get over this.
>
> P.S. and a good Spark application "boilerplate" with a separately
> implemented library of Dataframe/Dataset transformations relying on "import
> spark.implicits._" is still wanted badly!
>
> best regards
> --
> Valery
>


Re: GroupBy in Spark / Scala without Agg functions

2018-05-29 Thread Georg Heiler
Why do you group if you do not want to aggregate?
Isn't this the same as select distinct?

Chetan Khatri  schrieb am Di., 29. Mai 2018 um
20:21 Uhr:

> All,
>
> I have scenario like this in MSSQL Server SQL where i need to do groupBy
> without Agg function:
>
> Pseudocode:
>
>
> select m.student_id, m.student_name, m.student_std, m.student_group,
> m.student_d
> ob from student as m inner join general_register g on m.student_id =
> g.student_i
> d group by m.student_id, m.student_name, m.student_std, m.student_group,
> m.student_dob
>
> I tried to doing in spark but i am not able to get Dataframe as return
> value, how this kind of things could be done in Spark.
>
> Thanks
>


Re: Spark parse fixed length file [Java]

2018-04-13 Thread Georg Heiler
I am not 100% sure if spark is smart enough to achieve this using a single
pass over the data. If not you could create a java udf for this which
correctly parses all the columns at once.


Otherwise you could enable Tungsten off heap memory which might speed
things up.
lsn24  schrieb am Fr. 13. Apr. 2018 um 19:02:

> Hello,
>
>  We are running into issues while trying to process fixed length files
> using
> spark.
>
> The approach we took is as follows:
>
> 1. Read the .bz2 file  into a dataset from hdfs using
> spark.read().textFile() API.Create a temporary view.
>
>  Dataset rawDataset = sparkSession.read().textFile(filePath);
>  rawDataset.createOrReplaceTempView(tempView);
>
> 2. Run a sql query on the view, to slice and dice the data the way we need
> it (using substring).
>
>  (SELECT
>  TRIM(SUBSTRING(value,1 ,16)) AS record1 ,
>  TRIM(SUBSTRING(value,17 ,8)) AS record2 ,
>  TRIM(SUBSTRING(value,25 ,5)) AS record3 ,
>  TRIM(SUBSTRING(value,30 ,16)) AS record4 ,
>  CAST(SUBSTRING(value,46 ,8) AS BIGINT) AS record5 ,
>  CAST(SUBSTRING(value,54 ,6) AS BIGINT) AS record6 ,
>  CAST(SUBSTRING(value,60 ,3) AS BIGINT) AS record7 ,
>  CAST(SUBSTRING(value,63 ,6) AS BIGINT) AS record8 ,
>  TRIM(SUBSTRING(value,69 ,20)) AS record9 ,
>  TRIM(SUBSTRING(value,89 ,40)) AS record10 ,
>  TRIM(SUBSTRING(value,129 ,32)) AS record11 ,
>  TRIM(SUBSTRING(value,161 ,19)) AS record12,
>  TRIM(SUBSTRING(value,180 ,1)) AS record13 ,
>  TRIM(SUBSTRING(value,181 ,9)) AS record14 ,
>  TRIM(SUBSTRING(value,190 ,3)) AS record15 ,
>  CAST(SUBSTRING(value,193 ,8) AS BIGINT) AS record16 ,
>  CAST(SUBSTRING(value,201 ,8) AS BIGINT) AS record17
>  FROM tempView)
>
> 3.Write the output of sql query to a parquet file.
>  loadDataset.write().mode(SaveMode.Append).parquet(outputDirectory);
>
> Problem :
>
>   The step #2 takes a longer time , if the length of line is ~2000
> characters. If each line in the file is only 1000 characters then it takes
> only 4 minutes to process 20 million lines. If we increase the line length
> to 2000 characters it takes 20 minutes to process 20 million lines.
>
>
> Is there a better way in spark to parse fixed length lines?
>
>
> *Note: *Spark version we use is 2.2.0 and we are using  Spark with Java.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: run huge number of queries in Spark

2018-04-04 Thread Georg Heiler
See https://gist.github.com/geoHeil/e0799860262ceebf830859716bbf in
particular:

You will probably want to use sparks imperative (non SQL) API:
.rdd
.reduceByKey {
(count1, count2) => count1 + count2
}.map {
case ((word, path), n) => (word, (path, n))
}.toDF
i.e. builds an inverted index
which easily lets you then calculate TF / IDF
But spark also comes with
https://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf which
might help you to easily achieve the desired result.

Donni Khan  schrieb am Mi., 4. Apr. 2018 um
10:56 Uhr:

> Hi all,
>
> I want to run huge number of queries on Dataframe in Spark. I have a big
> data of text documents, I loded all documents into SparkDataFrame and
> create a temp table.
>
> dataFrame.registerTempTable("table1");
>
> I have more than 50,000 terms, I want to get the document frequency for
> each by using the "table1".
>
> I use the follwing:
>
> DataFrame df=sqlContext.sql("select count(ID) from table1 where text like
> '%"+term+"%'");
>
> but this scenario needs much time to finish because I have t run it from
> Spark Driver for each term.
>
>
> Does anyone has idea how I can run all queries in distributed way?
>
> Thank you && Best Regards,
>
> Donni
>
>
>
>


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-14 Thread Georg Heiler
Did you try spark 2.3 with structured streaming? There watermarking and
plain sql might be really interesting for you.
Aakash Basu  schrieb am Mi. 14. März 2018 um
14:57:

> Hi,
>
>
>
> *Info (Using):Spark Streaming Kafka 0.8 package*
>
> *Spark 2.2.1*
> *Kafka 1.0.1*
>
> As of now, I am feeding paragraphs in Kafka console producer and my Spark,
> which is acting as a receiver is printing the flattened words, which is a
> complete RDD operation.
>
> *My motive is to read two tables continuously (being updated) as two
> distinct Kafka topics being read as two Spark Dataframes and join them
> based on a key and produce the output. *(I am from Spark-SQL background,
> pardon my Spark-SQL-ish writing)
>
> *It may happen, the first topic is receiving new data 15 mins prior to the
> second topic, in that scenario, how to proceed? I should not lose any data.*
>
> As of now, I want to simply pass paragraphs, read them as RDD, convert to
> DF and then join to get the common keys as the output. (Just for R).
>
> Started using Spark Streaming and Kafka today itself.
>
> Please help!
>
> Thanks,
> Aakash.
>


Upgrades of streaming jobs

2018-03-08 Thread Georg Heiler
Hi

What is the state of spark structured streaming jobs and upgrades?

Can checkpoints of version 1 be read by version 2 of a job? Is downtime
required to upgrade the job?

Thanks


Re: Efficient way to compare the current row with previous row contents

2018-02-12 Thread Georg Heiler
See
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
and
https://stackoverflow.com/questions/42448564/spark-sql-window-function-with-complex-condition
for a more involved example


KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mo. 12. Feb. 2018
um 15:16:

> I am also looking for the same answer. Will this work in streaming
> application too ??
>
> Sent from my iPhone
>
> On Feb 12, 2018, at 8:12 AM, Debabrata Ghosh <mailford...@gmail.com>
> wrote:
>
> Georg - Thanks ! Will you be able to help me with a few examples please.
>
> Thanks in advance again !
>
> Cheers,
> D
>
> On Mon, Feb 12, 2018 at 6:03 PM, Georg Heiler <georg.kf.hei...@gmail.com>
> wrote:
>
>> You should look into window functions for spark sql.
>> Debabrata Ghosh <mailford...@gmail.com> schrieb am Mo. 12. Feb. 2018 um
>> 13:10:
>>
>>> Hi,
>>>  Greetings !
>>>
>>>  I needed some efficient way in pyspark to execute a
>>> comparison (on all the attributes) between the current row and the previous
>>> row. My intent here is to leverage the distributed framework of Spark to
>>> the best extent so that can achieve a good speed. Please can anyone suggest
>>> me a suitable algorithm / command. Here is a snapshot of the underlying
>>> data which I need to compare:
>>>
>>> [image: Inline image 1]
>>>
>>> Thanks in advance !
>>>
>>> D
>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Efficient way to compare the current row with previous row contents

2018-02-12 Thread Georg Heiler
You should look into window functions for spark sql.
Debabrata Ghosh  schrieb am Mo. 12. Feb. 2018 um
13:10:

> Hi,
>  Greetings !
>
>  I needed some efficient way in pyspark to execute a
> comparison (on all the attributes) between the current row and the previous
> row. My intent here is to leverage the distributed framework of Spark to
> the best extent so that can achieve a good speed. Please can anyone suggest
> me a suitable algorithm / command. Here is a snapshot of the underlying
> data which I need to compare:
>
> [image: Inline image 1]
>
> Thanks in advance !
>
> D
>


Re: Spark cannot find tables in Oracle database

2018-02-11 Thread Georg Heiler
I had the same problem. You need to uppercase all tables prior to storing
them in oracle.
Gourav Sengupta  schrieb am So. 11. Feb. 2018 um
10:44:

> Hi,
>
> since you are using the same user as the schema, I do not think that there
> is an access issue. Perhaps you might want to see whether there is anything
> case sensitive about the the table names. I remember once that the table
> names had to be in small letters, but that was in MYSQL.
>
>
> Regards,
> Gourav
>
> On Sun, Feb 11, 2018 at 2:26 AM, Lian Jiang  wrote:
>
>> Hi,
>>
>> I am following
>> https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
>> to query oracle database 12.1 from spark shell 2.11.8.
>>
>> val jdbcDF = spark.read
>>   .format("jdbc")
>>   .option("url", "jdbc:oracle:thin:@(DESCRIPTION = (ADDRESS = (PROTOCOL = 
>> TCP)(HOST = 129.106.123.73)(PORT = 1521))(CONNECT_DATA =(SERVER = 
>> DEDICATED)(SERVICE_NAME = pdb1.subnet1.hadoop.oraclevcn.com)))")
>>   .option("dbtable", "HADOOP_DEV.SYMBOLINFO")
>>   .option("user", "hadoop_dev")
>>   .option("password", "mypassword")
>>   .load()
>>
>> This statement failed due to "ORA-00942: table or view does not exist"
>> even SymbolInfo table does exist in hadoop_dev schema.
>>
>> Any clue? Thanks!
>>
>
>


Re: [Spark ML] Positive-Only Training Classification in Scala

2018-01-15 Thread Georg Heiler
I do not know that module, but in literature PUL is the exact term you
should look for.

Matt Hicks <m...@outr.com> schrieb am Mo., 15. Jan. 2018 um 20:56 Uhr:

> Is it fair to assume this is what I need?
> https://github.com/ispras/pu4spark
>
>
>
> On Mon, Jan 15, 2018 1:55 PM, Georg Heiler georg.kf.hei...@gmail.com
> wrote:
>
>> As far as I know spark does not implement such algorithms. In case the
>> dataset is small
>> http://scikit-learn.org/stable/modules/generated/sklearn.svm.OneClassSVM.html
>>  might
>> be of interest to you.
>>
>> Jörn Franke <jornfra...@gmail.com> schrieb am Mo., 15. Jan. 2018 um
>> 20:04 Uhr:
>>
>> I think you look more for algorithms for unsupervised learning, eg
>> clustering.
>>
>> Depending on the characteristics different clusters might be created , eg
>> donor or non-donor. Most likely you may find also more clusters (eg would
>> donate but has a disease preventing it or too old). You can verify which
>> clusters make sense for your approach so I recommend not only try two
>> clusters but multiple and see which number is more statistically
>> significant .
>>
>> On 15. Jan 2018, at 19:21, Matt Hicks <m...@outr.com> wrote:
>>
>> I'm attempting to create a training classification, but only have
>> positive information.  Specifically in this case it is a donor list of
>> users, but I want to use it as training in order to determine
>> classification for new contacts to give probabilities that they will donate.
>>
>> Any insights or links are appreciated. I've gone through the
>> documentation but have been unable to find any references to how I might do
>> this.
>>
>> Thanks
>>
>> ---*Matt Hicks*
>>
>> *Chief Technology Officer*
>>
>> 405.283.6887 <(405)%20283-6887> | http://outr.com
>>
>> 
>>
>>


Re: [Spark ML] Positive-Only Training Classification in Scala

2018-01-15 Thread Georg Heiler
As far as I know spark does not implement such algorithms. In case the
dataset is small
http://scikit-learn.org/stable/modules/generated/sklearn.svm.OneClassSVM.html
might
be of interest to you.

Jörn Franke  schrieb am Mo., 15. Jan. 2018 um
20:04 Uhr:

> I think you look more for algorithms for unsupervised learning, eg
> clustering.
>
> Depending on the characteristics different clusters might be created , eg
> donor or non-donor. Most likely you may find also more clusters (eg would
> donate but has a disease preventing it or too old). You can verify which
> clusters make sense for your approach so I recommend not only try two
> clusters but multiple and see which number is more statistically
> significant .
>
> On 15. Jan 2018, at 19:21, Matt Hicks  wrote:
>
> I'm attempting to create a training classification, but only have positive
> information.  Specifically in this case it is a donor list of users, but I
> want to use it as training in order to determine classification for new
> contacts to give probabilities that they will donate.
>
> Any insights or links are appreciated. I've gone through the documentation
> but have been unable to find any references to how I might do this.
>
> Thanks
>
> ---*Matt Hicks*
>
> *Chief Technology Officer*
>
> 405.283.6887 <(405)%20283-6887> | http://outr.com
>
> 
>
>


Re: Can spark shuffle leverage Alluxio to abtain higher stability?

2017-12-21 Thread Georg Heiler
Die you try to use the yarn Shuffle Service?
chopinxb  schrieb am Do. 21. Dez. 2017 um 04:43:

> In my practice of spark application(almost Spark-SQL) , when there is a
> complete node failure in my cluster, jobs which have shuffle blocks on the
> node will completely fail after 4 task retries.  It seems that data lineage
> didn't work. What' more, our applications use multiple SQL statements for
> data analysis. After a lengthy calculation, entire application failed
> because of one job failure is unacceptable.  So we consider more stability
> rather than speed in some way.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Feature generation / aggregate functions / timeseries

2017-12-14 Thread Georg Heiler
Also the rdd stat counter will already conpute most of your desired metrics
as well as df.describe
https://databricks.com/blog/2015/06/02/statistical-and-mathematical-functions-with-dataframes-in-spark.html
Georg Heiler <georg.kf.hei...@gmail.com> schrieb am Do. 14. Dez. 2017 um
19:40:

> Look at custom UADF functions
> <julio.ces...@free.fr> schrieb am Do. 14. Dez. 2017 um 09:31:
>
>> Hi dear spark community !
>>
>> I want to create a lib which generates features for potentially very
>> large datasets, so I believe spark could be a nice tool for that.
>> Let me explain what I need to do :
>>
>> Each file 'F' of my dataset is composed of at least :
>> - an id ( string or int )
>> - a timestamp ( or a long value )
>> - a value ( generaly a double )
>>
>> I want my tool to :
>> - compute aggregate function for many pairs 'instants + duration'
>> ===> FOR EXAMPLE :
>> = compute for the instant 't = 2001-01-01' aggregate functions for
>> data between 't-1month and t' and 't-12months and t-9months' and this,
>> FOR EACH ID !
>> ( aggregate functions such as
>> min/max/count/distinct/last/mode/kurtosis... or even user defined ! )
>>
>> My constraints :
>> - I don't want to compute aggregate for each tuple of 'F'
>> ---> I want to provide a list of couples 'instants + duration' (
>> potentially large )
>> - My 'window' defined by the duration may be really large ( but may
>> contain only a few values... )
>> - I may have many id...
>> - I may have many timestamps...
>>
>> 
>> 
>> 
>>
>> Let me describe this with some kind of example to see if SPARK ( SPARK
>> STREAMING ? ) may help me to do that :
>>
>> Let's imagine that I have all my data in a DB or a file with the
>> following columns :
>> id | timestamp(ms) | value
>> A | 100 |  100
>> A | 1000500 |  66
>> B | 100 |  100
>> B | 110 |  50
>> B | 120 |  200
>> B | 250 |  500
>>
>> ( The timestamp is a long value, so as to be able to express date in ms
>> from -01-01. to today )
>>
>> I want to compute operations such as min, max, average, last on the
>> value column, for a these couples :
>> -> instant = 1000500 / [-1000ms, 0 ] ( i.e. : aggregate data between [
>> t-1000ms and t ]
>> -> instant = 133 / [-5000ms, -2500 ] ( i.e. : aggregate data between
>> [ t-5000ms and t-2500ms ]
>>
>>
>> And this will produce this kind of output :
>>
>> id | timestamp(ms) | min_value | max_value | avg_value | last_value
>> ---
>> A | 1000500| min...| max   | avg   | last
>> B | 1000500| min...| max   | avg   | last
>> A | 133| min...| max   | avg   | last
>> B | 133| min...| max   | avg   | last
>>
>>
>>
>> Do you think we can do this efficiently with spark and/or spark
>> streaming, and do you have an idea on "how" ?
>> ( I have tested some solutions but I'm not really satisfied ATM... )
>>
>>
>> Thanks a lot Community :)
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Feature generation / aggregate functions / timeseries

2017-12-14 Thread Georg Heiler
Look at custom UADF functions.
 schrieb am Do. 14. Dez. 2017 um 09:31:

> Hi dear spark community !
>
> I want to create a lib which generates features for potentially very
> large datasets, so I believe spark could be a nice tool for that.
> Let me explain what I need to do :
>
> Each file 'F' of my dataset is composed of at least :
> - an id ( string or int )
> - a timestamp ( or a long value )
> - a value ( generaly a double )
>
> I want my tool to :
> - compute aggregate function for many pairs 'instants + duration'
> ===> FOR EXAMPLE :
> = compute for the instant 't = 2001-01-01' aggregate functions for
> data between 't-1month and t' and 't-12months and t-9months' and this,
> FOR EACH ID !
> ( aggregate functions such as
> min/max/count/distinct/last/mode/kurtosis... or even user defined ! )
>
> My constraints :
> - I don't want to compute aggregate for each tuple of 'F'
> ---> I want to provide a list of couples 'instants + duration' (
> potentially large )
> - My 'window' defined by the duration may be really large ( but may
> contain only a few values... )
> - I may have many id...
> - I may have many timestamps...
>
> 
> 
> 
>
> Let me describe this with some kind of example to see if SPARK ( SPARK
> STREAMING ? ) may help me to do that :
>
> Let's imagine that I have all my data in a DB or a file with the
> following columns :
> id | timestamp(ms) | value
> A | 100 |  100
> A | 1000500 |  66
> B | 100 |  100
> B | 110 |  50
> B | 120 |  200
> B | 250 |  500
>
> ( The timestamp is a long value, so as to be able to express date in ms
> from -01-01. to today )
>
> I want to compute operations such as min, max, average, last on the
> value column, for a these couples :
> -> instant = 1000500 / [-1000ms, 0 ] ( i.e. : aggregate data between [
> t-1000ms and t ]
> -> instant = 133 / [-5000ms, -2500 ] ( i.e. : aggregate data between
> [ t-5000ms and t-2500ms ]
>
>
> And this will produce this kind of output :
>
> id | timestamp(ms) | min_value | max_value | avg_value | last_value
> ---
> A | 1000500| min...| max   | avg   | last
> B | 1000500| min...| max   | avg   | last
> A | 133| min...| max   | avg   | last
> B | 133| min...| max   | avg   | last
>
>
>
> Do you think we can do this efficiently with spark and/or spark
> streaming, and do you have an idea on "how" ?
> ( I have tested some solutions but I'm not really satisfied ATM... )
>
>
> Thanks a lot Community :)
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Row Encoder For DataSet

2017-12-07 Thread Georg Heiler
You are looking for an UADF.
Sandip Mehta  schrieb am Fr. 8. Dez. 2017 um
06:20:

> Hi,
>
> I want to group on certain columns and then for every group wants to apply
> custom UDF function to it. Currently groupBy only allows to add aggregation
> function to GroupData.
>
> For this was thinking to use groupByKey which will return KeyValueDataSet
> and then apply UDF for every group but really not been able solve this.
>
> SM
>
> On Fri, Dec 8, 2017 at 10:29 AM Weichen Xu 
> wrote:
>
>> You can groupBy multiple columns on dataframe, so why you need so
>> complicated schema ?
>>
>> suppose df schema: (x, y, u, v, z)
>>
>> df.groupBy($"x", $"y").agg(...)
>>
>> Is this you want ?
>>
>> On Fri, Dec 8, 2017 at 11:51 AM, Sandip Mehta > > wrote:
>>
>>> Hi,
>>>
>>> During my aggregation I end up having following schema.
>>>
>>> Row(Row(val1,val2), Row(val1,val2,val3...))
>>>
>>> val values = Seq(
>>> (Row(10, 11), Row(10, 2, 11)),
>>> (Row(10, 11), Row(10, 2, 11)),
>>> (Row(20, 11), Row(10, 2, 11))
>>>   )
>>>
>>>
>>> 1st tuple is used to group the relevant records for aggregation. I have
>>> used following to create dataset.
>>>
>>> val s = StructType(Seq(
>>>   StructField("x", IntegerType, true),
>>>   StructField("y", IntegerType, true)
>>> ))
>>> val s1 = StructType(Seq(
>>>   StructField("u", IntegerType, true),
>>>   StructField("v", IntegerType, true),
>>>   StructField("z", IntegerType, true)
>>> ))
>>>
>>> val ds = 
>>> sparkSession.sqlContext.createDataset(sparkSession.sparkContext.parallelize(values))(Encoders.tuple(RowEncoder(s),
>>>  RowEncoder(s1)))
>>>
>>> Is this correct way of representing this?
>>>
>>> How do I create dataset and row encoder for such use case for doing
>>> groupByKey on this?
>>>
>>>
>>>
>>> Regards
>>> Sandeep
>>>
>>
>>


Re: Loading a large parquet file how much memory do I need

2017-11-27 Thread Georg Heiler
How many columns do you need from the big file?

Also how CPU / memory intensive are the computations you want to perform?
Alexander Czech  schrieb am Mo. 27. Nov.
2017 um 10:57:

> I want to load a 10TB parquet File from S3 and I'm trying to decide what
> EC2 instances to use.
>
> Should I go for instances that in total have a larger memory size than
> 10TB? Or is it enough that they have in total enough SSD storage so that
> everything can be spilled to disk?
>
> thanks
>


Re: Spark Streaming Kerberos Issue

2017-11-22 Thread Georg Heiler
Did you check that the security extensions are installed (JCE)?

KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mi., 22. Nov.
2017 um 19:36 Uhr:

> [image: Inline image 1]
>
> This is what we are on.
>
> On Wed, Nov 22, 2017 at 12:33 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> We use oracle JDK. we are on unix.
>>
>> On Wed, Nov 22, 2017 at 12:31 PM, Georg Heiler <georg.kf.hei...@gmail.com
>> > wrote:
>>
>>> Do you use oracle or open jdk? We recently had an issue with open jdk:
>>> formerly, java Security extensions were installed by default - no longer so
>>> on centos 7.3
>>>
>>> Are these installed?
>>>
>>> KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mi. 22. Nov.
>>> 2017 um 19:29:
>>>
>>>> I passed keytab, renewal is enabled by running the script every eight
>>>> hours. User gets renewed by the script every eight hours.
>>>>
>>>> On Wed, Nov 22, 2017 at 12:27 PM, Georg Heiler <
>>>> georg.kf.hei...@gmail.com> wrote:
>>>>
>>>>> Did you pass a keytab? Is renewal enabled in your kdc?
>>>>> KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mi. 22.
>>>>> Nov. 2017 um 19:25:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have written spark stream job and job is running successfully for
>>>>>> more than 36 hours. After around 36 hours job gets failed with kerberos
>>>>>> issue. Any solution on how to resolve it.
>>>>>>
>>>>>> org.apache.spark.SparkException: Task failed while wri\
>>>>>>
>>>>>> ting rows.
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>>>>
>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>>>>
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
>>>>>> java.io.IOException: org.apache.hadoop.security.authentication.client.\
>>>>>>
>>>>>> AuthenticationException:
>>>>>> org.apache.hadoop.security.token.SecretManager$InvalidToken: token 
>>>>>> (kms-dt
>>>>>> owner=va_dflt, renewer=yarn, re\
>>>>>>
>>>>>> alUser=, issueDate=1511262017635, maxDate=1511866817635,
>>>>>> sequenceNumber=1854601, masterKeyId=3392) is expired
>>>>>>
>>>>>> at org.apache.hadoop.hive.ql.io
>>>>>> .HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:248)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.newOutputWriter$1(hiveWriterContainers.scala:346)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:304)
>>>>>>
>>>>>> ... 8 more
>>>>>>
>>>>>> Caused by: java.io.IOException:
>>>>>> org.apache.hadoop.security.authent

Re: Spark Streaming Kerberos Issue

2017-11-22 Thread Georg Heiler
Do you use oracle or open jdk? We recently had an issue with open jdk:
formerly, java Security extensions were installed by default - no longer so
on centos 7.3

Are these installed?
KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mi. 22. Nov. 2017
um 19:29:

> I passed keytab, renewal is enabled by running the script every eight
> hours. User gets renewed by the script every eight hours.
>
> On Wed, Nov 22, 2017 at 12:27 PM, Georg Heiler <georg.kf.hei...@gmail.com>
> wrote:
>
>> Did you pass a keytab? Is renewal enabled in your kdc?
>> KhajaAsmath Mohammed <mdkhajaasm...@gmail.com> schrieb am Mi. 22. Nov.
>> 2017 um 19:25:
>>
>>> Hi,
>>>
>>> I have written spark stream job and job is running successfully for more
>>> than 36 hours. After around 36 hours job gets failed with kerberos issue.
>>> Any solution on how to resolve it.
>>>
>>> org.apache.spark.SparkException: Task failed while wri\
>>>
>>> ting rows.
>>>
>>> at
>>> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
>>>
>>> at
>>> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
>>>
>>> at
>>> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
>>>
>>> at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>>>
>>> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>>>
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
>>> java.io.IOException: org.apache.hadoop.security.authentication.client.\
>>>
>>> AuthenticationException:
>>> org.apache.hadoop.security.token.SecretManager$InvalidToken: token (kms-dt
>>> owner=va_dflt, renewer=yarn, re\
>>>
>>> alUser=, issueDate=1511262017635, maxDate=1511866817635,
>>> sequenceNumber=1854601, masterKeyId=3392) is expired
>>>
>>> at org.apache.hadoop.hive.ql.io
>>> .HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:248)
>>>
>>> at
>>> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.newOutputWriter$1(hiveWriterContainers.scala:346)
>>>
>>> at
>>> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:304)
>>>
>>> ... 8 more
>>>
>>> Caused by: java.io.IOException:
>>> org.apache.hadoop.security.authentication.client.AuthenticationException:
>>> org.apache.hadoop.securit\
>>>
>>> y.token.SecretManager$InvalidToken: token (kms-dt owner=va_dflt,
>>> renewer=yarn, realUser=, issueDate=1511262017635, maxDate=15118668\
>>>
>>> 17635, sequenceNumber=1854601, masterKeyId=3392) is expired
>>>
>>> at
>>> org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider.decryptEncryptedKey(LoadBalancingKMSClientProvider.java:216)
>>>
>>> at
>>> org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.decryptEncryptedKey(KeyProviderCryptoExtension.java:388)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1440)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.createWrappedOutputStream(DFSClient.java:1542)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DFSClient.createWrappedOutputStream(DFSClient.java:1527)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:428)
>>>
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:421)
>>>
>>> at
>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystem

Re: Spark Streaming Kerberos Issue

2017-11-22 Thread Georg Heiler
Did you pass a keytab? Is renewal enabled in your kdc?
KhajaAsmath Mohammed  schrieb am Mi. 22. Nov. 2017
um 19:25:

> Hi,
>
> I have written spark stream job and job is running successfully for more
> than 36 hours. After around 36 hours job gets failed with kerberos issue.
> Any solution on how to resolve it.
>
> org.apache.spark.SparkException: Task failed while wri\
>
> ting rows.
>
> at
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:328)
>
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
>
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:210)
>
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.apache.hadoop.hive.ql.metadata.HiveException:
> java.io.IOException: org.apache.hadoop.security.authentication.client.\
>
> AuthenticationException:
> org.apache.hadoop.security.token.SecretManager$InvalidToken: token (kms-dt
> owner=va_dflt, renewer=yarn, re\
>
> alUser=, issueDate=1511262017635, maxDate=1511866817635,
> sequenceNumber=1854601, masterKeyId=3392) is expired
>
> at
> org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:248)
>
> at
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.newOutputWriter$1(hiveWriterContainers.scala:346)
>
> at
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.writeToFile(hiveWriterContainers.scala:304)
>
> ... 8 more
>
> Caused by: java.io.IOException:
> org.apache.hadoop.security.authentication.client.AuthenticationException:
> org.apache.hadoop.securit\
>
> y.token.SecretManager$InvalidToken: token (kms-dt owner=va_dflt,
> renewer=yarn, realUser=, issueDate=1511262017635, maxDate=15118668\
>
> 17635, sequenceNumber=1854601, masterKeyId=3392) is expired
>
> at
> org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider.decryptEncryptedKey(LoadBalancingKMSClientProvider.java:216)
>
> at
> org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.decryptEncryptedKey(KeyProviderCryptoExtension.java:388)
>
> at
> org.apache.hadoop.hdfs.DFSClient.decryptEncryptedDataEncryptionKey(DFSClient.java:1440)
>
> at
> org.apache.hadoop.hdfs.DFSClient.createWrappedOutputStream(DFSClient.java:1542)
>
> at
> org.apache.hadoop.hdfs.DFSClient.createWrappedOutputStream(DFSClient.java:1527)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:428)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$7.doCall(DistributedFileSystem.java:421)
>
> at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:421)
>
> at
> org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:362)
>
> at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:925)
>
> at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
>
> at
> parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:220)
>
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:311)
>
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:287)
>
> at
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.(ParquetRecordWriterWrapper.java:65)
>
> at
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125)
>
> at
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114)
>
> at
> org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:260)
>
> at
> org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:245)
>
> ... 10 more
>
> Caused by:
> org.apache.hadoop.security.authentication.client.AuthenticationException:
> 

Re: best spark spatial lib?

2017-10-10 Thread Georg Heiler
What about someting like gromesa?
Anastasios Zouzias  schrieb am Di. 10. Okt. 2017 um
15:29:

> Hi,
>
> Which spatial operations do you require exactly? Also, I don't follow what
> you mean by combining logical operators?
>
> I have created a library that wraps Lucene's spatial functionality here:
> https://github.com/zouzias/spark-lucenerdd/wiki/Spatial-search
>
> You could give a try to the library, it supports intersections / within /
> etc. Ideally, I try to push all spatial Lucene features in the library.
>
> See also, https://github.com/zouzias/spark-lucenerdd/wiki/Related-Work
> for related libraries.
>
> Best,
> Anastasios
>
>
> On Tue, Oct 10, 2017 at 11:21 AM, Imran Rajjad  wrote:
>
>> I need to have a location column inside my Dataframe so that I can do
>> spatial queries and geometry operations. Are there any third-party packages
>> that perform this kind of operations. I have seen a few like Geospark and
>> megalan but they don't support operations where spatial and logical
>> operators can be combined.
>>
>> regards,
>> Imran
>>
>> --
>> I.R
>>
>
>
>
> --
> -- Anastasios Zouzias
> 
>


Re: Offline environment

2017-09-25 Thread Georg Heiler
Just build a fat jar and do not apply --packages
serkan ta?  schrieb am Mo. 25. Sep. 2017 um 09:24:

> Hi,
>
> Everytime i submit spark job, checks the dependent jars from remote maven
> repo.
>
> Is it  possible to set spark first load the cached jars rather than
> looking for internet connection?
>


Re: using R with Spark

2017-09-24 Thread Georg Heiler
No. It is free for use might need r studio server depending on which spark
master you choose.
Felix Cheung  schrieb am So. 24. Sep. 2017 um
22:24:

> Both are free to use; you can use sparklyr from the R shell without
> RStudio (but you probably want an IDE)
>
> --
> *From:* Adaryl Wakefield 
> *Sent:* Sunday, September 24, 2017 11:19:24 AM
> *To:* user@spark.apache.org
> *Subject:* using R with Spark
>
>
> There are two packages SparkR and sparklyr. Sparklyr seems to be the more
> useful. However, do you have to pay to use it? Unless I’m not reading this
> right, it seems you have to have the paid version of RStudio to use it.
>
>
>
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685
>
> www.massstreet.net
>
> www.linkedin.com/in/bobwakefieldmba
> Twitter: @BobLovesData 
>
>
>
>
>


Re: RDD order preservation through transformations

2017-09-14 Thread Georg Heiler
Usually spark ml Models specify the columns they use for training. i.e. you
would only select your columns (X) for model training but metadata i.e.
target labels or your date column  (y) would still be present for each row.

 schrieb am Do., 14. Sep. 2017 um 10:42 Uhr:

> In several situations I would like to zip RDDs knowing that their order
> matches. In particular I’m using an MLLib KMeansModel on an RDD of Vectors
> so I would like to do:
>
>
>
> myData.zip(myModel.predict(myData))
>
>
>
> Also the first column in my RDD is a timestamp which I don’t want to be a
> part of the model, so in fact I would like to split the first column out of
> my RDD, then do:
>
>
>
> myData.zip(myModel.predict(myData.map(dropTimestamp)))
>
>
>
> Moreover I’d like my data to be scaled and go through a principal
> component analysis first, so the main steps would be like:
>
>
>
> val noTs = myData.map(dropTimestamp)
>
> val scaled = scaler.transform(noTs)
>
> val projected = (new RowMatrix(scaled)).multiply(principalComponents).rows
>
> val clusters = myModel.predict(projected)
>
> val result = myData.zip(clusters)
>
>
>
> Do you think there’s a chance that the 4 transformations above would
> preserve order so the zip at the end would be correct?
>
>
>
>
>
> On 2017-09-13 19:51 CEST, lucas.g...@gmail.com wrote :
>
>
>
> I'm wondering why you need order preserved, we've had situations where
> keeping the source as an artificial field in the dataset was important and
> I had to run contortions to inject that (In this case the datasource had no
> unique key).
>
>
>
> Is this similar?
>
>
>
> On 13 September 2017 at 10:46, Suzen, Mehmet  wrote:
>
> But what happens if one of the partitions fail, how fault tolarence
> recover elements in other partitions.
>
>
>
> On 13 Sep 2017 18:39, "Ankit Maloo"  wrote:
>
> AFAIK, the order of a rdd is maintained across a partition for Map
> operations. There is no way a map operation  can change sequence across a
> partition as partition is local and computation happens one record at a
> time.
>
>
>
> On 13-Sep-2017 9:54 PM, "Suzen, Mehmet"  wrote:
>
> I think the order has no meaning in RDDs see this post, specially zip
> methods:
> https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
>
> _
>
> Ce message et ses pieces jointes peuvent contenir des informations 
> confidentielles ou privilegiees et ne doivent donc
> pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu 
> ce message par erreur, veuillez le signaler
> a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
> electroniques etant susceptibles d'alteration,
> Orange decline toute responsabilite si ce message a ete altere, deforme ou 
> falsifie. Merci.
>
> This message and its attachments may contain confidential or privileged 
> information that may be protected by law;
> they should not be distributed, used or copied without authorisation.
> If you have received this email in error, please notify the sender and delete 
> this message and its attachments.
> As emails may be altered, Orange is not liable for messages that have been 
> modified, changed or falsified.
> Thank you.
>
>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-29 Thread Georg Heiler
What about a custom UADF?
Patrick <titlibat...@gmail.com> schrieb am Mo. 28. Aug. 2017 um 20:54:

> ok . i see there is a describe() function which does the stat calculation
> on dataset similar to StatCounter but however i dont want to restrict my
> aggregations to standard mean, stddev etc and generate some custom stats ,
> or also may not run all the predefined stats but only subset of them on the
> particular column.
> I was thinking if we need to write some custom code which does this in one
> action(job) that would work for me
>
>
>
> On Tue, Aug 29, 2017 at 12:02 AM, Georg Heiler <georg.kf.hei...@gmail.com>
> wrote:
>
>> Rdd only
>> Patrick <titlibat...@gmail.com> schrieb am Mo. 28. Aug. 2017 um 20:13:
>>
>>> Ah, does it work with Dataset API or i need to convert it to RDD first ?
>>>
>>> On Mon, Aug 28, 2017 at 10:40 PM, Georg Heiler <
>>> georg.kf.hei...@gmail.com> wrote:
>>>
>>>> What about the rdd stat counter?
>>>> https://spark.apache.org/docs/0.6.2/api/core/spark/util/StatCounter.html
>>>>
>>>> Patrick <titlibat...@gmail.com> schrieb am Mo. 28. Aug. 2017 um 16:47:
>>>>
>>>>> Hi
>>>>>
>>>>> I have two lists:
>>>>>
>>>>>
>>>>>- List one: contains names of columns on which I want to do
>>>>>aggregate operations.
>>>>>- List two: contains the aggregate operations on which I want to
>>>>>perform on each column eg ( min, max, mean)
>>>>>
>>>>> I am trying to use spark 2.0 dataset to achieve this. Spark provides
>>>>> an agg() where you can pass a Map <String,String> (of column name and
>>>>> respective aggregate operation ) as input, however I want to perform
>>>>> different aggregation operations on the same column of the data and want 
>>>>> to
>>>>> collect the result in a Map<String,String> where key is the aggregate
>>>>> operation and Value is the result on the particular column.  If i add
>>>>> different agg() to same column, the key gets updated with latest value.
>>>>>
>>>>> Also I dont find any collectAsMap() operation that returns map of
>>>>> aggregated column name as key and result as value. I get collectAsList()
>>>>> but i dont know the order in which those agg() operations are run so how 
>>>>> do
>>>>> i match which list values corresponds to which agg operation.  I am able 
>>>>> to
>>>>> see the result using .show() but How can i collect the result in this 
>>>>> case ?
>>>>>
>>>>> Is it possible to do different aggregation on the same column in one
>>>>> Job(i.e only one collect operation) using agg() operation?
>>>>>
>>>>>
>>>>> Thanks in advance.
>>>>>
>>>>>
>>>
>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Georg Heiler
Rdd only
Patrick <titlibat...@gmail.com> schrieb am Mo. 28. Aug. 2017 um 20:13:

> Ah, does it work with Dataset API or i need to convert it to RDD first ?
>
> On Mon, Aug 28, 2017 at 10:40 PM, Georg Heiler <georg.kf.hei...@gmail.com>
> wrote:
>
>> What about the rdd stat counter?
>> https://spark.apache.org/docs/0.6.2/api/core/spark/util/StatCounter.html
>>
>> Patrick <titlibat...@gmail.com> schrieb am Mo. 28. Aug. 2017 um 16:47:
>>
>>> Hi
>>>
>>> I have two lists:
>>>
>>>
>>>- List one: contains names of columns on which I want to do
>>>aggregate operations.
>>>- List two: contains the aggregate operations on which I want to
>>>perform on each column eg ( min, max, mean)
>>>
>>> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
>>> agg() where you can pass a Map <String,String> (of column name and
>>> respective aggregate operation ) as input, however I want to perform
>>> different aggregation operations on the same column of the data and want to
>>> collect the result in a Map<String,String> where key is the aggregate
>>> operation and Value is the result on the particular column.  If i add
>>> different agg() to same column, the key gets updated with latest value.
>>>
>>> Also I dont find any collectAsMap() operation that returns map of
>>> aggregated column name as key and result as value. I get collectAsList()
>>> but i dont know the order in which those agg() operations are run so how do
>>> i match which list values corresponds to which agg operation.  I am able to
>>> see the result using .show() but How can i collect the result in this case ?
>>>
>>> Is it possible to do different aggregation on the same column in one
>>> Job(i.e only one collect operation) using agg() operation?
>>>
>>>
>>> Thanks in advance.
>>>
>>>
>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Georg Heiler
What about the rdd stat counter?
https://spark.apache.org/docs/0.6.2/api/core/spark/util/StatCounter.html
Patrick  schrieb am Mo. 28. Aug. 2017 um 16:47:

> Hi
>
> I have two lists:
>
>
>- List one: contains names of columns on which I want to do aggregate
>operations.
>- List two: contains the aggregate operations on which I want to
>perform on each column eg ( min, max, mean)
>
> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
> agg() where you can pass a Map  (of column name and
> respective aggregate operation ) as input, however I want to perform
> different aggregation operations on the same column of the data and want to
> collect the result in a Map where key is the aggregate
> operation and Value is the result on the particular column.  If i add
> different agg() to same column, the key gets updated with latest value.
>
> Also I dont find any collectAsMap() operation that returns map of
> aggregated column name as key and result as value. I get collectAsList()
> but i dont know the order in which those agg() operations are run so how do
> i match which list values corresponds to which agg operation.  I am able to
> see the result using .show() but How can i collect the result in this case ?
>
> Is it possible to do different aggregation on the same column in one
> Job(i.e only one collect operation) using agg() operation?
>
>
> Thanks in advance.
>
>


Re: different behaviour linux/Unix vs windows when load spark context in scala method called from R function using rscala package

2017-08-27 Thread Georg Heiler
Why don't you simply use sparklyr for a more R native integration of spark?
Simone Pallotta  schrieb am So. 27. Aug.
2017 um 09:47:

> In my R code, I am using rscala package to bridge to a scala method. in
> scala method I have initialized a spark context to be used later.
>
>
> R code:
>
> s <- scala(classpath = "", heap.maximum = "4g")
>
> assign("WrappeR",s$.it.wrapper.r.Wrapper)
>
> WrappeR$init()
>
> where init is a scala function and Wrapper is scala class name (see below).
>
> I created a scala object (Wrapper) with init function
>
>
> scala code:
>
> package it.wrapper.r
>
> object Wrapper {
>
>   var Spark_context: SparkContext = _
>
> def init(): Unit = {
>
> val spark_conf = new SparkConf()
>
>   .setMaster("local[*]")
>
>   .setAppName("GMQL-R")
>
>   .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
>   .set("spark.executor.memory", "6g")
>
>   .set("spark.driver.memory", "2g")
>
> Spark_context = SparkContext.getOrCreate(spark_conf)
>
>   }
>
> }
>
> I removed the following code after Spark_context because is not useful.
>
> this code works without flaws under OS X and Linux. Of course window is
> not gentle as other OSes. I checked out my project under windows (I have
> installed scala 2.11.8, java 1.8 and create environment variable JAVA_HOME
> and SCALA_HOME that are mandatory for running rscala package in R, I did
> the same in OS X/Linux)
>
> when I run the jar file as standalone application it works fine under
> windows, but when I invoke the same function from R it fails giving this
> error:
>
> Caused by: java.lang.RuntimeException: java.lang.RuntimeException: class
> org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
> org.apache.hadoop.security.GroupMappingServiceProvider
>
> at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1752)
>
> at org.apache.hadoop.security.Groups.(Groups.java:55)
>
> at
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:182)
>
> at
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:235)
>
> at
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:214)
>
> at
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:669)
>
> at `enter code
> here`org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:571)
>
> at
> org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2391)
>
> at
> org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2391)
>
> at scala.Option.getOrElse(Option.scala:121)
>
> at
> org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2391)
>
> at org.apache.spark.SparkContext.(SparkContext.scala:295)
>
> at org.apache.spark.SparkContext.(SparkContext.scala:145)
>
> at it.polimi.genomics.r.Wrapper$.initGMQL(Wrapper.scala:98)
>
> at $line4.$read$$iw$$iw$$anonfun$1.apply$mcV$sp(:19)
>
> at $line4.$read$$iw$$iw$$anonfun$1.apply(:16)
>
> at $line4.$read$$iw$$iw$$anonfun$1.apply(:16)
>
> ... 46 more
>
> Caused by: java.lang.RuntimeException: class
> org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
> org.apache.hadoop.security.GroupMappingServiceProvider
>
> at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1746)
>
> ... 62 more
>
> java.lang.reflect.InvocationTargetException
>
> java.lang.RuntimeException: java.lang.RuntimeException: class
> org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not
> org.apache.hadoop.security.GroupMappingServiceProvider
>
>
> Does anyone has already seen this kind of behavior?
>


Re: some Ideas on expressing Spark SQL using JSON

2017-07-26 Thread Georg Heiler
Because sparks dsl partially supports compile time type safety. E.g. the
compiler will notify you that a sql function was misspelled when using the
dsl opposed to the plain sql string which is only parsed at runtime.
Sathish Kumaran Vairavelu  schrieb am Di. 25.
Juli 2017 um 23:42:

> Just a thought. SQL itself is a DSL. Why DSL on top of another DSL?
> On Tue, Jul 25, 2017 at 4:47 AM kant kodali  wrote:
>
>> Hi All,
>>
>> I am thinking to express Spark SQL using JSON in the following the way.
>>
>> For Example:
>>
>> *Query using Spark DSL*
>>
>> DS.filter(col("name").equalTo("john"))
>> .groupBy(functions.window(df1.col("TIMESTAMP"), "24 hours", "24 
>> hours"), df1.col("hourlyPay"))
>> .agg(sum("hourlyPay").as("total"));
>>
>>
>> *Query using JSON*
>>
>>
>>
>> ​
>> ​
>> The Goal is to design a DSL in JSON such that users can and express SPARK
>> SQL queries in JSON so users can send Spark SQL queries over rest and get
>> the results out. Now, I am sure there are BI tools and notebooks like
>> Zeppelin that can accomplish the desired behavior however I believe there
>> maybe group of users who don't want to use those BI tools or notebooks
>> instead they want all the communication from front end to back end using
>> API's.
>>
>> Also another goal would be the DSL design in JSON should closely mimic
>> the underlying Spark SQL DSL.
>>
>> Please feel free to provide some feedback or criticize to whatever extent
>> you like!
>>
>> Thanks!
>>
>>
>>


Re: Flatten JSON to multiple columns in Spark

2017-07-18 Thread Georg Heiler
You need to have spark implicits in scope
Richard Xin  schrieb am Di. 18. Juli 2017
um 08:45:

> I believe you could use JOLT (bazaarvoice/jolt
> ) to flatten it to a json string and
> then to dataframe or dataset.
>
> bazaarvoice/jolt
>
> jolt - JSON to JSON transformation library written in Java.
> 
>
>
>
>
> On Monday, July 17, 2017, 11:18:24 PM PDT, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>
> Explode is not working in this scenario with error - string cannot be used
> in explore either array or map in spark
> On Tue, Jul 18, 2017 at 11:39 AM, 刘虓  wrote:
>
> Hi,
> have you tried to use explode?
>
> Chetan Khatri  于2017年7月18日 周二下午2:06写道:
>
> Hello Spark Dev's,
>
> Can you please guide me, how to flatten JSON to multiple columns in Spark.
>
> *Example:*
>
> Sr No Title ISBN Info
> 1 Calculus Theory 1234567890
>
> [{"cert":[{
> "authSbmtr":"009415da-c8cd- 418d-869e-0a19601d79fa",
> 009415da-c8cd-418d-869e- 0a19601d79fa
> "certUUID":"03ea5a1a-5530- 4fa3-8871-9d1ebac627c4",
>
> "effDt":"2016-05-06T15:04:56. 279Z",
>
>
> "fileFmt":"rjrCsv","status":" live"}],
>
> "expdCnt":"15",
> "mfgAcctNum":"531093",
>
> "oUUID":"23d07397-4fbe-4897- 8a18-b79c9f64726c",
>
>
> "pgmRole":["RETAILER"],
> "pgmUUID":"1cb5dd63-817a-45bc- a15c-5660e4accd63",
> "regUUID":"cc1bd898-657d-40dc- af5d-4bf1569a1cc4",
> "rtlrsSbmtd":["009415da-c8cd- 418d-869e-0a19601d79fa"]}]
>
> I want to get single row with 11 columns.
>
> Thanks.
>
>


Re: Flatten JSON to multiple columns in Spark

2017-07-18 Thread Georg Heiler
df.select ($"Info.*") should help
Chetan Khatri  schrieb am Di. 18. Juli 2017 um
08:06:

> Hello Spark Dev's,
>
> Can you please guide me, how to flatten JSON to multiple columns in Spark.
>
> *Example:*
>
> Sr No Title ISBN Info
> 1 Calculus Theory 1234567890 [{"cert":[{
> "authSbmtr":"009415da-c8cd-418d-869e-0a19601d79fa",
> 009415da-c8cd-418d-869e-0a19601d79fa
> "certUUID":"03ea5a1a-5530-4fa3-8871-9d1ebac627c4",
> "effDt":"2016-05-06T15:04:56.279Z",
> "fileFmt":"rjrCsv","status":"live"}],
>
> "expdCnt":"15",
> "mfgAcctNum":"531093",
> "oUUID":"23d07397-4fbe-4897-8a18-b79c9f64726c",
> "pgmRole":["RETAILER"],
> "pgmUUID":"1cb5dd63-817a-45bc-a15c-5660e4accd63",
> "regUUID":"cc1bd898-657d-40dc-af5d-4bf1569a1cc4",
> "rtlrsSbmtd":["009415da-c8cd-418d-869e-0a19601d79fa"]}]
>
> I want to get single row with 11 columns.
>
> Thanks.
>


Re: custom column types for JDBC datasource writer

2017-07-05 Thread Georg Heiler
Great, thanks!
But for the current release is there any possibility to be able to catch
the exception and handle it i.e. not have spark only log it to the console?

Takeshi Yamamuro <linguin@gmail.com> schrieb am Do., 6. Juli 2017 um
06:44 Uhr:

> -dev +user
>
> You can in master and see
> https://github.com/apache/spark/commit/c7911807050227fcd13161ce090330d9d8daa533
> .
> This option will be available in the next release.
>
> // maropu
>
> On Thu, Jul 6, 2017 at 1:25 PM, Georg Heiler <georg.kf.hei...@gmail.com>
> wrote:
>
>> Hi,
>> is it possible to somehow make spark not use VARCHAR(255) but something
>> bigger i.e. CLOB for Strings?
>>
>> If not, is it at least possible to catch the exception which is thrown.
>> To me, it seems that spark is catching and logging it - so I can no longer
>> intervene and handle it:
>>
>>
>> https://stackoverflow.com/questions/44927764/spark-jdbc-oracle-long-string-fields
>>
>> Regards,
>> Georg
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: [Spark Sql/ UDFs] Spark and Hive UDFs parity

2017-06-16 Thread Georg Heiler
I assume you want to have this life cycle in oder to create big/ heavy /
complex objects only once ( per partition) map partitions should fit this
usecase pretty well.
RD <rdsr...@gmail.com> schrieb am Fr. 16. Juni 2017 um 17:37:

> Thanks Georg. But I'm not sure how mapPartitions is relevant here.  Can
> you elaborate?
>
>
>
> On Thu, Jun 15, 2017 at 4:18 AM, Georg Heiler <georg.kf.hei...@gmail.com>
> wrote:
>
>> What about using map partitions instead?
>>
>> RD <rdsr...@gmail.com> schrieb am Do. 15. Juni 2017 um 06:52:
>>
>>> Hi Spark folks,
>>>
>>> Is there any plan to support the richer UDF API that Hive supports
>>> for Spark UDFs ? Hive supports the GenericUDF API which has, among others
>>> methods like initialize(), configure() (called once on the cluster) etc,
>>> which a lot of our users use. We have now a lot of UDFs in Hive which make
>>> use of these methods. We plan to move to UDFs to Spark UDFs but are being
>>> limited by not having similar lifecycle methods.
>>>Are there plans to address these? Or do people usually adopt some
>>> sort of workaround?
>>>
>>>If we  directly use  the Hive UDFs  in Spark we pay a performance
>>> penalty. I think Spark anyways does a conversion from InternalRow to Row
>>> back to InternalRow for native spark udfs and for Hive it does InternalRow
>>> to Hive Object back to InternalRow but somehow the conversion in native
>>> udfs is more performant.
>>>
>>> -Best,
>>> R.
>>>
>>
>


Re: [Spark Sql/ UDFs] Spark and Hive UDFs parity

2017-06-15 Thread Georg Heiler
What about using map partitions instead?
RD  schrieb am Do. 15. Juni 2017 um 06:52:

> Hi Spark folks,
>
> Is there any plan to support the richer UDF API that Hive supports for
> Spark UDFs ? Hive supports the GenericUDF API which has, among others
> methods like initialize(), configure() (called once on the cluster) etc,
> which a lot of our users use. We have now a lot of UDFs in Hive which make
> use of these methods. We plan to move to UDFs to Spark UDFs but are being
> limited by not having similar lifecycle methods.
>Are there plans to address these? Or do people usually adopt some sort
> of workaround?
>
>If we  directly use  the Hive UDFs  in Spark we pay a performance
> penalty. I think Spark anyways does a conversion from InternalRow to Row
> back to InternalRow for native spark udfs and for Hive it does InternalRow
> to Hive Object back to InternalRow but somehow the conversion in native
> udfs is more performant.
>
> -Best,
> R.
>


Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-21 Thread Georg Heiler
You could write your views to hive or maybe tachyon.

Is the periodically updated data big?
Hemanth Gudela <hemanth.gud...@qvantel.com> schrieb am Fr. 21. Apr. 2017 um
16:55:

> Being new to spark, I think I need your suggestion again.
>
>
>
> #2 you can always define a batch Dataframe and register it as view, and
> then run a background then periodically creates a new Dataframe with
> updated data and re-registers it as a view with the same name
>
>
>
> I seem to have misunderstood your statement and tried registering static
> dataframe as a temp view (“myTempView”) using createOrReplaceView in one
> spark session, and tried re-registering another refreshed dataframe as temp
> view with same name (“myTempView”) in another session. However, with this
> approach, I have failed to achieve what I’m aiming for, because views are
> local to one spark session.
>
> From spark 2.1.0 onwards, Global view is a nice feature, but still would
> not solve my problem, because global view cannot be updated.
>
>
>
> So after much thinking, I understood that you would have meant to use a
> background running process in the same spark job that would periodically
> create a new dataframe and re-register temp view with same name, within the
> same spark session.
>
> Could you please give me some pointers to documentation on how to create
> such asynchronous background process in spark streaming? Is Scala’s
> “Futures” the way to achieve this?
>
>
>
> Thanks,
>
> Hemanth
>
>
>
>
>
> *From: *Tathagata Das <tathagata.das1...@gmail.com>
>
>
> *Date: *Friday, 21 April 2017 at 0.03
> *To: *Hemanth Gudela <hemanth.gud...@qvantel.com>
>
> *Cc: *Georg Heiler <georg.kf.hei...@gmail.com>, "user@spark.apache.org" <
> user@spark.apache.org>
>
>
> *Subject: *Re: Spark structured streaming: Is it possible to periodically
> refresh static data frame?
>
>
>
> Here are couple of ideas.
>
> 1. You can set up a Structured Streaming query to update in-memory table.
>
> Look at the memory sink in the programming guide -
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
>
> So you can query the latest table using a specified table name, and also
> join that table with another stream. However, note that this in-memory
> table is maintained in the driver, and so you have be careful about the
> size of the table.
>
>
>
> 2. If you cannot define a streaming query in the slow moving due to
> unavailability of connector for your streaming data source, then you can
> always define a batch Dataframe and register it as view, and then run a
> background then periodically creates a new Dataframe with updated data and
> re-registers it as a view with the same name. Any streaming query that
> joins a streaming dataframe with the view will automatically start using
> the most updated data as soon as the view is updated.
>
>
>
> Hope this helps.
>
>
>
>
>
> On Thu, Apr 20, 2017 at 1:30 PM, Hemanth Gudela <
> hemanth.gud...@qvantel.com> wrote:
>
> Thanks Georg for your reply.
>
> But I’m not sure if I fully understood your answer.
>
>
>
> If you meant to join two streams (one reading Kafka, and another reading
> database table), then I think it’s not possible, because
>
> 1.   According to documentation
> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#data-sources>,
> Structured streaming does not support database as a streaming source
>
> 2.   Joining between two streams is not possible yet.
>
>
>
> Regards,
>
> Hemanth
>
>
>
> *From: *Georg Heiler <georg.kf.hei...@gmail.com>
> *Date: *Thursday, 20 April 2017 at 23.11
> *To: *Hemanth Gudela <hemanth.gud...@qvantel.com>, "user@spark.apache.org"
> <user@spark.apache.org>
> *Subject: *Re: Spark structured streaming: Is it possible to periodically
> refresh static data frame?
>
>
>
> What about treating the static data as a (slow) stream as well?
>
>
>
> Hemanth Gudela <hemanth.gud...@qvantel.com> schrieb am Do., 20. Apr. 2017
> um 22:09 Uhr:
>
> Hello,
>
>
>
> I am working on a use case where there is a need to join streaming data
> frame with a static data frame.
>
> The streaming data frame continuously gets data from Kafka topics, whereas
> static data frame fetches data from a database table.
>
>
>
> However, as the underlying database table is getting updated often, I must
> somehow manage to refresh my static data frame periodically to get the
> latest information from underlying database table.
>
>
>
> My questions:
>
> 1.   Is it possible to periodically refresh static data frame?
>
> 2.   If refreshing static data frame is not possible, is there a
> mechanism to automatically stop & restarting spark structured streaming
> job, so that every time the job restarts, the static data frame gets
> updated with latest information from underlying database table.
>
> 3.   If 1) and 2) are not possible, please suggest alternatives to
> achieve my requirement described above.
>
>
>
> Thanks,
>
> Hemanth
>
>
>


Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Georg Heiler
Unfortunately I think this currently might require the old api.
Hemanth Gudela <hemanth.gud...@qvantel.com> schrieb am Fr. 21. Apr. 2017 um
05:58:

> Idea #2 probably suits my needs better, because
>
> -  Streaming query does not have a source database connector yet
>
> -  My source database table is big, so in-memory table could be
> huge for driver to handle.
>
>
>
> Thanks for cool ideas, TD!
>
>
>
> Regards,
>
> Hemanth
>
>
>
> *From: *Tathagata Das <tathagata.das1...@gmail.com>
> *Date: *Friday, 21 April 2017 at 0.03
> *To: *Hemanth Gudela <hemanth.gud...@qvantel.com>
> *Cc: *Georg Heiler <georg.kf.hei...@gmail.com>, "user@spark.apache.org" <
> user@spark.apache.org>
>
>
> *Subject: *Re: Spark structured streaming: Is it possible to periodically
> refresh static data frame?
>
>
>
> Here are couple of ideas.
>
> 1. You can set up a Structured Streaming query to update in-memory table.
>
> Look at the memory sink in the programming guide -
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
>
> So you can query the latest table using a specified table name, and also
> join that table with another stream. However, note that this in-memory
> table is maintained in the driver, and so you have be careful about the
> size of the table.
>
>
>
> 2. If you cannot define a streaming query in the slow moving due to
> unavailability of connector for your streaming data source, then you can
> always define a batch Dataframe and register it as view, and then run a
> background then periodically creates a new Dataframe with updated data and
> re-registers it as a view with the same name. Any streaming query that
> joins a streaming dataframe with the view will automatically start using
> the most updated data as soon as the view is updated.
>
>
>
> Hope this helps.
>
>
>
>
>
> On Thu, Apr 20, 2017 at 1:30 PM, Hemanth Gudela <
> hemanth.gud...@qvantel.com> wrote:
>
> Thanks Georg for your reply.
>
> But I’m not sure if I fully understood your answer.
>
>
>
> If you meant to join two streams (one reading Kafka, and another reading
> database table), then I think it’s not possible, because
>
> 1.   According to documentation
> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#data-sources>,
> Structured streaming does not support database as a streaming source
>
> 2.   Joining between two streams is not possible yet.
>
>
>
> Regards,
>
> Hemanth
>
>
>
> *From: *Georg Heiler <georg.kf.hei...@gmail.com>
> *Date: *Thursday, 20 April 2017 at 23.11
> *To: *Hemanth Gudela <hemanth.gud...@qvantel.com>, "user@spark.apache.org"
> <user@spark.apache.org>
> *Subject: *Re: Spark structured streaming: Is it possible to periodically
> refresh static data frame?
>
>
>
> What about treating the static data as a (slow) stream as well?
>
>
>
> Hemanth Gudela <hemanth.gud...@qvantel.com> schrieb am Do., 20. Apr. 2017
> um 22:09 Uhr:
>
> Hello,
>
>
>
> I am working on a use case where there is a need to join streaming data
> frame with a static data frame.
>
> The streaming data frame continuously gets data from Kafka topics, whereas
> static data frame fetches data from a database table.
>
>
>
> However, as the underlying database table is getting updated often, I must
> somehow manage to refresh my static data frame periodically to get the
> latest information from underlying database table.
>
>
>
> My questions:
>
> 1.   Is it possible to periodically refresh static data frame?
>
> 2.   If refreshing static data frame is not possible, is there a
> mechanism to automatically stop & restarting spark structured streaming
> job, so that every time the job restarts, the static data frame gets
> updated with latest information from underlying database table.
>
> 3.   If 1) and 2) are not possible, please suggest alternatives to
> achieve my requirement described above.
>
>
>
> Thanks,
>
> Hemanth
>
>
>


Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Georg Heiler
What about treating the static data as a (slow) stream as well?

Hemanth Gudela  schrieb am Do., 20. Apr. 2017
um 22:09 Uhr:

> Hello,
>
>
>
> I am working on a use case where there is a need to join streaming data
> frame with a static data frame.
>
> The streaming data frame continuously gets data from Kafka topics, whereas
> static data frame fetches data from a database table.
>
>
>
> However, as the underlying database table is getting updated often, I must
> somehow manage to refresh my static data frame periodically to get the
> latest information from underlying database table.
>
>
>
> My questions:
>
> 1.   Is it possible to periodically refresh static data frame?
>
> 2.   If refreshing static data frame is not possible, is there a
> mechanism to automatically stop & restarting spark structured streaming
> job, so that every time the job restarts, the static data frame gets
> updated with latest information from underlying database table.
>
> 3.   If 1) and 2) are not possible, please suggest alternatives to
> achieve my requirement described above.
>
>
>
> Thanks,
>
> Hemanth
>


Re: Problem with Execution plan using loop

2017-04-16 Thread Georg Heiler
Hi

I had a similar problem. For me, using the rdd stat counter helped a lot.
Check out
http://stackoverflow.com/questions/41169873/spark-dynamic-dag-is-a-lot-slower-and-different-from-hard-coded-dag
and
http://stackoverflow.com/questions/41445571/spark-migrate-sql-window-function-to-rdd-for-better-performance

I did not calculate a lag, but a percentage. Having had a brief look at
your code, you seem to get min max count for the column so my links should
help you making these calculations more efficient.

Regards Georg
Javier Rey  schrieb am So. 16. Apr. 2017 um 05:34:

> Hi guys,
>
> I have this situation:
>
> 1. Data frame with 22 columns
> 2. I need to add some columns (feature engineering) using existing
> columns, 12 columns will be add by each column in list.
> 3. I created a loop, but in the 5 item(col) on the loop this starts to go
> very slow in the join part, I can observe that the execution plan is
> getting bigger.
> 4. I tried to save to parquet by each iteration, but parquet is immutable,
> I got an error.
> 5. I really appreciate any help
>
> Here the code:
>
> def create_lag_columns(df, months, columns_to_lag):
>
> columns_aggregate = []
> data_with_period = df
> w = Window().partitionBy("idpersona").orderBy("idpersona", "fecha")
> for column_lag in columns_to_lag:
>
> print("Calculating lag for column: " + column_lag)
>
> # Create lag columns
> for i in range(1,months + 1):
> column_name_lag = column_lag + "_t_" + str(i)
> data_with_period =
> data_with_period.withColumn(column_name_lag, lag(column_lag, i).over(w))
> columns_aggregate.append(column_name_lag)
>
> # Convert to long it's convenience to do aggregate operations
> df_long = data_with_period.select('idpersona', "fecha",
>
> explode(array(columns_aggregate)).alias('values'))
> # Aggregate operations
> df_agg = (df_long.groupBy("idpersona", "fecha")
>  .agg(F.min("values").alias("min_" + column_lag),
>   F.sum("values").alias("sum_" + column_lag),
>   F.max("values").alias("max_" + column_lag),
>   F.avg("values").alias("avg_" + column_lag),
>   F.count("values").alias("count_" +
> column_lag),
>   F.stddev("values").alias("std_" +
> column_lag))
>   )
>
> # Merge with result
> data_with_period = (data_with_period.join(df_agg, ['idpersona',
> "fecha"]))
>
> # Set null for next loop
> columns_aggregate = []
>
> return data_with_period
>
> -
>
> lag_columns = ["indice_charlson", "pam", "framingham", "tfg",
> "perimetro_abdominal",
>"presion_sistolica", "presion_diastolica", "imc",
> "peso", "talla",
>"frecuencia_cardiaca", "saturacion_oxigeno",
> "porcentaje_grasa"]
>
>
> --
> df = create_lag_columns(df, 6, columns_to_lag)
>
> Thanks,
>
> Javier Rey
>
>
>


Re: Any NLP library for sentiment analysis in Spark?

2017-04-12 Thread Georg Heiler
I upgraded some dependencies here https://github.com/geoHeil/spark-corenlp
and currently use it for an University project.

Would also be interested in better libraries for spark.

Tokenization and lemmatizaion work fine.

Regards Georg

hosur narahari  schrieb am Mi. 12. Apr. 2017 um 06:53:

> Tensorflow provides NLP implementation which uses deep learning
> technology. But it's not distributed. So you can try to integrate spark
> with Tensorflow.
>
> Best Regards,
> Hari
>
> On 11 Apr 2017 11:44 p.m., "Gabriel James" 
> wrote:
>
> Me too. Experiences and recommendations please.
>
>
>
> Gabriel
>
>
>
> *From:* Kevin Wang [mailto:buz...@gmail.com]
> *Sent:* Wednesday, April 12, 2017 6:11 AM
> *To:* Alonso Isidoro Roman 
> *Cc:* Gaurav1809 ; user@spark.apache.org
> *Subject:* Re: Any NLP library for sentiment analysis in Spark?
>
>
>
> I am also interested in this topic.  Anything else anyone can recommend?
> Thanks.
>
>
>
> Best,
>
>
>
> Kevin
>
>
>
> On Tue, Apr 11, 2017 at 5:00 AM, Alonso Isidoro Roman 
> wrote:
>
> i did not use it yet, but this library looks promising:
>
>
>
> https://github.com/databricks/spark-corenlp
>
>
>
>
> *Alonso Isidoro Roman*
>
> about.me/alonso.isidoro.roman
>
>
>
> 2017-04-11 11:02 GMT+02:00 Gaurav1809 :
>
> Hi All,
>
> I need to determine sentiment for given document (statement, paragraph
> etc.)
> Is there any NLP library available with Apache Spark that I can use here?
>
> Any other pointers towards this would be highly appreciated.
>
> Thanks in advance.
> Gaurav Pandya
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Any-NLP-library-for-sentiment-analysis-in-Spark-tp28586.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
>
>


spark off heap memory

2017-04-09 Thread Georg Heiler
Hi,
I thought that with the integration of project Tungesten, spark would
automatically use off heap memory.

What for are spark.memory.offheap.size and spark.memory.offheap.enabled? Do
I manually need to specify the amount of off heap memory for Tungsten here?

Regards,
Georg


Re: Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!

2017-03-24 Thread Georg Heiler
Maybe an udf to flatten is an interesting option as well.
http://stackoverflow.com/q/42888711/2587904 would a uadf very more
performant?
shyla deshpande  schrieb am Fr. 24. März 2017 um
04:04:

> Thanks a million Yong. Great help!!! It solved my problem.
>
> On Thu, Mar 23, 2017 at 6:00 PM, Yong Zhang  wrote:
>
> Change:
>
> val arrayinput = input.getAs[Array[String]](0)
>
> to:
>
> val arrayinput = input.getAs[*Seq*[String]](0)
>
>
> Yong
>
>
> --
> *From:* shyla deshpande 
> *Sent:* Thursday, March 23, 2017 8:18 PM
> *To:* user
> *Subject:* Spark dataframe, UserDefinedAggregateFunction(UDAF) help!!
>
> This is my input data. The UDAF needs to aggregate the goals for a team
> and return a map that  gives the count for every goal in the team.
> I am getting the following error
>
> java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef
> cannot be cast to [Ljava.lang.String;
> at com.whil.common.GoalAggregator.update(GoalAggregator.scala:27)
>
> +--+--+
> |teamid|goals |
> +--+--+
> |t1|[Goal1, Goal2]|
> |t1|[Goal1, Goal3]|
> |t2|[Goal1, Goal2]|
> |t3|[Goal2, Goal3]|
> +--+--+
>
> root
>  |-- teamid: string (nullable = true)
>  |-- goals: array (nullable = true)
>  ||-- element: string (containsNull = true)
>
> /Calling the UDAF//
>
> object TestUDAF {
>   def main(args: Array[String]): Unit = {
>
> val spark = SparkSession
>   .builder
>   .getOrCreate()
>
> val sc: SparkContext = spark.sparkContext
> val sqlContext = spark.sqlContext
>
> import sqlContext.implicits._
>
> val data = Seq(
>   ("t1", Seq("Goal1", "Goal2")),
>   ("t1", Seq("Goal1", "Goal3")),
>   ("t2", Seq("Goal1", "Goal2")),
>   ("t3", Seq("Goal2", "Goal3"))).toDF("teamid","goals")
>
> data.show(truncate = false)
> data.printSchema()
>
> import spark.implicits._
>
> val sumgoals = new GoalAggregator
> val result = data.groupBy("teamid").agg(sumgoals(col("goals")))
>
> result.show(truncate = false)
>
>   }
> }
>
> ///UDAF/
>
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types._
>
> class GoalAggregator extends UserDefinedAggregateFunction{
>
>   override def inputSchema: org.apache.spark.sql.types.StructType =
>   StructType(StructField("value", ArrayType(StringType)) :: Nil)
>
>   override def bufferSchema: StructType = StructType(
>   StructField("combined", MapType(StringType,IntegerType)) :: Nil
>   )
>
>   override def dataType: DataType = MapType(StringType,IntegerType)
>
>   override def deterministic: Boolean = true
>
>   override def initialize(buffer: MutableAggregationBuffer): Unit = {
> buffer.update(0, Map[String, Integer]())
>   }
>
>   override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
> val mapbuf = buffer.getAs[Map[String, Int]](0)
> val arrayinput = input.getAs[Array[String]](0)
> val result = mapbuf ++ arrayinput.map(goal => {
>   val cnt  = mapbuf.get(goal).getOrElse(0) + 1
>   goal -> cnt
> })
> buffer.update(0, result)
>   }
>
>   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = 
> {
> val map1 = buffer1.getAs[Map[String, Int]](0)
> val map2 = buffer2.getAs[Map[String, Int]](0)
> val result = map1 ++ map2.map { case (k,v) =>
>   val cnt = map1.get(k).getOrElse(0) + 1
>   k -> cnt
> }
> buffer1.update(0, result)
>   }
>
>   override def evaluate(buffer: Row): Any = {
> buffer.getAs[Map[String, Int]](0)
>   }
> }
>
>
>
>
>


Re: Spark Job Performance monitoring approaches

2017-02-15 Thread Georg Heiler
I know of the following tools
https://sites.google.com/site/sparkbigdebug/home
https://engineering.linkedin.com/blog/2016/04/dr-elephant-open-source-self-serve-performance-tuning-hadoop-spark
 https://github.com/SparkMonitor/varOne https://github.com/groupon/sparklint


Chetan Khatri  schrieb am Do., 16. Feb. 2017
um 06:15 Uhr:

> Hello All,
>
> What would be the best approches to monitor Spark Performance, is there
> any tools for Spark Job Performance monitoring ?
>
> Thanks.
>


Re: MultiLabelBinarizer

2017-02-08 Thread Georg Heiler
I believe only
http://stackoverflow.com/questions/34167105/using-spark-mls-onehotencoder-on-multiple-columns
is
currently possible i.e. using multiple stringindexers and then multiple one
hot encoders one per column

Madabhattula Rajesh Kumar  schrieb am Do., 9. Feb.
2017 um 06:38 Uhr:

> Hi,
>
> Do we have a below equivalent preprocessing function in Spark ML?
>
> from sklearn.preprocessing import MultiLabelBinarizer
>
> Regards,
>
> Rajesh
>
>
>


Re: Anyone has any experience using spark in the banking industry?

2017-01-18 Thread Georg Heiler
Have a look at mesos together with myriad I.e. Yarn on mesos.
kant kodali  schrieb am Mi. 18. Jan. 2017 um 22:51:

> Anyone has any experience using spark in the banking industry? I have
> couple of questions.
>
> 1. Most of the banks seem to care about number of pending transaction at
> any given time and I wonder if this is processing time or event time? I am
> just trying to understand how this is normally done in the banking industry?
>
> 2. How can I make spark cluster highly available across multi datacenter?
> Any pointers?
>
> Thanks,
> kant
>


Re: Nested ifs in sparksql

2017-01-11 Thread Georg Heiler
I was using the dataframe api not sql. The main problem was that too much
code was generated.
Using an unforgettable turned out to be quicker as well.
Olivier Girardot <o.girar...@lateral-thoughts.com> schrieb am Di. 10. Jan.
2017 um 21:54:

> Are you using the "case when" functions ? what do you mean by slow ? can
> you share a snippet ?
>
>
>
> On Tue, Jan 10, 2017 8:15 PM, Georg Heiler georg.kf.hei...@gmail.com
> wrote:
>
> Maybe you can create an UDF?
>
> Raghavendra Pandey <raghavendra.pan...@gmail.com> schrieb am Di., 10.
> Jan. 2017 um 20:04 Uhr:
>
> I have of around 41 level of nested if else in spark sql. I have
> programmed it using apis on dataframe. But it takes too much time.
> Is there anything I can do to improve on time here?
>
>
>
> *Olivier Girardot* | Associé
> o.girar...@lateral-thoughts.com
> +33 6 24 09 17 94
>


Re: Nested ifs in sparksql

2017-01-10 Thread Georg Heiler
Maybe you can create an UDF?

Raghavendra Pandey  schrieb am Di., 10. Jan.
2017 um 20:04 Uhr:

> I have of around 41 level of nested if else in spark sql. I have
> programmed it using apis on dataframe. But it takes too much time.
> Is there anything I can do to improve on time here?
>


Re: top-k function for Window

2017-01-04 Thread Georg Heiler
What about
https://github.com/myui/hivemall/wiki/Efficient-Top-k-computation-on-Apache-Hive-using-Hivemall-UDTF
Koert Kuipers  schrieb am Mi. 4. Jan. 2017 um 16:11:

> i assumed topk of frequencies in one pass. if its topk by known
> sorting/ordering then use priority queue aggregator instead of spacesaver.
>
> On Tue, Jan 3, 2017 at 3:11 PM, Koert Kuipers  wrote:
>
> i dont know anything about windowing or about not using developer apis...
>
> but
>
> but a trivial implementation of top-k requires a total sort per group.
> this can be done with dataset. we do this using spark-sorted (
> https://github.com/tresata/spark-sorted) but its not hard to do it
> yourself for datasets either. for rdds its actually a little harder i think
> (if you want to avoid in memory assumption, which i assume you do)..
>
> a perhaps more efficient implementation uses an aggregator. it is not hard
> to adapt algebirds topk aggregator (spacesaver) to use as a spark
> aggregator. this requires a simple adapter class. we do this in-house as
> well. although i have to say i would recommend spark 2.1.0 for this. spark
> 2.0.x aggregator codegen is too buggy in my experience.
>
> On Tue, Jan 3, 2017 at 2:09 PM, Andy Dang  wrote:
>
> Hi Austin,
>
> It's trivial to implement top-k in the RDD world - however I would like to
> stay in the Dataset API world instead of flip-flopping between the two APIs
> (consistency, wholestage codegen etc).
>
> The twitter library appears to support only RDD, and the solution you gave
> me is very similar to what I did - it doesn't work very well with skewed
> dataset :) (it has to perform the sort to work out the row number).
>
> I've been toying with the UDAF idea, but the more I write the code the
> more I see myself digging deeper into the developer API land  - not very
> ideal to be honest. Also, UDAF doesn't have any concept of sorting, so it
> gets messy really fast.
>
> ---
> Regards,
> Andy
>
> On Tue, Jan 3, 2017 at 6:59 PM, HENSLEE, AUSTIN L  wrote:
>
> Andy,
>
>
>
> You might want to also checkout the Algebird libraries from Twitter. They
> have topK and a lot of other helpful functions. I’ve used the Algebird topk
> successfully on very large data sets.
>
>
>
> You can also use Spark SQL to do a “poor man’s” topK. This depends on how
> scrupulous you are about your TopKs (I can expound on this, if needed).
>
>
>
> I obfuscated the field names, before pasting this into email – I think I
> got them all consistently.
>
>
>
> Here’s the meat of the TopK part (found on SO, but I don’t have a
> reference) – this one takes the top 4, hence “rowNum <= 4”:
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>incomingCount
>
>   FROM (select time_bucket,
>
> identifier1,
>
> identifier2,
>
> incomingCount,
>
>ROW_NUMBER() OVER (PARTITION BY time_bucket,
>
>identifier1
>
>   ORDER BY count DESC) as rowNum
>
>   FROM tablename) tmp
>
>   WHERE rowNum <=4
>
>   ORDER BY time_bucket, identifier1, rowNum
>
>
>
> The count and order by:
>
>
>
>
>
> SELECT time_bucket,
>
>identifier1,
>
>identifier2,
>
>count(identifier2) as myCount
>
>   FROM table
>
>   GROUP BY time_bucket,
>
>identifier1,
>
>identifier2
>
>   ORDER BY time_bucket,
>
>identifier1,
>
>count(identifier2) DESC
>
>
>
>
>
> *From: *Andy Dang 
> *Date: *Tuesday, January 3, 2017 at 7:06 AM
> *To: *user 
> *Subject: *top-k function for Window
>
>
>
> Hi all,
>
>
>
> What's the best way to do top-k with Windowing in Dataset world?
>
>
>
> I have a snippet of code that filters the data to the top-k, but with
> skewed keys:
>
>
>
> val windowSpec = Window.parititionBy(skewedKeys).orderBy(dateTime)
>
> val rank = row_number().over(windowSpec)
>
>
>
> input.withColumn("rank", rank).filter("rank <= 10").drop("rank")
>
>
>
> The problem with this code is that Spark doesn't know that it can sort the
> data locally, get the local rank first. What it ends up doing is performing
> a sort by key using the skewed keys, and this blew up the cluster since the
> keys are heavily skewed.
>
>
>
> In the RDD world we can do something like:
>
> rdd.mapPartitioins(iterator -> topK(iterator))
>
> but I can't really think of an obvious to do this in the Dataset API,
> especially with Window function. I guess some UserAggregateFunction would
> do, but I wonder if there's obvious way that I missed.
>
>
>
> ---
> Regards,
> Andy
>
>
>
>
>


Re: Spark kryo serialization register Datatype[]

2016-12-21 Thread Georg Heiler
I already set

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

to enable kryo and

.set("spark.kryo.registrationRequired", "true")

to force kryo. Strangely, I see the issue of this missing Dataset[]

Trying to register regular classes like Date

.registerKryoClasses(Array(classOf[Date]))

works just fine. but registering the spark internal Dataset[] is not
working / as far as I read the docs should be handled by spark.


Vadim Semenov  schrieb am Mi., 21. Dez. 2016
um 17:12 Uhr:

> to enable kryo serializer you just need to pass
> `spark.serializer=org.apache.spark.serializer.KryoSerializer`
>
> the `spark.kryo.registrationRequired` controls the following behavior:
>
> Whether to require registration with Kryo. If set to 'true', Kryo will
> throw an exception if an unregistered class is serialized. If set to false
> (the default), Kryo will write unregistered class names along with each
> object. Writing class names can cause significant performance overhead, so
> enabling this option can enforce strictly that a user has not omitted
> classes from registration.
>
>
> as described here http://spark.apache.org/docs/latest/configuration.html
>
> if it's set to `true` you need to manually register classes as described
> here: http://spark.apache.org/docs/latest/tuning.html#data-serialization
>
>
> On Wed, Dec 21, 2016 at 8:49 AM, geoHeil 
> wrote:
>
> To force spark to use kryo serialization I set
> spark.kryo.registrationRequired to true.
>
> Now spark complains that: Class is not registered:
> org.apache.spark.sql.types.DataType[] is not registered.
> How can I fix this? So far I could not successfully register this class.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-kryo-serialization-register-Datatype-tp28243.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: spark reshape hive table and save to parquet

2016-12-08 Thread Georg Heiler
https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html

Anton Kravchenko  schrieb am Do., 8. Dez.
2016 um 17:53 Uhr:

> Hello,
>
> I wonder if there is a way (preferably efficient) in Spark to reshape hive
> table and save it to parquet.
>
> Here is a minimal example, input hive table:
> col1 col2 col3
> 1 2 3
> 4 5 6
>
> output parquet:
> col1 newcol2
> 1 [2 3]
> 4 [5 6]
>
> p.s. The real input hive table has ~1000 columns.
>
> Thank you,
> Anton
>


Re: Spark sql generated dynamically

2016-12-02 Thread Georg Heiler
Are you sure? I think this is a column wise and not a row wise operation.
ayan guha <guha.a...@gmail.com> schrieb am Fr. 2. Dez. 2016 um 15:17:

> You are looking for window functions.
> On 2 Dec 2016 22:33, "Georg Heiler" <georg.kf.hei...@gmail.com> wrote:
>
> Hi,
>
> how can I perform a group wise operation in spark more elegant? Possibly
> dynamically generate SQL? Or would you suggest a custom UADF?
> http://stackoverflow.com/q/40930003/2587904
>
> Kind regards,
> Georg
>
>


Spark sql generated dynamically

2016-12-02 Thread Georg Heiler
Hi,

how can I perform a group wise operation in spark more elegant? Possibly
dynamically generate SQL? Or would you suggest a custom UADF?
http://stackoverflow.com/q/40930003/2587904

Kind regards,
Georg


Re: build models in parallel

2016-11-29 Thread Georg Heiler
They https://www.youtube.com/watch?v=R-6nAwLyWCI use such functionality via
pyspark.

Xiaomeng Wan  schrieb am Di., 29. Nov. 2016 um
17:54 Uhr:

> I want to divide big data into groups (eg groupby some id), and build one
> model for each group. I am wondering whether I can parallelize the model
> building process by implementing a UDAF (eg running linearregression in its
> evaluate mothod). is it good practice? anybody has experience? Thanks!
>
> Regards,
> Shawn
>


Fill na with last value

2016-11-17 Thread Georg Heiler
How can I fill nan values in spark with the last or the last good known
value?

Here is a minimal example http://stackoverflow.com/q/40592207/2587904
So far I tried a window function but unfortunately received only nan
values.

Kind regards
Georg