Flink OpenSearch Connector - Avro JSON to JSON
Hello Team Please helps to answer the below query. 1. OpenSearch supports writing data in the JSON format, but in Flink its default data format is Avro JSON. What is the best practice to write data to Open Search using Flink OpenSearch Connector? Do we need to manually convert Avro JSON to JSON format or is there any in-built support in Flink to write JSON to openSearch. 2. How can I write the existing complex nested JSON data into OpenSearch using the Flink connector rather than writing each key-value(or filed/value). 3. OpenSearch Java Rest Client support writing the Java object directly to the openSearch using the below code. IndexRequest indexRequest =new IndexRequest.Builder().index(index).document(indexData).build(); https://opensearch.org/docs/latest/clients/java/ Thanks !! // Regards Praveen Chandna
Re: Flink Kubernetes operator keeps reporting REST client timeout.
Seems the operator didn't get restarted automatically after the configmap is changed. After a roll-out restart, the exception disappeared. Never mind this issue. Thanks. On Tue, Nov 21, 2023 at 11:31 AM Xiaolong Wang wrote: > Hi, > > Recently I upgraded the flink-kubernetes-operator from 1.4.0 to 1.6.1 to > use Flink 1.18. After that, the operator kept reporting the following > exception: > > 2023-11-21 03:26:50,505 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO >> ][sn-push/sn-push-decision-maker-log-s3-hive-prd] Resource fully >> reconciled, nothing to do... >> >> 2023-11-21 03:26:50,727 o.a.f.r.r.RestClient [WARN >> ][realtime-streaming/realtime-perf-report-main-prd-test] Rest endpoint >> shutdown failed. >> >> java.util.concurrent.TimeoutException >> >> at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown >> Source) >> >> at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source) >> >> at org.apache.flink.runtime.rest.RestClient.shutdown(RestClient.java:227) >> >> at >> org.apache.flink.client.program.rest.RestClusterClient.close(RestClusterClient.java:270) >> >> at >> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getTaskManagersInfo(AbstractFlinkService.java:925) >> >> at >> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getClusterInfo(AbstractFlinkService.java:621) >> >> at >> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeClusterInfo(AbstractFlinkDeploymentObserver.java:85) >> >> at >> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:75) >> >> at >> org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:49) >> >> at >> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:129) >> >> at >> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56) >> >> at >> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138) >> >> at >> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) >> >> at >> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80) >> >> at >> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) >> >> at >> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139) >> >> at >> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119) >> >> at >> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89) >> >> at >> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62) >> >> at >> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414) >> >> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >> Source) >> >> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >> Source) >> >> at java.base/java.lang.Thread.run(Unknown Source) >> > > I tried to increase the rest timeout param of > "job.autoscaler.flink.rest-client.timeout" > to 60 s, yet it does not resolve the issue. > > Could you help check this out ? Thanks in advance. >
Flink Kubernetes operator keeps reporting REST client timeout.
Hi, Recently I upgraded the flink-kubernetes-operator from 1.4.0 to 1.6.1 to use Flink 1.18. After that, the operator kept reporting the following exception: 2023-11-21 03:26:50,505 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO > ][sn-push/sn-push-decision-maker-log-s3-hive-prd] Resource fully > reconciled, nothing to do... > > 2023-11-21 03:26:50,727 o.a.f.r.r.RestClient [WARN > ][realtime-streaming/realtime-perf-report-main-prd-test] Rest endpoint > shutdown failed. > > java.util.concurrent.TimeoutException > > at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown > Source) > > at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source) > > at org.apache.flink.runtime.rest.RestClient.shutdown(RestClient.java:227) > > at > org.apache.flink.client.program.rest.RestClusterClient.close(RestClusterClient.java:270) > > at > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getTaskManagersInfo(AbstractFlinkService.java:925) > > at > org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getClusterInfo(AbstractFlinkService.java:621) > > at > org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeClusterInfo(AbstractFlinkDeploymentObserver.java:85) > > at > org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:75) > > at > org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:49) > > at > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:129) > > at > org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56) > > at > io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138) > > at > io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96) > > at > org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80) > > at > io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95) > > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139) > > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119) > > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89) > > at > io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62) > > at > io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414) > > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > > at java.base/java.lang.Thread.run(Unknown Source) > I tried to increase the rest timeout param of "job.autoscaler.flink.rest-client.timeout" to 60 s, yet it does not resolve the issue. Could you help check this out ? Thanks in advance.
Handling Errors and Message Replay in Flink
Hello Flink community , We are currently working on a Flink job that consumes messages from RabbitMQ, with checkpointing configured to at-least-once mode. In our job, we make external API requests to retrieve information. If the external api is down or a timeout is occured, we currently throw an exception to avoid acknowledging the message in RabbitMQ as we aim to replay it. However, this approach causes all tasks to be redeployed. I'm reaching out to inquire if there are alternative solutions available in Flink to avoid throwing an exception. We are interested in a method that allows us to instruct Flink not to acknowledge a message if a problem occurred during its processing. Any suggestions would be appreciated. Best regards,
Re: Avoid dynamic classloading in native mode with Kubernetes Operator
Hi Trystan, I'm actually not very familiar with the operator's internals, but I'd guess that limitation is in Flink itself - application mode is a feature from core Flink, the operator just configures it based on the CRDs it defines. Maybe one of the maintainers can confirm. Regards, Alexis. On Mon, 20 Nov 2023, 19:25 Trystan, wrote: > Thanks Alexis, I can give that a try. However, that seems less than ideal > from the user's perspective. > > Is there a technical reason why the operator can't support this > combination of modes? I'd really like to just let the system do its thing > rather than build a complicated two-jar approach. > > Thanks, > Trystan > > On Fri, Nov 17, 2023 at 12:19 PM Alexis Sarda-Espinosa < > sarda.espin...@gmail.com> wrote: > >> Hi Trystan, >> >> I imagine you can create 2 jars, one should only have a class with the >> main method, and the other should be a fat jar with everything else for >> your job. If you create a custom image where your fat jar is placed under >> /opt/flink/lib/ then I think it would "just work" when specifying the >> main-method jar in jarURI. >> >> Nevertheless, even though Flink shadows a lot of the libraries they use >> internally, I suppose you could still end up with dependency conflicts, so >> you would probably have some added complexity managing what's bundled in >> your fat jar. >> >> Regards, >> Alexis. >> >> Am Do., 16. Nov. 2023 um 19:42 Uhr schrieb Trystan : >> >>> Is it possible to avoid dynamic classloading when using the operator >>> with a native kubernetes application deployment? >>> >>> If I put the job jar into /opt/flinklib, then there are two possible >>> outcomes: >>> >>>1. If I point jarURI to the jar, I get linkage errors (presumably: >>>the class have already been loaded by the AppClassLoader and the >>>FlinkUserCodeClassLoader). >>>2. If I do not include jarURI the operator pods encounter a >>>NullPointerException. The docs state this is optional, but appears to >>> only >>>pertain to standalone mode. >>> >>> https://issues.apache.org/jira/browse/FLINK-29288 enabled the optional >>> jarURI (apparently only for standalone deployments). >>> >>> Are there any additional configurations (configs, jar locations, etc) >>> that are needed to avoid dynamic classloading in this case? >>> >>
Re: Avoid dynamic classloading in native mode with Kubernetes Operator
Thanks Alexis, I can give that a try. However, that seems less than ideal from the user's perspective. Is there a technical reason why the operator can't support this combination of modes? I'd really like to just let the system do its thing rather than build a complicated two-jar approach. Thanks, Trystan On Fri, Nov 17, 2023 at 12:19 PM Alexis Sarda-Espinosa < sarda.espin...@gmail.com> wrote: > Hi Trystan, > > I imagine you can create 2 jars, one should only have a class with the > main method, and the other should be a fat jar with everything else for > your job. If you create a custom image where your fat jar is placed under > /opt/flink/lib/ then I think it would "just work" when specifying the > main-method jar in jarURI. > > Nevertheless, even though Flink shadows a lot of the libraries they use > internally, I suppose you could still end up with dependency conflicts, so > you would probably have some added complexity managing what's bundled in > your fat jar. > > Regards, > Alexis. > > Am Do., 16. Nov. 2023 um 19:42 Uhr schrieb Trystan : > >> Is it possible to avoid dynamic classloading when using the operator with >> a native kubernetes application deployment? >> >> If I put the job jar into /opt/flinklib, then there are two possible >> outcomes: >> >>1. If I point jarURI to the jar, I get linkage errors (presumably: >>the class have already been loaded by the AppClassLoader and the >>FlinkUserCodeClassLoader). >>2. If I do not include jarURI the operator pods encounter a >>NullPointerException. The docs state this is optional, but appears to only >>pertain to standalone mode. >> >> https://issues.apache.org/jira/browse/FLINK-29288 enabled the optional >> jarURI (apparently only for standalone deployments). >> >> Are there any additional configurations (configs, jar locations, etc) >> that are needed to avoid dynamic classloading in this case? >> >
Re: [DISCUSS] FLIP-391: Deprecate RuntimeContext#getExecutionConfig
Hi Junrui, Thanks for the clarification. On one hand, adding more methods into the RuntimeContext flat will increase the effort for users who will use RuntimeContext. But the impact is limited. It is fine. The big impact is, on the other hand, for users who want to focus on the execution config, they will need to find the needle in the haystack. I just shared my thoughts and tried to help you look at the issue from many different angles and I am open to learning opinions from other contributors. Please feel free to proceed if there are no other objections. Best regards, Jing On Mon, Nov 20, 2023 at 6:50 AM Junrui Lee wrote: > Hi Jing, > > Thank you for your feedback. I understand your concerns regarding putting > all methods into the RuntimeContext flat. > > I would like to share some of my thoughts on this matter. > Firstly, this FLIP only proposes the addition of three additional methods, > which should not impose too much extra burden on users. Secondly, I agree > that it is important to make it clearer for users to use the > RuntimeContext. However, reorganizing the RuntimeContext to achieve this > requires further discussion. We should focus on a more specific and unified > reorganization of the RuntimeContext interface in future work, rather than > implementing a temporary solution now. Therefore, I prefer not to add a > separate abstraction layer for these three methods in this FLIP. > > Please feel free to share any further thoughts. > > Best regards, > Junrui > > Jing Ge 于2023年11月20日周一 05:46写道: > >> Hi Junrui, >> >> Thanks for bringing this to our attention. First of all, it makes sense >> to deprecate RuntimeContext#getExecutionConfig. >> >> Afaic, this is an issue of how we design API with clean concepts/aspects. >> There are two issues mentioned in the FLIP: >> >> 1. short of user-facing abstraction - we just exposed ExecutionConfig >> which mixed methods for users with methods that should only be used >> internally. >> 2. mutable vs immutable - do we want users to be able to modify configs >> during job execution? >> >> An immutable user-facing abstraction design can solve both issues. All >> execution related configs are still consolidated into the abstraction class >> and easy to access. This is another design decision: flat vs. hierarchical. >> Current FLIP removed the execution config abstraction and put all methods >> into RuntimeContext flat, which will end up with more than 30 methods >> offered flat by the RuntimeContext. I am not sure if this could help users >> find the right method in the context of execution config better than >> before. >> >> I might miss something and look forward to your thoughts. Thanks! >> >> Best regards, >> Jing >> >> On Sat, Nov 18, 2023 at 11:21 AM Junrui Lee wrote: >> >>> Hello Wencong, >>> >>> Thank you for your valuable feedback and suggestions. I want to clarify >>> that reviewing existing methods in the ExecutionConfig is not directly >>> related to the proposal in this FLIP. The main focus of this FLIP is to >>> deprecate the specific method RuntimeContext#getExecutionConfig(). I >>> believe it is important to keep the scope of this FLIP limited. However, >>> your suggestion can certainly be considered as a separate FLIP in the >>> future. >>> >>> Best regards, >>> Junrui >>> >>> Wencong Liu 于2023年11月17日周五 22:08写道: >>> Hello Junrui, Thanks for the effort. I agree with the proposal to deprecate the getExecutionConfig() method in the RuntimeContext class. Exposing the complex ExecutionConfig to user-defined functions can lead to unnecessary complexity and risks. I also have a suggestion. We could consider reviewing the existing methods in ExecutionConfig. If there are methods that are defined in ExecutionConfig but currently have no callers, we could consider annotating them as @Internal or directly removing them. Since users are no longer able to access and invoke these methods, it would be beneficial to clean up the codebase. +1 (non-binding). Best, Wencong At 2023-11-15 16:51:15, "Junrui Lee" wrote: >Hi all, > >I'd like to start a discussion of FLIP-391: Deprecate >RuntimeContext#getExecutionConfig[1]. > >Currently, the FLINK RuntimeContext is important for connecting user >functions to the underlying runtime details. It provides users with >necessary runtime information during job execution. >However, he current implementation of the FLINK RuntimeContext exposes the >ExecutionConfig to users, resulting in two issues: >Firstly, the ExecutionConfig contains much unrelated information that can >confuse users and complicate management. >Secondly, exposing the ExecutionConfig allows users to modify it during job >execution, which can cause inconsiste