Hmm kinda stuck here. Seems like SQL Group by is translated to a
*GroupAggProcessFunction* which stores a state for every aggregation
element (thus flattening the map items for state store). Seems like there's
no way around it. Am i wrong? is there any way to evolve the map elements
when doing *SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by ..  *?

On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> I think this would work.
> However, you should be aware that all keys are kept forever in state
> (unless you configure idle state retention time [1]).
> This includes previous versions of keys.
>
> Also note that we are not guaranteeing savepoint compatibility across
> Flink versions yet.
> If the state of the aggregation operator changes in a later version (say
> Flink 1.9.x), it might not be possible to migrate to a later Flink version.
> Compatibility for bugfix releases (1.8.0 -> 1.8.1) is of course provided.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/streaming/query_configuration.html#idle-state-retention-time
>
> Am Di., 19. März 2019 um 22:27 Uhr schrieb Shahar Cizer Kobrinsky <
> shahar.kobrin...@gmail.com>:
>
>> My bad. it actually did work with
>> Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map
>> group by a
>>
>> do you think thats OK as a workaround? main schema should be changed that
>> way - only keys in the map
>>
>> On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky <
>> shahar.kobrin...@gmail.com> wrote:
>>
>>> Thanks Fabian,
>>>
>>> Im thinking about how to work around that issue and one thing that came
>>> to my mind is to create a map that holds keys & values that can be edited
>>> without changing the schema, though im thinking how to implement it in
>>> Calcite.
>>> Considering the following original SQL in which "metrics" can be
>>> added/deleted/renamed
>>> Select a, sum(b) as metric_sum_c ,sum(c) as metric_sum_c
>>> Group by a
>>>
>>> im looking both at json_objectagg & map to change it but it seems that
>>> json_objectagg is on a later calcite version and map doesnt work for me.
>>> Trying something like
>>> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as
>>> metric_map
>>> group by a
>>>
>>> results with "Non-query expression encountered in illegal context"
>>> is my train of thought the right one? if so, do i have a mistake in the
>>> way im trying to implement it?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Mar 19, 2019 at 2:03 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Restarting a changed query from a savepoint is currently not supported.
>>>> In general this is a very difficult problem as new queries might result
>>>> in completely different execution plans.
>>>> The special case of adding and removing aggregates is easier to solve,
>>>> but the schema of the stored state changes and we would need to analyze the
>>>> previous and current query and generate compatible serializers.
>>>> So far we did not explore this rabbit hole.
>>>>
>>>> Also, starting a different query from a savepoint can also lead to
>>>> weird result semantics.
>>>> I'd recommend to bootstrap the state of the new query from scatch.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> Am Mo., 18. März 2019 um 20:02 Uhr schrieb Shahar Cizer Kobrinsky <
>>>> shahar.kobrin...@gmail.com>:
>>>>
>>>>> Or is it the SQL state that is incompatible.. ?
>>>>>
>>>>> On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky <
>>>>> shahar.kobrin...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Guys,
>>>>>>
>>>>>> I actually got an error now adding some fields into the select
>>>>>> statement:
>>>>>>
>>>>>> java.lang.RuntimeException: Error while getting state
>>>>>> at
>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:135)
>>>>>> at
>>>>>> org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.open(GroupAggProcessFunction.scala:74)
>>>>>> at
>>>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.open(LegacyKeyedProcessOperator.java:60)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by: org.apache.flink.util.StateMigrationException: For heap
>>>>>> backends, the new state serializer must not be incompatible.
>>>>>> at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:301)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:341)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:63)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:241)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:290)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
>>>>>> at
>>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>>>> ... 9 more
>>>>>>
>>>>>> Does that mean i should move from having a Pojo storing the result of
>>>>>> the SQL retracted stream to Avro? trying to understand how to mitigate 
>>>>>> it.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Sat, Mar 9, 2019 at 4:41 PM Rong Rong <walter...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Shahar,
>>>>>>>
>>>>>>> From my understanding, if you use "groupby" withAggregateFunctions,
>>>>>>> they save the accumulators to SQL internal states: which are invariant 
>>>>>>> from
>>>>>>> your input schema. Based on what you described I think that's why it is
>>>>>>> fine for recovering from existing state.
>>>>>>> I think one confusion you might have is the "toRetractStream"
>>>>>>> syntax. This actually passes the "retracting" flag to the Flink planner 
>>>>>>> to
>>>>>>> indicate how the DataStream operator gets generated based on your SQL.
>>>>>>>
>>>>>>> So in my understanding, there's really no "state" associated with
>>>>>>> the "retracting stream", but rather associated with the generated
>>>>>>> operators.
>>>>>>> However, I am not expert in Table/SQL state recovery: I recall there
>>>>>>> were an open JIRA[1] that might be related to your question regarding
>>>>>>> SQL/Table generated operator recovery. Maybe @Fabian can provide more
>>>>>>> insight here?
>>>>>>>
>>>>>>> Regarding the rest of the pipeline, both "filter" and "map"
>>>>>>> operators are stateless; and sink state recovery depends on what you do.
>>>>>>>
>>>>>>> --
>>>>>>> Rong
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-6966
>>>>>>>
>>>>>>> On Fri, Mar 8, 2019 at 12:07 PM shkob1 <shahar.kobrin...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Rong,
>>>>>>>>
>>>>>>>> I have made some quick test changing the SQL select (adding a
>>>>>>>> select field
>>>>>>>> in the middle) and reran the job from a savepoint and it worked
>>>>>>>> without any
>>>>>>>> errors. I want to make sure i understand how at what point the
>>>>>>>> state is
>>>>>>>> stored and how does it work.
>>>>>>>>
>>>>>>>> Let's simplify the scenario and forget my specific case of
>>>>>>>> dynamically
>>>>>>>> generated pojo. let's focus on generic steps of:
>>>>>>>> Source->register table->SQL select and group by session->retracted
>>>>>>>> stream
>>>>>>>> (Row)->transformToPojo (Custom Map function) ->pushToSink
>>>>>>>>
>>>>>>>> And let's assume the SQL select is changed (a field is added
>>>>>>>> somewhere in
>>>>>>>> the middle of the select field).
>>>>>>>> So:
>>>>>>>> We had intermediate results that are in the old format that are
>>>>>>>> loaded from
>>>>>>>> state to the new Row object in the retracted stream. is that an
>>>>>>>> accurate
>>>>>>>> statement? at what operator/format is the state stored in this
>>>>>>>> case? is it
>>>>>>>> the SQL result/Row? is it the Pojo? as this scenario does not fail
>>>>>>>> for me im
>>>>>>>> trying to understand how/where it is handled in Flink?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Sent from:
>>>>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>>>>>
>>>>>>>

Reply via email to