Re: Schema Evolution on Dynamic Schema

2019-04-09 Thread Fabian Hueske
an awesome conference, I had learned a lot > Shahar > > From: Fabian Hueske > Sent: Monday, April 8, 02:54 > Subject: Re: Schema Evolution on Dynamic Schema > To: Shahar Cizer Kobrinsky > Cc: Rong Rong, user > > > Hi Shahar, > > Sorry for the late response.

Re: Schema Evolution on Dynamic Schema

2019-04-08 Thread Shahar Cizer Kobrinsky
Evolution on Dynamic Schema To: Shahar Cizer Kobrinsky Cc: Rong Rong, user Hi Shahar, Sorry for the late response. The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator. The optimizer is converting the plan into an aggregation operator that computes all

Re: Schema Evolution on Dynamic Schema

2019-04-08 Thread Fabian Hueske
Hi Shahar, Sorry for the late response. The problem is not with the type of the retract stream, but with the GROUP BY aggregation operator. The optimizer is converting the plan into an aggregation operator that computes all aggregates followed by a projection that inserts the aggregation results

Re: Schema Evolution on Dynamic Schema

2019-03-28 Thread Shahar Cizer Kobrinsky
Hmm kinda stuck here. Seems like SQL Group by is translated to a *GroupAggProcessFunction* which stores a state for every aggregation element (thus flattening the map items for state store). Seems like there's no way around it. Am i wrong? is there any way to evolve the map elements when doing

Re: Schema Evolution on Dynamic Schema

2019-03-26 Thread shkob1
Sorry to flood this thread, but keeping my experiments: so far i've been using retract to a Row and then mapping to a dynamic pojo that is created (using ByteBuddy) according to the select fields in the SQL. Considering the error I'm trying now to remove thr usage in Row and use the dynamic type

Re: Schema Evolution on Dynamic Schema

2019-03-26 Thread shkob1
Debugging locally it seems like the state descriptor of "GroupAggregateState" is creating an additional field (TypleSerializer of SumAccumulator) serializer within the RowSerializer. Im guessing this is what causing incompatibility? Is there any work around i can do? -- Sent from:

Re: Schema Evolution on Dynamic Schema

2019-03-26 Thread shkob1
Hi Fabian, It seems like it didn't work. Let me specify what i have done: i have a SQL that looks something like: Select a, sum(b), map[ 'sum_c', sum(c), 'sum_d', sum(d)] as my_map FROM... GROUP BY a As you said im preventing keys in the state forever by doing idle state retention time (+ im

Re: Schema Evolution on Dynamic Schema

2019-03-20 Thread Fabian Hueske
Hi, I think this would work. However, you should be aware that all keys are kept forever in state (unless you configure idle state retention time [1]). This includes previous versions of keys. Also note that we are not guaranteeing savepoint compatibility across Flink versions yet. If the state

Re: Schema Evolution on Dynamic Schema

2019-03-19 Thread Shahar Cizer Kobrinsky
My bad. it actually did work with Select a, map['sumB', sum(b) ,'sumC' , sum(c) ) as metric_map group by a do you think thats OK as a workaround? main schema should be changed that way - only keys in the map On Tue, Mar 19, 2019 at 11:50 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com>

Re: Schema Evolution on Dynamic Schema

2019-03-19 Thread Shahar Cizer Kobrinsky
Thanks Fabian, Im thinking about how to work around that issue and one thing that came to my mind is to create a map that holds keys & values that can be edited without changing the schema, though im thinking how to implement it in Calcite. Considering the following original SQL in which

Re: Schema Evolution on Dynamic Schema

2019-03-19 Thread Fabian Hueske
Hi, Restarting a changed query from a savepoint is currently not supported. In general this is a very difficult problem as new queries might result in completely different execution plans. The special case of adding and removing aggregates is easier to solve, but the schema of the stored state

Re: Schema Evolution on Dynamic Schema

2019-03-18 Thread Shahar Cizer Kobrinsky
Or is it the SQL state that is incompatible.. ? On Mon, Mar 18, 2019 at 11:44 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote: > Thanks Guys, > > I actually got an error now adding some fields into the select statement: > > java.lang.RuntimeException: Error while getting state > at

Re: Schema Evolution on Dynamic Schema

2019-03-18 Thread Shahar Cizer Kobrinsky
Thanks Guys, I actually got an error now adding some fields into the select statement: java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) at

Re: Schema Evolution on Dynamic Schema

2019-03-09 Thread Rong Rong
Hi Shahar, >From my understanding, if you use "groupby" withAggregateFunctions, they save the accumulators to SQL internal states: which are invariant from your input schema. Based on what you described I think that's why it is fine for recovering from existing state. I think one confusion you

Re: Schema Evolution on Dynamic Schema

2019-03-08 Thread shkob1
Thanks Rong, I have made some quick test changing the SQL select (adding a select field in the middle) and reran the job from a savepoint and it worked without any errors. I want to make sure i understand how at what point the state is stored and how does it work. Let's simplify the scenario and

Re: Schema Evolution on Dynamic Schema

2019-03-08 Thread Rong Rong
Hi Shahar, 1. Are you referring to that the incoming data source is published as JSON and you have a customized Pojo source function / table source that converts it? In that case it is you that maintains the schema evolution support am I correct? For Avro I think you can refer to [1]. 2. If you

Re: Schema Evolution on Dynamic Schema

2019-03-07 Thread Shahar Cizer Kobrinsky
Thanks for the response Rong. Would be happy to clarify more. So there are two possible changes that could happen: 1. There could be a change in the incoming source schema. Since there's a deserialization phase here (JSON -> Pojo) i expect a couple of options. Backward compatible changes

Re: Schema Evolution on Dynamic Schema

2019-03-07 Thread Rong Rong
Hi Shahar, I wasn't sure which schema are you describing that is going to "evolve" (is it the registered_table? or the output sink?). It will be great if you can clarify more. For the example you provided, IMO it is more considered as logic change instead of schema evolution: - if you are

Schema Evolution on Dynamic Schema

2019-03-06 Thread shkob1
Hey, My job is built on SQL that is injected as an input to the job. so lets take an example of Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a (side note: in order for the state not to grow indefinitely i'm transforming to a retracted stream and filtering based on a