[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-22 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314, I should have clarified for documentation I meant 
apart from the docstrings you have added now, we also have to include 
documentation in the Flink 
[docs](https://github.com/apache/flink/tree/master/docs/dev/libs/ml) for each 
new addition.

See for example the docs for the [standard 
scaler](https://github.com/apache/flink/blob/master/docs/dev/libs/ml/standard_scaler.md).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-22 Thread tfournier314
Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @thvasilo @greghogan  

Ok I've updated documentation. I stay tuned for updating code. 

Regards
Thomas


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-21 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314,

This PR is still missing documentation. After that is done a project 
committer will have to review it before it gets merged, which might take a 
while.

Regards,
Theodore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-21 Thread tfournier314
Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan @thvasilo What's the next step ? More tests and reviews ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-16 Thread tfournier314
Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan Ok I've pushed the code with my tests and some modifications in 
mapping

@thvasilo It seems to work perfectly! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-15 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan Excuse my ignorance, I'm only now learning about Flink internals 
:)
It seems like the issue here was that `partitionByRange` partitions keys in 
ascending order but we want the end result in descending order.

@tfournier314 I think the following should work, here I use a key extractor 
to negate the value of the key to achieve the desired effect:

```Scala
itData.map(s => (s,1))
  .groupBy(0)
  .sum(1)
  .partitionByRange(x => -x._2) // Take the negative count as the key
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-15 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2740
  
`zipWithIndex` preserves the order between partitions 
(DataSetUtils.java:121). @tfournier314, I don't think it's a problem pushing 
your current code since we're still discussing the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-15 Thread thvasilo
Github user thvasilo commented on the issue:

https://github.com/apache/flink/pull/2740
  
Hello @tfournier314 I tested your code and it does seem that partitions are 
sorted
only internally, which is expected and `zipWithIndex` is AFAIK unaware of 
the sorted (as in key range) order of partitions, so it's not guaranteed that 
the "first" partition will get the `[0, partitionSize-1]` indices, the second 
`[partitionSize, 2*partitionSize]` etc. Maybe @greghogan knows a solution for 
global sorting?

If it's not possible I think we can take a step back and see what we are 
trying to achieve here.

The task is to count the frequencies of labels and assign integer ids to 
them in order of frequency. The labels should either be categorical variables 
(e.g. {Male, Female, Uknown}) or class labels. The case with the most unique 
values might be vocabulary words, which will range in the few million unique 
values at worst.

I would argue then than after we have performed the frequency count in a 
distributed manner there is no need to do the last step which is assigning 
ordered indices in a distributed manner as well, we can make the assumption 
that all the (label -> frequency) values should fit into the memory of one 
machine.

So I would recommend to gather all data into one partition after getting 
the counts, that way we guarantee a global ordering:

```{Scala}
fitData.map(s => (s,1))
  .groupBy(0)
  .sum(1)
  .partitionByRange(x => 0)
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
  .print()
```

Of course we would need to clarify this restriction in the docstrings and 
documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-14 Thread tfournier314
Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
@greghogan I've not pushed the code yet because my tests are still 
incorrect.
Indeed the following code:

val env = ExecutionEnvironment.getExecutionEnvironment
val fitData = 
env.fromCollection(List("a","b","c","a","a","d","a","a","a","b","b","c","a","c","b","c","b"))
fitData.map(s => (s,1)).groupBy(0)
  .reduce((a,b) => (a._1, a._2 + b._2))
  .partitionByRange(1)
  .sortPartition(1, Order.DESCENDING)
  .zipWithIndex
  .print()

returns 

(0,(b,5))
(1,(c,4))
(2,(d,1))
(3,(a,7))

And I would like the following:

(1,(b,5))
(2,(c,4))
(3,(d,1))
(0,(a,7))

Even if the order inside partitions is preserved (with mapPartitions), the 
order between partitions is not right ?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-09 Thread tfournier314
Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
@thvasilo @greghogan I've updated my code so that I'm streaming instead of 
caching with a collect(). Does it seem ok for you ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-04 Thread tfournier314
Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
I've changed my code so that I have now mapping:DataSet[(String,Long)]

val mapping = input
  .mapWith( s => (s, 1) )
  .groupBy( 0 )
  .reduce( (a, b) => (a._1, a._2 + b._2) )
  .partitionByRange( 1 )
  .zipWithIndex
  .mapWith { case (id, (label, count)) => (label, id) }

Parsing a new DataSet[String] called rawInput, I'd like to use this mapping 
and associate each "label" of rawInput an ID (which is the Long value of 
mapping).

Is it possible with a streaming approach (need a join for example) ? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2740: [FLINK-4964] [ml]

2016-11-02 Thread tfournier314
Github user tfournier314 commented on the issue:

https://github.com/apache/flink/pull/2740
  
Yes, I've just updated the PR title


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---