Re: flink sql 执行limit 很少的语句依然会暴增
flink run -py new_jdbc_source.py Traceback (most recent call last): File "new_jdbc_source.py", line 66, in st_env.execute_sql("select * from feature_bar_sink").print() File "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py", line 543, in execute_sql File "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ File "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco File "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql. : java.lang.UnsupportedOperationException: Currently, a DynamicTableSource with SupportsLimitPushDown ability is not supported. at org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:210) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:208) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.validateTableSource(CatalogSourceTable.scala:208) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:142) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) org.apache.flink.client.program.ProgramAbortException at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at
Re: flink sql 执行limit 很少的语句依然会暴增
嗯嗯 好的 谢谢大家 ,应该就是这个问题了,merge到分支验证下 On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang wrote: > hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。 > > [1] https://github.com/apache/flink/pull/13800 > > Land 于2021年1月22日周五 上午11:28写道: > > > 可能是没有下推到MySQL执行。 > > 问题和我遇到的类似: > > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html > > > > > > > > -- > > Sent from: http://apache-flink.147419.n8.nabble.com/ > > >
flink sql 执行limit 很少的语句依然会暴增
请教个问题,使用flink sql 去拉取mysql数据,mysql源表有千万级别数据量,使用了 select * from sourcTable limit 10; 即使是limit几条数据也会导致内存暴增。这里的limit是从mysql原表执行后 直接在flink taskmanager进行limit取数据吗?
Re: Re:flink-sql字段类型问题
看了下源码BigInteger 转都会有问题,没有匹配的这种类型: public boolean isNullAt(int pos) { return this.fields[pos] == null; } @Override public boolean getBoolean(int pos) { return (boolean) this.fields[pos]; } @Override public byte getByte(int pos) { return (byte) this.fields[pos]; } @Override public short getShort(int pos) { return (short) this.fields[pos]; } @Override public int getInt(int pos) { return (int) this.fields[pos]; } @Override public long getLong(int pos) { return (long) this.fields[pos]; } @Override public float getFloat(int pos) { return (float) this.fields[pos]; } @Override public double getDouble(int pos) { return (double) this.fields[pos]; } On Thu, Jan 14, 2021 at 6:24 PM yinghua...@163.com wrote: > > 回复错了,抱歉! > > > yinghua...@163.com > > 发件人: yinghua...@163.com > 发送时间: 2021-01-14 18:16 > 收件人: user-zh > 主题: Re: 转发:flink-sql字段类型问题 > > [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls > hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6 > Java HotSpot(TM) 64-Bit Server VM warning: ignoring option > MaxPermSize=512m; support was removed in 8.0 > log4j:WARN No such property [datePattern] in > org.apache.log4j.RollingFileAppender. > 21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop > library > Found 1 items > -rw-rw-r-- 3 yarn hdfs 5388 2021-01-14 17:03 > hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata > // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件 > [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls > hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6 > //我停止任务后再去查询时,这个目录已经删除了,出错如下 > Java HotSpot(TM) 64-Bit Server VM warning: ignoring option > MaxPermSize=512m; support was removed in 8.0 > log4j:WARN No such property [datePattern] in > org.apache.log4j.RollingFileAppender. > 21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop > library > ls: > `hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6': > No such file or directory//出错信息 > > > > yinghua...@163.com > 发件人: 郝文强 > 发送时间: 2021-01-14 17:24 > 收件人: user-zh > 主题: 转发:flink-sql字段类型问题 > | | > 郝文强 > | > | > 18846086...@163.com > | > 签名由网易邮箱大师定制 > - 转发邮件信息 - > 发件人: 郝文强 <18846086...@163.com> > 发送日期: 2021年01月14日 17:23 > 发送至: d...@flink.apache.org > 主题: 转发:flink-sql字段类型问题 > | | > 郝文强 > | > | > 18846086...@163.com > | > 签名由网易邮箱大师定制 > - 转发邮件信息 - > 发件人: 郝文强 <18846086...@163.com> > 发送日期: 2021年01月14日 17:22 > 发送至: dev-h...@flink.apache.org > 主题: flink-sql字段类型问题 > sql-client 创建表 报错java.math.BigInteger cannot be cast to java.lang.Long > 麻烦各位帮看一下 > 源数据表是 mysql的information_schema.tables 表 > 表结构如下: > table_catalog varchar(64) > table_schema varchar(64) > table_name varchar(64) > table_type enum('base table','view','system view') > engine varchar(64) > version int > row_format > enum('fixed','dynamic','compressed','redundant','compact','paged') > table_rows bigint unsigned > avg_row_length bigint unsigned > data_length bigint unsigned > max_data_length bigint unsigned > index_length bigint unsigned > data_free bigint unsigned > auto_increment bigint unsigned > create_time timestamp > update_time datetime > check_time datetime > table_collation varchar(64) > checksum bigint > create_options varchar(256) > table_comment text > 我的flink sql 建表语句: >CREATE TABLE info_table ( > TABLE_CATALOG STRING, > TABLE_SCHEMA STRING, > TABLE_NAME STRING, > TABLE_TYPE STRING, > ENGINE STRING, > VERSION INT, > ROW_FORMAT STRING, > TABLE_ROWS BIGINT, > AVG_ROW_LENGTH BIGINT, > DATA_LENGTH BIGINT, > MAX_DATA_LENGTH BIGINT, > INDEX_LENGTH BIGINT, > DATA_FREE BIGINT, > AUTO_INCREMENT BIGINT, > CREATE_TIME TIMESTAMP, > UPDATE_TIME TIMESTAMP, > CHECK_TIME TIMESTAMP, > TABLE_COLLATION STRING, > CHECKSUM INTEGER, > CREATE_OPTIONS STRING, > TABLE_COMMENT STRING, > PRIMARY KEY (`TABLE_NAME`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://localhost:3306/information_schema', > 'username' = 'root', > 'password' = 'root', > 'table-name' = 'TABLES' > ); > 反复改了几次类型都报错: > java.math.BigInteger cannot be cast to java.lang.Integer > java.lang.Long cannot be cast to java.math.BigDecimal > java.lang.Long cannot be cast to java.lang.Integer > | | > 郝文强 > | > | > 18846086...@163.com > | > 签名由网易邮箱大师定制 >
flink版本升级问题咨询
目前现状:公司flink任务都是跑在flin1.7当中,通过公司开发的流计算平台进行提交到flink yarn集群,flink on yarn 基于flink session进行部署,运行任务有接近500个流式任务,许多都是有状态应用,现在如果想把flink集群升级到1.11或者1.12,如何平滑的进行版本升级,而不影响现有的任务?
hive读取复杂数据类型写入mysql报错
请教个问题 hive读取复杂类型,mysql支持,有什么方法把数据当初字符串去存储到 mysql中: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.0_es_hive_094679318_tmp do not match. Query schema: [id: INT, name: VARCHAR(2147483647), hobby: ARRAY, add: MAP] Sink schema: [id: INT, name: VARCHAR(2147483647), hobby: VARCHAR(2147483647), add: VARCHAR(2147483647)] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100)