Re: flink-1.14.0 sql 写array 错误

2021-10-13 文章 Caizhi Weng
Hi!

这看起来像一个 bug,我已经记了一个 issue [1],可以在那里关注问题进展。

如 issue 中所描述,目前看来如果常量字符串一样长,或者都 cast 成 varchar 可以绕过该问题。可以先这样绕过一下。

[1] https://issues.apache.org/jira/browse/FLINK-24537

kcz <573693...@qq.com.invalid> 于2021年10月13日周三 下午5:29写道:

> 因为select出多个sum的值,每一个sum的值都是一个type类型的数据,最后我将它插入到MySQL表里面,MySQL表结构为
> (id,type,value),于是想到通过列转行形式来操作。
> SQL如下:
> CREATE TABLE kafka_table (
>  
> vin STRING,
>  
> speed DOUBLE,
>  
> brake DOUBLE,
>  
> hard_to DOUBLE,
>  
> distance DOUBLE,
>  
> times TIMESTAMP(3),
>  
> WATERMARK FOR times AS times - INTERVAL
> '5' SECOND
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior',
>  'properties.bootstrap.servers' = 'localhost:9092',
>  'properties.group.id' = 'testGroup',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> );
>
>
>
>
> select window_start, window_end,vin,array[row('brakes',sum(if(brake 
> 3.0451,1,0))),row('hard_tos',sum(if(hard_to  3.0451,1,0)))]
> from TABLE(
>   TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10'
> MINUTES)) group by window_start, window_end,vin;
>
>
> 报错如下:
> Exception in thread "main" java.lang.AssertionError: Conversion to
> relational algebra failed to preserve datatypes:
> validated type:
> RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL
> window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin,
> RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER
> EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
> converted type:
> RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL
> window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin,
> RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT
> NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
> rel:
> LogicalProject(window_start=[$0], window_end=[$1], vin=[$2],
> EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7)
> CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT
> NULL, CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET
> "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)])
>  LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)],
> agg#1=[SUM($4)])
>   LogicalProject(window_start=[$6], window_end=[$7], vin=[$0],
> $f3=[IF(($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(($3,
> 3.0451:DECIMAL(5, 4)), 1, 0)])
>LogicalTableFunctionScan(invocation=[TUMBLE($5,
> DESCRIPTOR($5), 60:INTERVAL MINUTE)],
> rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake,
> DOUBLE hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3)
> window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
> LogicalProject(vin=[$0], speed=[$1],
> brake=[$2], hard_to=[$3], distance=[$4], times=[$5])
> 
> LogicalWatermarkAssigner(rowtime=[times], watermark=[-($5, 5000:INTERVAL
> SECOND)])
>  
> LogicalTableScan(table=[[default_catalog, default_database, kafka_table]])
>
>
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48)
> at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87)
> Disconnected from the target VM, address: '127.0.0.1:61710', transport:
> 'socket'
>
>
> Process finished with exit code 1


Re: flinksql客户端不能提交任务

2021-10-13 文章 Caizhi Weng
Hi!

从报错信息来看 client 在尝试链接位于 http://127.0.0.1:8081/ 的集群。你的 yarn session
应该不在本地吧?所以很可能是 sql client 对应的 flink 配置出错,检查一下对应的 flink 配置文件看看。

maker_d...@foxmail.com  于2021年10月14日周四 上午9:18写道:

> 各位大家好:
> 紧急求助!
> 我之前一直用sql-client提交SQL任务,今天突然不能提交了,报错如下:
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:215)
> Caused by: java.lang.RuntimeException: Error running SQL job.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$4(LocalExecutor.java:514)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:256)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:507)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:428)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:690)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:327)
> at java.util.Optional.ifPresent(Optional.java:159)
> at
> org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214)
> at
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144)
> at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:115)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$4(LocalExecutor.java:511)
> ... 10 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> at
> 

Re: Flink SQL支持side output

2021-10-13 文章 Kenyore Woo
你可以把使用反向条件把脏数据输出到另外一张表去。source会复用的。其实和side output效果是一致的
On Oct 13, 2021 at 16:28:57, Ada Luna  wrote:

> 这个没有支持的打算是因为,目前我们假定Flink SQL处理的数据都是干净的经过清洗的是吧。
>
> Ada Luna  于2021年9月19日周日 下午7:43写道:
>
>
> 主要是脏数据,Source、Sink或者其他算子产生的脏数据,向把这些数据侧向输出到外部数据库里存起来。
>
>
> Caizhi Weng  于2021年9月16日周四 下午1:52写道:
>
> >
>
> > Hi!
>
> >
>
> > 就我所知目前暂时没有支持 side output 的打算。可以描述一下需求和场景吗?
>
> >
>
> > Ada Luna  于2021年9月15日周三 下午8:38写道:
>
> >
>
> > > Flink SQL 未来会支持side output,侧向输出一些脏数据吗?
>
> > >
>
>


user-zh

2021-10-13 文章 351334131


flinksql客户端不能提交任务

2021-10-13 文章 maker_d...@foxmail.com
各位大家好:
紧急求助!
我之前一直用sql-client提交SQL任务,今天突然不能提交了,报错如下:

Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:215)
Caused by: java.lang.RuntimeException: Error running SQL job.
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$4(LocalExecutor.java:514)
at 
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:256)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:507)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:428)
at 
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:690)
at 
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:327)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:214)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:144)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:115)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeUpdateInternal$4(LocalExecutor.java:511)
... 10 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:400)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:390)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:430)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: 

Re: flink时态表:两个Hbase左关联有报错情况

2021-10-13 文章 zhisheng
是不是加了 'lookup.async' = 'true',当 rowkey 为 null 的时候会出现这个问题

https://issues.apache.org/jira/browse/FLINK-24528

Michael Ran  于2021年7月23日周五 上午10:44写道:

> java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError:
> org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils缺jar
> 在 2021-07-14 09:39:53,"xie_guo...@163.com"  写道:
> >您好,有关flinkSQL时态表左关联时遇到了问题。
> >具体场景:
> >
> 两个Hbase表做时态表关联,左表定义了处理时间,右表原本不变(rowkey,cf),FLINK版本1.13.0,报错情况如下,麻烦问一下大家又遇到类似问题嘛,该怎么处理!
> >
> >2021-07-14 09:22:20.592 WARN  org.apache.flink.runtime.taskmanager.Task
> --- 2021-07-14 09:22:20.596 WARN
> org.apache.flink.runtime.taskmanager.Task  ---
> LookupJoin(table=[default_catalog.default_database.hbase_source_pollution_dwb_enterprise_wodhz],
> joinType=[LeftOuterJoin], async=[true], lookup=[code=$f4], select=[code,
> data1, data2, p, $f4, code0, data]) -> Calc(select=[code,
> ROW(,,data.activ) -> NotNullEnforcer(fields=[EXPR$1, EXPR$2, p,
> EXPR$4]) (3/3)#3 (4ada188e117c67ccd9bd6488ae95216a) switched from RUNNING
> to FAILED with failure cause: java.util.concurrent.ExecutionException:
> java.lang.NoClassDefFoundError:
> org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
> >at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928
> >at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:168)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:131)
> >at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:448)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:671)
> >at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:629)
> >at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
> >at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> >at java.lang.Thread.run(Thread.java:748)
> >Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/hbase/shaded/org/apache/commons/io/IOUtils
> >at
> org.apache.flink.hbase.shaded.org.apache.hadoop.hbase.client.AsyncConnectionImpl.close(AsyncConnectionImpl.java:193)
> >at
> org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.close(HBaseRowDataAsyncLookupFunction.java:251)
> >at LookupFunction$3.close(Unknown Source
> >
>
> >ps:同样的代码,左表换成Kafka,能够正常运行。网上搜索了一下相关报错,好像时缺包,目前pom文件是有hbase-client、hbase-commen、flink-sql-connector-hbase。
> >
> >
> >
> >Sincerely,
> >xie_guo...@163.com
>


Re:Re:回复:flinksql有计划支持mysql catalog吗?

2021-10-13 文章 Yuepeng Pan
旭晨,你好。
关于你描述的问题,当前的MySQLCatalog的实现是不支持的,如果需要此功能,则需要重写对应的方法。https://github.com/apache/flink/pull/16962
 中 twalthr 老师与 jark 老师提及后续会重构这一部分。你可以在对应的JIRA或者PR上直接进行留言做进一步的讨论。
目前基于AbstractJdbcCatalog实现的XXXCatalog, 
包括PostgresCatalog和正在实现的MySQLCatalog都是不支持创建和更改表的。GenericInMemoryCatalog与HiveCatalog是支持的。


祝好,
Roc.

在 2021-10-13 16:42:16,"赵旭晨"  写道:
>@Roc Marshal 你好:
>我大致翻了下你的FLINK-15352分支上的关于mysqlcatalog的测试代码,想问一个问题:
>目前的mysql实现,tabelenvironment通过jdbc驱动,去加载mysql元数据,那么,反过来,通过flink java 
> api或者sqlclient,执行DDL建表语句create 
> catalog.database.table,将元数据写入mysql,之后当我第二次要调用相关表的时候就不需要再建表了,因为mysql已经有相关元数据了,这个能支持吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-10-12 12:06:03,"Roc Marshal"  写道:
>>旭晨,你好。
>>目前这个feature已经在工作中。
>>欢迎 review / 讨论/改进。 https://github.com/apache/flink/pull/16962
>>
>>祝好。
>>Roc.
>>
>>
>>
>>发自 网易邮箱大师
>>
>>
>>
>>
>> 回复的原邮件 
>>| 发件人 | 赵旭晨 |
>>| 日期 | 2021年10月12日 10:17 |
>>| 收件人 | user-zh@flink.apache.org |
>>| 抄送至 | |
>>| 主题 | flinksql有计划支持mysql catalog吗? |
>>目前flink的jdbccatalog只支持PG,有计划支持mysql的吗?由于公司的元数据存储统一用mysql,不太可能再引进PG,或者反过来问,flink社区到目前为止不支持mysqlcatalog的原因是什么?有什么顾虑么?


flink-1.14.0 sql ??array ????

2021-10-13 文章 kcz
select??sumsumtypeMySQLMySQL
 (id,type,value)
SQL??
CREATE TABLE kafka_table (
   
   vin STRING,
   
   speed DOUBLE,
   
   brake DOUBLE,
   
   hard_to DOUBLE,
   
   distance DOUBLE,
   
   times TIMESTAMP(3),
   
   WATERMARK FOR times AS times - INTERVAL '5' SECOND
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
);




select window_start, window_end,vin,array[row('brakes',sum(if(brake  
3.0451,1,0))),row('hard_tos',sum(if(hard_to  3.0451,1,0)))]
from TABLE(
  TUMBLE(TABLE kafka_table, DESCRIPTOR(times), INTERVAL '10' 
MINUTES)) group by window_start, window_end,vin;


??
Exception in thread "main" java.lang.AssertionError: Conversion to relational 
algebra failed to preserve datatypes:
validated type:
RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL 
window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, 
RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER EXPR$1) 
NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
converted type:
RecordType(TIMESTAMP(3) NOT NULL window_start, TIMESTAMP(3) NOT NULL 
window_end, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" vin, 
RecordType(VARCHAR(7) CHARACTER SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT 
NULL EXPR$1) NOT NULL ARRAY NOT NULL EXPR$3) NOT NULL
rel:
LogicalProject(window_start=[$0], window_end=[$1], vin=[$2], 
EXPR$3=[ARRAY(CAST(ROW(_UTF-16LE'brake', $3)):RecordType(VARCHAR(7) CHARACTER 
SET "UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL, 
CAST(ROW(_UTF-16LE'hard_to', $4)):RecordType(VARCHAR(7) CHARACTER SET 
"UTF-16LE" NOT NULL EXPR$0, INTEGER NOT NULL EXPR$1) NOT NULL)])
 LogicalAggregate(group=[{0, 1, 2}], agg#0=[SUM($3)], agg#1=[SUM($4)])
  LogicalProject(window_start=[$6], window_end=[$7], vin=[$0], 
$f3=[IF(($2, 3.0451:DECIMAL(5, 4)), 1, 0)], $f4=[IF(($3, 
3.0451:DECIMAL(5, 4)), 1, 0)])
   LogicalTableFunctionScan(invocation=[TUMBLE($5, 
DESCRIPTOR($5), 60:INTERVAL MINUTE)], 
rowType=[RecordType(VARCHAR(2147483647) vin, DOUBLE speed, DOUBLE brake, DOUBLE 
hard_to, DOUBLE distance, TIMESTAMP(3) *ROWTIME* times, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
LogicalProject(vin=[$0], speed=[$1], brake=[$2], 
hard_to=[$3], distance=[$4], times=[$5])
 LogicalWatermarkAssigner(rowtime=[times], 
watermark=[-($5, 5000:INTERVAL SECOND)])
  
LogicalTableScan(table=[[default_catalog, default_database, kafka_table]])


at 
org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:467)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:582)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:177)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:169)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1057)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1026)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:301)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
at com.hycan.bigdata.utils.SqlUtil.callCommand(SqlUtil.java:48)
at com.hycan.bigdata.job.SchemaJob.main(SchemaJob.java:87)
Disconnected from the target VM, address: '127.0.0.1:61710', transport: 'socket'


Process finished with exit code 1

Re:回复:flinksql有计划支持mysql catalog吗?

2021-10-13 文章 赵旭晨
@Roc Marshal 你好:
我大致翻了下你的FLINK-15352分支上的关于mysqlcatalog的测试代码,想问一个问题:
目前的mysql实现,tabelenvironment通过jdbc驱动,去加载mysql元数据,那么,反过来,通过flink java 
api或者sqlclient,执行DDL建表语句create 
catalog.database.table,将元数据写入mysql,之后当我第二次要调用相关表的时候就不需要再建表了,因为mysql已经有相关元数据了,这个能支持吗?

















在 2021-10-12 12:06:03,"Roc Marshal"  写道:
>旭晨,你好。
>目前这个feature已经在工作中。
>欢迎 review / 讨论/改进。 https://github.com/apache/flink/pull/16962
>
>祝好。
>Roc.
>
>
>
>发自 网易邮箱大师
>
>
>
>
> 回复的原邮件 
>| 发件人 | 赵旭晨 |
>| 日期 | 2021年10月12日 10:17 |
>| 收件人 | user-zh@flink.apache.org |
>| 抄送至 | |
>| 主题 | flinksql有计划支持mysql catalog吗? |
>目前flink的jdbccatalog只支持PG,有计划支持mysql的吗?由于公司的元数据存储统一用mysql,不太可能再引进PG,或者反过来问,flink社区到目前为止不支持mysqlcatalog的原因是什么?有什么顾虑么?


Re: Flink SQL支持side output

2021-10-13 文章 Ada Luna
这个没有支持的打算是因为,目前我们假定Flink SQL处理的数据都是干净的经过清洗的是吧。

Ada Luna  于2021年9月19日周日 下午7:43写道:
>
> 主要是脏数据,Source、Sink或者其他算子产生的脏数据,向把这些数据侧向输出到外部数据库里存起来。
>
> Caizhi Weng  于2021年9月16日周四 下午1:52写道:
> >
> > Hi!
> >
> > 就我所知目前暂时没有支持 side output 的打算。可以描述一下需求和场景吗?
> >
> > Ada Luna  于2021年9月15日周三 下午8:38写道:
> >
> > > Flink SQL 未来会支持side output,侧向输出一些脏数据吗?
> > >


Session模式不同Job日志分离问题

2021-10-13 文章 Ada Luna
目前我遇到的问题是不同Job的日志无法再一个Session中区分。

看了京东写的文章。
https://www.infoq.cn/article/1nvlduu82ihmusxxqruq

未来社区在这方面有什么规划吗。

https://issues.apache.org/jira/browse/FLINK-17969
这个Ticket的PR也被关了。


退订

2021-10-13 文章 宋京昌
退订


| |
宋京昌
|
|
邮箱:sjc999...@126.com
|

签名由 网易邮箱大师 定制

退订

2021-10-13 文章 Wayne Ding
退订