Hi Nirmalya,
what does the compiler say if you use the variant without explicit
TypeInfo? Like this:

 .fold(
           // Seed
           Map[EWayCoordinates,Set[VehicleID]](),

           // FoldFunction
           folder,

           // WindowFunction
           windower,
  )

Best,
Aljoscha

On Thu, 23 Feb 2017 at 14:41 nsengupta <sengupta.nirma...@gmail.com> wrote:

> For reasons I cannot grasp, I am unable to move ahead.
>
> Here's the code:
>
> ---------------------------------------------------------------------------------------------------------------------------------------------
>
>
> import org.apache.flink.api.common.functions.FoldFunction
> import org.apache.flink.api.java.typeutils.TupleTypeInfo
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> import org.apache.flink.streaming.api.scala._
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.{TimeWindow,
> Window}
> import org.apache.flink.util.Collector
> import org.nirmalya.exercise.Elements.{EWayCoordinates, PositionReport,
> RawMITSIMTuple, VehicleID}
>
> case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int,
> eWaySegment: Int)
>
> case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos:
> Int)
>
> case class PositionReport(
>                               // tupletype: Int,
>                               timeOfReport: Int,
>                               eWayCoordinates: EWayCoordinates,
>                               vehicleDetails: VehicleDetails
>                        )
>
>
> // ....
>
>
> val envDefault = StreamExecutionEnvironment.createLocalEnvironment(4)
>   envDefault
>     .setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>   val readings = IndexedSeq [RawMITSIMTuple] (
>     RawMITSIMTuple(0,0,107,32,0,0,0,10, 53320,-1,-1,-1,-1,-1,-1),
>     RawMITSIMTuple(0,0,109,20,0,0,0,10,100644,-1,-1,-1,-1,-1,-1),
>     RawMITSIMTuple(0,0,106,28,0,0,0,10,137745,-1,-1,-1,-1,-1,-1),
>     RawMITSIMTuple(0,1,107,32,0,0,0,67,354281,-1,-1,-1,-1,-1,-1),
>     RawMITSIMTuple(0,1,105,30,0,0,1,94,501089,-1,-1,-1,-1,-1,-1),
>     RawMITSIMTuple(0,1,110,26,0,0,1,95,495898,-1,-1,-1,-1,-1,-1),
>     RawMITSIMTuple(0,1,102,30,0,0,1,85,453562,-1,-1,-1,-1,-1,-1),
>     RawMITSIMTuple(0,2,104,32,0,0,1,80,427144,-1,-1,-1,-1,-1,-1),
>     RawMITSIMTuple(0,2,102,20,0,0,1,73,390383,-1,-1,-1,-1,-1,-1),
>     RawMITSIMTuple(0,2,111,27,1,0,1,69,369135,-1,-1,-1,-1,-1,-1),
>     RawMITSIMTuple(0,2,112,28,1,0,0, 1,  5757,-1,-1,-1,-1,-1,-1)
>   )
>
> val folder = new FoldFunction[PositionReport,
> Map[EWayCoordinates,Set[Int]]]
> {
>       override
>         def fold(
>               t: Map[EWayCoordinates, Set[VehicleID]], o: PositionReport
>         ): Map[EWayCoordinates, Set[VehicleID]] = {
>         t + (o.eWayCoordinates -> (t.getOrElse(o.eWayCoordinates,Set.empty)
> + (o.vehicleDetails.vehicleID)))
>       }
>     }
>
>     val windower = new AllWindowFunction[Map[EWayCoordinates,
> Set[VehicleID]],(EWayCoordinates,Int),Window] {
>       override
>       def apply(
>            w: Window,
>            bunch: Iterable[Map[EWayCoordinates, Set[VehicleID]]],
>            collector: Collector[(EWayCoordinates, VehicleID)]): Unit = {
>
>         val allVehiclesInLast30Mins = bunch.iterator().next.mapValues(e =>
> e.size)
>
>         allVehiclesInLast30Mins.foreach(e => println(e))
>
>         collector.collect((EWayCoordinates(-1,-1,-1,-1),0))
>
>       }
>     }
>
>     val uniqueVehicles = envDefault
>       .fromCollection(readings)
>       .map(e => MITSIMUtils.preparePositionReport(e))
>       .assignAscendingTimestamps(e => e.timeOfReport)
>       .keyBy(e => (
>         e.eWayCoordinates.eWayID,
>         e.eWayCoordinates.eWayDir,
>         e.eWayCoordinates.eWaySegment,
>         e.vehicleDetails.vehicleID))
>       .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))
>       .fold(
>            // Seed
>            Map[EWayCoordinates,Set[VehicleID]](),
>
>            // FoldFunction
>            folder,
>
>            // WindowFunction
>            windower,
>
>            // Satisfying the compiler
>            new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](),
>            new TupleTypeInfo[(EWayCoordinates,Int)]
>       )
>
>
> -----------------------------------------------------------------------------------------
>
> The compiler is unhappy:
>
> [ERROR]
>
> /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:136:
> error: missing argument list for method fold in class AllWindowedStream
> [ERROR] Unapplied methods are only converted to functions when a function
> type is expected.
> [ERROR] You can make this conversion explicit by writing `fold _` or
> `fold(_)(_)(_)` instead of `fold`.
> [ERROR]       .fold(
> [ERROR]            ^
> [ERROR] one error found
>
>
> ----------------------------------------------------------------------------------------
>
> I understand why is the compiler unhappy, but I am unsure if I have to go
> through all the *devilry*. In no Flink example, I see some such thing being
> prescribed. But, then, perhaps I am missing an important point.
>
> I have been through this  comment
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Incremental-aggregations-Example-not-working-td10581.html#a10585
> >
> by *Yassine Marzougui*, before I added those type hints. But, I am using
> *Flink 1.2.0*.
>
> I know this sounds silly, but I am simply failing to get out of this.
>
> All help appreciated.
>
> -- Nirmalya
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Reply via email to