Re: flink sql 执行limit 很少的语句依然会暴增

2021-01-24 文章 zhang hao
flink run -py new_jdbc_source.py
Traceback (most recent call last):
  File "new_jdbc_source.py", line 66, in 
st_env.execute_sql("select * from feature_bar_sink").print()
  File
"/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
line 543, in execute_sql
  File
"/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
line 1286, in __call__
  File
"/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 147, in deco
  File
"/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o10.executeSql.
: java.lang.UnsupportedOperationException: Currently, a DynamicTableSource
with SupportsLimitPushDown ability is not supported.
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:210)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:208)
at scala.collection.immutable.List.foreach(List.scala:392)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.validateTableSource(CatalogSourceTable.scala:208)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:142)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)


org.apache.flink.client.program.ProgramAbortException
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at

Re: flink sql 执行limit 很少的语句依然会暴增

2021-01-22 文章 zhang hao
嗯嗯 好的 谢谢大家 ,应该就是这个问题了,merge到分支验证下

On Fri, Jan 22, 2021 at 11:35 AM Shengkai Fang  wrote:

> hi, LIMIT PUSH DOWN 近期已经merge进了master分支了。
>
> [1] https://github.com/apache/flink/pull/13800
>
> Land  于2021年1月22日周五 上午11:28写道:
>
> > 可能是没有下推到MySQL执行。
> > 问题和我遇到的类似:
> > http://apache-flink.147419.n8.nabble.com/Flink-MySQL-td10374.html
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


flink sql 执行limit 很少的语句依然会暴增

2021-01-20 文章 zhang hao
请教个问题,使用flink sql 去拉取mysql数据,mysql源表有千万级别数据量,使用了 select * from sourcTable
limit 10;
即使是limit几条数据也会导致内存暴增。这里的limit是从mysql原表执行后 直接在flink taskmanager进行limit取数据吗?


Re: Re:flink-sql字段类型问题

2021-01-14 文章 zhang hao
看了下源码BigInteger 转都会有问题,没有匹配的这种类型:

public boolean isNullAt(int pos) {
   return this.fields[pos] == null;
}

@Override
public boolean getBoolean(int pos) {
   return (boolean) this.fields[pos];
}

@Override
public byte getByte(int pos) {
   return (byte) this.fields[pos];
}

@Override
public short getShort(int pos) {
   return (short) this.fields[pos];
}

@Override
public int getInt(int pos) {
   return (int) this.fields[pos];
}

@Override
public long getLong(int pos) {
   return (long) this.fields[pos];
}

@Override
public float getFloat(int pos) {
   return (float) this.fields[pos];
}

@Override
public double getDouble(int pos) {
   return (double) this.fields[pos];
}


On Thu, Jan 14, 2021 at 6:24 PM yinghua...@163.com 
wrote:

>
> 回复错了,抱歉!
>
>
> yinghua...@163.com
>
> 发件人: yinghua...@163.com
> 发送时间: 2021-01-14 18:16
> 收件人: user-zh
> 主题: Re: 转发:flink-sql字段类型问题
>
> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:05:50 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> Found 1 items
> -rw-rw-r--   3 yarn hdfs   5388 2021-01-14 17:03
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6/_metadata
>  // 这个是通过JobManger看到已经checkpoing完成后去查询出来的记录,的确是生成了,里面已经包含了_metadata文件
> [root@sdp-10-88-100-147 flink-1.11.3]# hdfs dfs -ls
> hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6
> //我停止任务后再去查询时,这个目录已经删除了,出错如下
> Java HotSpot(TM) 64-Bit Server VM warning: ignoring option
> MaxPermSize=512m; support was removed in 8.0
> log4j:WARN No such property [datePattern] in
> org.apache.log4j.RollingFileAppender.
> 21/01/14 17:06:17 INFO util.NativeCodeLoader: Loaded the native-hadoop
> library
> ls:
> `hdfs://hdfsCluster/apps/ccp/flink/checkpoints/10001/39ed8aee0a2c4497be9a9d826355f595/chk-6':
> No such file or directory//出错信息
>
>
>
> yinghua...@163.com
> 发件人: 郝文强
> 发送时间: 2021-01-14 17:24
> 收件人: user-zh
> 主题: 转发:flink-sql字段类型问题
> | |
> 郝文强
> |
> |
> 18846086...@163.com
> |
> 签名由网易邮箱大师定制
> - 转发邮件信息 -
> 发件人: 郝文强 <18846086...@163.com>
> 发送日期: 2021年01月14日 17:23
> 发送至: d...@flink.apache.org 
> 主题: 转发:flink-sql字段类型问题
> | |
> 郝文强
> |
> |
> 18846086...@163.com
> |
> 签名由网易邮箱大师定制
> - 转发邮件信息 -
> 发件人: 郝文强 <18846086...@163.com>
> 发送日期: 2021年01月14日 17:22
> 发送至: dev-h...@flink.apache.org 
> 主题: flink-sql字段类型问题
> sql-client 创建表 报错java.math.BigInteger cannot be cast to java.lang.Long
> 麻烦各位帮看一下
> 源数据表是 mysql的information_schema.tables 表
> 表结构如下:
> table_catalog varchar(64)
> table_schema  varchar(64)
> table_name  varchar(64)
> table_type  enum('base table','view','system view')
> engine  varchar(64)
> version int
> row_format
> enum('fixed','dynamic','compressed','redundant','compact','paged')
> table_rows  bigint unsigned
> avg_row_length  bigint unsigned
> data_length bigint unsigned
> max_data_length bigint unsigned
> index_length  bigint unsigned
> data_free bigint unsigned
> auto_increment  bigint unsigned
> create_time timestamp
> update_time datetime
> check_time  datetime
> table_collation varchar(64)
> checksum  bigint
> create_options  varchar(256)
> table_comment text
> 我的flink sql 建表语句:
>CREATE TABLE info_table (
>   TABLE_CATALOG STRING,
>   TABLE_SCHEMA STRING,
>   TABLE_NAME STRING,
>   TABLE_TYPE STRING,
>   ENGINE STRING,
>   VERSION INT,
>   ROW_FORMAT STRING,
>   TABLE_ROWS BIGINT,
>   AVG_ROW_LENGTH BIGINT,
>   DATA_LENGTH BIGINT,
>   MAX_DATA_LENGTH BIGINT,
>   INDEX_LENGTH BIGINT,
>   DATA_FREE BIGINT,
>   AUTO_INCREMENT BIGINT,
>   CREATE_TIME TIMESTAMP,
>   UPDATE_TIME TIMESTAMP,
>   CHECK_TIME TIMESTAMP,
>   TABLE_COLLATION STRING,
>   CHECKSUM INTEGER,
>   CREATE_OPTIONS STRING,
>   TABLE_COMMENT STRING,
>   PRIMARY KEY (`TABLE_NAME`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:mysql://localhost:3306/information_schema',
>   'username' = 'root',
>   'password' = 'root',
>   'table-name' = 'TABLES'
> );
> 反复改了几次类型都报错:
> java.math.BigInteger cannot be cast to java.lang.Integer
> java.lang.Long cannot be cast to java.math.BigDecimal
> java.lang.Long cannot be cast to java.lang.Integer
> | |
> 郝文强
> |
> |
> 18846086...@163.com
> |
> 签名由网易邮箱大师定制
>


flink版本升级问题咨询

2021-01-06 文章 zhang hao
目前现状:公司flink任务都是跑在flin1.7当中,通过公司开发的流计算平台进行提交到flink yarn集群,flink on yarn
基于flink
session进行部署,运行任务有接近500个流式任务,许多都是有状态应用,现在如果想把flink集群升级到1.11或者1.12,如何平滑的进行版本升级,而不影响现有的任务?


hive读取复杂数据类型写入mysql报错

2020-12-29 文章 zhang hao
请教个问题 hive读取复杂类型,mysql支持,有什么方法把数据当初字符串去存储到 mysql中:
org.apache.flink.table.api.ValidationException: Field types of query
result and registered TableSink
default_catalog.default_database.0_es_hive_094679318_tmp do not match.

Query schema: [id: INT, name: VARCHAR(2147483647), hobby: ARRAY,
add: MAP]
Sink schema: [id: INT, name: VARCHAR(2147483647), hobby:
VARCHAR(2147483647), add: VARCHAR(2147483647)]
at 
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100)