Re: Compilation Error in WindowStream.fold()
It seems the type of your initial accumulator, which is Map[EWayCoordinates,Set[VehicleID]], does not match the accumulator type on your FoldFunction, which is Map[EWayCoordinates,Set[Int]]. Could you change that? On Sat, 25 Feb 2017 at 04:09 nsengupta wrote: > Hello Aljoscha, > > Many thanks for taking this up. > > This is the modified code: > > -- > val uniqueVehicles = envDefault > .fromCollection(readings) > .map(e => MITSIMUtils.preparePositionReport(e)) > .assignAscendingTimestamps(e => e.timeOfReport) > .keyBy(e => ( > e.eWayCoordinates.eWayID, > e.eWayCoordinates.eWayDir, > e.eWayCoordinates.eWaySegment, > e.vehicleDetails.vehicleID)) > .windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) > .fold( >// Seed >Map[EWayCoordinates,Set[VehicleID]](), > >// FoldFunction >folder, > >// WindowFunction >windower > >// I have taken the TupleTypeInfo out, to see what the compiler > says! >// Satisfying the compiler: > >/*new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](), >new TupleTypeInfo[(EWayCoordinates,Int)]*/ > ) > > -- > > And, this is what the compiler says: > > -- > [INFO] Compiling 3 source files to > /home/nirmalya/Workspace-Flink/LinearRoad/target/test-classes at > 1487991829901 > [ERROR] > > /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:137: > error: overloaded method value fold with alternatives: > [ERROR] [ACC, R](initialValue: ACC, preAggregator: (ACC, > org.nirmalya.exercise.Elements.PositionReport) => ACC, windowFunction: > (org.apache.flink.streaming.api.windowing.windows.TimeWindow, > Iterable[ACC], > org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$7: > org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit > evidence$8: > > org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] > > [ERROR] [ACC, R](initialValue: ACC, preAggregator: > > org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,ACC], > windowFunction: > > org.apache.flink.streaming.api.scala.function.AllWindowFunction[ACC,R,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit > evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], > implicit evidence$6: > > org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] > [ERROR] cannot be applied to > > (scala.collection.immutable.Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]], > > org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[Int]]], > > org.apache.flink.streaming.api.functions.windowing.AllWindowFunction[Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],(org.nirmalya.exercise.Elements.EWayCoordinates, > Int),org.apache.flink.streaming.api.windowing.windows.Window]) > [ERROR] .fold( > [ERROR]^ > [ERROR] one error found > [INFO] > > [INFO] BUILD FAILURE > [INFO] > > > > -- > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830p11910.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
Re: Compilation Error in WindowStream.fold()
Hello Aljoscha, Many thanks for taking this up. This is the modified code: -- val uniqueVehicles = envDefault .fromCollection(readings) .map(e => MITSIMUtils.preparePositionReport(e)) .assignAscendingTimestamps(e => e.timeOfReport) .keyBy(e => ( e.eWayCoordinates.eWayID, e.eWayCoordinates.eWayDir, e.eWayCoordinates.eWaySegment, e.vehicleDetails.vehicleID)) .windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) .fold( // Seed Map[EWayCoordinates,Set[VehicleID]](), // FoldFunction folder, // WindowFunction windower // I have taken the TupleTypeInfo out, to see what the compiler says! // Satisfying the compiler: /*new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](), new TupleTypeInfo[(EWayCoordinates,Int)]*/ ) -- And, this is what the compiler says: -- [INFO] Compiling 3 source files to /home/nirmalya/Workspace-Flink/LinearRoad/target/test-classes at 1487991829901 [ERROR] /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:137: error: overloaded method value fold with alternatives: [ERROR] [ACC, R](initialValue: ACC, preAggregator: (ACC, org.nirmalya.exercise.Elements.PositionReport) => ACC, windowFunction: (org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[ACC], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$8: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ERROR] [ACC, R](initialValue: ACC, preAggregator: org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,ACC], windowFunction: org.apache.flink.streaming.api.scala.function.AllWindowFunction[ACC,R,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$5: org.apache.flink.api.common.typeinfo.TypeInformation[ACC], implicit evidence$6: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] [ERROR] cannot be applied to (scala.collection.immutable.Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]], org.apache.flink.api.common.functions.FoldFunction[org.nirmalya.exercise.Elements.PositionReport,Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[Int]]], org.apache.flink.streaming.api.functions.windowing.AllWindowFunction[Map[org.nirmalya.exercise.Elements.EWayCoordinates,Set[org.nirmalya.exercise.Elements.VehicleID]],(org.nirmalya.exercise.Elements.EWayCoordinates, Int),org.apache.flink.streaming.api.windowing.windows.Window]) [ERROR] .fold( [ERROR]^ [ERROR] one error found [INFO] [INFO] BUILD FAILURE [INFO] -- -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Compilation-Error-in-WindowStream-fold-tp11830p11910.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Compilation Error in WindowStream.fold()
Hi Nirmalya, what does the compiler say if you use the variant without explicit TypeInfo? Like this: .fold( // Seed Map[EWayCoordinates,Set[VehicleID]](), // FoldFunction folder, // WindowFunction windower, ) Best, Aljoscha On Thu, 23 Feb 2017 at 14:41 nsengupta wrote: > For reasons I cannot grasp, I am unable to move ahead. > > Here's the code: > > - > > > import org.apache.flink.api.common.functions.FoldFunction > import org.apache.flink.api.java.typeutils.TupleTypeInfo > import org.apache.flink.streaming.api.TimeCharacteristic > import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction > import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} > import org.apache.flink.streaming.api.scala._ > import > org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, > Window} > import org.apache.flink.util.Collector > import org.nirmalya.exercise.Elements.{EWayCoordinates, PositionReport, > RawMITSIMTuple, VehicleID} > > case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int, > eWaySegment: Int) > > case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos: > Int) > > case class PositionReport( > // tupletype: Int, > timeOfReport: Int, > eWayCoordinates: EWayCoordinates, > vehicleDetails: VehicleDetails >) > > > // > > > val envDefault = StreamExecutionEnvironment.createLocalEnvironment(4) > envDefault > .setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > val readings = IndexedSeq [RawMITSIMTuple] ( > RawMITSIMTuple(0,0,107,32,0,0,0,10, 53320,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,0,109,20,0,0,0,10,100644,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,0,106,28,0,0,0,10,137745,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,1,107,32,0,0,0,67,354281,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,1,105,30,0,0,1,94,501089,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,1,110,26,0,0,1,95,495898,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,1,102,30,0,0,1,85,453562,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,2,104,32,0,0,1,80,427144,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,2,102,20,0,0,1,73,390383,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,2,111,27,1,0,1,69,369135,-1,-1,-1,-1,-1,-1), > RawMITSIMTuple(0,2,112,28,1,0,0, 1, 5757,-1,-1,-1,-1,-1,-1) > ) > > val folder = new FoldFunction[PositionReport, > Map[EWayCoordinates,Set[Int]]] > { > override > def fold( > t: Map[EWayCoordinates, Set[VehicleID]], o: PositionReport > ): Map[EWayCoordinates, Set[VehicleID]] = { > t + (o.eWayCoordinates -> (t.getOrElse(o.eWayCoordinates,Set.empty) > + (o.vehicleDetails.vehicleID))) > } > } > > val windower = new AllWindowFunction[Map[EWayCoordinates, > Set[VehicleID]],(EWayCoordinates,Int),Window] { > override > def apply( >w: Window, >bunch: Iterable[Map[EWayCoordinates, Set[VehicleID]]], >collector: Collector[(EWayCoordinates, VehicleID)]): Unit = { > > val allVehiclesInLast30Mins = bunch.iterator().next.mapValues(e => > e.size) > > allVehiclesInLast30Mins.foreach(e => println(e)) > > collector.collect((EWayCoordinates(-1,-1,-1,-1),0)) > > } > } > > val uniqueVehicles = envDefault > .fromCollection(readings) > .map(e => MITSIMUtils.preparePositionReport(e)) > .assignAscendingTimestamps(e => e.timeOfReport) > .keyBy(e => ( > e.eWayCoordinates.eWayID, > e.eWayCoordinates.eWayDir, > e.eWayCoordinates.eWaySegment, > e.vehicleDetails.vehicleID)) > .windowAll(TumblingEventTimeWindows.of(Time.seconds(30))) > .fold( >// Seed >Map[EWayCoordinates,Set[VehicleID]](), > >// FoldFunction >folder, > >// WindowFunction >windower, > >// Satisfying the compiler >new TupleTypeInfo[Map[EWayCoordinates,Set[VehicleID]]](), >new TupleTypeInfo[(EWayCoordinates,Int)] > ) > > > - > > The compiler is unhappy: > > [ERROR] > > /home/nirmalya/Workspace-Flink/LinearRoad/src/test/scala/org/nirmalya/exercise/StreamStateTest.scala:136: > error: missing argument list for method fold in class AllWindowedStream > [ERROR] Unapplied methods are only converted to functions when a function > type is expected. > [ERROR] You can make this conversion explicit by writing `fold _` or > `fol