Re: [DISCUSS] Support pandas API layer on PySpark

2021-03-17 Thread Georg Heiler
Would you plan to keep the existing indexing mechanism then?
https://koalas.readthedocs.io/en/latest/user_guide/best_practices.html#use-distributed-or-distributed-sequence-default-index
For me, it always even when trying to use the distributed version resulted
in various window functions being chained, a different query plan than the
default query plan, and slower execution of the job due to this overhead.

Especially when some people here are thinking about making it the
default/replacing the regular API I would strongly suggest defaulting to an
indexing mechanism that is not changing the query plan.

Best,
Georg

Am Mi., 17. März 2021 um 12:13 Uhr schrieb Hyukjin Kwon :

> > Just out of curiosity, does Koalas pretty much implement all of the
> Pandas APIs now? If there are some that are yet to be implemented or others
> that have differences, are these documented so users won't be caught
> off-guard?
>
> It's roughly 75% done so far (in Series, DataFrame and Index).
> Yeah, and it throws an exception that says it's not implemented yet
> properly (or intentionally not implemented, e.g.) Series.__iter__ that will
> easily make users shoot their feet by, for example, for loop ... ).
>
>
> 2021년 3월 17일 (수) 오후 2:17, Bryan Cutler 님이 작성:
>
>> +1 the proposal sounds good to me. Having a familiar API built-in will
>> really help new users get into using Spark that might only have Pandas
>> experience. It sounds like maintenance costs should be manageable, once the
>> hurdle with setting up tests is done. Just out of curiosity, does Koalas
>> pretty much implement all of the Pandas APIs now? If there are some that
>> are yet to be implemented or others that have differences, are these
>> documented so users won't be caught off-guard?
>>
>> On Tue, Mar 16, 2021 at 6:54 PM Andrew Melo 
>> wrote:
>>
>>> Hi,
>>>
>>> Integrating Koalas with pyspark might help enable a richer integration
>>> between the two. Something that would be useful with a tighter
>>> integration is support for custom column array types. Currently, Spark
>>> takes dataframes, converts them to arrow buffers then transmits them
>>> over the socket to Python. On the other side, pyspark takes the arrow
>>> buffer and converts it to a Pandas dataframe. Unfortunately, the
>>> default Pandas representation of a list-type for a column causes it to
>>> turn what was contiguous value/offset arrays in Arrow into
>>> deserialized Python objects for each row. Obviously, this kills
>>> performance.
>>>
>>> A PR to extend the pyspark API to elide the pandas conversion
>>> (https://github.com/apache/spark/pull/26783) was submitted and
>>> rejected, which is unfortunate, but perhaps this proposed integration
>>> would provide the hooks via Pandas' ExtensionArray interface to allow
>>> Spark to performantly interchange jagged/ragged lists to/from python
>>> UDFs.
>>>
>>> Cheers
>>> Andrew
>>>
>>> On Tue, Mar 16, 2021 at 8:15 PM Hyukjin Kwon 
>>> wrote:
>>> >
>>> > Thank you guys for all your feedback. I will start working on SPIP
>>> with Koalas team.
>>> > I would expect the SPIP can be sent late this week or early next week.
>>> >
>>> >
>>> > I inlined and answered the questions unanswered as below:
>>> >
>>> > Is the community developing the pandas API layer for Spark interested
>>> in being part of Spark or do they prefer having their own release cycle?
>>> >
>>> > Yeah, Koalas team used to have its own release cycle to develop and
>>> move quickly.
>>> > Now it became pretty mature with reaching 1.7.0, and the team thinks
>>> that it’s now
>>> > fine to have less frequent releases, and they are happy to work
>>> together with Spark with
>>> > contributing to it. The active contributors in the Koalas community
>>> will continue to
>>> > make the contributions in Spark.
>>> >
>>> > How about test code? Does it fit into the PySpark test framework?
>>> >
>>> > Yes, this will be one of the places where it needs some efforts.
>>> Koalas currently uses pytest
>>> > with various dependency version combinations (e.g., Python version,
>>> conda vs pip) whereas
>>> > PySpark uses the plain unittests with less dependency version
>>> combinations.
>>> >
>>> > For pytest in Koalas <> unittests in PySpark:
>>> >
>>> >   I am currently thinking we will have to convert the Koalas tests to
>>> use unittests to match
>>> >   with PySpark for now.
>>> >   It is a feasible option for PySpark to migrate to pytest too but it
>>> will need extra effort to
>>> >   make it working with our own PySpark testing framework seamlessly.
>>> >   Koalas team (presumably and likely I) will take a look in any event.
>>> >
>>> > For the combinations of dependency versions:
>>> >
>>> >   Due to the lack of the resources in GitHub Actions, I currently plan
>>> to just add the
>>> >   Koalas tests into the matrix PySpark is currently using.
>>> >
>>> > one question I have; what’s an initial goal of the proposal?
>>> > Is that to port all the pandas interfaces that Koalas has already
>>> implemented?

Re: pip/conda distribution headless mode

2020-08-30 Thread Georg Heiler
Many thanks.

Best,
Georg

Am Mo., 31. Aug. 2020 um 01:12 Uhr schrieb Xiao Li :

> Hi, Georg,
>
> This is being tracked by https://issues.apache.org/jira/browse/SPARK-32017 You
> can leave comments in the JIRA.
>
> Thanks,
>
> Xiao
>
> On Sun, Aug 30, 2020 at 3:06 PM Georg Heiler 
> wrote:
>
>> Hi,
>>
>> I want to use pyspark as distributed via conda in headless mode.
>> It looks like the hadoop binaries are bundles (= pip distributes a
>> default version)
>> https://stackoverflow.com/questions/63661404/bootstrap-spark-itself-on-yarn
>> .
>>
>> I want to ask if it would be possible to A) distribute the headless
>> version (=without hadoop) instead or B) distribute the headless version
>> additionally for pip & conda-forge distribution channels.
>>
>> Best,
>> Georg
>>
>
>
> --
> <https://databricks.com/sparkaisummit/north-america>
>


pip/conda distribution headless mode

2020-08-30 Thread Georg Heiler
Hi,

I want to use pyspark as distributed via conda in headless mode.
It looks like the hadoop binaries are bundles (= pip distributes a default
version)
https://stackoverflow.com/questions/63661404/bootstrap-spark-itself-on-yarn.

I want to ask if it would be possible to A) distribute the headless version
(=without hadoop) instead or B) distribute the headless version
additionally for pip & conda-forge distribution channels.

Best,
Georg


custom FileStreamSource which reads from one partition onwards

2019-09-20 Thread Georg Heiler
Hi,

to my best knowledge, the existing FileStreamSource reads all the files in
a directory (hive table).
However, I need to be able to specify an initial partition it should start
from (i.e. like a Kafka offset/initial warmed-up state) and then only read
data which is semantically (i.e. using a file path lexicographically)
greater than the minimum committed initial state?

After playing around with the internals of the file format I have come to
the conclusion that manually modifying it and setting some values (i.e. the
last processed & committed partition) is not  feasible as spark regardless
will pick up all the files (even the older partitions)
https://stackoverflow.com/questions/58004832/spark-structured-streaming-file-source-read-from-a-certain-partition-onwards

Is this correct?

This leads me to the conclusion I need a custom StatefulFileStreamSource. I
tried to create one (
https://gist.github.com/geoHeil/6c0c51e43469ace71550b426cfcce1c1), but so
far fail to instantiate it (even though it is just a copy of the original
one as the constructor without any arguments does not seem to be defined:

NoSuchMethodException:
org.apache.spark.sql.execution.streaming.StatefulFileStreamSource.()

Why is the default constructor not found? Even for a simple copy of an
existing (and presumably working class?

Note, I am currently working on spark 2.2.3

Best,
Georg


Re: Custom Window Function

2019-01-25 Thread Georg Heiler
Hi,

https://stackoverflow.com/questions/32100973/how-to-define-and-use-a-user-defined-aggregate-function-in-spark-sql
has a good overview and the best sample I have found so far. (besides spark
source code).

Best,
Georg

Am Mi., 23. Jan. 2019 um 17:16 Uhr schrieb Georg Heiler <
georg.kf.hei...@gmail.com>:

> Hi Herman,
>
> Thanks a lot. So far I only found most of the documentation about UDAF.
> Could you point me anywhere (besides just reading spark's source code)
> which explains how to work with custom AggregateFunctions?
>
> Best,
> Georg
>
> Am Mi., 23. Jan. 2019 um 16:02 Uhr schrieb Herman van Hovell <
> her...@databricks.com>:
>
>> Hi Georg,
>>
>> In most cases you want to implement an aggregate function. You can either
>> define a UDAF, or an AggregateFunction. The latter is an internal
>> expression, so it is faster but we might break it across versions of spark.
>>
>> Hope this helps.
>>
>> Cheers,
>> Herman
>>
>> On Wed, Jan 23, 2019 at 3:55 PM Georg Heiler 
>> wrote:
>>
>>> Hi,
>>>
>>> I want to write custom window functions in spark which are also
>>> optimisable for catalyst.
>>> Can you provide some hints where to start?
>>>
>>> Also posting to DEVLIST as I believe this is a rather exotic topic.
>>>
>>> Best,
>>> Georg
>>>
>>


Re: Custom Window Function

2019-01-23 Thread Georg Heiler
Hi Herman,

Thanks a lot. So far I only found most of the documentation about UDAF.
Could you point me anywhere (besides just reading spark's source code)
which explains how to work with custom AggregateFunctions?

Best,
Georg

Am Mi., 23. Jan. 2019 um 16:02 Uhr schrieb Herman van Hovell <
her...@databricks.com>:

> Hi Georg,
>
> In most cases you want to implement an aggregate function. You can either
> define a UDAF, or an AggregateFunction. The latter is an internal
> expression, so it is faster but we might break it across versions of spark.
>
> Hope this helps.
>
> Cheers,
> Herman
>
> On Wed, Jan 23, 2019 at 3:55 PM Georg Heiler 
> wrote:
>
>> Hi,
>>
>> I want to write custom window functions in spark which are also
>> optimisable for catalyst.
>> Can you provide some hints where to start?
>>
>> Also posting to DEVLIST as I believe this is a rather exotic topic.
>>
>> Best,
>> Georg
>>
>


Custom Window Function

2019-01-23 Thread Georg Heiler
Hi,

I want to write custom window functions in spark which are also optimisable
for catalyst.
Can you provide some hints where to start?

Also posting to DEVLIST as I believe this is a rather exotic topic.

Best,
Georg


Derby dependency missing for spark-hive form 2.2.1 onwards

2018-03-31 Thread Georg Heiler
Hi,

I noticed that spark standalone (locally for development) will no longer
support the integrated hive megastore as some driver classes for derby seem
to be missing from 2.2.1 and onwards (2.3.0). It works just fine for 2.2.0
or previous versions to execute the following script:

spark.sql("CREATE database foobar")
The exception I see for newer versions of spark is:

NoClassDefFoundError: Could not initialize class
org.apache.derby.jdbc.EmbeddedDriver

Simply adding derby as a dependency in SBT did not solve this issue for me.

Best,
Georg


Re: Compiling Spark UDF at runtime

2018-01-12 Thread Georg Heiler
You could store the jar in hdfs. Then even in yarn cluster mode your give
workaround should work.
Michael Shtelma  schrieb am Fr. 12. Jan. 2018 um 12:58:

> Hi all,
>
> I would like to be able to compile Spark UDF at runtime. Right now I
> am using Janino for that.
> My problem is, that in order to make my compiled functions visible to
> spark, I have to set janino classloader (janino gives me classloader
> with compiled UDF classes) as context class loader before I create
> Spark Session. This approach is working locally for debugging purposes
> but is not going to work in cluster mode, because the UDF classes will
> not be distributed to the worker nodes.
>
> An alternative is to register UDF via Hive functionality and generate
> temporary jar somewhere, which at least in Standalone cluster mode
> will be made available to spark workers using embedded http server. As
> far as I understand, this is not going to work in yarn mode.
>
> I am wondering now, how is it better to approach this problem? My
> current best idea is to develop own small netty based file web server
> and use it in order to distribute my custom jar, which can be created
> on the fly, to workers both in standalone and in yarn modes. Can I
> reference the jar in form  of http url using extra driver options and
> then register UDFs contained in this jar using spark.udf().* methods?
>
> Does anybody have any better ideas?
> Any assistance would be greatly appreciated!
>
> Thanks,
> Michael
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Schema Evolution in Apache Spark

2018-01-11 Thread Georg Heiler
Isn't this related to the data format used, i.e. parquet, Avro, ... which
already support changing schema?

Dongjoon Hyun  schrieb am Fr., 12. Jan. 2018 um
02:30 Uhr:

> Hi, All.
>
> A data schema can evolve in several ways and Apache Spark 2.3 already
> supports the followings for file-based data sources like
> CSV/JSON/ORC/Parquet.
>
> 1. Add a column
> 2. Remove a column
> 3. Change a column position
> 4. Change a column type
>
> Can we guarantee users some schema evolution coverage on file-based data
> sources by adding schema evolution test suites explicitly? So far, there
> are some test cases.
>
> For simplicity, I have several assumptions on schema evolution.
>
> 1. A safe evolution without data loss.
> - e.g. from small types to larger types like int-to-long, not vice
> versa.
> 2. Final schema is given by users (or Hive)
> 3. Simple Spark data types supported by Spark vectorized execution.
>
> I made a test case PR to receive your opinions for this.
>
> [SPARK-23007][SQL][TEST] Add schema evolution test suite for file-based
> data sources
> - https://github.com/apache/spark/pull/20208
>
> Could you take a look and give some opinions?
>
> Bests,
> Dongjoon.
>


Re: In structured streamin, multiple streaming aggregations are not yet supported.

2017-11-28 Thread Georg Heiler
2.3 around January
0.0 <407216...@qq.com> schrieb am Mi. 29. Nov. 2017 um 05:08:

> Hi, all:
> Multiple streaming aggregations are not yet supported. When will it be
> supported? Is it in the plan?
>
> Thanks.
>


custom column types for JDBC datasource writer

2017-07-05 Thread Georg Heiler
Hi,
is it possible to somehow make spark not use VARCHAR(255) but something
bigger i.e. CLOB for Strings?

If not, is it at least possible to catch the exception which is thrown. To
me, it seems that spark is catching and logging it - so I can no longer
intervene and handle it:

https://stackoverflow.com/questions/44927764/spark-jdbc-oracle-long-string-fields

Regards,
Georg


spark encoder not working for UDF

2017-06-25 Thread Georg Heiler
Hi,

I have a custom spark kayo encoder but that one is not in scope for the
UDFs to work.
https://stackoverflow.com/questions/44735235/spark-custom-kryo-encoder-not-providing-schema-for-udf


Regards,
Georg


Re: spark messing up handling of native dependency code?

2017-06-05 Thread Georg Heiler
I read http://techblog.applift.com/upgrading-spark  an conducted further
research. I think there is some problem with the class
loader. Unfortunately, so far, I did not get it to work.

Georg Heiler  schrieb am Sa., 3. Juni 2017 um
08:27 Uhr:

> When tested without any parallelism the same problem persists. Actually,
> NiFi shows the same issues. So probably it is not related to spark.
>
> Maciej Szymkiewicz  schrieb am Sa., 3. Juni 2017
> um 01:37 Uhr:
>
>> Maybe not related, but in general geotools are not thread safe,so using
>> from workers is most likely a gamble.
>> On 06/03/2017 01:26 AM, Georg Heiler wrote:
>>
>> Hi,
>>
>> There is a weird problem with spark when handling native dependency code:
>> I want to use a library (JAI) with spark to parse some spatial raster
>> files. Unfortunately, there are some strange issues. JAI only works when
>> running via the build tool i.e. `sbt run` when executed in spark.
>>
>> When executed via spark-submit the error is:
>>
>> java.lang.IllegalArgumentException: The input argument(s) may not be
>> null.
>> at
>> javax.media.jai.ParameterBlockJAI.getDefaultMode(ParameterBlockJAI.java:136)
>> at
>> javax.media.jai.ParameterBlockJAI.(ParameterBlockJAI.java:157)
>> at
>> javax.media.jai.ParameterBlockJAI.(ParameterBlockJAI.java:178)
>> at
>> org.geotools.process.raster.PolygonExtractionProcess.execute(PolygonExtractionProcess.java:171)
>>
>> Which looks like some native dependency (I think GEOS is running in the
>> background) is not there correctly.
>>
>> Assuming something is wrong with the class path I tried to run a plain
>> java/scala function. but this one works just fine.
>>
>> Is spark messing with the class paths?
>>
>> I created a minimal example here:
>> https://github.com/geoHeil/jai-packaging-problem
>>
>>
>> Hope someone can shed some light on this problem,
>> Regards,
>> Georg
>>
>>
>>


Re: spark messing up handling of native dependency code?

2017-06-02 Thread Georg Heiler
When tested without any parallelism the same problem persists. Actually,
NiFi shows the same issues. So probably it is not related to spark.

Maciej Szymkiewicz  schrieb am Sa., 3. Juni 2017 um
01:37 Uhr:

> Maybe not related, but in general geotools are not thread safe,so using
> from workers is most likely a gamble.
> On 06/03/2017 01:26 AM, Georg Heiler wrote:
>
> Hi,
>
> There is a weird problem with spark when handling native dependency code:
> I want to use a library (JAI) with spark to parse some spatial raster
> files. Unfortunately, there are some strange issues. JAI only works when
> running via the build tool i.e. `sbt run` when executed in spark.
>
> When executed via spark-submit the error is:
>
> java.lang.IllegalArgumentException: The input argument(s) may not be
> null.
> at
> javax.media.jai.ParameterBlockJAI.getDefaultMode(ParameterBlockJAI.java:136)
> at
> javax.media.jai.ParameterBlockJAI.(ParameterBlockJAI.java:157)
> at
> javax.media.jai.ParameterBlockJAI.(ParameterBlockJAI.java:178)
> at
> org.geotools.process.raster.PolygonExtractionProcess.execute(PolygonExtractionProcess.java:171)
>
> Which looks like some native dependency (I think GEOS is running in the
> background) is not there correctly.
>
> Assuming something is wrong with the class path I tried to run a plain
> java/scala function. but this one works just fine.
>
> Is spark messing with the class paths?
>
> I created a minimal example here:
> https://github.com/geoHeil/jai-packaging-problem
>
>
> Hope someone can shed some light on this problem,
> Regards,
> Georg
>
>
>


spark messing up handling of native dependency code?

2017-06-02 Thread Georg Heiler
Hi,

There is a weird problem with spark when handling native dependency code:
I want to use a library (JAI) with spark to parse some spatial raster
files. Unfortunately, there are some strange issues. JAI only works when
running via the build tool i.e. `sbt run` when executed in spark.

When executed via spark-submit the error is:

java.lang.IllegalArgumentException: The input argument(s) may not be
null.
at
javax.media.jai.ParameterBlockJAI.getDefaultMode(ParameterBlockJAI.java:136)
at javax.media.jai.ParameterBlockJAI.(ParameterBlockJAI.java:157)
at javax.media.jai.ParameterBlockJAI.(ParameterBlockJAI.java:178)
at
org.geotools.process.raster.PolygonExtractionProcess.execute(PolygonExtractionProcess.java:171)

Which looks like some native dependency (I think GEOS is running in the
background) is not there correctly.

Assuming something is wrong with the class path I tried to run a plain
java/scala function. but this one works just fine.

Is spark messing with the class paths?

I created a minimal example here:
https://github.com/geoHeil/jai-packaging-problem


Hope someone can shed some light on this problem,
Regards,
Georg


Generic datasets implicit encoder missing

2017-05-29 Thread Georg Heiler
Hi,

Anyone knows what is wrong with using a generic
https://stackoverflow.com/q/44247874/2587904 to construct a dataset? Even
though the implicits are imported, they are missing.

Regards Georg


Re: Spark Local Pipelines

2017-03-12 Thread Georg Heiler
Great idea. I see the same problem.
I would suggest checking the following projects as a kick start as well (
not only mleap)
https://github.com/ucbrise/clipper and
https://github.com/Hydrospheredata/mist

Regards Georg
Asher Krim  schrieb am So. 12. März 2017 um 23:21:

> Hi All,
>
> I spent a lot of time at Spark Summit East this year talking with Spark
> developers and committers about challenges with productizing Spark. One of
> the biggest shortcomings I've encountered in Spark ML pipelines is the lack
> of a way to serve single requests with any reasonable performance.
> SPARK-10413 explores adding methods for single item prediction, but I'd
> like to explore a more holistic approach - a separate local api, with
> models that support transformations without depending on Spark at all.
>
> I've written up a doc
> 
> detailing the approach, and I'm happy to discuss alternatives. If this
> gains traction, I can create a branch with a minimal example on a simple
> transformer (probably something like CountVectorizerModel) so we have
> something concrete to continue the discussion on.
>
> Thanks,
> Asher Krim
> Senior Software Engineer
>


Re: Spark Job Performance monitoring approaches

2017-02-15 Thread Georg Heiler
I know of the following tools
https://sites.google.com/site/sparkbigdebug/home
https://engineering.linkedin.com/blog/2016/04/dr-elephant-open-source-self-serve-performance-tuning-hadoop-spark
 https://github.com/SparkMonitor/varOne https://github.com/groupon/sparklint


Chetan Khatri  schrieb am Do., 16. Feb. 2017
um 06:15 Uhr:

> Hello All,
>
> What would be the best approches to monitor Spark Performance, is there
> any tools for Spark Job Performance monitoring ?
>
> Thanks.
>


Re: handling of empty partitions

2017-01-11 Thread Georg Heiler
I see that there is the possibility to improve and make the algorithm more
fault tolerant as outlined by both of you.
Could you explain a little bit more why

+--++
|   foo| bar|
+--++
|2016-01-01|   first|
|2016-01-02|  second|
|  null|   noValidFormat|
|2016-01-04|lastAssumingSameDate|
+--++

+--++
|   foo| bar|
+--++
|2016-01-01|   first|
|2016-01-02|  second|
|2016-01-04|   noValidFormat|
|2016-01-04|lastAssumingSameDate|
+--++

i.e. that the records are not filled with the last good known value but
rather "the next one" is so clear.
Why does it depend on the partitions?
As the broadcast map is available to all the partitions shouldn't this be
the same regardless of partitioning?

The (too simple fix to be applicable generally)
 if (lastNotNullRow == None) {
lastNotNullRow = toCarryBd.value.get(i + 1).get
  }
as of choosing the next element is only applied when the partition does not
contain new values.

Kind regards,
Georg

Liang-Chi Hsieh  schrieb am Do., 12. Jan. 2017 um
03:48 Uhr:

>
> Hi Georg,
>
> It is not strange. As I said before, it depends how the data is
> partitioned.
>
> When you try to get the available value from next partition like this:
>
> var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
>   if (lastNotNullRow == None) {
> lastNotNullRow = toCarryBd.value.get(i + 1).get
>   }
>
> You may need to make sure the next partition has a value too. Holden has
> pointed out before, you need to deal with the case that the previous/next
> partition is empty too and go next until you find a non-empty partition.
>
>
>
> geoHeil wrote
> > Hi Liang-Chi Hsieh,
> >
> > Strange:
> > As the "toCarry" returned is the following when I tested your codes:
> >
> > Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
> > Some(FooBar(Some(2016-01-02),second)))
> > For me it always looked like:
> >
> > ## carry
> > Map(2 -> None, 5 -> None, 4 -> None, 7 ->
> > Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 ->
> > Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)),
> > 6 -> None, 0 -> None)
> > (2,None)
> > (5,None)
> > (4,None)
> > (7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
> > (1,Some(FooBar(2016-01-01,first)))
> > (3,Some(FooBar(2016-01-02,second)))
> > (6,None)
> > (0,None)
> > ()
> > ## carry
> >
> >
> > I updated the code to contain a fixed default parallelism
> > .set("spark.default.parallelism", "12")
> >
> > Also:
> > I updated the sample code:
> > https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2
> >
> > To cope with "empty/ none" partitions I added
> >
> > var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
> >   if (lastNotNullRow == None) {
> > lastNotNullRow = toCarryBd.value.get(i + 1).get
> >   }
> >
> >
> > But that will result in
> >
> > +--++
> > |   foo| bar|
> > +--++
> > |2016-01-01|   first|
> > |2016-01-02|  second|
> > |  null|   noValidFormat|
> > |2016-01-04|lastAssumingSameDate|
> > +--++
> >
> > +--++
> > |   foo| bar|
> > +--++
> > |2016-01-01|   first|
> > |2016-01-02|  second|
> > |2016-01-04|   noValidFormat|
> > |2016-01-04|lastAssumingSameDate|
> > +--++
> >
> > You see that noValidFormat should have been filled with 2016-01-02 to be
> > filled with last good known value (forward fill)
> > Cheers,
> > Georg
> >
> > Liang-Chi Hsieh <
>
> > viirya@
>
> > > schrieb am Mo., 9. Jan. 2017 um
> > 09:08 Uhr:
> >
> >>
> >> The map "toCarry" will return you (partitionIndex, None) for empty
> >> partition.
> >>
> >> So I think line 51 won't fail. Line 58 can fail if "lastNotNullRow" is
> >> None.
> >> You of course should check if an Option has value or not before you
> >> access
> >> it.
> >>
> >> As the "toCarry" returned is the following when I tested your codes:
> >>
> >> Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
> >> Some(FooBar(Some(2016-01-02),second)))
> >>
> >> As you seen, there is no None, so the codes work without failure. But of
> >> course it depends how your data partitions.
> >>
> >> For empty partition, when you do mapPartitions, it just gives you an
> >> empty
> >> iterator as input. You can do what you need. You already return a None
> >> when
> >> you find an empty iterator in preparing "toCarry". So I was wondering
> >> what
> >> you want to ask in the previous reply.
> >>
> >>
> >>
> >> geoHeil wrote
> >> > Thanks a lot, Holden.
> >> >
> >> > @Liang-Ch

Re: handling of empty partitions

2017-01-09 Thread Georg Heiler
Hi Liang-Chi Hsieh,

Strange:
As the "toCarry" returned is the following when I tested your codes:

Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
Some(FooBar(Some(2016-01-02),second)))
For me it always looked like:

## carry
Map(2 -> None, 5 -> None, 4 -> None, 7 ->
Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 ->
Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)),
6 -> None, 0 -> None)
(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)
()
## carry


I updated the code to contain a fixed default parallelism
.set("spark.default.parallelism", "12")

Also:
I updated the sample code:
https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2

To cope with "empty/ none" partitions I added

var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
  if (lastNotNullRow == None) {
lastNotNullRow = toCarryBd.value.get(i + 1).get
  }


But that will result in

+--++
|   foo| bar|
+--++
|2016-01-01|   first|
|2016-01-02|  second|
|  null|   noValidFormat|
|2016-01-04|lastAssumingSameDate|
+--++

+--++
|   foo| bar|
+--++
|2016-01-01|   first|
|2016-01-02|  second|
|2016-01-04|   noValidFormat|
|2016-01-04|lastAssumingSameDate|
+--++

You see that noValidFormat should have been filled with 2016-01-02 to be
filled with last good known value (forward fill)
Cheers,
Georg

Liang-Chi Hsieh  schrieb am Mo., 9. Jan. 2017 um
09:08 Uhr:

>
> The map "toCarry" will return you (partitionIndex, None) for empty
> partition.
>
> So I think line 51 won't fail. Line 58 can fail if "lastNotNullRow" is
> None.
> You of course should check if an Option has value or not before you access
> it.
>
> As the "toCarry" returned is the following when I tested your codes:
>
> Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
> Some(FooBar(Some(2016-01-02),second)))
>
> As you seen, there is no None, so the codes work without failure. But of
> course it depends how your data partitions.
>
> For empty partition, when you do mapPartitions, it just gives you an empty
> iterator as input. You can do what you need. You already return a None when
> you find an empty iterator in preparing "toCarry". So I was wondering what
> you want to ask in the previous reply.
>
>
>
> geoHeil wrote
> > Thanks a lot, Holden.
> >
> > @Liang-Chi Hsieh did you try to run
> > https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 for me
> > that is crashing in either line 51 or 58. Holden described the problem
> > pretty well. Ist it clear for you now?
> >
> > Cheers,
> > Georg
> >
> > Holden Karau [via Apache Spark Developers List] <
>
> > ml-node+s1001551n20516h45@.nabble
>
> >> schrieb am Mo., 9. Jan. 2017 um
> > 06:40 Uhr:
> >
> >> Hi Georg,
> >>
> >> Thanks for the question along with the code (as well as posting to stack
> >> overflow). In general if a question is well suited for stackoverflow its
> >> probably better suited to the user@ list instead of the dev@ list so
> I've
> >> cc'd the user@ list for you.
> >>
> >> As far as handling empty partitions when working mapPartitions (and
> >> similar), the general approach is to return an empty iterator of the
> >> correct type when you have an empty input iterator.
> >>
> >> It looks like your code is doing this, however it seems like you likely
> >> have a bug in your application logic (namely it assumes that if a
> >> partition
> >> has a record missing a value it will either have had a previous row in
> >> the
> >> same partition which is good OR that the previous partition is not empty
> >> and has a good row - which need not necessarily be the case). You've
> >> partially fixed this problem by going through and for each partition
> >> collecting the last previous good value, and then if you don't have a
> >> good
> >> value at the start of a partition look up the value in the collected
> >> array.
> >>
> >> However, if this also happens at the same time the previous partition is
> >> empty, you will need to go and lookup the previous previous partition
> >> value
> >> until you find the one you are looking for. (Note this assumes that the
> >> first record in your dataset is valid, if it isn't your code will still
> >> fail).
> >>
> >> Your solution is really close to working but just has some minor
> >> assumptions which don't always necessarily hold.
> >>
> >> Cheers,
> >>
> >> Holden :)
> >> On Sun, Jan 8, 2017 at 8:30 PM, Liang-Chi Hsieh <[hidden email]
> >> ;>
> >> wrote:
> >>
> >>
> >> Hi Georg,
> >>
> >> Can you describe your question more c

Re: modifications to ALS.scala

2016-12-08 Thread Georg Heiler
You can write some code e.g. A custom estimator transformer in sparks
namespace.
http://stackoverflow.com/a/40785438/2587904 might help you get started.
Be aware that using private e.g. Spark internal api might be subjected to
change from release to release.

You definitely will require spark -mllib dependency.

Currently for my usage I was not required to build a separate version of
mllib.
harini  schrieb am Do. 8. Dez. 2016 um 00:23:

> I am new to development with spark, how do I do that? Can I write up a
> custom
> implementation under package org.apache.spark.ml.recommendation, and
> specify
> "spark-mllib" along with others as a library dependency?
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/modifications-to-ALS-scala-tp20167p20169.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: modifications to ALS.scala

2016-12-07 Thread Georg Heiler
What about putting a custom als implementation into sparks name space?
harini  schrieb am Do. 8. Dez. 2016 um 00:01:

> Hi all, I am trying to implement ALS with a slightly modified objective
> function, which will require minor changes to fit -> train ->
> computeFactors within ALS.scala
> 
> - Is there a way to do this without having to build spark in its entirety?
> --
> View this message in context: modifications to ALS.scala
> 
> Sent from the Apache Spark Developers List mailing list archive
>  at
> Nabble.com.
>


Re: SparkUI via proxy

2016-11-24 Thread Georg Heiler
Sehr Port forwarding will help you out.
marco rocchi  schrieb am Do. 24. Nov.
2016 um 16:33:

> Hi,
> I'm working with Apache Spark in order to develop my master thesis.I'm new
> in spark and working with cluster. I searched through internet but I didn't
> found a way to solve.
> My problem is the following one: from my pc I can access to a master node
> of a cluster only via proxy.
> To connect to proxy and then to master node,I have to set up an ssh
> tunnel, but from parctical point of view I have no idea of how in this way
> I can interact with WebUI spark.
> Anyone can help me?
> Thanks in advance
>


Re: Develop custom Estimator / Transformer for pipeline

2016-11-20 Thread Georg Heiler
The estimator should perform data cleaning tasks. This means some rows will
be dropped, some columns dropped, some columns added, some values replaced
in existing columns. IT should also store the mean or min for some numeric
columns as a NaN replacement.

However,

override def transformSchema(schema: StructType): StructType = {
   schema.add(StructField("foo", IntegerType))}

only supports adding fields? I am curious how am I supposed to handle this.
Should I create a new column for each affected column, drop the old one and
rename afterward?


Regards,

Georg

On Fri, Nov 18, 2016 at 7:39 AM Georg Heiler 
> wrote:
>
> Yes that would be really great. Thanks a lot
> Holden Karau  schrieb am Fr. 18. Nov. 2016 um 07:38:
>
> Hi Greg,
>
> So while the post isn't 100% finished if you would want to review a draft
> copy I can share a google doc with you. Would that be useful?
>
> Cheers,
>
> Holden :)
>
> On Fri, Nov 18, 2016 at 7:07 AM Georg Heiler 
> wrote:
>
> Looking forward to the blog post.
> Thanks for for pointing me to some of the simpler classes.
> Nick Pentreath  schrieb am Fr. 18. Nov. 2016 um
> 02:53:
>
> @Holden look forward to the blog post - I think a user guide PR based on
> it would also be super useful :)
>
>
> On Fri, 18 Nov 2016 at 05:29 Holden Karau  wrote:
>
> I've been working on a blog post around this and hope to have it published
> early next month 😀
>
> On Nov 17, 2016 10:16 PM, "Joseph Bradley"  wrote:
>
> Hi Georg,
>
> It's true we need better documentation for this.  I'd recommend checking
> out simple algorithms within Spark for examples:
> ml.feature.Tokenizer
> ml.regression.IsotonicRegression
>
> You should not need to put your library in Spark's namespace.  The shared
> Params in SPARK-7146 are not necessary to create a custom algorithm; they
> are just niceties.
>
> Though there aren't great docs yet, you should be able to follow existing
> examples.  And I'd like to add more docs in the future!
>
> Good luck,
> Joseph
>
> On Wed, Nov 16, 2016 at 6:29 AM, Georg Heiler 
> wrote:
>
> HI,
>
> I want to develop a library with custom Estimator / Transformers for
> spark. So far not a lot of documentation could be found but
> http://stackoverflow.com/questions/37270446/how-to-roll-a-custom-estimator-in-pyspark-mllib
>
>
> Suggest that:
> Generally speaking, there is no documentation because as for Spark 1.6 /
> 2.0 most of the related API is not intended to be public. It should change
> in Spark 2.1.0 (see SPARK-7146
> <https://issues.apache.org/jira/browse/SPARK-7146>).
>
> Where can I already find documentation today?
> Is it true that my library would require residing in Sparks`s namespace
> similar to https://github.com/collectivemedia/spark-ext to utilize all
> the handy functionality?
>
> Kind Regards,
> Georg
>
>
>
>


Re: Develop custom Estimator / Transformer for pipeline

2016-11-17 Thread Georg Heiler
Looking forward to the blog post.
Thanks for for pointing me to some of the simpler classes.
Nick Pentreath  schrieb am Fr. 18. Nov. 2016 um
02:53:

> @Holden look forward to the blog post - I think a user guide PR based on
> it would also be super useful :)
>
>
> On Fri, 18 Nov 2016 at 05:29 Holden Karau  wrote:
>
> I've been working on a blog post around this and hope to have it published
> early next month 😀
>
> On Nov 17, 2016 10:16 PM, "Joseph Bradley"  wrote:
>
> Hi Georg,
>
> It's true we need better documentation for this.  I'd recommend checking
> out simple algorithms within Spark for examples:
> ml.feature.Tokenizer
> ml.regression.IsotonicRegression
>
> You should not need to put your library in Spark's namespace.  The shared
> Params in SPARK-7146 are not necessary to create a custom algorithm; they
> are just niceties.
>
> Though there aren't great docs yet, you should be able to follow existing
> examples.  And I'd like to add more docs in the future!
>
> Good luck,
> Joseph
>
> On Wed, Nov 16, 2016 at 6:29 AM, Georg Heiler 
> wrote:
>
> HI,
>
> I want to develop a library with custom Estimator / Transformers for
> spark. So far not a lot of documentation could be found but
> http://stackoverflow.com/questions/37270446/how-to-roll-a-custom-estimator-in-pyspark-mllib
>
>
> Suggest that:
> Generally speaking, there is no documentation because as for Spark 1.6 /
> 2.0 most of the related API is not intended to be public. It should change
> in Spark 2.1.0 (see SPARK-7146
> <https://issues.apache.org/jira/browse/SPARK-7146>).
>
> Where can I already find documentation today?
> Is it true that my library would require residing in Sparks`s namespace
> similar to https://github.com/collectivemedia/spark-ext to utilize all
> the handy functionality?
>
> Kind Regards,
> Georg
>
>
>
>


Develop custom Estimator / Transformer for pipeline

2016-11-16 Thread Georg Heiler
HI,

I want to develop a library with custom Estimator / Transformers for spark.
So far not a lot of documentation could be found but
http://stackoverflow.com/questions/37270446/how-to-roll-a-custom-estimator-in-pyspark-mllib


Suggest that:
Generally speaking, there is no documentation because as for Spark 1.6 /
2.0 most of the related API is not intended to be public. It should change
in Spark 2.1.0 (see SPARK-7146
).

Where can I already find documentation today?
Is it true that my library would require residing in Sparks`s namespace
similar to https://github.com/collectivemedia/spark-ext to utilize all the
handy functionality?

Kind Regards,
Georg