Thanks Ron for starting this flip! It will complete the user story for flip-435[1].
Regarding the WorkflowOperation, I have a question about whether we should add Delete/DropWorkflowOperation as well for when the Materialized Table is dropped or refresh mode changed from full to continuous? [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-435%3A+Introduce+a+New+Materialized+Table+for+Simplifying+Data+Pipelines?src=contextnavpagetreemode Best, Lincoln Lee <lorenzo.affe...@ververica.com.invalid> 于2024年4月30日周二 15:37写道: > Hello Ron, thank you for your detailed answers! > > For the Visitor pattern, I thought about it the other way around, so that > operations visit the scheduler, and not vice-versa :) In this way > operations can get the required information in order to be executed in a > tailored way. > > Thank you for your effort, but, as you say: > > furthermore, I think the current does not see the benefits of the time, > simpler instead of better, similar to the design of > CatalogModificationEvent[2] and CatalogModificationListener[3], the > developer only needs instanceof judgment. > > In java, most of the times, `instanceof` is considered an anti-pattern, > that's why I was also thinking about a command pattern (every operations > defines an `execute` method). However, I also understand this part is not > crucial for the FLIP under discussion, and the implementation details can > simply wait for the PRs to come. > > > After discussing with Shengkai offline, there is no need for this REST > API > to support multiple tables to be refreshed at the same time, so it would be > more appropriate to put the materialized table identifier in the path of > the URL, thanks for the suggestion. > > Very good! > > Thank you! > On Apr 29, 2024 at 05:04 +0200, Ron Liu <ron9....@gmail.com>, wrote: > > Hi, Lorenzo > > > > > I have a question there: how can the gateway update the refreshHandler > in > > the Catalog before getting it from the scheduler? > > > > The refreshHandler in CatalogMateriazedTable is null before getting it > from > > the scheduler, you can look at the CatalogMaterializedTable.Builder[1] > for > > more details. > > > > > You have a typo here: WorkflowScheudler -> WorkflowScheduler :) > > > > Fix it now, thanks very much. > > > > > For the operations part, I still think that the FLIP would benefit from > > providing a specific pattern for operations. You could either propose a > > command pattern [1] or a visitor pattern (where the scheduler visits the > > operation to get relevant info) [2] for those operations at your choice. > > > > Thank you for your input, I find it very useful. I tried to understand > your > > thinking through code and implemented the following pseudo code using the > > visitor design pattern: > > 1. first defined WorkflowOperationVisitor, providing several overloaded > > visit methods. > > > > public interface WorkflowOperationVisitor { > > > > <T extends RefreshHandler> T visit(CreateWorkflowOperation > > createWorkflowOperation); > > > > void visit(ModifyWorkflowOperation operation); > > } > > > > 2. then in the WorkflowOperation add the accept method. > > > > @PublicEvolving > > public interface WorkflowOperation { > > > > void accept(WorkflowOperationVisitor visitor); > > } > > > > > > 3. in the WorkflowScheduler call the implementation class of > > WorkflowOperationVisitor, complete the corresponding operations. > > > > I recognize this design pattern purely from a code design point of view, > > but from the point of our specific scenario: > > 1. For CreateWorkflowOperation, the visit method needs to return > > RefreshHandler, for ModifyWorkflowOperation, such as suspend and resume, > > the visit method doesn't need to return RefreshHandler. parameter, > > currently for different WorkflowOperation, > WorkflowOperationVisitor#accept > > can't be unified, so I think visitor may not be applicable here. > > > > 2. In addition, I think using the visitor pattern will add complexity to > > the WorkflowScheduler implementer, which needs to implement one more > > interface WorkflowOperationVisitor, this interface is not for the engine > to > > use, so I don't see any benefit from this design at the moment. > > > > 3. furthermore, I think the current does not see the benefits of the > time, > > simpler instead of better, similar to the design of > > CatalogModificationEvent[2] and CatalogModificationListener[3], the > > developer only needs instanceof judgment. > > > > To summarize, I don't think there is a need to introduce command or > visitor > > pattern at present. > > > > > About the REST API, I will wait for your offline discussion :) > > > > After discussing with Shengkai offline, there is no need for this REST > API > > to support multiple tables to be refreshed at the same time, so it would > be > > more appropriate to put the materialized table identifier in the path of > > the URL, thanks for the suggestion. > > > > [1] > > > https://github.com/apache/flink/blob/e412402ca4dfc438e28fb990dc53ea7809430aee/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java#L264 > > [2] > > > https://github.com/apache/flink/blob/b1544e4e513d2b75b350c20dbb1c17a8232c22fd/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogModificationEvent.java#L28 > > [3] > > > https://github.com/apache/flink/blob/b1544e4e513d2b75b350c20dbb1c17a8232c22fd/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/listener/CatalogModificationListener.java#L31 > > > > Best, > > Ron > > > > Ron Liu <ron9....@gmail.com> 于2024年4月28日周日 23:53写道: > > > > > Hi, Shengkai > > > > > > Thanks for your feedback and suggestion, it looks very useful for this > > > proposal, regarding your question I made the following optimization: > > > > > > > > *WorkflowScheduler* > > > > > 1. How to get the exception details if `modifyRefreshWorkflow` > fails? > > > > > 2. Could you give us an example about how to configure the > scheduler? > > > > > > 1. Added a new WorkflowException, WorkflowScheduler's related method > > > signature will throw WorkflowException, when creating or modifying > Workflow > > > encountered an exception, so that the framework will sense and deal > with it. > > > > > > 2. Added a new Configuration section, introduced a new Option, and > gave an > > > example of how to define the Scheduler in flink-conf.yaml. > > > > > > > > *SQL Gateway* > > > > > 1. SqlGatewayService requires Session as the input, but the REST > API > > > doesn't need any Session information. > > > > > 2. Use "-" instead of "_" in the REST URI and camel case for > fields in > > > request/response > > > > > 3. Do we need scheduleTime and scheduleTimeFormat together? > > > > > > 1. If it is designed as a synchronous API, it may lead to network > jitter, > > > thread resource exhaustion and other problems, which I have not > considered > > > before. The asynchronous API, although increasing the cost of use for > the > > > user, is friendly to the SqlGatewayService, as well as the Client > thread > > > resources. In summary as discussed offline, so I also tend to think > that > > > all APIs of SqlGateway should be unified, and all should be > asynchronous > > > APIs, and bound to session. I have updated the REST API section in > FLIP. > > > > > > 2. thanks for the reminder, it has been updated > > > > > > 3. After rethinking, I think it can indeed be simpler, there is no > need to > > > pass in a custom time format, scheduleTime can be unified to the SQL > > > standard timestamp format: 'yyyy-MM-dd HH:mm:ss', it is able to > satisfy the > > > time related needs of materialized table. > > > > > > Based on your feedback, I have optimized and updated the FLIP related > > > section. > > > > > > Best, > > > Ron > > > > > > > > > Shengkai Fang <fskm...@gmail.com> 于2024年4月28日周日 15:47写道: > > > > > > > > Hi, Liu. > > > > > > > > > > Thanks for your proposal. I have some questions about the FLIP: > > > > > > > > > > *WorkflowScheduler* > > > > > > > > > > 1. How to get the exception details if `modifyRefreshWorkflow` > fails? > > > > > 2. Could you give us an example about how to configure the > scheduler? > > > > > > > > > > *SQL Gateway* > > > > > > > > > > 1. SqlGatewayService requires Session as the input, but the REST > API > > > > > doesn't need any Session information. > > > > > > > > > > From the perspective of a gateway developer, I tend to unify the > API of > > > > > the > > > > > SQL gateway, binding all concepts to the session. On the one hand, > this > > > > > approach allows us to reduce maintenance and understanding costs, > as we > > > > > only need to maintain one set of architecture to complete basic > concepts. > > > > > On the other hand, the benefits of an asynchronous architecture are > > > > > evident: we maintain state on the server side. If the request is a > long > > > > > connection, even in the face of network layer jitter, we can still > find > > > > > the > > > > > original result through session and operation handles. > > > > > > > > > > Using asynchronous APIs may increase the development cost for > users, but > > > > > from a platform perspective, if a request remains in a blocking > state for > > > > > a > > > > > long time, it also becomes a burden on the platform's JVM. This is > because > > > > > thread switching and maintenance require certain resources. > > > > > > > > > > 2. Use "-" instead of "_" in the REST URI and camel case for > fields in > > > > > request/response > > > > > > > > > > Please follow the Flink REST Design. > > > > > > > > > > 3. Do we need scheduleTime and scheduleTimeFormat together? > > > > > > > > > > I think we can use SQL timestamp format or ISO timestamp format. > It is not > > > > > necessary to pass time in any specific format. > > > > > > > > > > https://en.wikipedia.org/wiki/ISO_8601 > > > > > > > > > > Best, > > > > > Shengkai > > > > > > > > >