Re: Reconstruct object through partial select query
Hey Hequn & Fabian, It seems like i found a reasonable way using both Row and my own TypeInfo: - I started by just using my own TypeInfo using your example. So i'm using a serializer which is basically a compound of the original event type serializer as well as a string array serializer (used BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO.createSerializer for it). the rest is pretty much boiler plate similar to the MapSerializer (as well as the snapshot class). I extended TypeInformation> and the downside was that unlike Row/Pojo i didnt have the field names for the SQL Query so i went back looking at Row. - I realized that i can use Row without knowing the internals of the origin event class in advance basically mapping to Row with an explicit RowTypeInfo as follows: final TypeInformation rowTypeInformation = Types.ROW(new String[]{"originalEvent", "tags"}, new TypeInformation[]{dataStream.getType(), BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO}); final SingleOutputStreamOperator mappedStream = this.dataStream.map((MapFunction) value -> { final Row row = new Row(2); row.setField(0, value); row.setField(1, new String[0]); return row; }).returns(rowTypeInformation); tableEnvironment.registerDataStream(ORIGIN_EVENT_STREAM_TABLE_NAME, mappedStream); So that worked well. - Giving last thought to it i didnt want to leave my users with a Row stream but indeed with a Pojo for it, so i ended up keeping the TaggedEventTypeInfo as an additional mapping past the SQL query. Therefore my process is: 1. Map DataStream to Row, register as a table (code above) 2. Run SQL over the table 3. Transform result to an append stream of Row (trying to directly convert to my new type info results with *Requested result type is an atomic type but result[..] has more or less than a single field*. not sure if there's a better way 4. Map the Row object to TaggedEvent explicitly implementing ResultTypeQueryable.getProducedType to use the TaggedEventTypeInfo Would love to get feedback on that, whether that seems like the best solution or can it be more efficient. Thanks! Shahar On Mon, May 13, 2019 at 9:25 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote: > Thanks for looking into it Hequn! > > I do not have a requirement to use TaggedEvent vs Row. But correct me if I > am wrong, creating a Row will require me knowing the internal fields of the > original event in compile time, is that correct? I do have a requirement to > support a generic original event type, so unless i can map T to a Row > without knowing the object fields, i wont be able to use it. Can you > confirm that? > I will look at MapView and let you know, thanks again! > > On Sun, May 12, 2019 at 1:30 AM Hequn Cheng wrote: > >> Hi shahar, >> >> An easier way to solve your problem is to use a Row to store your data >> instead of the `TaggedEvent `. I think this is what Fabian means. In this >> way, you don't have to define the user-defined TypeFactory and use the Row >> type directly. Take `TaggedEvent` as an example, the corresponding row >> type is `Types.ROW(Types.ROW(Types.INT, Types.STRING), >> Types.OBJECT_ARRAY(Types.STRING))` in which Types is >> `org.apache.flink.table.api.Types`. Furthermore, row type is also easier to >> cooperate with Table API & SQL. >> >> However, if the `TaggedEvent` is a must-have for you, you can take a look >> at the MapView[1] as an example of how to define a user-defined table >> factory. >> >> Best, Hequn >> >> [1] >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala >> >> On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky < >> shahar.kobrin...@gmail.com> wrote: >> >>> Hi Fabian, >>> >>> I have a trouble implementing the type for this operation, i wonder how >>> i can do that. >>> So given generic type T i want to create a TypeInformation for: >>> class TaggedEvent { >>>String[] tags >>>T originalEvent >>> } >>> >>> Was trying a few different things but not sure how to do it. >>> Doesn't seem like i can use TypeHint as i need to know the actual >>> generics class for it, right? >>> Do i need a TaggedEventTypeFactory? If so, how do i create the >>> TaggedEventTypeInfo for it? do you have an example for it? was trying to >>> follow this[1] but doesn't seem to really work. I'm getting null as my >>> genericParameter for some reason. Also, how would you create the serializer >>> for the type
Re: Reconstruct object through partial select query
Thanks for looking into it Hequn! I do not have a requirement to use TaggedEvent vs Row. But correct me if I am wrong, creating a Row will require me knowing the internal fields of the original event in compile time, is that correct? I do have a requirement to support a generic original event type, so unless i can map T to a Row without knowing the object fields, i wont be able to use it. Can you confirm that? I will look at MapView and let you know, thanks again! On Sun, May 12, 2019 at 1:30 AM Hequn Cheng wrote: > Hi shahar, > > An easier way to solve your problem is to use a Row to store your data > instead of the `TaggedEvent `. I think this is what Fabian means. In this > way, you don't have to define the user-defined TypeFactory and use the Row > type directly. Take `TaggedEvent` as an example, the corresponding row > type is `Types.ROW(Types.ROW(Types.INT, Types.STRING), > Types.OBJECT_ARRAY(Types.STRING))` in which Types is > `org.apache.flink.table.api.Types`. Furthermore, row type is also easier to > cooperate with Table API & SQL. > > However, if the `TaggedEvent` is a must-have for you, you can take a look > at the MapView[1] as an example of how to define a user-defined table > factory. > > Best, Hequn > > [1] > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala > > On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky < > shahar.kobrin...@gmail.com> wrote: > >> Hi Fabian, >> >> I have a trouble implementing the type for this operation, i wonder how i >> can do that. >> So given generic type T i want to create a TypeInformation for: >> class TaggedEvent { >>String[] tags >>T originalEvent >> } >> >> Was trying a few different things but not sure how to do it. >> Doesn't seem like i can use TypeHint as i need to know the actual >> generics class for it, right? >> Do i need a TaggedEventTypeFactory? If so, how do i create the >> TaggedEventTypeInfo for it? do you have an example for it? was trying to >> follow this[1] but doesn't seem to really work. I'm getting null as my >> genericParameter for some reason. Also, how would you create the serializer >> for the type info? can i reuse some builtin Kryo functionality? >> >> Thanks >> >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer >> >> >> >> >> >> On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky < >> shahar.kobrin...@gmail.com> wrote: >> >>> Thanks Fabian, >>> >>> I'm looking into a way to enrich it without having to know the internal >>> fields of the original event type. >>> Right now what I managed to do is to map Car into a TaggedEvent >>> prior to the SQL query, tags being empty, then run the SQL query selecting >>> *origin, >>> enrich(.. ) as tags* >>> Not sure there's a better way but i guess that works >>> >>> >>> >>> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske wrote: >>> >>>> Hi, >>>> >>>> you can use the value construction function ROW to create a nested row >>>> (or object). >>>> However, you have to explicitly reference all attributes that you will >>>> add. >>>> >>>> If you have a table Cars with (year, modelName) a query could look like >>>> this: >>>> >>>> SELECT >>>> ROW(year, modelName) AS car, >>>> enrich(year, modelName) AS tags >>>> FROM Cars; >>>> >>>> Handling many attributes is always a bit painful in SQL. >>>> There is an effort to make the Table API easier to use for these use >>>> cases (for example Column Operations [1]). >>>> >>>> Best, Fabian >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-11967 >>>> >>>> >>>> >>>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 < >>>> shahar.kobrin...@gmail.com>: >>>> >>>>> Just to be more clear on my goal - >>>>> Im trying to enrich the incoming stream with some meaningful tags >>>>> based on >>>>> conditions from the event itself. >>>>> So the input stream could be an event looks like: >>>>> Class Car { >>>>> int year; >>>>> String modelName; >>>>> } >>>>> >>>>> i will have a config that are defining tags as: >>>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0" >>>>> >>>>> So ideally my output will be in the structure of >>>>> >>>>> Class TaggedEvent { >>>>>Car origin; >>>>>String[] tags; >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Sent from: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>> >>>>
Re: Reconstruct object through partial select query
Hi Fabian, I have a trouble implementing the type for this operation, i wonder how i can do that. So given generic type T i want to create a TypeInformation for: class TaggedEvent { String[] tags T originalEvent } Was trying a few different things but not sure how to do it. Doesn't seem like i can use TypeHint as i need to know the actual generics class for it, right? Do i need a TaggedEventTypeFactory? If so, how do i create the TaggedEventTypeInfo for it? do you have an example for it? was trying to follow this[1] but doesn't seem to really work. I'm getting null as my genericParameter for some reason. Also, how would you create the serializer for the type info? can i reuse some builtin Kryo functionality? Thanks [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote: > Thanks Fabian, > > I'm looking into a way to enrich it without having to know the internal > fields of the original event type. > Right now what I managed to do is to map Car into a TaggedEvent prior > to the SQL query, tags being empty, then run the SQL query selecting *origin, > enrich(.. ) as tags* > Not sure there's a better way but i guess that works > > > > On Thu, May 9, 2019 at 12:50 AM Fabian Hueske wrote: > >> Hi, >> >> you can use the value construction function ROW to create a nested row >> (or object). >> However, you have to explicitly reference all attributes that you will >> add. >> >> If you have a table Cars with (year, modelName) a query could look like >> this: >> >> SELECT >> ROW(year, modelName) AS car, >> enrich(year, modelName) AS tags >> FROM Cars; >> >> Handling many attributes is always a bit painful in SQL. >> There is an effort to make the Table API easier to use for these use >> cases (for example Column Operations [1]). >> >> Best, Fabian >> >> [1] https://issues.apache.org/jira/browse/FLINK-11967 >> >> >> >> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 < >> shahar.kobrin...@gmail.com>: >> >>> Just to be more clear on my goal - >>> Im trying to enrich the incoming stream with some meaningful tags based >>> on >>> conditions from the event itself. >>> So the input stream could be an event looks like: >>> Class Car { >>> int year; >>> String modelName; >>> } >>> >>> i will have a config that are defining tags as: >>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0" >>> >>> So ideally my output will be in the structure of >>> >>> Class TaggedEvent { >>>Car origin; >>>String[] tags; >>> } >>> >>> >>> >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>
Re: Reconstruct object through partial select query
Thanks Fabian, I'm looking into a way to enrich it without having to know the internal fields of the original event type. Right now what I managed to do is to map Car into a TaggedEvent prior to the SQL query, tags being empty, then run the SQL query selecting *origin, enrich(.. ) as tags* Not sure there's a better way but i guess that works On Thu, May 9, 2019 at 12:50 AM Fabian Hueske wrote: > Hi, > > you can use the value construction function ROW to create a nested row (or > object). > However, you have to explicitly reference all attributes that you will add. > > If you have a table Cars with (year, modelName) a query could look like > this: > > SELECT > ROW(year, modelName) AS car, > enrich(year, modelName) AS tags > FROM Cars; > > Handling many attributes is always a bit painful in SQL. > There is an effort to make the Table API easier to use for these use cases > (for example Column Operations [1]). > > Best, Fabian > > [1] https://issues.apache.org/jira/browse/FLINK-11967 > > > > Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 < > shahar.kobrin...@gmail.com>: > >> Just to be more clear on my goal - >> Im trying to enrich the incoming stream with some meaningful tags based on >> conditions from the event itself. >> So the input stream could be an event looks like: >> Class Car { >> int year; >> String modelName; >> } >> >> i will have a config that are defining tags as: >> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0" >> >> So ideally my output will be in the structure of >> >> Class TaggedEvent { >>Car origin; >>String[] tags; >> } >> >> >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >
Re: Schema Evolution on Dynamic Schema
That makes sense Fabian! So I want to make sure I fully understand how this should look. Would the same expression look like: custom_groupby(my_group_fields, map[ 'a', sum(a)...]) ? Will I be able to use the builtin aggregation function internally such as sum/avg etc? or would I need to reimplement all such functions? In terms of schema evolution, if these are implemented as a map state, will I be OK as new items are added to that map? Thanks again, and congrats on an awesome conference, I had learned a lot Shahar From: Fabian Hueske Sent: Monday, April 8, 02:54 Subject: Re: Schema Evolution on Dynamic Schema To: Shahar Cizer Kobrinsky Cc: Rong Rong, user Hi Shahar, Sorry for the late response. The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator. The optimizer is converting the plan into an aggregation operator that computes all aggregates followed by a projection that inserts the aggregation results into a MAP type. The problem is the state of the aggregation operator. By adding a new field to the map, the state of the operator changes and you cannot restore it. The only workaround that I can think of would be to implement a user-defined aggregation function [1] that performs all aggregations internally and manually maintain state compatibility for the accumulator of the UDAGG. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#aggregation-functions Am Do., 28. März 2019 um 22:28 Uhr schrieb Shahar Cizer Kobrinsky mailto:shahar.kobrin...@gmail.com>>: Hmm kinda stuck here. Seems like SQL Group by is translated to a GroupAggProcessFunction which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by .. ? On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske mailto:fhue...@gmail.com>> wrote: Hi, I think this would work. However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]). This includes previous versions of keys. Also note that we are not guaranteeing savepoint compatibility across Flink versions yet. If the state of the aggregation operator changes in a later version (say Flink 1.9.x), it might not be possible to migrate to a later Flink version. Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky mailto:shahar.kobrin...@gmail.com>>: My bad. it actually did work with Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map group by a do you think thats OK as a workaround? main schema should be changed that way - only keys in the map On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky mailto:shahar.kobrin...@gmail.com>> wrote: Thanks Fabian, Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite. Considering the following original SQL in which "metrics" can be added/deleted/renamed Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c Group by a im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me. Trying something like Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map group by a results with "Non-query expression encountered in illegal context" is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it? Thanks! On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske mailto:fhue...@gmail.com>> wrote: Hi, Restarting a changed query from a savepoint is currently not supported. In general this is a very difficult problem as new queries might result in completely different execution plans. The special case of adding and removing aggregates is easier to solve, but the schema of the stored state changes and we would need to analyze the previous and current query and generate compatible serializers. So far we did not explore this rabbit hole. Also, starting a different query from a savepoint can also lead to weird result semantics. I'd recommend to bootstrap the state of the new query from scatch. Best, Fabian Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky mailto:shahar.kobrin...@gmail.com>>: Or is it the SQL state that is incompatible.. ? On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky ma
Re: Schema Evolution on Dynamic Schema
Hmm kinda stuck here. Seems like SQL Group by is translated to a *GroupAggProcessFunction* which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing *SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by .. *? On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske wrote: > Hi, > > I think this would work. > However, you should be aware that all keys are kept forever in state > (unless you configure idle state retention time [1]). > This includes previous versions of keys. > > Also note that we are not guaranteeing savepoint compatibility across > Flink versions yet. > If the state of the aggregation operator changes in a later version (say > Flink 1.9.x), it might not be possible to migrate to a later Flink version. > Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided. > > Best, > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time > > Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky < > shahar.kobrin...@gmail.com>: > >> My bad. it actually did work with >> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map >> group by a >> >> do you think thats OK as a workaround? main schema should be changed that >> way - only keys in the map >> >> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky < >> shahar.kobrin...@gmail.com> wrote: >> >>> Thanks Fabian, >>> >>> Im thinking about how to work around that issue and one thing that came >>> to my mind is to create a map that holds keys & values that can be edited >>> without changing the schema, though im thinking how to implement it in >>> Calcite. >>> Considering the following original SQL in which "metrics" can be >>> added/deleted/renamed >>> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c >>> Group by a >>> >>> im looking both at json_objectagg & map to change it but it seems that >>> json_objectagg is on a later calcite version and map doesnt work for me. >>> Trying something like >>> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as >>> metric_map >>> group by a >>> >>> results with "Non-query expression encountered in illegal context" >>> is my train of thought the right one? if so, do i have a mistake in the >>> way im trying to implement it? >>> >>> Thanks! >>> >>> >>> >>> >>> >>> >>> >>> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske wrote: >>> >>>> Hi, >>>> >>>> Restarting a changed query from a savepoint is currently not supported. >>>> In general this is a very difficult problem as new queries might result >>>> in completely different execution plans. >>>> The special case of adding and removing aggregates is easier to solve, >>>> but the schema of the stored state changes and we would need to analyze the >>>> previous and current query and generate compatible serializers. >>>> So far we did not explore this rabbit hole. >>>> >>>> Also, starting a different query from a savepoint can also lead to >>>> weird result semantics. >>>> I'd recommend to bootstrap the state of the new query from scatch. >>>> >>>> Best, Fabian >>>> >>>> >>>> >>>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky < >>>> shahar.kobrin...@gmail.com>: >>>> >>>>> Or is it the SQL state that is incompatible.. ? >>>>> >>>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < >>>>> shahar.kobrin...@gmail.com> wrote: >>>>> >>>>>> Thanks Guys, >>>>>> >>>>>> I actually got an error now adding some fields into the select >>>>>> statement: >>>>>> >>>>>> java.lang.RuntimeException: Error while getting state >>>>>> at >>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >>>>>> at >>>>>> org.apache.flink.streaming.api.operato
Re: Calcite SQL Map to Pojo Map
Based on this discussion http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/HashMap-HashSet-Serialization-Issue-td10899.html this seems by design that HashMap/Map are handled as GenericTypes . However that doesn't work with the query result table schema which generates a Map type. On Thu, Mar 28, 2019 at 10:32 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote: > Hey Rong, > > I don't think this is about a UDF, i reproduce the same exception with a > simple map['a','b'] where the Pojo has a Map property > btw for the UDF i'm already doing it (clazz is based on the specific map > im creating): > > @Override > public TypeInformation getResultType(Class[] signature) { > return Types.MAP(Types.STRING, TypeInformation.of(clazz)); > } > > The table schema looks good but looking at the PojoTypeInfo fields the Map > field is a GenericType - this causes the exception to be thrown on > TableEnvironment.generateRowConverterFunction > > > On Thu, Mar 28, 2019 at 8:56 AM Rong Rong wrote: > >> If your conversion is done using a UDF you need to override the >> getResultType method [1] to explicitly specify the key and value type >> information. As generic erasure will not preseve the part >> of your code. >> >> Thanks, >> Rong >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions >> >> On Wed, Mar 27, 2019 at 10:14 AM shkob1 >> wrote: >> >>> Im trying to convert a SQL query that has a select map[..] into a pojo >>> with >>> Map (using tableEnv.toRestractedStream ) >>> It seems to fail when the field requestedTypeInfo is GenericTypeInfo with >>> GenericType while the field type itself is MapTypeInfo >>> with >>> Map >>> >>> >>> Exception in thread "main" org.apache.flink.table.api.TableException: >>> Result >>> field does not match requested type. Requested: >>> GenericType; >>> Actual: Map >>> >>> Any suggestion? >>> Shahar >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>
Re: Calcite SQL Map to Pojo Map
Hey Rong, I don't think this is about a UDF, i reproduce the same exception with a simple map['a','b'] where the Pojo has a Map property btw for the UDF i'm already doing it (clazz is based on the specific map im creating): @Override public TypeInformation getResultType(Class[] signature) { return Types.MAP(Types.STRING, TypeInformation.of(clazz)); } The table schema looks good but looking at the PojoTypeInfo fields the Map field is a GenericType - this causes the exception to be thrown on TableEnvironment.generateRowConverterFunction On Thu, Mar 28, 2019 at 8:56 AM Rong Rong wrote: > If your conversion is done using a UDF you need to override the > getResultType method [1] to explicitly specify the key and value type > information. As generic erasure will not preseve the part > of your code. > > Thanks, > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions > > On Wed, Mar 27, 2019 at 10:14 AM shkob1 > wrote: > >> Im trying to convert a SQL query that has a select map[..] into a pojo >> with >> Map (using tableEnv.toRestractedStream ) >> It seems to fail when the field requestedTypeInfo is GenericTypeInfo with >> GenericType while the field type itself is MapTypeInfo with >> Map >> >> >> Exception in thread "main" org.apache.flink.table.api.TableException: >> Result >> field does not match requested type. Requested: >> GenericType; >> Actual: Map >> >> Any suggestion? >> Shahar >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >
Re: Map UDF : The Nothing type cannot have a serializer
Thanks Rong, Yes for some reason i thought i need a table function, but scalar works! Yes the map constructor is what i started with but then figured it doesn't support conditional entries. On Thu, Mar 21, 2019 at 6:07 PM Rong Rong wrote: > Based on what I saw in the implementation, I think you meant to implement > a ScalarFunction right? since you are only trying to structure a VarArg > string into a Map. > > If my understanding was correct. I think the Map constructor[1] is > something you might be able to leverage. It doesn't resolve your > Nullability issue though. > Otherwise you can use the Scalar UDF [2] > > -- > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/functions.html#value-construction-functions > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions > > > > On Thu, Mar 21, 2019 at 5:02 PM shkob1 wrote: > >> Looking further into the RowType it seems like this field is translated >> as a >> CURSOR rather than a map.. not sure why >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >
Re: Schema Evolution on Dynamic Schema
My bad. it actually did work with Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map group by a do you think thats OK as a workaround? main schema should be changed that way - only keys in the map On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote: > Thanks Fabian, > > Im thinking about how to work around that issue and one thing that came to > my mind is to create a map that holds keys & values that can be edited > without changing the schema, though im thinking how to implement it in > Calcite. > Considering the following original SQL in which "metrics" can be > added/deleted/renamed > Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c > Group by a > > im looking both at json_objectagg & map to change it but it seems that > json_objectagg is on a later calcite version and map doesnt work for me. > Trying something like > Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map > group by a > > results with "Non-query expression encountered in illegal context" > is my train of thought the right one? if so, do i have a mistake in the > way im trying to implement it? > > Thanks! > > > > > > > > On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske wrote: > >> Hi, >> >> Restarting a changed query from a savepoint is currently not supported. >> In general this is a very difficult problem as new queries might result >> in completely different execution plans. >> The special case of adding and removing aggregates is easier to solve, >> but the schema of the stored state changes and we would need to analyze the >> previous and current query and generate compatible serializers. >> So far we did not explore this rabbit hole. >> >> Also, starting a different query from a savepoint can also lead to weird >> result semantics. >> I'd recommend to bootstrap the state of the new query from scatch. >> >> Best, Fabian >> >> >> >> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky < >> shahar.kobrin...@gmail.com>: >> >>> Or is it the SQL state that is incompatible.. ? >>> >>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < >>> shahar.kobrin...@gmail.com> wrote: >>> >>>> Thanks Guys, >>>> >>>> I actually got an error now adding some fields into the select >>>> statement: >>>> >>>> java.lang.RuntimeException: Error while getting state >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) >>>> at >>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>>> at >>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: org.apache.flink.util.StateMigrationException: For heap >>>> backends, the new state serializer must not be incompatible. >>>> at >>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) >>>> at >>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) >>>> at >>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >>>> at >>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(Abstract
Re: Schema Evolution on Dynamic Schema
Thanks Fabian, Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite. Considering the following original SQL in which "metrics" can be added/deleted/renamed Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c Group by a im looking both at json_objectagg & map to change it but it seems that json_objectagg is on a later calcite version and map doesnt work for me. Trying something like Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as metric_map group by a results with "Non-query expression encountered in illegal context" is my train of thought the right one? if so, do i have a mistake in the way im trying to implement it? Thanks! On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske wrote: > Hi, > > Restarting a changed query from a savepoint is currently not supported. > In general this is a very difficult problem as new queries might result in > completely different execution plans. > The special case of adding and removing aggregates is easier to solve, but > the schema of the stored state changes and we would need to analyze the > previous and current query and generate compatible serializers. > So far we did not explore this rabbit hole. > > Also, starting a different query from a savepoint can also lead to weird > result semantics. > I'd recommend to bootstrap the state of the new query from scatch. > > Best, Fabian > > > > Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky < > shahar.kobrin...@gmail.com>: > >> Or is it the SQL state that is incompatible.. ? >> >> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < >> shahar.kobrin...@gmail.com> wrote: >> >>> Thanks Guys, >>> >>> I actually got an error now adding some fields into the select statement: >>> >>> java.lang.RuntimeException: Error while getting state >>> at >>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >>> at >>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) >>> at >>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) >>> at >>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>> at >>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: org.apache.flink.util.StateMigrationException: For heap >>> backends, the new state serializer must not be incompatible. >>> at >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) >>> at >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) >>> at >>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >>> at >>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) >>> at >>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) >>> at >>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) >>> at >>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) >>> at >>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >>> ... 9 more >>> >>> Does that mean i should move from having a Pojo storing the result of >>> the SQL retracted stream to Avro? trying to understand how to mitigate it. >>> >>> Thanks >>> >>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong wrote: >>> >>>> Hi Shahar, >>>> >>>> From my understanding, if you use "groupby" withAgg
Re: Schema Evolution on Dynamic Schema
Or is it the SQL state that is incompatible.. ? On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote: > Thanks Guys, > > I actually got an error now adding some fields into the select statement: > > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) > at > org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.StateMigrationException: For heap > backends, the new state serializer must not be incompatible. > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) > at > org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) > at > org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) > at > org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) > ... 9 more > > Does that mean i should move from having a Pojo storing the result of the > SQL retracted stream to Avro? trying to understand how to mitigate it. > > Thanks > > On Sat, Mar 9, 2019 at 4:41 PM Rong Rong wrote: > >> Hi Shahar, >> >> From my understanding, if you use "groupby" withAggregateFunctions, they >> save the accumulators to SQL internal states: which are invariant from your >> input schema. Based on what you described I think that's why it is fine for >> recovering from existing state. >> I think one confusion you might have is the "toRetractStream" syntax. >> This actually passes the "retracting" flag to the Flink planner to indicate >> how the DataStream operator gets generated based on your SQL. >> >> So in my understanding, there's really no "state" associated with the >> "retracting stream", but rather associated with the generated operators. >> However, I am not expert in Table/SQL state recovery: I recall there were >> an open JIRA[1] that might be related to your question regarding SQL/Table >> generated operator recovery. Maybe @Fabian can provide more insight here? >> >> Regarding the rest of the pipeline, both "filter" and "map" operators are >> stateless; and sink state recovery depends on what you do. >> >> -- >> Rong >> >> [1] https://issues.apache.org/jira/browse/FLINK-6966 >> >> On Fri, Mar 8, 2019 at 12:07 PM shkob1 >> wrote: >> >>> Thanks Rong, >>> >>> I have made some quick test changing the SQL select (adding a select >>> field >>> in the middle) and reran the job from a savepoint and it worked without >>> any >>> errors. I want to make sure i understand how at what point the state is >>> stored and how does it work. >>> >>> Let's simplify the scenario and forget my specific case of dynamically >>> generated pojo. let's focus on generic steps of: >>> Source->register table->SQL select and group by session->retracted stream >>> (Row)->transformToPojo (Custom Map function) ->pushToSink >>> >>> And let's assume the SQL select is changed (a field is added somewhere in >>> the middle of the select field). >>> So: >>> We had intermediate results that are in the old format that are loaded >>> from >>> state to the new Row object in the retracted stream. is that an accurate >>> statement? at what operator/format is the state stored in this case? is >>> it >>> the SQL result/Row? is it the Pojo? as this scenario does not fail for >>> me im >>> trying to understand how/where it is handled in Flink? >>> >>> >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>
Re: Schema Evolution on Dynamic Schema
Thanks Guys, I actually got an error now adding some fields into the select statement: java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) at org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.StateMigrationException: For heap backends, the new state serializer must not be incompatible. at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 9 more Does that mean i should move from having a Pojo storing the result of the SQL retracted stream to Avro? trying to understand how to mitigate it. Thanks On Sat, Mar 9, 2019 at 4:41 PM Rong Rong wrote: > Hi Shahar, > > From my understanding, if you use "groupby" withAggregateFunctions, they > save the accumulators to SQL internal states: which are invariant from your > input schema. Based on what you described I think that's why it is fine for > recovering from existing state. > I think one confusion you might have is the "toRetractStream" syntax. This > actually passes the "retracting" flag to the Flink planner to indicate how > the DataStream operator gets generated based on your SQL. > > So in my understanding, there's really no "state" associated with the > "retracting stream", but rather associated with the generated operators. > However, I am not expert in Table/SQL state recovery: I recall there were > an open JIRA[1] that might be related to your question regarding SQL/Table > generated operator recovery. Maybe @Fabian can provide more insight here? > > Regarding the rest of the pipeline, both "filter" and "map" operators are > stateless; and sink state recovery depends on what you do. > > -- > Rong > > [1] https://issues.apache.org/jira/browse/FLINK-6966 > > On Fri, Mar 8, 2019 at 12:07 PM shkob1 wrote: > >> Thanks Rong, >> >> I have made some quick test changing the SQL select (adding a select field >> in the middle) and reran the job from a savepoint and it worked without >> any >> errors. I want to make sure i understand how at what point the state is >> stored and how does it work. >> >> Let's simplify the scenario and forget my specific case of dynamically >> generated pojo. let's focus on generic steps of: >> Source->register table->SQL select and group by session->retracted stream >> (Row)->transformToPojo (Custom Map function) ->pushToSink >> >> And let's assume the SQL select is changed (a field is added somewhere in >> the middle of the select field). >> So: >> We had intermediate results that are in the old format that are loaded >> from >> state to the new Row object in the retracted stream. is that an accurate >> statement? at what operator/format is the state stored in this case? is it >> the SQL result/Row? is it the Pojo? as this scenario does not fail for me >> im >> trying to understand how/where it is handled in Flink? >> >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >
Re: Schema Evolution on Dynamic Schema
Thanks for the response Rong. Would be happy to clarify more. So there are two possible changes that could happen: 1. There could be a change in the incoming source schema. Since there's a deserialization phase here (JSON -> Pojo) i expect a couple of options. Backward compatible changes to the JSON should not have an impact (as the Pojo is the same), however we might want to change the Pojo which i believe is a state evolving action. I do want to migrate the Pojo to Avro - will that suffice for Schema evolution feature to work? 2. The other possible change is the SQL select fields change, as mention someone could add/delete/change-order another field to the SQL Select. I do see this as an issue per the way i transform the Row object to the dynamically generated class. This is done today through indices of the class fields and the ones of the Row object. This seems like an issue for when for example a select field is added in the middle and now there's an older Row which fields order is not matching the (new) generated Class fields order. I'm thinking of how to solve that one - i imagine this is not something the schema evolution feature can solve (am i right?). im thinking on whether there is a way i can transform the Row object to my generated class by maybe the Row's column names corresponding to the generated class field names, though i don't see Row object has any notion of column names. Would love to hear your thoughts. If you want me to paste some code here i can do so. Shahar On Thu, Mar 7, 2019 at 10:40 AM Rong Rong wrote: > Hi Shahar, > > I wasn't sure which schema are you describing that is going to "evolve" > (is it the registered_table? or the output sink?). It will be great if you > can clarify more. > > For the example you provided, IMO it is more considered as logic change > instead of schema evolution: > - if you are changing max(c) to max(d) in your query. I don't think this > qualifies as schema evolution. > - if you are adding another column "max(d)" to your query along with your > existing "max(c)" that might be considered as a backward compatible change. > However, either case you will have to restart your logic, you can also > consult how state schema evolution [1], and there are many other problems > that can be tricky as well[2,3]. > > Thanks, > Rong > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html > [2] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-operator-schema-evolution-savepoint-deserialization-fail-td23079.html > [3] > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepointing-with-Avro-Schema-change-td19290.html#a19293 > > > On Wed, Mar 6, 2019 at 12:52 PM shkob1 wrote: > >> Hey, >> >> My job is built on SQL that is injected as an input to the job. so lets >> take >> an example of >> >> Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a >> >> (side note: in order for the state not to grow indefinitely i'm >> transforming >> to a retracted stream and filtering based on a custom trigger) >> >> In order to get the output as a Json format i basically created a way to >> dynamically generate a class and registering it to the class loader, so >> when >> transforming to the retracted stream im doing something like: >> >> Table result = tableEnv.sqlQuery(sqlExpression); >> tableEnv.toRetractStream(result, Row.class, config) >> .filter(tuple -> tuple.f0) >> .map(new RowToDynamicClassMapper(sqlSelectFields)) >> .addSink(..) >> >> This actually works pretty good (though i do need to make sure to register >> the dynamic class to the class loader whenever the state is loaded) >> >> Im now looking into "schema evolution" - which basically means what >> happens >> when the query is changed (say max(c) is removed, and maybe max(d) is >> added). I dont know if that fits the classic "schema evolution" feature or >> should that be thought about differently. Would be happy to get some >> thoughts. >> >> Thanks! >> >> >> >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >
Re: Dynamically Generated Classes - Cannot load user class
I have with no luck. I wonder though - do I need to load it only in the map function? I tried to add it in the open method of the sink function and the process function I have there too cause they still are using the type.. still no good. is there any way. is there a way of knowing which operator fails? From: Hequn Cheng Sent: Monday, October 22, 2018 6:33:52 PM To: Shahar Cizer Kobrinsky Cc: user Subject: Re: Dynamically Generated Classes - Cannot load user class Hi shkob > i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to > use for Byte Buddy - but doesnt seem to be enough. >From the log, it seems that the user class can not be found in the classloader. Cannot load user class: commodel.MyGeneratedClass Have you ever tried Thread.currentThread().getContextClassLoader(), which should have the user-code ClassLoader. Best, Hequn On Tue, Oct 23, 2018 at 5:47 AM shkob1 mailto:shahar.kobrin...@gmail.com>> wrote: Hey, I'm trying to run a job which uses a dynamically generated class (through Byte Buddy). think of me having a complex schema as yaml text and generating a class from it. Throughout the job i am using an artificial super class (MySuperClass) of the generated class (as for example i need to specify the generic class to extend RichMapFunction). MyRichMapFunction extends RichMapFunction is introducing the dynamic class. It will take the yaml in the CTOR and: 1. open - takes the schema and converts it into a Pojo class which extends MySuperClass 2. getProducedType - does the same thing in order to correctly send the Pojo with all the right fields So basically my job is something like env.addSource([stream of pojos]) .filter(...) ... (register table, running a query which generates Rows) .map(myRichMapFunction) .returns(myRichMapFunction.getProducedType) .addSink(...) My trouble now is that, when running on a cluster the classloader fails to load my generated class. i tried to use getRuntimeContext().getUserCodeClassLoader() as the loader to use for Byte Buddy - but doesnt seem to be enough. Was reading about it here: https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html Is there a hook maybe to get called when a job is loaded so i can load the class? Stacktrace: org.apache.flink.client.program.ProgramInvocationException: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: commodel.MyGeneratedClass ClassLoader info: URL ClassLoader: file: '/var/folders/f7/c4pvjrf902b6c73_tbzkxnjwgn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:264) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at com.MainClass.main(MainClass.java:46) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.model.DynamicSchema ClassLoader info: URL ClassLoader: file: '/var/folders/f7/c4pvjrf902b6c73_tbzkxnjwgn/T/blobStore-4b685b0a-b8c1-43a1-a75d-f0b9c0156f4c/job_d1187ea7e783007b92ef6c0597d72fcb/blob_p-38b9e6dce2423b0374f82842a35dcaa92e10dedd-6f1056ab61afcccb3c1fca895ccb3eb0' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOp
Re: Custom Trigger + SQL Pattern
Thanks for the answer Hequn! To be honest im still trying to wrap my head around this solution, also trying to think whether it has advantages over my solution. My original thought was that my design is "backwards" because logically i would want to 1. collect raw records 2. partition them by session id to windows 3. wait for the session to end (=accumulate) 4. run the group by & aggregation on the ended session's row - emit the result my solution though (and I assume yours too) is rather doing the aggregation for every record coming in, which seems wasteful. It doesn't have any benefit of storage on state as it's emitting to a retractable stream anyway What do you think? Shahar On Tue, Oct 16, 2018 at 7:02 PM Hequn Cheng wrote: > Hi Shahar, > > The table function takes a single row but can output multi rows. You can > split the row based on the "last" event. The code looks like: > > val sessionResult = >> "SELECT " + >> " lastUDAF(line) AS lastEvents " >> "FROM MyTable " + >> "GROUP BY SESSION(rowtime, INTERVAL '4' HOUR)" >> val result = >> s"SELECT lastEvent FROM ($sessionResult), LATERAL >> TABLE(splitUDTF(lastEvents)) >> as T(lastEvent)" > > > The lastUDAF is used to process data in a session window. As your > lastEvent is base on either window end or a special "last" event, the > lastUDAF outputs multi last events. > After the window, we perform a splitUDTF to split the lastEvents to multi > single events. > > Best, Hequn > > > On Wed, Oct 17, 2018 at 12:38 AM Shahar Cizer Kobrinsky < > shahar.kobrin...@gmail.com> wrote: > >> Im wondering how does that work, it seems that a table function still >> takes a single row's values as an input, am i wrong (or at least that is >> how the examples show)? >> How would the SQL look like? >> >> On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng wrote: >> >>> Hi shkob1, >>> >>> > while one is time(session inactivity) the other is based on a specific >>> event marked as a "last" event. >>> How about using a session window and an udtf[1] to solve the problem. >>> The session window may output multi `last` elements. However, we can use a >>> udtf to split them into single ones. Thus, we can use SQL for the whole job. >>> >>> Best, Hequn. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions >>> >>> On Sat, Oct 13, 2018 at 2:28 AM shkob1 >>> wrote: >>> >>>> Hey! >>>> >>>> I have a use case in which im grouping a stream by session id - so far >>>> pretty standard, note that i need to do it through SQL and not by the >>>> table >>>> api. >>>> In my use case i have 2 trigger conditions though - while one is time >>>> (session inactivity) the other is based on a specific event marked as a >>>> "last" event. >>>> AFAIK SQL does not support custom triggers - so what i end up doing is >>>> doing >>>> group by in the SQL - then converting the result to a stream along with >>>> a >>>> boolean field that marks whether at least one of the events was the end >>>> event - then adding my custom trigger on top of it. >>>> It looks something like this: >>>> >>>> Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent), >>>> sessionId, count(*) FROM source Group By sessionId"); >>>> tableEnv.toRetractStream(result, Row.class, streamQueryConfig) >>>> .filter(tuple -> tuple.f0) >>>> .map(...) >>>> .returns(...) >>>> .keyBy("sessionId") >>>> .window(EventTimeSessionWindows.withGap(Time.hours(4))) >>>> .trigger(new SessionEndedByTimeOrEndTrigger()) >>>> .process(...take last element from the group by >>>> result..) >>>> >>>> This seems like a weird work around to, isn't it? my window is >>>> basically of >>>> the SQL result rather than on the source stream. Ideally i would keyby >>>> the >>>> sessionId before running the SQL but then a) would I need to register a >>>> table per key? b) would i be able to use the custom trigger per window? >>>> >>>> basically i want to group by session id and have a window for every >>>> session >>>> that supports both time and custom trigger. Assuming i need to use SQL >>>> (reason is the query is dynamically loaded), is there a better solution >>>> for >>>> it? >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>>
Re: Custom Trigger + SQL Pattern
Im wondering how does that work, it seems that a table function still takes a single row's values as an input, am i wrong (or at least that is how the examples show)? How would the SQL look like? On Fri, Oct 12, 2018 at 9:15 PM Hequn Cheng wrote: > Hi shkob1, > > > while one is time(session inactivity) the other is based on a specific > event marked as a "last" event. > How about using a session window and an udtf[1] to solve the problem. The > session window may output multi `last` elements. However, we can use a udtf > to split them into single ones. Thus, we can use SQL for the whole job. > > Best, Hequn. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions > > On Sat, Oct 13, 2018 at 2:28 AM shkob1 wrote: > >> Hey! >> >> I have a use case in which im grouping a stream by session id - so far >> pretty standard, note that i need to do it through SQL and not by the >> table >> api. >> In my use case i have 2 trigger conditions though - while one is time >> (session inactivity) the other is based on a specific event marked as a >> "last" event. >> AFAIK SQL does not support custom triggers - so what i end up doing is >> doing >> group by in the SQL - then converting the result to a stream along with a >> boolean field that marks whether at least one of the events was the end >> event - then adding my custom trigger on top of it. >> It looks something like this: >> >> Table result = tableEnv.sqlQuery("select atLeastOneTrue(lastEvent), >> sessionId, count(*) FROM source Group By sessionId"); >> tableEnv.toRetractStream(result, Row.class, streamQueryConfig) >> .filter(tuple -> tuple.f0) >> .map(...) >> .returns(...) >> .keyBy("sessionId") >> .window(EventTimeSessionWindows.withGap(Time.hours(4))) >> .trigger(new SessionEndedByTimeOrEndTrigger()) >> .process(...take last element from the group by result..) >> >> This seems like a weird work around to, isn't it? my window is basically >> of >> the SQL result rather than on the source stream. Ideally i would keyby the >> sessionId before running the SQL but then a) would I need to register a >> table per key? b) would i be able to use the custom trigger per window? >> >> basically i want to group by session id and have a window for every >> session >> that supports both time and custom trigger. Assuming i need to use SQL >> (reason is the query is dynamically loaded), is there a better solution >> for >> it? >> >> >> >> >> >> >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >
Re: Fire and Purge with Idle State
Thanks! On Fri, Oct 12, 2018 at 9:29 PM Hequn Cheng wrote: > Hi shkob1, > > Currently, the idle state retention time is only used for unbounded > operators in sql/table-api. The unbounded operators include non-window > group by, non-window join, unbounded over, etc. The retention time affects > neither sql/table-api window operators nor DataStream operators. > > Best, Hequn > > On Sat, Oct 13, 2018 at 2:40 AM shkob1 wrote: > >> Hey >> >> Say im aggregating an event stream by sessionId in SQL and im emitting the >> results once the session is "over", i guess i should be using Fire and >> Purge >> - i dont expect to need to session data once over. How should i treat the >> Idle state retention time - is it needed at all if im using purge? will it >> become relevant only if a session is both never-ending AND never has more >> records? >> >> Thanks! >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >> >