Hi,

This looks like a good solution to me.
The conversion mappers in step 1. and 4. should not cause a lot of overhead
as they are chained to their predecessors.

Best, Fabian

Am Di., 14. Mai 2019 um 01:08 Uhr schrieb Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com>:

> Hey Hequn & Fabian,
>
> It seems like i found a reasonable way using both Row and my own TypeInfo:
>
>    - I started by just using my own TypeInfo using your example. So i'm
>    using a serializer which is basically a compound of the original event type
>    serializer as well as a string array serializer
>    (used BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO.createSerializer for it).
>    the rest is pretty much boiler plate similar to the MapSerializer (as well
>    as the snapshot class). I extended TypeInformation<TaggedEvent<T>> and the
>    downside was that unlike Row/Pojo i didnt have the field names for the SQL
>    Query so i went back looking at Row.
>
>    - I realized that i can use Row without knowing the internals of the
>    origin event class in advance basically mapping to Row with an explicit
>    RowTypeInfo as follows:
>
>    final TypeInformation<Row> rowTypeInformation = Types.ROW(new 
> String[]{"originalEvent", "tags"},
>            new TypeInformation[]{dataStream.getType(), 
> BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO});
>
>
>    final SingleOutputStreamOperator<Row> mappedStream = 
> this.dataStream.map((MapFunction<T, Row>) value -> {
>        final Row row = new Row(2);
>        row.setField(0, value);
>        row.setField(1, new String[0]);
>        return row;
>    }).returns(rowTypeInformation);
>
>    tableEnvironment.registerDataStream(ORIGIN_EVENT_STREAM_TABLE_NAME, 
> mappedStream);
>
>
> So that worked well.
>
>
>    - Giving last thought to it i didnt want to leave my users with a Row
>    stream but indeed with a Pojo for it, so i ended up keeping the
>    TaggedEventTypeInfo as an additional mapping past the SQL query.
>    Therefore my process is:
>    1. Map DataStream<T> to Row, register as a table (code above)
>    2. Run SQL over the table
>    3. Transform result to an append stream of Row (trying to directly
>    convert to my new type info results with *Requested result type is an
>    atomic type but result[..] has more or less than a single field*. not
>    sure if there's a better way
>    4. Map the Row object to TaggedEvent<T> explicitly implementing
>    ResultTypeQueryable.getProducedType to use the TaggedEventTypeInfo
>
>
> Would love to get feedback on that, whether that seems like the best
> solution or can it be more efficient.
>
> Thanks!
> Shahar
>
>
> On Mon, May 13, 2019 at 9:25 AM Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com> wrote:
>
>> Thanks for looking into it Hequn!
>>
>> I do not have a requirement to use TaggedEvent vs Row. But correct me if
>> I am wrong, creating a Row will require me knowing the internal fields of
>> the original event in compile time, is that correct? I do have a
>> requirement to support a generic original event type, so unless i can map T
>> to a Row without knowing the object fields, i wont be able to use it. Can
>> you confirm that?
>> I will look at MapView and let you know, thanks again!
>>
>> On Sun, May 12, 2019 at 1:30 AM Hequn Cheng <chenghe...@gmail.com> wrote:
>>
>>> Hi shahar,
>>>
>>> An easier way to solve your problem is to use a Row to store your data
>>> instead of the `TaggedEvent `. I think this is what Fabian means. In this
>>> way, you don't have to define the user-defined TypeFactory and use the Row
>>> type directly. Take `TaggedEvent<Car>` as an example, the corresponding row
>>> type is `Types.ROW(Types.ROW(Types.INT, Types.STRING),
>>> Types.OBJECT_ARRAY(Types.STRING))` in which Types is
>>> `org.apache.flink.table.api.Types`. Furthermore, row type is also easier to
>>> cooperate with Table API & SQL.
>>>
>>> However, if the `TaggedEvent` is a must-have for you, you can take a
>>> look at the MapView[1] as an example of how to define a user-defined table
>>> factory.
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
>>>
>>> On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky <
>>> shahar.kobrin...@gmail.com> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> I have a trouble implementing the type for this operation, i wonder how
>>>> i can do that.
>>>> So given generic type T i want to create a TypeInformation for:
>>>> class TaggedEvent<T> {
>>>>    String[] tags
>>>>    T originalEvent
>>>> }
>>>>
>>>> Was trying a few different things but not sure how to do it.
>>>> Doesn't seem like i can use TypeHint as i need to know the actual
>>>> generics class for it, right?
>>>> Do i need a TaggedEventTypeFactory? If so, how do i create the
>>>> TaggedEventTypeInfo for it?  do you have an example for it? was trying to
>>>> follow this[1] but doesn't seem to really work. I'm getting null as my
>>>> genericParameter for some reason. Also, how would you create the serializer
>>>> for the type info? can i reuse some builtin Kryo functionality?
>>>>
>>>> Thanks
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky <
>>>> shahar.kobrin...@gmail.com> wrote:
>>>>
>>>>> Thanks Fabian,
>>>>>
>>>>> I'm looking into a way to enrich it without having to know the
>>>>> internal fields of the original event type.
>>>>> Right now what I managed to do is to map Car into a TaggedEvent<Car>
>>>>> prior to the SQL query, tags being empty, then run the SQL query 
>>>>> selecting *origin,
>>>>> enrich(.. ) as tags*
>>>>> Not sure there's a better way but i guess that works
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> you can use the value construction function ROW to create a nested
>>>>>> row (or object).
>>>>>> However, you have to explicitly reference all attributes that you
>>>>>> will add.
>>>>>>
>>>>>> If you have a table Cars with (year, modelName) a query could look
>>>>>> like this:
>>>>>>
>>>>>> SELECT
>>>>>>   ROW(year, modelName) AS car,
>>>>>>   enrich(year, modelName) AS tags
>>>>>> FROM Cars;
>>>>>>
>>>>>> Handling many attributes is always a bit painful in SQL.
>>>>>> There is an effort to make the Table API easier to use for these use
>>>>>> cases (for example Column Operations [1]).
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11967
>>>>>>
>>>>>>
>>>>>>
>>>>>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 <
>>>>>> shahar.kobrin...@gmail.com>:
>>>>>>
>>>>>>> Just to be more clear on my goal -
>>>>>>> Im trying to enrich the incoming stream with some meaningful tags
>>>>>>> based on
>>>>>>> conditions from the event itself.
>>>>>>> So the input stream could be an event looks like:
>>>>>>> Class Car {
>>>>>>>   int year;
>>>>>>>   String modelName;
>>>>>>> }
>>>>>>>
>>>>>>> i will have a config that are defining tags as:
>>>>>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0"
>>>>>>>
>>>>>>> So ideally my output will be in the structure of
>>>>>>>
>>>>>>> Class TaggedEvent<Car> {
>>>>>>>    Car origin;
>>>>>>>    String[] tags;
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Sent from:
>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>
>>>>>>

Reply via email to