[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception
[ https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17282167#comment-17282167 ] Jark Wu commented on FLINK-20995: - I downgrade the priority to Critical, as this is a known limitation and not a blocker bug preventing users to use something. > Calling execute() on org.apache.flink.table.api.Table Throws Exception > -- > > Key: FLINK-20995 > URL: https://issues.apache.org/jira/browse/FLINK-20995 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend, Table SQL / API >Affects Versions: 1.11.2, 1.12.0 > Environment: Flink version 1.12.0 > Kubernetes Standalone Cluster (Session Mode) > uname -a > Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov > 8 23:39:32 UTC 2018 x86_64 GNU/Linux >Reporter: Robert Cullen >Priority: Critical > Fix For: 1.13.0 > > Attachments: 1611570438847.jpg > > > Exception on this line: > {code:java} > try (CloseableIterator iterator = log_counts.execute().collect()) { > ... > {code} > Here's the code snippet: (See Stack Trace below) > {code:java} > ... > > final EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().build(); > final TableEnvironment tEnv = TableEnvironment.create(settings); > String ddl = "CREATE TABLE log_counts (\n" + > " msg_id STRING,\n" + > " hostname STRING,\n" + > " last_updated TIMESTAMP(3),\n" + > " WATERMARK FOR last_updated AS last_updated - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = > 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" + > " 'connector.table' = 'chi_logger_intake',\n" + > " 'connector.driver' = 'org.postgresql.Driver',\n" + > " 'connector.username' = 'user',\n" + > " 'connector.password' = 'password'\n" + > ")"; > tEnv.executeSql(ddl); > Table log_counts = tEnv.from("log_counts") > .filter($("hostname").isNotNull() > .and($("hostname").isNotEqual(""))) > .window(Tumble > .over(lit(5).minutes()) > .on($("last_updated")).as("w")) > .groupBy($("msg_id"), $("hostname"), $("w")) > .select($("msg_id"), > $("hostname"), > $("msg_id").count().as("cnt")); > try (CloseableIterator iterator = > log_counts.execute().collect()) { > final List materializedUpdates = new ArrayList<>(); > iterator.forEachRemaining( > row -> { > final RowKind kind = row.getKind(); > switch (kind) { > case INSERT: > case UPDATE_AFTER: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.add(row); > break; > case UPDATE_BEFORE: > case DELETE: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.remove(row); > break; > } > }); > // show the final output table if the result is bounded, > // the output should exclude San Antonio because it has a smaller > population than > // Houston or Dallas in Texas (TX) > materializedUpdates.forEach(System.out::println); > }{code} > > Stack Trace: > {code:java} > 2021-01-15 16:52:00,628 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring > the job submission via query parameters is deprecated. Please migrate to > submitting a JSON request instead. > 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - > Starting program (detached: true) > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect). > 2021-01
[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception
[ https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281814#comment-17281814 ] Timo Walther commented on FLINK-20995: -- Having a nicer exception is a good solution. We should also properly document when to use and when not to use {{collect()}} in the docs. > Calling execute() on org.apache.flink.table.api.Table Throws Exception > -- > > Key: FLINK-20995 > URL: https://issues.apache.org/jira/browse/FLINK-20995 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend, Table SQL / API >Affects Versions: 1.11.2, 1.12.0 > Environment: Flink version 1.12.0 > Kubernetes Standalone Cluster (Session Mode) > uname -a > Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov > 8 23:39:32 UTC 2018 x86_64 GNU/Linux >Reporter: Robert Cullen >Priority: Blocker > Fix For: 1.13.0 > > Attachments: 1611570438847.jpg > > > Exception on this line: > {code:java} > try (CloseableIterator iterator = log_counts.execute().collect()) { > ... > {code} > Here's the code snippet: (See Stack Trace below) > {code:java} > ... > > final EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().build(); > final TableEnvironment tEnv = TableEnvironment.create(settings); > String ddl = "CREATE TABLE log_counts (\n" + > " msg_id STRING,\n" + > " hostname STRING,\n" + > " last_updated TIMESTAMP(3),\n" + > " WATERMARK FOR last_updated AS last_updated - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = > 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" + > " 'connector.table' = 'chi_logger_intake',\n" + > " 'connector.driver' = 'org.postgresql.Driver',\n" + > " 'connector.username' = 'user',\n" + > " 'connector.password' = 'password'\n" + > ")"; > tEnv.executeSql(ddl); > Table log_counts = tEnv.from("log_counts") > .filter($("hostname").isNotNull() > .and($("hostname").isNotEqual(""))) > .window(Tumble > .over(lit(5).minutes()) > .on($("last_updated")).as("w")) > .groupBy($("msg_id"), $("hostname"), $("w")) > .select($("msg_id"), > $("hostname"), > $("msg_id").count().as("cnt")); > try (CloseableIterator iterator = > log_counts.execute().collect()) { > final List materializedUpdates = new ArrayList<>(); > iterator.forEachRemaining( > row -> { > final RowKind kind = row.getKind(); > switch (kind) { > case INSERT: > case UPDATE_AFTER: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.add(row); > break; > case UPDATE_BEFORE: > case DELETE: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.remove(row); > break; > } > }); > // show the final output table if the result is bounded, > // the output should exclude San Antonio because it has a smaller > population than > // Houston or Dallas in Texas (TX) > materializedUpdates.forEach(System.out::println); > }{code} > > Stack Trace: > {code:java} > 2021-01-15 16:52:00,628 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring > the job submission via query parameters is deprecated. Please migrate to > submitting a JSON request instead. > 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - > Starting program (detached: true) > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1
[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception
[ https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17281804#comment-17281804 ] Jark Wu commented on FLINK-20995: - Yes. I think the root cause is {{WebSubmissionJobClient}} doesn't implement {{CoordinationRequestGateway}} interface. However, I'm worried that {{WebSubmissionJobClient}} can't support {{CoordinationRequestGateway}} because it doesn't have a communication with the cluster, and you can see all the methods of this class throws unsupported exception, and the Javadoc of this class says: "This is used in web submission, where we do not want the Web UI to have jobs blocking threads while waiting for their completion." Therefore, I think we can't support this feature for Web UI submission. But we can definitely improve this exception. What do you think [~twalthr], [~dwysakowicz]? > Calling execute() on org.apache.flink.table.api.Table Throws Exception > -- > > Key: FLINK-20995 > URL: https://issues.apache.org/jira/browse/FLINK-20995 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.11.2, 1.12.0 > Environment: Flink version 1.12.0 > Kubernetes Standalone Cluster (Session Mode) > uname -a > Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov > 8 23:39:32 UTC 2018 x86_64 GNU/Linux >Reporter: Robert Cullen >Priority: Blocker > Fix For: 1.13.0 > > Attachments: 1611570438847.jpg > > > Exception on this line: > {code:java} > try (CloseableIterator iterator = log_counts.execute().collect()) { > ... > {code} > Here's the code snippet: (See Stack Trace below) > {code:java} > ... > > final EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().build(); > final TableEnvironment tEnv = TableEnvironment.create(settings); > String ddl = "CREATE TABLE log_counts (\n" + > " msg_id STRING,\n" + > " hostname STRING,\n" + > " last_updated TIMESTAMP(3),\n" + > " WATERMARK FOR last_updated AS last_updated - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = > 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" + > " 'connector.table' = 'chi_logger_intake',\n" + > " 'connector.driver' = 'org.postgresql.Driver',\n" + > " 'connector.username' = 'user',\n" + > " 'connector.password' = 'password'\n" + > ")"; > tEnv.executeSql(ddl); > Table log_counts = tEnv.from("log_counts") > .filter($("hostname").isNotNull() > .and($("hostname").isNotEqual(""))) > .window(Tumble > .over(lit(5).minutes()) > .on($("last_updated")).as("w")) > .groupBy($("msg_id"), $("hostname"), $("w")) > .select($("msg_id"), > $("hostname"), > $("msg_id").count().as("cnt")); > try (CloseableIterator iterator = > log_counts.execute().collect()) { > final List materializedUpdates = new ArrayList<>(); > iterator.forEachRemaining( > row -> { > final RowKind kind = row.getKind(); > switch (kind) { > case INSERT: > case UPDATE_AFTER: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.add(row); > break; > case UPDATE_BEFORE: > case DELETE: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.remove(row); > break; > } > }); > // show the final output table if the result is bounded, > // the output should exclude San Antonio because it has a smaller > population than > // Houston or Dallas in Texas (TX) > materializedUpdates.forEach(System.out::println); > }{code} > > Stack Trace: > {code:java} > 2021-01-15 16:52:00,628 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring > the job submission via query parameters is deprecated. Please migrate to > submitting a JSON request instead. > 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - > Starting progr
[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception
[ https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17271232#comment-17271232 ] Roc Marshal commented on FLINK-20995: - !1611570438847.jpg! [~jark],[~tinny] It inferred from the exception information that the implementation-class 'WebSubmissionJobClient' lacks the implementation of the interface 'CoordinationRequestGateway' ? > Calling execute() on org.apache.flink.table.api.Table Throws Exception > -- > > Key: FLINK-20995 > URL: https://issues.apache.org/jira/browse/FLINK-20995 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: Flink version 1.12.0 > Kubernetes Standalone Cluster (Session Mode) > uname -a > Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov > 8 23:39:32 UTC 2018 x86_64 GNU/Linux >Reporter: Robert Cullen >Priority: Blocker > Fix For: 1.13.0 > > Attachments: 1611570438847.jpg > > > Exception on this line: > {code:java} > try (CloseableIterator iterator = log_counts.execute().collect()) { > ... > {code} > Here's the code snippet: (See Stack Trace below) > {code:java} > ... > > final EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().build(); > final TableEnvironment tEnv = TableEnvironment.create(settings); > String ddl = "CREATE TABLE log_counts (\n" + > " msg_id STRING,\n" + > " hostname STRING,\n" + > " last_updated TIMESTAMP(3),\n" + > " WATERMARK FOR last_updated AS last_updated - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = > 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" + > " 'connector.table' = 'chi_logger_intake',\n" + > " 'connector.driver' = 'org.postgresql.Driver',\n" + > " 'connector.username' = 'user',\n" + > " 'connector.password' = 'password'\n" + > ")"; > tEnv.executeSql(ddl); > Table log_counts = tEnv.from("log_counts") > .filter($("hostname").isNotNull() > .and($("hostname").isNotEqual(""))) > .window(Tumble > .over(lit(5).minutes()) > .on($("last_updated")).as("w")) > .groupBy($("msg_id"), $("hostname"), $("w")) > .select($("msg_id"), > $("hostname"), > $("msg_id").count().as("cnt")); > try (CloseableIterator iterator = > log_counts.execute().collect()) { > final List materializedUpdates = new ArrayList<>(); > iterator.forEachRemaining( > row -> { > final RowKind kind = row.getKind(); > switch (kind) { > case INSERT: > case UPDATE_AFTER: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.add(row); > break; > case UPDATE_BEFORE: > case DELETE: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.remove(row); > break; > } > }); > // show the final output table if the result is bounded, > // the output should exclude San Antonio because it has a smaller > population than > // Houston or Dallas in Texas (TX) > materializedUpdates.forEach(System.out::println); > }{code} > > Stack Trace: > {code:java} > 2021-01-15 16:52:00,628 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring > the job submission via query parameters is deprecated. Please migrate to > submitting a JSON request instead. > 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - > Starting program (detached: true) > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph sub
[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception
[ https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17268030#comment-17268030 ] Robert Cullen commented on FLINK-20995: --- Yes > Calling execute() on org.apache.flink.table.api.Table Throws Exception > -- > > Key: FLINK-20995 > URL: https://issues.apache.org/jira/browse/FLINK-20995 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: Flink version 1.12.0 > Kubernetes Standalone Cluster (Session Mode) > uname -a > Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov > 8 23:39:32 UTC 2018 x86_64 GNU/Linux >Reporter: Robert Cullen >Priority: Blocker > Fix For: 1.13.0 > > > Exception on this line: > {code:java} > try (CloseableIterator iterator = log_counts.execute().collect()) { > ... > {code} > Here's the code snippet: (See Stack Trace below) > {code:java} > ... > > final EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().build(); > final TableEnvironment tEnv = TableEnvironment.create(settings); > String ddl = "CREATE TABLE log_counts (\n" + > " msg_id STRING,\n" + > " hostname STRING,\n" + > " last_updated TIMESTAMP(3),\n" + > " WATERMARK FOR last_updated AS last_updated - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = > 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" + > " 'connector.table' = 'chi_logger_intake',\n" + > " 'connector.driver' = 'org.postgresql.Driver',\n" + > " 'connector.username' = 'user',\n" + > " 'connector.password' = 'password'\n" + > ")"; > tEnv.executeSql(ddl); > Table log_counts = tEnv.from("log_counts") > .filter($("hostname").isNotNull() > .and($("hostname").isNotEqual(""))) > .window(Tumble > .over(lit(5).minutes()) > .on($("last_updated")).as("w")) > .groupBy($("msg_id"), $("hostname"), $("w")) > .select($("msg_id"), > $("hostname"), > $("msg_id").count().as("cnt")); > try (CloseableIterator iterator = > log_counts.execute().collect()) { > final List materializedUpdates = new ArrayList<>(); > iterator.forEachRemaining( > row -> { > final RowKind kind = row.getKind(); > switch (kind) { > case INSERT: > case UPDATE_AFTER: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.add(row); > break; > case UPDATE_BEFORE: > case DELETE: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.remove(row); > break; > } > }); > // show the final output table if the result is bounded, > // the output should exclude San Antonio because it has a smaller > population than > // Houston or Dallas in Texas (TX) > materializedUpdates.forEach(System.out::println); > }{code} > > Stack Trace: > {code:java} > 2021-01-15 16:52:00,628 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring > the job submission via query parameters is deprecated. Please migrate to > submitting a JSON request instead. > 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - > Starting program (detached: true) > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect). > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job > 84c9f12fe943bc7f32ee637666ed3bc1 (collect). > 2021-01-15 16:52:00,831 INFO org.ap
[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception
[ https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17267996#comment-17267996 ] shizhengchao commented on FLINK-20995: -- Did you submit the task through the web interface ? > Calling execute() on org.apache.flink.table.api.Table Throws Exception > -- > > Key: FLINK-20995 > URL: https://issues.apache.org/jira/browse/FLINK-20995 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: Flink version 1.12.0 > Kubernetes Standalone Cluster (Session Mode) > uname -a > Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov > 8 23:39:32 UTC 2018 x86_64 GNU/Linux >Reporter: Robert Cullen >Priority: Blocker > Fix For: 1.13.0 > > > Exception on this line: > {code:java} > try (CloseableIterator iterator = log_counts.execute().collect()) { > ... > {code} > Here's the code snippet: (See Stack Trace below) > {code:java} > ... > > final EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().build(); > final TableEnvironment tEnv = TableEnvironment.create(settings); > String ddl = "CREATE TABLE log_counts (\n" + > " msg_id STRING,\n" + > " hostname STRING,\n" + > " last_updated TIMESTAMP(3),\n" + > " WATERMARK FOR last_updated AS last_updated - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = > 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" + > " 'connector.table' = 'chi_logger_intake',\n" + > " 'connector.driver' = 'org.postgresql.Driver',\n" + > " 'connector.username' = 'user',\n" + > " 'connector.password' = 'password'\n" + > ")"; > tEnv.executeSql(ddl); > Table log_counts = tEnv.from("log_counts") > .filter($("hostname").isNotNull() > .and($("hostname").isNotEqual(""))) > .window(Tumble > .over(lit(5).minutes()) > .on($("last_updated")).as("w")) > .groupBy($("msg_id"), $("hostname"), $("w")) > .select($("msg_id"), > $("hostname"), > $("msg_id").count().as("cnt")); > try (CloseableIterator iterator = > log_counts.execute().collect()) { > final List materializedUpdates = new ArrayList<>(); > iterator.forEachRemaining( > row -> { > final RowKind kind = row.getKind(); > switch (kind) { > case INSERT: > case UPDATE_AFTER: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.add(row); > break; > case UPDATE_BEFORE: > case DELETE: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.remove(row); > break; > } > }); > // show the final output table if the result is bounded, > // the output should exclude San Antonio because it has a smaller > population than > // Houston or Dallas in Texas (TX) > materializedUpdates.forEach(System.out::println); > }{code} > > Stack Trace: > {code:java} > 2021-01-15 16:52:00,628 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring > the job submission via query parameters is deprecated. Please migrate to > submitting a JSON request instead. > 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - > Starting program (detached: true) > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect). > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job > 84c9f12fe943bc7f32ee637666ed3bc1 (c
[jira] [Commented] (FLINK-20995) Calling execute() on org.apache.flink.table.api.Table Throws Exception
[ https://issues.apache.org/jira/browse/FLINK-20995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17266604#comment-17266604 ] Jark Wu commented on FLINK-20995: - [~TsReaper], could you help to check this? > Calling execute() on org.apache.flink.table.api.Table Throws Exception > -- > > Key: FLINK-20995 > URL: https://issues.apache.org/jira/browse/FLINK-20995 > Project: Flink > Issue Type: Bug > Components: Table SQL / API >Affects Versions: 1.12.0 > Environment: Flink version 1.12.0 > Kubernetes Standalone Cluster (Session Mode) > uname -a > Linux flink-jobmanager-664c4b8f46-77llc 3.10.0-957.el7.x86_64 #1 SMP Thu Nov > 8 23:39:32 UTC 2018 x86_64 GNU/Linux >Reporter: Robert Cullen >Priority: Blocker > > Exception on this line: > {code:java} > try (CloseableIterator iterator = log_counts.execute().collect()) { > ... > {code} > Here's the code snippet: (See Stack Trace below) > {code:java} > ... > > final EnvironmentSettings settings = > EnvironmentSettings.newInstance().inStreamingMode().build(); > final TableEnvironment tEnv = TableEnvironment.create(settings); > String ddl = "CREATE TABLE log_counts (\n" + > " msg_id STRING,\n" + > " hostname STRING,\n" + > " last_updated TIMESTAMP(3),\n" + > " WATERMARK FOR last_updated AS last_updated - INTERVAL '5' > SECOND\n" + > ") WITH (\n" + > " 'connector.type' = 'jdbc',\n" + > " 'connector.url' = > 'jdbc:postgresql://cmdaa-postgres.cmdaa.svc.cluster.local:5432/postgres',\n" + > " 'connector.table' = 'chi_logger_intake',\n" + > " 'connector.driver' = 'org.postgresql.Driver',\n" + > " 'connector.username' = 'user',\n" + > " 'connector.password' = 'password'\n" + > ")"; > tEnv.executeSql(ddl); > Table log_counts = tEnv.from("log_counts") > .filter($("hostname").isNotNull() > .and($("hostname").isNotEqual(""))) > .window(Tumble > .over(lit(5).minutes()) > .on($("last_updated")).as("w")) > .groupBy($("msg_id"), $("hostname"), $("w")) > .select($("msg_id"), > $("hostname"), > $("msg_id").count().as("cnt")); > try (CloseableIterator iterator = > log_counts.execute().collect()) { > final List materializedUpdates = new ArrayList<>(); > iterator.forEachRemaining( > row -> { > final RowKind kind = row.getKind(); > switch (kind) { > case INSERT: > case UPDATE_AFTER: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.add(row); > break; > case UPDATE_BEFORE: > case DELETE: > row.setKind(RowKind.INSERT); // for full > equality > materializedUpdates.remove(row); > break; > } > }); > // show the final output table if the result is bounded, > // the output should exclude San Antonio because it has a smaller > population than > // Houston or Dallas in Texas (TX) > materializedUpdates.forEach(System.out::println); > }{code} > > Stack Trace: > {code:java} > 2021-01-15 16:52:00,628 WARN > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring > the job submission via query parameters is deprecated. Please migrate to > submitting a JSON request instead. > 2021-01-15 16:52:00,640 INFO org.apache.flink.client.ClientUtils [] - > Starting program (detached: true) > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Job 84c9f12fe943bc7f32ee637666ed3bc1 is submitted. > 2021-01-15 16:52:00,678 INFO > org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] > - Submitting Job with JobId=84c9f12fe943bc7f32ee637666ed3bc1. > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received > JobGraph submission 84c9f12fe943bc7f32ee637666ed3bc1 (collect). > 2021-01-15 16:52:00,830 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job > 84c9f12fe943bc7f32ee637666ed3bc1 (collect). > 2021-01-15 16:52:00,831 INFO org.apache.