Hi, This looks like a good solution to me. The conversion mappers in step 1. and 4. should not cause a lot of overhead as they are chained to their predecessors.
Best, Fabian Am Di., 14. Mai 2019 um 01:08 Uhr schrieb Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com>: > Hey Hequn & Fabian, > > It seems like i found a reasonable way using both Row and my own TypeInfo: > > - I started by just using my own TypeInfo using your example. So i'm > using a serializer which is basically a compound of the original event type > serializer as well as a string array serializer > (used BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO.createSerializer for it). > the rest is pretty much boiler plate similar to the MapSerializer (as well > as the snapshot class). I extended TypeInformation<TaggedEvent<T>> and the > downside was that unlike Row/Pojo i didnt have the field names for the SQL > Query so i went back looking at Row. > > - I realized that i can use Row without knowing the internals of the > origin event class in advance basically mapping to Row with an explicit > RowTypeInfo as follows: > > final TypeInformation<Row> rowTypeInformation = Types.ROW(new > String[]{"originalEvent", "tags"}, > new TypeInformation[]{dataStream.getType(), > BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO}); > > > final SingleOutputStreamOperator<Row> mappedStream = > this.dataStream.map((MapFunction<T, Row>) value -> { > final Row row = new Row(2); > row.setField(0, value); > row.setField(1, new String[0]); > return row; > }).returns(rowTypeInformation); > > tableEnvironment.registerDataStream(ORIGIN_EVENT_STREAM_TABLE_NAME, > mappedStream); > > > So that worked well. > > > - Giving last thought to it i didnt want to leave my users with a Row > stream but indeed with a Pojo for it, so i ended up keeping the > TaggedEventTypeInfo as an additional mapping past the SQL query. > Therefore my process is: > 1. Map DataStream<T> to Row, register as a table (code above) > 2. Run SQL over the table > 3. Transform result to an append stream of Row (trying to directly > convert to my new type info results with *Requested result type is an > atomic type but result[..] has more or less than a single field*. not > sure if there's a better way > 4. Map the Row object to TaggedEvent<T> explicitly implementing > ResultTypeQueryable.getProducedType to use the TaggedEventTypeInfo > > > Would love to get feedback on that, whether that seems like the best > solution or can it be more efficient. > > Thanks! > Shahar > > > On Mon, May 13, 2019 at 9:25 AM Shahar Cizer Kobrinsky < > shahar.kobrin...@gmail.com> wrote: > >> Thanks for looking into it Hequn! >> >> I do not have a requirement to use TaggedEvent vs Row. But correct me if >> I am wrong, creating a Row will require me knowing the internal fields of >> the original event in compile time, is that correct? I do have a >> requirement to support a generic original event type, so unless i can map T >> to a Row without knowing the object fields, i wont be able to use it. Can >> you confirm that? >> I will look at MapView and let you know, thanks again! >> >> On Sun, May 12, 2019 at 1:30 AM Hequn Cheng <chenghe...@gmail.com> wrote: >> >>> Hi shahar, >>> >>> An easier way to solve your problem is to use a Row to store your data >>> instead of the `TaggedEvent `. I think this is what Fabian means. In this >>> way, you don't have to define the user-defined TypeFactory and use the Row >>> type directly. Take `TaggedEvent<Car>` as an example, the corresponding row >>> type is `Types.ROW(Types.ROW(Types.INT, Types.STRING), >>> Types.OBJECT_ARRAY(Types.STRING))` in which Types is >>> `org.apache.flink.table.api.Types`. Furthermore, row type is also easier to >>> cooperate with Table API & SQL. >>> >>> However, if the `TaggedEvent` is a must-have for you, you can take a >>> look at the MapView[1] as an example of how to define a user-defined table >>> factory. >>> >>> Best, Hequn >>> >>> [1] >>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala >>> >>> On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky < >>> shahar.kobrin...@gmail.com> wrote: >>> >>>> Hi Fabian, >>>> >>>> I have a trouble implementing the type for this operation, i wonder how >>>> i can do that. >>>> So given generic type T i want to create a TypeInformation for: >>>> class TaggedEvent<T> { >>>> String[] tags >>>> T originalEvent >>>> } >>>> >>>> Was trying a few different things but not sure how to do it. >>>> Doesn't seem like i can use TypeHint as i need to know the actual >>>> generics class for it, right? >>>> Do i need a TaggedEventTypeFactory? If so, how do i create the >>>> TaggedEventTypeInfo for it? do you have an example for it? was trying to >>>> follow this[1] but doesn't seem to really work. I'm getting null as my >>>> genericParameter for some reason. Also, how would you create the serializer >>>> for the type info? can i reuse some builtin Kryo functionality? >>>> >>>> Thanks >>>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer >>>> >>>> >>>> >>>> >>>> >>>> On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky < >>>> shahar.kobrin...@gmail.com> wrote: >>>> >>>>> Thanks Fabian, >>>>> >>>>> I'm looking into a way to enrich it without having to know the >>>>> internal fields of the original event type. >>>>> Right now what I managed to do is to map Car into a TaggedEvent<Car> >>>>> prior to the SQL query, tags being empty, then run the SQL query >>>>> selecting *origin, >>>>> enrich(.. ) as tags* >>>>> Not sure there's a better way but i guess that works >>>>> >>>>> >>>>> >>>>> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> you can use the value construction function ROW to create a nested >>>>>> row (or object). >>>>>> However, you have to explicitly reference all attributes that you >>>>>> will add. >>>>>> >>>>>> If you have a table Cars with (year, modelName) a query could look >>>>>> like this: >>>>>> >>>>>> SELECT >>>>>> ROW(year, modelName) AS car, >>>>>> enrich(year, modelName) AS tags >>>>>> FROM Cars; >>>>>> >>>>>> Handling many attributes is always a bit painful in SQL. >>>>>> There is an effort to make the Table API easier to use for these use >>>>>> cases (for example Column Operations [1]). >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11967 >>>>>> >>>>>> >>>>>> >>>>>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 < >>>>>> shahar.kobrin...@gmail.com>: >>>>>> >>>>>>> Just to be more clear on my goal - >>>>>>> Im trying to enrich the incoming stream with some meaningful tags >>>>>>> based on >>>>>>> conditions from the event itself. >>>>>>> So the input stream could be an event looks like: >>>>>>> Class Car { >>>>>>> int year; >>>>>>> String modelName; >>>>>>> } >>>>>>> >>>>>>> i will have a config that are defining tags as: >>>>>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0" >>>>>>> >>>>>>> So ideally my output will be in the structure of >>>>>>> >>>>>>> Class TaggedEvent<Car> { >>>>>>> Car origin; >>>>>>> String[] tags; >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Sent from: >>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>>> >>>>>>