Hi Georg,

It is not strange. As I said before, it depends how the data is partitioned.

When you try to get the available value from next partition like this:

var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
      if (lastNotNullRow == None) {
        lastNotNullRow = toCarryBd.value.get(i + 1).get
      }

You may need to make sure the next partition has a value too. Holden has
pointed out before, you need to deal with the case that the previous/next
partition is empty too and go next until you find a non-empty partition.



geoHeil wrote
> Hi Liang-Chi Hsieh,
> 
> Strange:
> As the "toCarry" returned is the following when I tested your codes:
> 
> Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
> Some(FooBar(Some(2016-01-02),second)))
> For me it always looked like:
> 
> ###################### carry
> Map(2 -> None, 5 -> None, 4 -> None, 7 ->
> Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 ->
> Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)),
> 6 -> None, 0 -> None)
> (2,None)
> (5,None)
> (4,None)
> (7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
> (1,Some(FooBar(2016-01-01,first)))
> (3,Some(FooBar(2016-01-02,second)))
> (6,None)
> (0,None)
> ()
> ###################### carry
> 
> 
> I updated the code to contain a fixed default parallelism
> .set("spark.default.parallelism", "12")
> 
> Also:
> I updated the sample code:
> https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2
> 
> To cope with "empty/ none" partitions I added
> 
> var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get
>       if (lastNotNullRow == None) {
>         lastNotNullRow = toCarryBd.value.get(i + 1).get
>       }
> 
> 
> But that will result in
> 
> +----------+--------------------+
> |       foo|                 bar|
> +----------+--------------------+
> |2016-01-01|               first|
> |2016-01-02|              second|
> |      null|       noValidFormat|
> |2016-01-04|lastAssumingSameDate|
> +----------+--------------------+
> 
> +----------+--------------------+
> |       foo|                 bar|
> +----------+--------------------+
> |2016-01-01|               first|
> |2016-01-02|              second|
> |2016-01-04|       noValidFormat|
> |2016-01-04|lastAssumingSameDate|
> +----------+--------------------+
> 
> You see that noValidFormat should have been filled with 2016-01-02 to be
> filled with last good known value (forward fill)
> Cheers,
> Georg
> 
> Liang-Chi Hsieh <

> viirya@

> > schrieb am Mo., 9. Jan. 2017 um
> 09:08 Uhr:
> 
>>
>> The map "toCarry" will return you (partitionIndex, None) for empty
>> partition.
>>
>> So I think line 51 won't fail. Line 58 can fail if "lastNotNullRow" is
>> None.
>> You of course should check if an Option has value or not before you
>> access
>> it.
>>
>> As the "toCarry" returned is the following when I tested your codes:
>>
>> Map(1 -> Some(FooBar(Some(2016-01-04),lastAssumingSameDate)), 0 ->
>> Some(FooBar(Some(2016-01-02),second)))
>>
>> As you seen, there is no None, so the codes work without failure. But of
>> course it depends how your data partitions.
>>
>> For empty partition, when you do mapPartitions, it just gives you an
>> empty
>> iterator as input. You can do what you need. You already return a None
>> when
>> you find an empty iterator in preparing "toCarry". So I was wondering
>> what
>> you want to ask in the previous reply.
>>
>>
>>
>> geoHeil wrote
>> > Thanks a lot, Holden.
>> >
>> > @Liang-Chi Hsieh did you try to run
>> > https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 for me
>> > that is crashing in either line 51 or 58. Holden described the problem
>> > pretty well. Ist it clear for you now?
>> >
>> > Cheers,
>> > Georg
>> >
>> > Holden Karau [via Apache Spark Developers List] <
>>
>> > ml-node+s1001551n20516h45@.nabble
>>
>> >> schrieb am Mo., 9. Jan. 2017 um
>> > 06:40 Uhr:
>> >
>> >> Hi Georg,
>> >>
>> >> Thanks for the question along with the code (as well as posting to
>> stack
>> >> overflow). In general if a question is well suited for stackoverflow
>> its
>> >> probably better suited to the user@ list instead of the dev@ list so
>> I've
>> >> cc'd the user@ list for you.
>> >>
>> >> As far as handling empty partitions when working mapPartitions (and
>> >> similar), the general approach is to return an empty iterator of the
>> >> correct type when you have an empty input iterator.
>> >>
>> >> It looks like your code is doing this, however it seems like you
>> likely
>> >> have a bug in your application logic (namely it assumes that if a
>> >> partition
>> >> has a record missing a value it will either have had a previous row in
>> >> the
>> >> same partition which is good OR that the previous partition is not
>> empty
>> >> and has a good row - which need not necessarily be the case). You've
>> >> partially fixed this problem by going through and for each partition
>> >> collecting the last previous good value, and then if you don't have a
>> >> good
>> >> value at the start of a partition look up the value in the collected
>> >> array.
>> >>
>> >> However, if this also happens at the same time the previous partition
>> is
>> >> empty, you will need to go and lookup the previous previous partition
>> >> value
>> >> until you find the one you are looking for. (Note this assumes that
>> the
>> >> first record in your dataset is valid, if it isn't your code will
>> still
>> >> fail).
>> >>
>> >> Your solution is really close to working but just has some minor
>> >> assumptions which don't always necessarily hold.
>> >>
>> >> Cheers,
>> >>
>> >> Holden :)
>> >> On Sun, Jan 8, 2017 at 8:30 PM, Liang-Chi Hsieh <[hidden email]
>> >> &lt;http:///user/SendEmail.jtp?type=node&amp;node=20516&amp;i=0&gt;>
>> >> wrote:
>> >>
>> >>
>> >> Hi Georg,
>> >>
>> >> Can you describe your question more clear?
>> >>
>> >> Actually, the example codes you posted in stackoverflow doesn't crash
>> as
>> >> you
>> >> said in the post.
>> >>
>> >>
>> >> geoHeil wrote
>> >> > I am working on building a custom ML pipeline-model / estimator to
>> >> impute
>> >> > missing values, e.g. I want to fill with last good known value.
>> >> > Using a window function is slow / will put the data into a single
>> >> > partition.
>> >> > I built some sample code to use the RDD API however, it some None /
>> >> null
>> >> > problems with empty partitions.
>> >> >
>> >> > How should this be implemented properly to handle such empty
>> >> partitions?
>> >> >
>> >>
>> http://stackoverflow.com/questions/41474175/spark-mappartitionswithindex-handling-empty-partitions
>> >> >
>> >> > Kind regards,
>> >> > Georg
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> -----
>> >>
>> >>
>> >> Liang-Chi Hsieh | @viirya
>> >> Spark Technology Center
>> >> http://www.spark.tc/
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20515.html
>> >>
>> >> Sent from the Apache Spark Developers List mailing list archive at
>> >> Nabble.com.
>> >>
>> >> ---------------------------------------------------------------------
>> >>
>> >> To unsubscribe e-mail: [hidden email]
>> >> &lt;http:///user/SendEmail.jtp?type=node&amp;node=20516&amp;i=1&gt;
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Cell : 425-233-8271 <(425)%20233-8271> <(425)%20233-8271>
>> >> Twitter: https://twitter.com/holdenkarau
>> >> If you reply to this email, your message will be added to the
>> discussion
>> >> below:
>> >>
>> >>
>> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20516.html
>> >> To unsubscribe from handling of empty partitions, click here
>> >> &lt;
>> http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&amp;node=20496&amp;code=Z2Vvcmcua2YuaGVpbGVyQGdtYWlsLmNvbXwyMDQ5NnwtMTgzMzc4NTU4MQ==&gt
>> ;
>> >> .
>> >> NAML
>> >> &lt;
>> http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&amp;id=instant_html%21nabble%3Aemail.naml&amp;base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&amp;breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml&gt
>> ;
>> >>
>>
>>
>>
>>
>>
>> -----
>> Liang-Chi Hsieh | @viirya
>> Spark Technology Center
>> http://www.spark.tc/
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20519.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: 

> dev-unsubscribe@.apache

>>
>>





-----
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/handling-of-empty-partitions-tp20496p20558.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to