For now is there any way / workaround to use broadcast vars in Scala (I
tried to use the Java classes, but it did not work out nicely, see below)?

val center = X_2 map {
    x => (0.0f, x, -1)
} reduce(new JReduceFunction{
    var y: Float = 0.0f
    override def open(parameters: Configuration) = {
       val ySet = getRuntimeContext().getBroadcastVariable("Y")
       y = ySet.iterator().next()
    }
    override def apply(x1: (Float, Vector, Int), x2: (Float, Vector, Int)):
(Float, Vector, Int) = {
       if(x1._3 != -1)
         x1
       else{
         if(x1._1+x2._1 > y)
          (x1._1+x2._1, x2._2, x2._2.id)
         else
           (x1._1+x2._1, x2._2, -1)
         }
    }
}).withBroadcastSet(y, "Y") map { x => x._1 }


The problem, is that the reduce function returns a scala class, and that
has no member withBroadcastSet.

Thank you!

Mit freundlichen Grüßen,
Max!


On Thu, Aug 14, 2014 at 7:45 PM, Stephan Ewen <se...@apache.org> wrote:

> Support to get Elements from a DataSet back is in progress. There is a
> pull request with a temporary solution:
> https://github.com/apache/incubator-flink/pull/94
>
>
> On Thu, Aug 14, 2014 at 9:05 AM, Maximilian Alber <
> alber.maximil...@gmail.com> wrote:
>
>> Ok, thank you!
>>
>> Mit freundlichen Grüßen,
>> Max!
>>
>>
>> On Thu, Aug 14, 2014 at 6:00 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Yes, you are right. But to my knowledge Broadcast Variables are not yet
>>> supported in the Scala API. We are working on this though but it is not
>>> ready yet.
>>>
>>>
>>> On Thu, Aug 14, 2014 at 5:41 PM, Maximilian Alber <
>>> alber.maximil...@gmail.com> wrote:
>>>
>>>> Yeah, I got that. What I had in mind was something like a variable that
>>>> can be used as broadcast var, thus at runtime gets supplied by Flink to the
>>>> function f.e. a map function.
>>>>
>>>> It would be something like a shortcut. Right now I already could use a
>>>> broadcast variable, and extract inside the open function the only value it
>>>> is holding and then supplying it to the apply function. Am I right with
>>>> that?
>>>>
>>>> Mit freundlichen Grüßen,
>>>> Max!
>>>>
>>>>
>>>> On Thu, Aug 14, 2014 at 5:24 PM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> No, unfortunately that's not possible right now because a DataSet only
>>>>> represents an Execution that is run when the program is executed. So while
>>>>> building  your program by chaining together operations the actual data is
>>>>> not yet available.
>>>>>
>>>>> I hope that helps but the whole thing can be a bit confusing. So just
>>>>> ask if you need clarification.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On Thu, Aug 14, 2014 at 3:01 PM, Maximilian Alber <
>>>>> alber.maximil...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the quick reply.
>>>>>>
>>>>>> Ok, but is there a way to get the only element out of a DataSet into
>>>>>> a variable?
>>>>>>
>>>>>> Mit freundlichen Grüßen,
>>>>>> Max!
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 14, 2014 at 11:13 AM, Aljoscha Krettek <
>>>>>> aljos...@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> for the Java API there are the so-called broadcast variables. Those
>>>>>>> can be used to set the output of an operation as an additional input of
>>>>>>> another operator. The feature is not available in the Scala API though? 
>>>>>>> Or
>>>>>>> am I wrong here?
>>>>>>>
>>>>>>> I'm right now working on bringing the Scala API to feature parity
>>>>>>> with the Java API.
>>>>>>>
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 13, 2014 at 5:51 PM, Maximilian Alber <
>>>>>>> alber.maximil...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Flinker,
>>>>>>>>
>>>>>>>> I try to implement a quadratic distribution i.e. I would like to
>>>>>>>> choose an element from a dataset with probability proportional to it's
>>>>>>>> squared value.
>>>>>>>>
>>>>>>>> In Python this would look like this:
>>>>>>>>
>>>>>>>> s = numpy.cumsum(residual**2)
>>>>>>>> x = numpy.random.rand() * s[-1]
>>>>>>>> return residual[numpy.sum(x > s)]
>>>>>>>>
>>>>>>>>  With Flink it is somewhat more complicated, I gave it a try:
>>>>>>>>
>>>>>>>> import util.Random
>>>>>>>>
>>>>>>>> val X = DataSource(XFile, CsvInputFormat[Float])
>>>>>>>> val Y = DataSource(YFile, CsvInputFormat[Float])
>>>>>>>>
>>>>>>>> // take square of them
>>>>>>>> val X_2 = X map { x => (x*x, x) }
>>>>>>>> // calc sum of squares
>>>>>>>> val X_sum = X_2 reduce { (x1, x2) => (x1._1 + x2._1, 0) } map { x
>>>>>>>> => x._1 }
>>>>>>>> // choose random value in our range
>>>>>>>> val y = X_sum map { Random.nextFloat * _ }
>>>>>>>>
>>>>>>>> // make cummulative sum and find value we search for
>>>>>>>> val center = X_2 map {
>>>>>>>>     x => (0.0f, x._1, x._2) //sum, x^2, x
>>>>>>>> } reduce {
>>>>>>>>     (x1, x2) =>
>>>>>>>>     if(x1._1 > y){// already found value we searched for
>>>>>>>>        x1
>>>>>>>>      } else {
>>>>>>>>        if(x1._1 + x2._2 > y){// this is the value we search for
>>>>>>>>           (x1._1 + x2._2, x2._2, x2._3)
>>>>>>>>        } else {
>>>>>>>>           (x1._1 + x2._2, x1._2, x2._3) // just go on with
>>>>>>>> cummulative sum
>>>>>>>>       }
>>>>>>>>    }
>>>>>>>> } map { _._3 } // we just need the initial value
>>>>>>>>
>>>>>>>> val output = center //map { x => println(x); x }
>>>>>>>> val sink = output.write("/tmp/test", CsvOutputFormat[Float],
>>>>>>>> "Center output")
>>>>>>>>
>>>>>>>> My problem here is now, I need to get the information stored in y
>>>>>>>> into the reduce statement to gather the center value. Unfortunately I 
>>>>>>>> have
>>>>>>>> no idea how to achieve that. If somebody knows a way I would be rather
>>>>>>>> thankful. If someone would know a easier way to solve this problem too!
>>>>>>>>
>>>>>>>> Many thanks in advance!
>>>>>>>>
>>>>>>>> Cheers Max
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to