[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception

2021-02-09 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282167#comment-17282167
 ] 

Jark Wu commented on FLINK-20995:
-

I downgrade the priority to Critical, as this is a known limitation and not a 
blocker bug preventing users to use something.

> Calling execute() on org.apache.flink.table.api.Table Throws Exception
> --
>
> Key: FLINK-20995
> URL: https://issues.apache.org/jira/browse/FLINK-20995
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend, Table SQL / API
>Affects Versions: 1.11.2, 1.12.0
> Environment: Flink version 1.12.0
> Kubernetes Standalone Cluster (Session Mode)
> uname -a
> Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov 
> 8 23:39:32 UTC 2018 x86_64 GNU/Linux
>Reporter: Robert Cullen
>Priority: Critical
> Fix For: 1.13.0
>
> Attachments: 1611570438847.jpg
>
>
> Exception on this line:
> {code:java}
> try (CloseableIterator iterator = log_counts.execute().collect()) {
> ...
> {code}
> Here's the code snippet: (See Stack Trace below)
> {code:java}
> ...
>  
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> String ddl = "CREATE TABLE log_counts (\n" +
> "  msg_id STRING,\n" +
> "  hostname STRING,\n" +
> "  last_updated TIMESTAMP(3),\n" +
> "  WATERMARK FOR last_updated AS last_updated - INTERVAL '5' 
> SECOND\n" +
> ") WITH (\n" +
> "  'connector.type' = 'jdbc',\n" +
> "  'connector.url' = 
> 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
> "  'connector.table' = 'chi_logger_intake',\n" +
> "  'connector.driver' = 'org.postgresql.Driver',\n" +
> "  'connector.username' = 'user',\n" +
> "  'connector.password' = 'password'\n" +
> ")";
> tEnv.executeSql(ddl);
> Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull()
> .and($("hostname").isNotEqual("")))
> .window(Tumble
> .over(lit(5).minutes())
> .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w"))
> .select($("msg_id"),
> $("hostname"),
> $("msg_id").count().as("cnt"));
> try (CloseableIterator iterator = 
> log_counts.execute().collect()) {
> final List materializedUpdates = new ArrayList<>();
> iterator.forEachRemaining(
> row -> {
> final RowKind kind = row.getKind();
> switch (kind) {
> case INSERT:
> case UPDATE_AFTER:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.add(row);
> break;
> case UPDATE_BEFORE:
> case DELETE:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.remove(row);
> break;
> }
> });
> // show the final output table if the result is bounded,
> // the output should exclude San Antonio because it has a smaller 
> population than
> // Houston or Dallas in Texas (TX)
> materializedUpdates.forEach(System.out::println);
> }{code}
>  
> Stack Trace:
> {code:java}
> 2021-01-15 16:52:00,628 WARN 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring 
> the job submission via query parameters is deprecated. Please migrate to 
> submitting a JSON request instead.
> 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - 
> Starting program (detached: true)
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 2021-01

[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception

2021-02-09 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281814#comment-17281814
 ] 

Timo Walther commented on FLINK-20995:
--

Having a nicer exception is a good solution. We should also properly document 
when to use and when not to use {{collect()}} in the docs.

> Calling execute() on org.apache.flink.table.api.Table Throws Exception
> --
>
> Key: FLINK-20995
> URL: https://issues.apache.org/jira/browse/FLINK-20995
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend, Table SQL / API
>Affects Versions: 1.11.2, 1.12.0
> Environment: Flink version 1.12.0
> Kubernetes Standalone Cluster (Session Mode)
> uname -a
> Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov 
> 8 23:39:32 UTC 2018 x86_64 GNU/Linux
>Reporter: Robert Cullen
>Priority: Blocker
> Fix For: 1.13.0
>
> Attachments: 1611570438847.jpg
>
>
> Exception on this line:
> {code:java}
> try (CloseableIterator iterator = log_counts.execute().collect()) {
> ...
> {code}
> Here's the code snippet: (See Stack Trace below)
> {code:java}
> ...
>  
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> String ddl = "CREATE TABLE log_counts (\n" +
> "  msg_id STRING,\n" +
> "  hostname STRING,\n" +
> "  last_updated TIMESTAMP(3),\n" +
> "  WATERMARK FOR last_updated AS last_updated - INTERVAL '5' 
> SECOND\n" +
> ") WITH (\n" +
> "  'connector.type' = 'jdbc',\n" +
> "  'connector.url' = 
> 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
> "  'connector.table' = 'chi_logger_intake',\n" +
> "  'connector.driver' = 'org.postgresql.Driver',\n" +
> "  'connector.username' = 'user',\n" +
> "  'connector.password' = 'password'\n" +
> ")";
> tEnv.executeSql(ddl);
> Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull()
> .and($("hostname").isNotEqual("")))
> .window(Tumble
> .over(lit(5).minutes())
> .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w"))
> .select($("msg_id"),
> $("hostname"),
> $("msg_id").count().as("cnt"));
> try (CloseableIterator iterator = 
> log_counts.execute().collect()) {
> final List materializedUpdates = new ArrayList<>();
> iterator.forEachRemaining(
> row -> {
> final RowKind kind = row.getKind();
> switch (kind) {
> case INSERT:
> case UPDATE_AFTER:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.add(row);
> break;
> case UPDATE_BEFORE:
> case DELETE:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.remove(row);
> break;
> }
> });
> // show the final output table if the result is bounded,
> // the output should exclude San Antonio because it has a smaller 
> population than
> // Houston or Dallas in Texas (TX)
> materializedUpdates.forEach(System.out::println);
> }{code}
>  
> Stack Trace:
> {code:java}
> 2021-01-15 16:52:00,628 WARN 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring 
> the job submission via query parameters is deprecated. Please migrate to 
> submitting a JSON request instead.
> 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - 
> Starting program (detached: true)
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1

[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception

2021-02-09 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281804#comment-17281804
 ] 

Jark Wu commented on FLINK-20995:
-

Yes. I think the root cause is {{WebSubmissionJobClient}} doesn't implement 
{{CoordinationRequestGateway}} interface.

However, I'm worried that {{WebSubmissionJobClient}} can't support 
{{CoordinationRequestGateway}} because it doesn't have a communication with the 
cluster, and you can see all the methods of this class throws unsupported 
exception, and the Javadoc of this class says: 

"This is used in web submission, where we do not want the Web UI to have jobs 
blocking 
threads while waiting for their completion."

Therefore, I think we can't support this feature for Web UI submission. But we 
can definitely improve this exception. 

What do you think [~twalthr], [~dwysakowicz]?

> Calling execute() on org.apache.flink.table.api.Table Throws Exception
> --
>
> Key: FLINK-20995
> URL: https://issues.apache.org/jira/browse/FLINK-20995
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.2, 1.12.0
> Environment: Flink version 1.12.0
> Kubernetes Standalone Cluster (Session Mode)
> uname -a
> Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov 
> 8 23:39:32 UTC 2018 x86_64 GNU/Linux
>Reporter: Robert Cullen
>Priority: Blocker
> Fix For: 1.13.0
>
> Attachments: 1611570438847.jpg
>
>
> Exception on this line:
> {code:java}
> try (CloseableIterator iterator = log_counts.execute().collect()) {
> ...
> {code}
> Here's the code snippet: (See Stack Trace below)
> {code:java}
> ...
>  
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> String ddl = "CREATE TABLE log_counts (\n" +
> "  msg_id STRING,\n" +
> "  hostname STRING,\n" +
> "  last_updated TIMESTAMP(3),\n" +
> "  WATERMARK FOR last_updated AS last_updated - INTERVAL '5' 
> SECOND\n" +
> ") WITH (\n" +
> "  'connector.type' = 'jdbc',\n" +
> "  'connector.url' = 
> 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
> "  'connector.table' = 'chi_logger_intake',\n" +
> "  'connector.driver' = 'org.postgresql.Driver',\n" +
> "  'connector.username' = 'user',\n" +
> "  'connector.password' = 'password'\n" +
> ")";
> tEnv.executeSql(ddl);
> Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull()
> .and($("hostname").isNotEqual("")))
> .window(Tumble
> .over(lit(5).minutes())
> .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w"))
> .select($("msg_id"),
> $("hostname"),
> $("msg_id").count().as("cnt"));
> try (CloseableIterator iterator = 
> log_counts.execute().collect()) {
> final List materializedUpdates = new ArrayList<>();
> iterator.forEachRemaining(
> row -> {
> final RowKind kind = row.getKind();
> switch (kind) {
> case INSERT:
> case UPDATE_AFTER:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.add(row);
> break;
> case UPDATE_BEFORE:
> case DELETE:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.remove(row);
> break;
> }
> });
> // show the final output table if the result is bounded,
> // the output should exclude San Antonio because it has a smaller 
> population than
> // Houston or Dallas in Texas (TX)
> materializedUpdates.forEach(System.out::println);
> }{code}
>  
> Stack Trace:
> {code:java}
> 2021-01-15 16:52:00,628 WARN 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring 
> the job submission via query parameters is deprecated. Please migrate to 
> submitting a JSON request instead.
> 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - 
> Starting progr

[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception

2021-01-25 Thread Roc Marshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17271232#comment-17271232
 ] 

Roc Marshal commented on FLINK-20995:
-

!1611570438847.jpg!

[~jark],[~tinny]

It inferred from the exception information that the implementation-class 
'WebSubmissionJobClient' lacks the implementation of the interface 
'CoordinationRequestGateway' ?

> Calling execute() on org.apache.flink.table.api.Table Throws Exception
> --
>
> Key: FLINK-20995
> URL: https://issues.apache.org/jira/browse/FLINK-20995
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: Flink version 1.12.0
> Kubernetes Standalone Cluster (Session Mode)
> uname -a
> Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov 
> 8 23:39:32 UTC 2018 x86_64 GNU/Linux
>Reporter: Robert Cullen
>Priority: Blocker
> Fix For: 1.13.0
>
> Attachments: 1611570438847.jpg
>
>
> Exception on this line:
> {code:java}
> try (CloseableIterator iterator = log_counts.execute().collect()) {
> ...
> {code}
> Here's the code snippet: (See Stack Trace below)
> {code:java}
> ...
>  
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> String ddl = "CREATE TABLE log_counts (\n" +
> "  msg_id STRING,\n" +
> "  hostname STRING,\n" +
> "  last_updated TIMESTAMP(3),\n" +
> "  WATERMARK FOR last_updated AS last_updated - INTERVAL '5' 
> SECOND\n" +
> ") WITH (\n" +
> "  'connector.type' = 'jdbc',\n" +
> "  'connector.url' = 
> 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
> "  'connector.table' = 'chi_logger_intake',\n" +
> "  'connector.driver' = 'org.postgresql.Driver',\n" +
> "  'connector.username' = 'user',\n" +
> "  'connector.password' = 'password'\n" +
> ")";
> tEnv.executeSql(ddl);
> Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull()
> .and($("hostname").isNotEqual("")))
> .window(Tumble
> .over(lit(5).minutes())
> .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w"))
> .select($("msg_id"),
> $("hostname"),
> $("msg_id").count().as("cnt"));
> try (CloseableIterator iterator = 
> log_counts.execute().collect()) {
> final List materializedUpdates = new ArrayList<>();
> iterator.forEachRemaining(
> row -> {
> final RowKind kind = row.getKind();
> switch (kind) {
> case INSERT:
> case UPDATE_AFTER:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.add(row);
> break;
> case UPDATE_BEFORE:
> case DELETE:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.remove(row);
> break;
> }
> });
> // show the final output table if the result is bounded,
> // the output should exclude San Antonio because it has a smaller 
> population than
> // Houston or Dallas in Texas (TX)
> materializedUpdates.forEach(System.out::println);
> }{code}
>  
> Stack Trace:
> {code:java}
> 2021-01-15 16:52:00,628 WARN 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring 
> the job submission via query parameters is deprecated. Please migrate to 
> submitting a JSON request instead.
> 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - 
> Starting program (detached: true)
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
> JobGraph sub

[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception

2021-01-19 Thread Robert Cullen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268030#comment-17268030
 ] 

Robert Cullen commented on FLINK-20995:
---

Yes

> Calling execute() on org.apache.flink.table.api.Table Throws Exception
> --
>
> Key: FLINK-20995
> URL: https://issues.apache.org/jira/browse/FLINK-20995
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: Flink version 1.12.0
> Kubernetes Standalone Cluster (Session Mode)
> uname -a
> Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov 
> 8 23:39:32 UTC 2018 x86_64 GNU/Linux
>Reporter: Robert Cullen
>Priority: Blocker
> Fix For: 1.13.0
>
>
> Exception on this line:
> {code:java}
> try (CloseableIterator iterator = log_counts.execute().collect()) {
> ...
> {code}
> Here's the code snippet: (See Stack Trace below)
> {code:java}
> ...
>  
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> String ddl = "CREATE TABLE log_counts (\n" +
> "  msg_id STRING,\n" +
> "  hostname STRING,\n" +
> "  last_updated TIMESTAMP(3),\n" +
> "  WATERMARK FOR last_updated AS last_updated - INTERVAL '5' 
> SECOND\n" +
> ") WITH (\n" +
> "  'connector.type' = 'jdbc',\n" +
> "  'connector.url' = 
> 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
> "  'connector.table' = 'chi_logger_intake',\n" +
> "  'connector.driver' = 'org.postgresql.Driver',\n" +
> "  'connector.username' = 'user',\n" +
> "  'connector.password' = 'password'\n" +
> ")";
> tEnv.executeSql(ddl);
> Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull()
> .and($("hostname").isNotEqual("")))
> .window(Tumble
> .over(lit(5).minutes())
> .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w"))
> .select($("msg_id"),
> $("hostname"),
> $("msg_id").count().as("cnt"));
> try (CloseableIterator iterator = 
> log_counts.execute().collect()) {
> final List materializedUpdates = new ArrayList<>();
> iterator.forEachRemaining(
> row -> {
> final RowKind kind = row.getKind();
> switch (kind) {
> case INSERT:
> case UPDATE_AFTER:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.add(row);
> break;
> case UPDATE_BEFORE:
> case DELETE:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.remove(row);
> break;
> }
> });
> // show the final output table if the result is bounded,
> // the output should exclude San Antonio because it has a smaller 
> population than
> // Houston or Dallas in Texas (TX)
> materializedUpdates.forEach(System.out::println);
> }{code}
>  
> Stack Trace:
> {code:java}
> 2021-01-15 16:52:00,628 WARN 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring 
> the job submission via query parameters is deprecated. Please migrate to 
> submitting a JSON request instead.
> 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - 
> Starting program (detached: true)
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 
> 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 2021-01-15 16:52:00,831 INFO org.ap

[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception

2021-01-19 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17267996#comment-17267996
 ] 

shizhengchao commented on FLINK-20995:
--

Did you submit the task through the web interface ?

> Calling execute() on org.apache.flink.table.api.Table Throws Exception
> --
>
> Key: FLINK-20995
> URL: https://issues.apache.org/jira/browse/FLINK-20995
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: Flink version 1.12.0
> Kubernetes Standalone Cluster (Session Mode)
> uname -a
> Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov 
> 8 23:39:32 UTC 2018 x86_64 GNU/Linux
>Reporter: Robert Cullen
>Priority: Blocker
> Fix For: 1.13.0
>
>
> Exception on this line:
> {code:java}
> try (CloseableIterator iterator = log_counts.execute().collect()) {
> ...
> {code}
> Here's the code snippet: (See Stack Trace below)
> {code:java}
> ...
>  
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> String ddl = "CREATE TABLE log_counts (\n" +
> "  msg_id STRING,\n" +
> "  hostname STRING,\n" +
> "  last_updated TIMESTAMP(3),\n" +
> "  WATERMARK FOR last_updated AS last_updated - INTERVAL '5' 
> SECOND\n" +
> ") WITH (\n" +
> "  'connector.type' = 'jdbc',\n" +
> "  'connector.url' = 
> 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
> "  'connector.table' = 'chi_logger_intake',\n" +
> "  'connector.driver' = 'org.postgresql.Driver',\n" +
> "  'connector.username' = 'user',\n" +
> "  'connector.password' = 'password'\n" +
> ")";
> tEnv.executeSql(ddl);
> Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull()
> .and($("hostname").isNotEqual("")))
> .window(Tumble
> .over(lit(5).minutes())
> .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w"))
> .select($("msg_id"),
> $("hostname"),
> $("msg_id").count().as("cnt"));
> try (CloseableIterator iterator = 
> log_counts.execute().collect()) {
> final List materializedUpdates = new ArrayList<>();
> iterator.forEachRemaining(
> row -> {
> final RowKind kind = row.getKind();
> switch (kind) {
> case INSERT:
> case UPDATE_AFTER:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.add(row);
> break;
> case UPDATE_BEFORE:
> case DELETE:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.remove(row);
> break;
> }
> });
> // show the final output table if the result is bounded,
> // the output should exclude San Antonio because it has a smaller 
> population than
> // Houston or Dallas in Texas (TX)
> materializedUpdates.forEach(System.out::println);
> }{code}
>  
> Stack Trace:
> {code:java}
> 2021-01-15 16:52:00,628 WARN 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring 
> the job submission via query parameters is deprecated. Please migrate to 
> submitting a JSON request instead.
> 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - 
> Starting program (detached: true)
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 
> 84c9f12fe943bc7f32ee637666ed3bc1 (c

[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception

2021-01-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17266604#comment-17266604
 ] 

Jark Wu commented on FLINK-20995:
-

[~TsReaper], could you help to check this?

> Calling execute() on org.apache.flink.table.api.Table Throws Exception
> --
>
> Key: FLINK-20995
> URL: https://issues.apache.org/jira/browse/FLINK-20995
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.12.0
> Environment: Flink version 1.12.0
> Kubernetes Standalone Cluster (Session Mode)
> uname -a
> Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov 
> 8 23:39:32 UTC 2018 x86_64 GNU/Linux
>Reporter: Robert Cullen
>Priority: Blocker
>
> Exception on this line:
> {code:java}
> try (CloseableIterator iterator = log_counts.execute().collect()) {
> ...
> {code}
> Here's the code snippet: (See Stack Trace below)
> {code:java}
> ...
>  
> final EnvironmentSettings settings =
> EnvironmentSettings.newInstance().inStreamingMode().build();
> final TableEnvironment tEnv = TableEnvironment.create(settings);
> String ddl = "CREATE TABLE log_counts (\n" +
> "  msg_id STRING,\n" +
> "  hostname STRING,\n" +
> "  last_updated TIMESTAMP(3),\n" +
> "  WATERMARK FOR last_updated AS last_updated - INTERVAL '5' 
> SECOND\n" +
> ") WITH (\n" +
> "  'connector.type' = 'jdbc',\n" +
> "  'connector.url' = 
> 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" +
> "  'connector.table' = 'chi_logger_intake',\n" +
> "  'connector.driver' = 'org.postgresql.Driver',\n" +
> "  'connector.username' = 'user',\n" +
> "  'connector.password' = 'password'\n" +
> ")";
> tEnv.executeSql(ddl);
> Table log_counts = tEnv.from("log_counts")
> .filter($("hostname").isNotNull()
> .and($("hostname").isNotEqual("")))
> .window(Tumble
> .over(lit(5).minutes())
> .on($("last_updated")).as("w"))
> .groupBy($("msg_id"), $("hostname"), $("w"))
> .select($("msg_id"),
> $("hostname"),
> $("msg_id").count().as("cnt"));
> try (CloseableIterator iterator = 
> log_counts.execute().collect()) {
> final List materializedUpdates = new ArrayList<>();
> iterator.forEachRemaining(
> row -> {
> final RowKind kind = row.getKind();
> switch (kind) {
> case INSERT:
> case UPDATE_AFTER:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.add(row);
> break;
> case UPDATE_BEFORE:
> case DELETE:
> row.setKind(RowKind.INSERT); // for full 
> equality
> materializedUpdates.remove(row);
> break;
> }
> });
> // show the final output table if the result is bounded,
> // the output should exclude San Antonio because it has a smaller 
> population than
> // Houston or Dallas in Texas (TX)
> materializedUpdates.forEach(System.out::println);
> }{code}
>  
> Stack Trace:
> {code:java}
> 2021-01-15 16:52:00,628 WARN 
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring 
> the job submission via query parameters is deprecated. Please migrate to 
> submitting a JSON request instead.
> 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - 
> Starting program (detached: true)
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted.
> 2021-01-15 16:52:00,678 INFO 
> org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] 
> - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1.
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received 
> JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 2021-01-15 16:52:00,830 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job 
> 84c9f12fe943bc7f32ee637666ed3bc1 (collect).
> 2021-01-15 16:52:00,831 INFO org.apache.