I am trying to understand if the AllWindowedStream.apply() function can be
used for creating a DataStream of new types. 

Here is a portion of the code:
------------------------------------------------------------------------------------------------------------------------

case class RawMITSIMTuple(
                             tupletype: Int,      timeOfReport: Int,
vehicleID: Int,   vehicleSpeed: Int,
                             expressWayID: Int,   vehicleLane: Int, 
vehicleDir: Int,
                             vehicleSegment: Int, vehiclePos: Int,   queyID:
Int,
                             segmentInit: Int,    segmentEnd: Int ,
                             dayOfWeek: Int,      timeOfDay: Int,    dayID:
Int
                     )

  case class EWayCoordinates(eWayID: Int, eWayDir: Int, eWayLane: Int,
eWaySegment: Int)

  case class VehicleDetails(vehicleID: Int, vehicleSpeed: Int, vehiclePos:
Int)

  case class PositionReport(
                              tupletype: Int, timeOfReport: Int,
                              eWayCoordinates: EWayCoordinates,
                              vehicleDetails: VehicleDetails
                       )

val envDefault = StreamExecutionEnvironment.getExecutionEnvironment
envDefault.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

// ...

val positionReportStream = this
      .readRawMITSIMTuplesInjected(envDefault,args(0))
      .assignAscendingTimestamps(e => e.timeOfReport)
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(30)))

------------------------------------------------------------------------------------------------------------------------

positionReportStream above is of type *AllWindowedStream*. As such, I cannot
use it as a DataStream[PositionReport]: I cannot segregate it by some kind
of KeySelection and use it further down. 

I have been thinking of using a FoldFunction on it, but that gives a
collection of PositionReport. So, I get a DataStream[Vector[PositionReport]]
(Vector is just an example).

The other alternative is to use an AllWindowedStream.apply() function, where
I can emit a DataStream[PositionReport]. But, that will mean that I am using
the apply function more as a *mapper*. Is that the right way to use it?

Could someone please push me to the correct way to deal with it?

-- Nirmalya
 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clarification-use-of-AllWindowedStream-apply-function-tp11627.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to