Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-06-22 Thread Sonam Mandal
Hello,

We've written a simple tool which takes SQL statements as input and uses a 
StreamTableEnvironment​ to eventually submit this to the Flink cluster. We've 
noticed that the Flink library versions we depend on must match the Flink 
version running in our Kubernetes cluster for the job submission to be 
successful. If the versions don't match, the job submission goes through but 
the job errors out for various reasons. We do not want to use the SQL shell 
(which I also believe is version specific and must run on the same pod as the 
Job Manager).

Is there any version agnostic way to submit SQL jobs to the Flink cluster?

Thanks,
Sonam


Re: Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-06-29 Thread Matthias Pohl
Hi Sonam,
what's the reason for not using the Flink SQL client? Because of the
version issue? I only know that FlinkSQL's state is not
backwards-compatible between major Flink versions [1]. But that seems to be
unrelated to what you describe.

I'm gonna add Jark and Timo to this thread. Maybe, they can add more
insights.

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-20823

On Tue, Jun 22, 2021 at 9:44 PM Sonam Mandal  wrote:

> Hello,
>
> We've written a simple tool which takes SQL statements as input and uses a
> StreamTableEnvironment to eventually submit this to the Flink cluster.
> We've noticed that the Flink library versions we depend on must match the
> Flink version running in our Kubernetes cluster for the job submission to
> be successful. If the versions don't match, the job submission goes through
> but the job errors out for various reasons. We do not want to use the SQL
> shell (which I also believe is version specific and must run on the same
> pod as the Job Manager).
>
> Is there any version agnostic way to submit SQL jobs to the Flink cluster?
>
> Thanks,
> Sonam
>


Re: Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-06-29 Thread Sonam Mandal
Hi Matthias,

Thanks for getting back to me. We are trying to build a system where users can 
focus on writing Flink SQL applications and we handle the full lifecycle of 
their Flink cluster and job. We would like to let users focus on just their SQL 
and UDF logic. In such an environment, we cannot enforce that all users must 
use a single Flink version. We intend to have this setup in kubernetes where 
within the same kubernetes cluster we can create multiple Flink clusters to 
which jobs are submitted.

Due to this, using an interactive shell will not be an option, nor do we want 
to directly expose this to users except for testing purposes.

I see the latest 1.13 release now has an option to pass a SQL file as input to 
the SQL client and it’ll take care of running the job. We will explore this 
option as well. I believe this is a new feature which wasn’t available in 1.12, 
right? Does the planning happen in the SQL client or on the job manager? We ran 
into issues with job graph incompatibility if our code directly submitted the 
SQL to the remote environment or if we used /bin/flink to run this jar that 
does the SQL conversion.

We currently have a POC idea which takes the SQL as a file and we wrote a 
simple job runner which reads this SQL and executes it. We are using Flink REST 
APIs to upload this jar and submit the job so that the job graph generation 
happens on the job manager. We no longer see the job graph incompatibility 
issues.

Is there any reason not to use the above approach? We noticed that the Flink 
client (/bin/flink) does job graph generation  itself and not via the REST API, 
any reason why it doesn’t leverage the REST API?

Nice thing about using REST is that we can now run multiple Flink cluster 
versions and our job submission code doesn’t need to know which flink client 
version to use.

We definitely saw this job graph incompatibility with /bin/flink. We still need 
to test out the sql client with the -f option to assess whether we will require 
keeping multiple versions around should we decide to use this option. So we 
were wondering what the recommendation is within the Flink community on how to 
handle such cases. Hope this clarifies our use case better.

Also, as for the state incompatibility between major Flink versions, I see the 
thread mentions using a tool to rewrite the savepoints. Is this the only 
recommended way to handle this? Is this safe and does it work in all scenarios?

Thanks,
Sonam



From: Matthias Pohl 
Sent: Tuesday, June 29, 2021 02:29
To: Sonam Mandal
Cc: user@flink.apache.org; Jark Wu; Timo Walther
Subject: Re: Recommended way to submit a SQL job via code without getting tied 
to a Flink version?

Hi Sonam,
what's the reason for not using the Flink SQL client? Because of the version 
issue? I only know that FlinkSQL's state is not backwards-compatible between 
major Flink versions [1]. But that seems to be unrelated to what you describe.

I'm gonna add Jark and Timo to this thread. Maybe, they can add more insights.

Matthias

[1] 
https://issues.apache.org/jira/browse/FLINK-20823<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-20823&data=04%7C01%7Csomandal%40linkedin.com%7C8f2bc487eac94e12087308d93ae0517e%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637605557525140470%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000&sdata=pvMNaKJAZjKL%2FFX%2BevvguaEOjkhkBfH0D0HYYE0YS0s%3D&reserved=0>

On Tue, Jun 22, 2021 at 9:44 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hello,

We've written a simple tool which takes SQL statements as input and uses a 
StreamTableEnvironment to eventually submit this to the Flink cluster. We've 
noticed that the Flink library versions we depend on must match the Flink 
version running in our Kubernetes cluster for the job submission to be 
successful. If the versions don't match, the job submission goes through but 
the job errors out for various reasons. We do not want to use the SQL shell 
(which I also believe is version specific and must run on the same pod as the 
Job Manager).

Is there any version agnostic way to submit SQL jobs to the Flink cluster?

Thanks,
Sonam


Re: Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-06-30 Thread Stephan Ewen
; only recommended way to handle this? Is this safe and does it work in all
> scenarios?
>
> Thanks,
> Sonam
>
>
> ----------
> *From:* Matthias Pohl 
> *Sent:* Tuesday, June 29, 2021 02:29
> *To:* Sonam Mandal
> *Cc:* user@flink.apache.org; Jark Wu; Timo Walther
> *Subject:* Re: Recommended way to submit a SQL job via code without
> getting tied to a Flink version?
>
> Hi Sonam,
> what's the reason for not using the Flink SQL client? Because of the
> version issue? I only know that FlinkSQL's state is not
> backwards-compatible between major Flink versions [1]. But that seems to be
> unrelated to what you describe.
>
> I'm gonna add Jark and Timo to this thread. Maybe, they can add more
> insights.
>
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-20823
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-20823&data=04%7C01%7Csomandal%40linkedin.com%7C8f2bc487eac94e12087308d93ae0517e%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637605557525140470%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C2000&sdata=pvMNaKJAZjKL%2FFX%2BevvguaEOjkhkBfH0D0HYYE0YS0s%3D&reserved=0>
>
> On Tue, Jun 22, 2021 at 9:44 PM Sonam Mandal 
> wrote:
>
>> Hello,
>>
>> We've written a simple tool which takes SQL statements as input and uses
>> a StreamTableEnvironment to eventually submit this to the Flink cluster.
>> We've noticed that the Flink library versions we depend on must match the
>> Flink version running in our Kubernetes cluster for the job submission to
>> be successful. If the versions don't match, the job submission goes through
>> but the job errors out for various reasons. We do not want to use the SQL
>> shell (which I also believe is version specific and must run on the same
>> pod as the Job Manager).
>>
>> Is there any version agnostic way to submit SQL jobs to the Flink cluster?
>>
>> Thanks,
>> Sonam
>>
>


Re: Recommended way to submit a SQL job via code without getting tied to a Flink version?

2021-07-01 Thread Sonam Mandal
Hi Stephan,

Thanks for the detailed explanation! This really helps understand all this 
better. Appreciate your help!

Regards,
Sonam

From: Stephan Ewen 
Sent: Wednesday, June 30, 2021 3:56:22 AM
To: Sonam Mandal ; user@flink.apache.org 

Cc: matth...@ververica.com ; Jark Wu 
; Timo Walther 
Subject: Re: Recommended way to submit a SQL job via code without getting tied 
to a Flink version?

Hi Sonam!

To answer this, let me quickly provide some background on the two ways flink 
deployments / job submissions work.
See also here for some background: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/overview/#deployment-modes<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.13%2Fdocs%2Fdeployment%2Foverview%2F%23deployment-modes&data=04%7C01%7Csomandal%40linkedin.com%7C3cd4406b98424cedb91908d93bb5c2db%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637606474120742440%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=J0FO3IfUMiJs8%2BfNKx4es0Hz4lGjfh9eT883SrNO3Q4%3D&reserved=0>

What is common in all setups is that the query compilation / the dataflow 
assembly happens where the entry-point program runs.

If you are programatically setting up the application with a 
StreamExecutionEnvironment / TableEnvironment, then the query compilation (and 
JobGraph generation) happens where the program's main()-method is. If you are 
submitting via the SQL Client, then the SQL Client is the entrypoint program, 
and the query compilation happens where the SQLClient runs.

Now, where is that entry-point program executed if you deploy a job? That 
depends on your deployment mode.

(1) Session Mode:

Here you have a running cluster with a Dispatcher that has the REST endpoint 
that accepts job submissions. Jobs are submitted via HTTP transporting a 
serialized JobGraph.
The entry-point can run anywhere and uses a HTTP client to send the generated 
JobGraph to the Dispatcher.

==> Here you need to worry about matching the versions of the client (the 
entry-point process, like SQL Client) and the deployed session cluster.

(2) Application Mode:

The entry-point (SQL Client or application program) spawns the JobManager when 
the program is executed. The jobgraph is passed as a Java object directly to 
the spawned JM component. The is an HTTP endpoint, but it is not for submitting 
jobs, only for the Web UI and for commands like cancelling execution.

This mode should allow you to encapsulate a Flink application (a SQL query) 
completely self-contained and not need to sync versions between clients and 
clusters.



The internal abstraction for all ways to execute the programs are the 
PipelineExecutors.
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/PipelineExecutor.java<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fcore%2Fexecution%2FPipelineExecutor.java&data=04%7C01%7Csomandal%40linkedin.com%7C3cd4406b98424cedb91908d93bb5c2db%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C637606474120752440%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=QR907wIxv%2F9H6AtJdpIPiLCk6pK5Y2xBlDQdnOBn4KU%3D&reserved=0>

If you look at the subclasses of "PipelineExecutor" you basically see all 
built-in deployment modes. To create a customized version of the Application 
deployment mode, (or maybe the Job deployment mode) you can dig for example 
through the EmbeddedExecutor and the ApplicationDispatcherBootstrap.

Hope that helps...

Best,
Stephan



On Tue, Jun 29, 2021 at 5:01 PM Sonam Mandal 
mailto:soman...@linkedin.com>> wrote:
Hi Matthias,

Thanks for getting back to me. We are trying to build a system where users can 
focus on writing Flink SQL applications and we handle the full lifecycle of 
their Flink cluster and job. We would like to let users focus on just their SQL 
and UDF logic. In such an environment, we cannot enforce that all users must 
use a single Flink version. We intend to have this setup in kubernetes where 
within the same kubernetes cluster we can create multiple Flink clusters to 
which jobs are submitted.

Due to this, using an interactive shell will not be an option, nor do we want 
to directly expose this to users except for testing purposes.

I see the latest 1.13 release now has an option to pass a SQL file as input to 
the SQL client and it’ll take care of running the job. We will explore this 
option as well. I believe this is a new feature which wasn’t available in 1.12, 
right? Does the planning happen in the SQL client or on the job manager? We ran 
into issues with job graph incompatibility if our code directly submitted the 
SQL to the