Dang what a massive PR: Files changed2,118, +104,104 −29,161 lines changed. Thanks for the details, Jark!
On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <imj...@gmail.com> wrote: > Hi Kant, > Having a custom state backend is very difficult and is not recommended. > > Hi Benoît, > Yes, the "Query on the intermediate state is on the roadmap" I > mentioned is referring to integrate Table API & SQL with Queryable State. > We also have an early issue FLINK-6968 to tracks this. > > Best, > Jark > > > On Fri, 24 Jan 2020 at 00:26, Benoît Paris < > benoit.pa...@centraliens-lille.org> wrote: > >> Hi all! >> >> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query >> on the intermediate state is on the roadmap"? >> Are you referring to working on QueryableStateStream/QueryableStateClient >> [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is >> there a FLIP?)? >> >> Cheers >> Ben >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table >> >> >> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <kanth...@gmail.com> wrote: >> >>> Is it a common practice to have a custom state backend? if so, what >>> would be a popular custom backend? >>> >>> Can I do Elasticseatch as a state backend? >>> >>> Thanks! >>> >>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <imj...@gmail.com> wrote: >>> >>>> Hi Kant, >>>> >>>> 1) List of row is also sufficient in this case. Using a MapState is in >>>> order to retract a row faster, and save the storage size. >>>> >>>> 2) State Process API is usually used to process save point. I’m afraid >>>> the performance is not good to use it for querying. >>>> On the other side, AFAIK, State Process API requires the uid of >>>> operator. However, uid of operators is not set in Table API & SQL. >>>> So I’m not sure whether it works or not. >>>> >>>> 3)You can have a custom statebackend by >>>> implement org.apache.flink.runtime.state.StateBackend interface, and use it >>>> via `env.setStateBackend(…)`. >>>> >>>> Best, >>>> Jark >>>> >>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <kanth...@gmail.com> wrote: >>>> >>>>> Hi Jark, >>>>> >>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the >>>>> same joining key right? >>>>> >>>>> 2) Can I use state processor API >>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html> >>>>> from an external application to query the intermediate results in near >>>>> real-time? I thought querying rocksdb state is a widely requested feature. >>>>> It would be really great to consider this feature for 1.11 >>>>> >>>>> 3) Is there any interface where I can implement my own state backend? >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imj...@gmail.com> wrote: >>>>> >>>>>> Hi Kant, >>>>>> >>>>>> 1) Yes, it will be stored in rocksdb statebackend. >>>>>> 2) In old planner, the left state is the same with right state which >>>>>> are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`. >>>>>> It is a 2-level map structure, where the `col1` is the join key, >>>>>> it is the first-level key of the state. The key of the MapState is the >>>>>> input row, >>>>>> and the `count` is the number of this row, the expiredTime >>>>>> indicates when to cleanup this row (avoid infinite state size). You can >>>>>> find the source code here[1]. >>>>>> In blink planner, the state structure will be more complex which >>>>>> is determined by the meta-information of upstream. You can see the source >>>>>> code of blink planner here [2]. >>>>>> 3) Currently, the intermediate state is not exposed to users. >>>>>> Usually, users should write the query result to an external system (like >>>>>> Mysql) and query the external system. >>>>>> Query on the intermediate state is on the roadmap, but I guess it >>>>>> is not in 1.11 plan. >>>>>> >>>>>> Best, >>>>>> Jark >>>>>> >>>>>> [1]: >>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61 >>>>>> [2]: >>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45 >>>>>> >>>>>> >>>>>> 2020年1月21日 18:01,kant kodali <kanth...@gmail.com> 写道: >>>>>> >>>>>> Hi All, >>>>>> >>>>>> If I run a query like this >>>>>> >>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on >>>>>> table1.col1 = table2.col1") >>>>>> >>>>>> 1) Where will flink store the intermediate result? Imagine >>>>>> flink-conf.yaml says state.backend = 'rocksdb' >>>>>> >>>>>> 2) If the intermediate results are stored in rockdb then what is the >>>>>> key and value in this case(given the query above)? >>>>>> >>>>>> 3) What is the best way to query these intermediate results from an >>>>>> external application? while the job is running and while the job is not >>>>>> running? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> >>>>>> >> >> -- >> Benoît Paris >> Ingénieur Machine Learning Explicable >> Tél : +33 6 60 74 23 00 >> http://benoit.paris >> http://explicable.ml >> > -- Benoît Paris Ingénieur Machine Learning Explicable Tél : +33 6 60 74 23 00 http://benoit.paris http://explicable.ml