Re: flink-1.14.0 sql 写array 错误
Hi! 这看起来像一个 bug,我已经记了一个 issue [1],可以在那里关注问题进展。 如 issue 中所描述,目前看来如果常量字符串一样长,或者都 cast 成 varchar 可以绕过该问题。可以先这样绕过一下。 [1] https://issues.apache.org/jira/browse/FLINK-24537 kcz <573693...@qq.com.invalid> 于2021年10月13日周三 下午5:29写道: > 因为select出多个sum的值,每一个sum的值都是一个type类型的数据,最后我将它插入到MySQL表里面,MySQL表结构为 > (id,type,value),于是想到通过列转行形式来操作。 > SQL如下: > CREATE TABLE kafka_table ( > > vin STRING, > > speed DOUBLE, > > brake DOUBLE, > > hard_to DOUBLE, > > distance DOUBLE, > > times TIMESTAMP(3), > > WATERMARK FOR times AS times - INTERVAL > '5' SECOND > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behavior', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'testGroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'json' > ); > > > > > select window_start, window_end,vin,array[row('brakes',sum(if(brake > 3.0451,1,0))),row('hard_tos',sum(if(hard_to 3.0451,1,0)))] > from TABLE( > TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10' > MINUTES)) group by window_start, window_end,vin; > > > 报错如下: > Exception in thread "main" java.lang.AssertionError: Conversion to > relational algebra failed to preserve datatypes: > validated type: > RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL > window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, > RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER > EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL > converted type: > RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL > window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, > RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT > NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL > rel: > LogicalProject(window_start=[$0], window_end=[$1], vin=[$2], > EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7) > CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT > NULL, CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET > "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)]) > LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)], > agg#1=[SUM($4)]) > LogicalProject(window_start=[$6], window_end=[$7], vin=[$0], > $f3=[IF(($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(($3, > 3.0451:DECIMAL(5, 4)), 1, 0)]) >LogicalTableFunctionScan(invocation=[TUMBLE($5, > DESCRIPTOR($5), 60:INTERVAL MINUTE)], > rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake, > DOUBLE hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3) > window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) > LogicalProject(vin=[$0], speed=[$1], > brake=[$2], hard_to=[$3], distance=[$4], times=[$5]) > > LogicalWatermarkAssigner(rowtime=[times], watermark=[-($5, 5000:INTERVAL > SECOND)]) > > LogicalTableScan(table=[[default_catalog, default_database, kafka_table]]) > > > at > org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736) > at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48) > at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87) > Disconnected from the target VM, address: '127.0.0.1:61710', transport: > 'socket' > > > Process finished with exit code 1
Re: flinksql客户端不能提交任务
Hi! 从报错信息来看 client 在尝试链接位于 http://127.0.0.1:8081/ 的集群。你的 yarn session 应该不在本地吧?所以很可能是 sql client 对应的 flink 配置出错,检查一下对应的 flink 配置文件看看。 maker_d...@foxmail.com 于2021年10月14日周四 上午9:18写道: > 各位大家好: > 紧急求助! > 我之前一直用sql-client提交SQL任务,今天突然不能提交了,报错如下: > > Exception in thread "main" > org.apache.flink.table.client.SqlClientException: Unexpected exception. > This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:215) > Caused by: java.lang.RuntimeException: Error running SQL job. > at > org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$4(LocalExecutor.java:514) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:256) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:507) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:428) > at > org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:690) > at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:327) > at java.util.Optional.ifPresent(Optional.java:159) > at > org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214) > at > org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144) > at > org.apache.flink.table.client.SqlClient.start(SqlClient.java:115) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$4(LocalExecutor.java:511) > ... 10 more > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit JobGraph. > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400) > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) > at >
Re: Flink SQL支持side output
你可以把使用反向条件把脏数据输出到另外一张表去。source会复用的。其实和side output效果是一致的 On Oct 13, 2021 at 16:28:57, Ada Luna wrote: > 这个没有支持的打算是因为,目前我们假定Flink SQL处理的数据都是干净的经过清洗的是吧。 > > Ada Luna 于2021年9月19日周日 下午7:43写道: > > > 主要是脏数据,Source、Sink或者其他算子产生的脏数据,向把这些数据侧向输出到外部数据库里存起来。 > > > Caizhi Weng 于2021年9月16日周四 下午1:52写道: > > > > > > Hi! > > > > > > 就我所知目前暂时没有支持 side output 的打算。可以描述一下需求和场景吗? > > > > > > Ada Luna 于2021年9月15日周三 下午8:38写道: > > > > > > > Flink SQL 未来会支持side output,侧向输出一些脏数据吗? > > > > > >
user-zh
flinksql客户端不能提交任务
各位大家好: 紧急求助! 我之前一直用sql-client提交SQL任务,今天突然不能提交了,报错如下: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:215) Caused by: java.lang.RuntimeException: Error running SQL job. at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$4(LocalExecutor.java:514) at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:256) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:507) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:428) at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:690) at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:327) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:115) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$4(LocalExecutor.java:511) ... 10 more Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) Caused by:
Re: flink时态表:两个Hbase左关联有报错情况
是不是加了 'lookup.async' = 'true',当 rowkey 为 null 的时候会出现这个问题 https://issues.apache.org/jira/browse/FLINK-24528 Michael Ran 于2021年7月23日周五 上午10:44写道: > java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: > org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils缺jar > 在 2021-07-14 09:39:53,"xie_guo...@163.com" 写道: > >您好,有关flinkSQL时态表左关联时遇到了问题。 > >具体场景: > > > 两个Hbase表做时态表关联,左表定义了处理时间,右表原本不变(rowkey,cf),FLINK版本1.13.0,报错情况如下,麻烦问一下大家又遇到类似问题嘛,该怎么处理! > > > >2021-07-14 09:22:20.592 WARN org.apache.flink.runtime.taskmanager.Task > --- 2021-07-14 09:22:20.596 WARN > org.apache.flink.runtime.taskmanager.Task --- > LookupJoin(table=[default_catalog.default_database.hbase_source_pollution_dwb_enterprise_wodhz], > joinType=[LeftOuterJoin], async=[true], lookup=[code=$f4], select=[code, > data1, data2, p, $f4, code0, data]) -> Calc(select=[code, > ROW(,,data.activ) -> NotNullEnforcer(fields=[EXPR$1, EXPR$2, p, > EXPR$4]) (3/3)#3 (4ada188e117c67ccd9bd6488ae95216a) switched from RUNNING > to FAILED with failure cause: java.util.concurrent.ExecutionException: > java.lang.NoClassDefFoundError: > org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils > >at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > >at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928 > >at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168) > >at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131) > >at > org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:448) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:671) > >at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:629) > >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) > >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > >at java.lang.Thread.run(Thread.java:748) > >Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils > >at > org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193) > >at > org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:251) > >at LookupFunction$3.close(Unknown Source > > > > >ps:同样的代码,左表换成Kafka,能够正常运行。网上搜索了一下相关报错,好像时缺包,目前pom文件是有hbase-client、hbase-commen、flink-sql-connector-hbase。 > > > > > > > >Sincerely, > >xie_guo...@163.com >
Re:Re:回复:flinksql有计划支持mysql catalog吗?
旭晨,你好。 关于你描述的问题,当前的MySQLCatalog的实现是不支持的,如果需要此功能,则需要重写对应的方法。https://github.com/apache/flink/pull/16962 中 twalthr 老师与 jark 老师提及后续会重构这一部分。你可以在对应的JIRA或者PR上直接进行留言做进一步的讨论。 目前基于AbstractJdbcCatalog实现的XXXCatalog, 包括PostgresCatalog和正在实现的MySQLCatalog都是不支持创建和更改表的。GenericInMemoryCatalog与HiveCatalog是支持的。 祝好, Roc. 在 2021-10-13 16:42:16,"赵旭晨" 写道: >@Roc Marshal 你好: >我大致翻了下你的FLINK-15352分支上的关于mysqlcatalog的测试代码,想问一个问题: >目前的mysql实现,tabelenvironment通过jdbc驱动,去加载mysql元数据,那么,反过来,通过flink java > api或者sqlclient,执行DDL建表语句create > catalog.database.table,将元数据写入mysql,之后当我第二次要调用相关表的时候就不需要再建表了,因为mysql已经有相关元数据了,这个能支持吗? > > > > > > > > > > > > > > > > > >在 2021-10-12 12:06:03,"Roc Marshal" 写道: >>旭晨,你好。 >>目前这个feature已经在工作中。 >>欢迎 review / 讨论/改进。 https://github.com/apache/flink/pull/16962 >> >>祝好。 >>Roc. >> >> >> >>发自 网易邮箱大师 >> >> >> >> >> 回复的原邮件 >>| 发件人 | 赵旭晨 | >>| 日期 | 2021年10月12日 10:17 | >>| 收件人 | user-zh@flink.apache.org | >>| 抄送至 | | >>| 主题 | flinksql有计划支持mysql catalog吗? | >>目前flink的jdbccatalog只支持PG,有计划支持mysql的吗?由于公司的元数据存储统一用mysql,不太可能再引进PG,或者反过来问,flink社区到目前为止不支持mysqlcatalog的原因是什么?有什么顾虑么?
flink-1.14.0 sql ??array ????
select??sumsumtypeMySQLMySQL (id,type,value) SQL?? CREATE TABLE kafka_table ( vin STRING, speed DOUBLE, brake DOUBLE, hard_to DOUBLE, distance DOUBLE, times TIMESTAMP(3), WATERMARK FOR times AS times - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); select window_start, window_end,vin,array[row('brakes',sum(if(brake 3.0451,1,0))),row('hard_tos',sum(if(hard_to 3.0451,1,0)))] from TABLE( TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10' MINUTES)) group by window_start, window_end,vin; ?? Exception in thread "main" java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes: validated type: RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL converted type: RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL rel: LogicalProject(window_start=[$0], window_end=[$1], vin=[$2], EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL, CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)]) LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)], agg#1=[SUM($4)]) LogicalProject(window_start=[$6], window_end=[$7], vin=[$0], $f3=[IF(($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(($3, 3.0451:DECIMAL(5, 4)), 1, 0)]) LogicalTableFunctionScan(invocation=[TUMBLE($5, DESCRIPTOR($5), 60:INTERVAL MINUTE)], rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake, DOUBLE hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) LogicalProject(vin=[$0], speed=[$1], brake=[$2], hard_to=[$3], distance=[$4], times=[$5]) LogicalWatermarkAssigner(rowtime=[times], watermark=[-($5, 5000:INTERVAL SECOND)]) LogicalTableScan(table=[[default_catalog, default_database, kafka_table]]) at org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736) at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48) at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87) Disconnected from the target VM, address: '127.0.0.1:61710', transport: 'socket' Process finished with exit code 1
Re:回复:flinksql有计划支持mysql catalog吗?
@Roc Marshal 你好: 我大致翻了下你的FLINK-15352分支上的关于mysqlcatalog的测试代码,想问一个问题: 目前的mysql实现,tabelenvironment通过jdbc驱动,去加载mysql元数据,那么,反过来,通过flink java api或者sqlclient,执行DDL建表语句create catalog.database.table,将元数据写入mysql,之后当我第二次要调用相关表的时候就不需要再建表了,因为mysql已经有相关元数据了,这个能支持吗? 在 2021-10-12 12:06:03,"Roc Marshal" 写道: >旭晨,你好。 >目前这个feature已经在工作中。 >欢迎 review / 讨论/改进。 https://github.com/apache/flink/pull/16962 > >祝好。 >Roc. > > > >发自 网易邮箱大师 > > > > > 回复的原邮件 >| 发件人 | 赵旭晨 | >| 日期 | 2021年10月12日 10:17 | >| 收件人 | user-zh@flink.apache.org | >| 抄送至 | | >| 主题 | flinksql有计划支持mysql catalog吗? | >目前flink的jdbccatalog只支持PG,有计划支持mysql的吗?由于公司的元数据存储统一用mysql,不太可能再引进PG,或者反过来问,flink社区到目前为止不支持mysqlcatalog的原因是什么?有什么顾虑么?
Re: Flink SQL支持side output
这个没有支持的打算是因为,目前我们假定Flink SQL处理的数据都是干净的经过清洗的是吧。 Ada Luna 于2021年9月19日周日 下午7:43写道: > > 主要是脏数据,Source、Sink或者其他算子产生的脏数据,向把这些数据侧向输出到外部数据库里存起来。 > > Caizhi Weng 于2021年9月16日周四 下午1:52写道: > > > > Hi! > > > > 就我所知目前暂时没有支持 side output 的打算。可以描述一下需求和场景吗? > > > > Ada Luna 于2021年9月15日周三 下午8:38写道: > > > > > Flink SQL 未来会支持side output,侧向输出一些脏数据吗? > > >
Session模式不同Job日志分离问题
目前我遇到的问题是不同Job的日志无法再一个Session中区分。 看了京东写的文章。 https://www.infoq.cn/article/1nvlduu82ihmusxxqruq 未来社区在这方面有什么规划吗。 https://issues.apache.org/jira/browse/FLINK-17969 这个Ticket的PR也被关了。
退订
退订 | | 宋京昌 | | 邮箱:sjc999...@126.com | 签名由 网易邮箱大师 定制
退订
退订