Hmm kinda stuck here. Seems like SQL Group by is translated to a *GroupAggProcessFunction* which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing *SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by .. *?
On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi, > > I think this would work. > However, you should be aware that all keys are kept forever in state > (unless you configure idle state retention time [1]). > This includes previous versions of keys. > > Also note that we are not guaranteeing savepoint compatibility across > Flink versions yet. > If the state of the aggregation operator changes in a later version (say > Flink 1.9.x), it might not be possible to migrate to a later Flink version. > Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided. > > Best, > Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time > > Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky < > shahar.kobrin...@gmail.com>: > >> My bad. it actually did work with >> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map >> group by a >> >> do you think thats OK as a workaround? main schema should be changed that >> way - only keys in the map >> >> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky < >> shahar.kobrin...@gmail.com> wrote: >> >>> Thanks Fabian, >>> >>> Im thinking about how to work around that issue and one thing that came >>> to my mind is to create a map that holds keys & values that can be edited >>> without changing the schema, though im thinking how to implement it in >>> Calcite. >>> Considering the following original SQL in which "metrics" can be >>> added/deleted/renamed >>> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c >>> Group by a >>> >>> im looking both at json_objectagg & map to change it but it seems that >>> json_objectagg is on a later calcite version and map doesnt work for me. >>> Trying something like >>> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as >>> metric_map >>> group by a >>> >>> results with "Non-query expression encountered in illegal context" >>> is my train of thought the right one? if so, do i have a mistake in the >>> way im trying to implement it? >>> >>> Thanks! >>> >>> >>> >>> >>> >>> >>> >>> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Restarting a changed query from a savepoint is currently not supported. >>>> In general this is a very difficult problem as new queries might result >>>> in completely different execution plans. >>>> The special case of adding and removing aggregates is easier to solve, >>>> but the schema of the stored state changes and we would need to analyze the >>>> previous and current query and generate compatible serializers. >>>> So far we did not explore this rabbit hole. >>>> >>>> Also, starting a different query from a savepoint can also lead to >>>> weird result semantics. >>>> I'd recommend to bootstrap the state of the new query from scatch. >>>> >>>> Best, Fabian >>>> >>>> >>>> >>>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky < >>>> shahar.kobrin...@gmail.com>: >>>> >>>>> Or is it the SQL state that is incompatible.. ? >>>>> >>>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < >>>>> shahar.kobrin...@gmail.com> wrote: >>>>> >>>>>> Thanks Guys, >>>>>> >>>>>> I actually got an error now adding some fields into the select >>>>>> statement: >>>>>> >>>>>> java.lang.RuntimeException: Error while getting state >>>>>> at >>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135) >>>>>> at >>>>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74) >>>>>> at >>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>>>>> at >>>>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >>>>>> at >>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>> Caused by: org.apache.flink.util.StateMigrationException: For heap >>>>>> backends, the new state serializer must not be incompatible. >>>>>> at >>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301) >>>>>> at >>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341) >>>>>> at >>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) >>>>>> at >>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63) >>>>>> at >>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241) >>>>>> at >>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290) >>>>>> at >>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124) >>>>>> at >>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >>>>>> ... 9 more >>>>>> >>>>>> Does that mean i should move from having a Pojo storing the result of >>>>>> the SQL retracted stream to Avro? trying to understand how to mitigate >>>>>> it. >>>>>> >>>>>> Thanks >>>>>> >>>>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote: >>>>>> >>>>>>> Hi Shahar, >>>>>>> >>>>>>> From my understanding, if you use "groupby" withAggregateFunctions, >>>>>>> they save the accumulators to SQL internal states: which are invariant >>>>>>> from >>>>>>> your input schema. Based on what you described I think that's why it is >>>>>>> fine for recovering from existing state. >>>>>>> I think one confusion you might have is the "toRetractStream" >>>>>>> syntax. This actually passes the "retracting" flag to the Flink planner >>>>>>> to >>>>>>> indicate how the DataStream operator gets generated based on your SQL. >>>>>>> >>>>>>> So in my understanding, there's really no "state" associated with >>>>>>> the "retracting stream", but rather associated with the generated >>>>>>> operators. >>>>>>> However, I am not expert in Table/SQL state recovery: I recall there >>>>>>> were an open JIRA[1] that might be related to your question regarding >>>>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more >>>>>>> insight here? >>>>>>> >>>>>>> Regarding the rest of the pipeline, both "filter" and "map" >>>>>>> operators are stateless; and sink state recovery depends on what you do. >>>>>>> >>>>>>> -- >>>>>>> Rong >>>>>>> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966 >>>>>>> >>>>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks Rong, >>>>>>>> >>>>>>>> I have made some quick test changing the SQL select (adding a >>>>>>>> select field >>>>>>>> in the middle) and reran the job from a savepoint and it worked >>>>>>>> without any >>>>>>>> errors. I want to make sure i understand how at what point the >>>>>>>> state is >>>>>>>> stored and how does it work. >>>>>>>> >>>>>>>> Let's simplify the scenario and forget my specific case of >>>>>>>> dynamically >>>>>>>> generated pojo. let's focus on generic steps of: >>>>>>>> Source->register table->SQL select and group by session->retracted >>>>>>>> stream >>>>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink >>>>>>>> >>>>>>>> And let's assume the SQL select is changed (a field is added >>>>>>>> somewhere in >>>>>>>> the middle of the select field). >>>>>>>> So: >>>>>>>> We had intermediate results that are in the old format that are >>>>>>>> loaded from >>>>>>>> state to the new Row object in the retracted stream. is that an >>>>>>>> accurate >>>>>>>> statement? at what operator/format is the state stored in this >>>>>>>> case? is it >>>>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail >>>>>>>> for me im >>>>>>>> trying to understand how/where it is handled in Flink? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Sent from: >>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>>>> >>>>>>>