For now is there any way / workaround to use broadcast vars in Scala (I tried to use the Java classes, but it did not work out nicely, see below)?
val center = X_2 map { x => (0.0f, x, -1) } reduce(new JReduceFunction{ var y: Float = 0.0f override def open(parameters: Configuration) = { val ySet = getRuntimeContext().getBroadcastVariable("Y") y = ySet.iterator().next() } override def apply(x1: (Float, Vector, Int), x2: (Float, Vector, Int)): (Float, Vector, Int) = { if(x1._3 != -1) x1 else{ if(x1._1+x2._1 > y) (x1._1+x2._1, x2._2, x2._2.id) else (x1._1+x2._1, x2._2, -1) } } }).withBroadcastSet(y, "Y") map { x => x._1 } The problem, is that the reduce function returns a scala class, and that has no member withBroadcastSet. Thank you! Mit freundlichen Grüßen, Max! On Thu, Aug 14, 2014 at 7:45 PM, Stephan Ewen <se...@apache.org> wrote: > Support to get Elements from a DataSet back is in progress. There is a > pull request with a temporary solution: > https://github.com/apache/incubator-flink/pull/94 > > > On Thu, Aug 14, 2014 at 9:05 AM, Maximilian Alber < > alber.maximil...@gmail.com> wrote: > >> Ok, thank you! >> >> Mit freundlichen Grüßen, >> Max! >> >> >> On Thu, Aug 14, 2014 at 6:00 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Yes, you are right. But to my knowledge Broadcast Variables are not yet >>> supported in the Scala API. We are working on this though but it is not >>> ready yet. >>> >>> >>> On Thu, Aug 14, 2014 at 5:41 PM, Maximilian Alber < >>> alber.maximil...@gmail.com> wrote: >>> >>>> Yeah, I got that. What I had in mind was something like a variable that >>>> can be used as broadcast var, thus at runtime gets supplied by Flink to the >>>> function f.e. a map function. >>>> >>>> It would be something like a shortcut. Right now I already could use a >>>> broadcast variable, and extract inside the open function the only value it >>>> is holding and then supplying it to the apply function. Am I right with >>>> that? >>>> >>>> Mit freundlichen Grüßen, >>>> Max! >>>> >>>> >>>> On Thu, Aug 14, 2014 at 5:24 PM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> No, unfortunately that's not possible right now because a DataSet only >>>>> represents an Execution that is run when the program is executed. So while >>>>> building your program by chaining together operations the actual data is >>>>> not yet available. >>>>> >>>>> I hope that helps but the whole thing can be a bit confusing. So just >>>>> ask if you need clarification. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> >>>>> On Thu, Aug 14, 2014 at 3:01 PM, Maximilian Alber < >>>>> alber.maximil...@gmail.com> wrote: >>>>> >>>>>> Thanks for the quick reply. >>>>>> >>>>>> Ok, but is there a way to get the only element out of a DataSet into >>>>>> a variable? >>>>>> >>>>>> Mit freundlichen Grüßen, >>>>>> Max! >>>>>> >>>>>> >>>>>> On Thu, Aug 14, 2014 at 11:13 AM, Aljoscha Krettek < >>>>>> aljos...@apache.org> wrote: >>>>>> >>>>>>> Hi, >>>>>>> for the Java API there are the so-called broadcast variables. Those >>>>>>> can be used to set the output of an operation as an additional input of >>>>>>> another operator. The feature is not available in the Scala API though? >>>>>>> Or >>>>>>> am I wrong here? >>>>>>> >>>>>>> I'm right now working on bringing the Scala API to feature parity >>>>>>> with the Java API. >>>>>>> >>>>>>> Aljoscha >>>>>>> >>>>>>> >>>>>>> On Wed, Aug 13, 2014 at 5:51 PM, Maximilian Alber < >>>>>>> alber.maximil...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Flinker, >>>>>>>> >>>>>>>> I try to implement a quadratic distribution i.e. I would like to >>>>>>>> choose an element from a dataset with probability proportional to it's >>>>>>>> squared value. >>>>>>>> >>>>>>>> In Python this would look like this: >>>>>>>> >>>>>>>> s = numpy.cumsum(residual**2) >>>>>>>> x = numpy.random.rand() * s[-1] >>>>>>>> return residual[numpy.sum(x > s)] >>>>>>>> >>>>>>>> With Flink it is somewhat more complicated, I gave it a try: >>>>>>>> >>>>>>>> import util.Random >>>>>>>> >>>>>>>> val X = DataSource(XFile, CsvInputFormat[Float]) >>>>>>>> val Y = DataSource(YFile, CsvInputFormat[Float]) >>>>>>>> >>>>>>>> // take square of them >>>>>>>> val X_2 = X map { x => (x*x, x) } >>>>>>>> // calc sum of squares >>>>>>>> val X_sum = X_2 reduce { (x1, x2) => (x1._1 + x2._1, 0) } map { x >>>>>>>> => x._1 } >>>>>>>> // choose random value in our range >>>>>>>> val y = X_sum map { Random.nextFloat * _ } >>>>>>>> >>>>>>>> // make cummulative sum and find value we search for >>>>>>>> val center = X_2 map { >>>>>>>> x => (0.0f, x._1, x._2) //sum, x^2, x >>>>>>>> } reduce { >>>>>>>> (x1, x2) => >>>>>>>> if(x1._1 > y){// already found value we searched for >>>>>>>> x1 >>>>>>>> } else { >>>>>>>> if(x1._1 + x2._2 > y){// this is the value we search for >>>>>>>> (x1._1 + x2._2, x2._2, x2._3) >>>>>>>> } else { >>>>>>>> (x1._1 + x2._2, x1._2, x2._3) // just go on with >>>>>>>> cummulative sum >>>>>>>> } >>>>>>>> } >>>>>>>> } map { _._3 } // we just need the initial value >>>>>>>> >>>>>>>> val output = center //map { x => println(x); x } >>>>>>>> val sink = output.write("/tmp/test", CsvOutputFormat[Float], >>>>>>>> "Center output") >>>>>>>> >>>>>>>> My problem here is now, I need to get the information stored in y >>>>>>>> into the reduce statement to gather the center value. Unfortunately I >>>>>>>> have >>>>>>>> no idea how to achieve that. If somebody knows a way I would be rather >>>>>>>> thankful. If someone would know a easier way to solve this problem too! >>>>>>>> >>>>>>>> Many thanks in advance! >>>>>>>> >>>>>>>> Cheers Max >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >