官方pyflink 例子的执行问题

2020-07-20 Thread chenxuying
官方例子:
https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
按照例子写了程序,也安装了pyflink
|
python -m pip install apache-flink
|
代码:
|
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf


env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)


add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], 
DataTypes.BIGINT())


t_env.register_function("add", add)


t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
 \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')


t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
 \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')


t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')


t_env.execute("tutorial_job")
|

执行:

|
python test_pyflink.py
|

报错:


|
Traceback (most recent call last):
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
 line 147, in deco
return f(*a, **kw)
  File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
 line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
: org.apache.flink.table.api.TableException: The configured Task Off-Heap 
Memory 0 bytes is less than the least required Python worker Memory 79 mb. The 
Task Off-Heap Memory can be configured using the configuration key 
'taskmanager.memory.task.off-heap.size'.
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
at 
org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
at 
org.apache.flink

Re:Re: 官方pyflink 例子的执行问题

2020-07-20 Thread chenxuying
你好
明白了,感谢 , 我文档没看清楚哈

















在 2020-07-21 11:44:23,"Xingbo Huang"  写道:
>Hi,
>你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>True)就行,如果你用了的话,就需要配置off-heap
>memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>'80m')。你可以参考文档上的例子,以及对应的note说明[1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions
>
>Best,
>Xingbo
>
>
>chenxuying  于2020年7月21日周二 上午11:36写道:
>
>> 官方例子:
>> https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html
>> 按照例子写了程序,也安装了pyflink
>> |
>> python -m pip install apache-flink
>> |
>> 代码:
>> |
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, DataTypes
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
>> from pyflink.table.udf import udf
>>
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> t_env = StreamTableEnvironment.create(env)
>>
>>
>> add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()],
>> DataTypes.BIGINT())
>>
>>
>> t_env.register_function("add", add)
>>
>>
>> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))
>> \
>> .with_format(OldCsv()
>> .field('a', DataTypes.BIGINT())
>> .field('b', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>> .field('a', DataTypes.BIGINT())
>> .field('b', DataTypes.BIGINT())) \
>> .create_temporary_table('mySource')
>>
>>
>> t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))
>> \
>> .with_format(OldCsv()
>> .field('sum', DataTypes.BIGINT())) \
>> .with_schema(Schema()
>> .field('sum', DataTypes.BIGINT())) \
>> .create_temporary_table('mySink')
>>
>>
>> t_env.from_path('mySource')\
>> .select("add(a, b)") \
>> .insert_into('mySink')
>>
>>
>> t_env.execute("tutorial_job")
>> |
>>
>> 执行:
>>
>> |
>> python test_pyflink.py
>> |
>>
>> 报错:
>>
>>
>> |
>> Traceback (most recent call last):
>>   File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py",
>> line 147, in deco
>> return f(*a, **kw)
>>   File
>> "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py",
>> line 328, in get_return_value
>> format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
>> : org.apache.flink.table.api.TableException: The configured Task Off-Heap
>> Memory 0 bytes is less than the least required Python worker Memory 79 mb.
>> The Task Off-Heap Memory can be configured using the configuration key
>> 'taskmanager.memory.task.off-heap.size'.
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158)
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119)
>> at
>> org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.

flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
hi
我使用的flink 1.11.0版本
代码如下
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
tableEnvironment.executeSql(" " +
" CREATE TABLE mySource ( " +
"  a bigint, " +
"  b bigint " +
" ) WITH ( " +
"  'connector.type' = 'kafka', " +
"  'connector.version' = 'universal', " +
"  'connector.topic' = 'mytesttopic', " +
"  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
"  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
"  'connector.properties.group.id' = 'flink-test-cxy', " +
"  'connector.startup-mode' = 'latest-offset', " +
"  'format.type' = 'json' " +
" ) ");
tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
" id bigint, " +
"  game_id varchar, " +
"  PRIMARY KEY (id) NOT ENFORCED  " +
" )  " +
" with ( " +
"  'connector.type' = 'jdbc',   " +
"  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " 
+
"  'connector.username' = 'root' , " +
"  'connector.password' = 'root',  " +
"  'connector.table' = 'mysqlsink' , " +
"  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
"  'connector.write.flush.interval' = '2s',  " +
"  'connector.write.flush.max-rows' = '300'  " +
" )");
tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
(select a,cast(b as varchar) b from mySource)");


问题一 : 上面的insert语句会出现如下错误
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply 
'$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY()'. Supported form(s): '$SCALAR_QUERY()'


问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 
'1' for key 'PRIMARY'





Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
谢谢回答
使用新属性可以 成功修改记录 ,
但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做

















在 2020-07-31 16:46:41,"Leonard Xu"  写道:
>Hi, chenxuying
>
>看你还是用的还是 "  'connector.type' = 'jdbc', ….  " 
>,这是老的option,使用老的option参数还是需要根据query推导主键,
>需要使用新的属性[1]:" 'connector' = 'jdbc’,…." 才能配合 主键 决定 upsert 模式.
> 
>Best
>Leonard
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options
> 
><https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#connector-options>
>
>> 在 2020年7月31日,16:12,chenxuying  写道:
>> 
>> hi
>> 我使用的flink 1.11.0版本
>> 代码如下
>> StreamExecutionEnvironment streamEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> TableEnvironment tableEnvironment = StreamTableEnvironment.create(streamEnv);
>> tableEnvironment.executeSql(" " +
>> " CREATE TABLE mySource ( " +
>> "  a bigint, " +
>> "  b bigint " +
>> " ) WITH ( " +
>> "  'connector.type' = 'kafka', " +
>> "  'connector.version' = 'universal', " +
>> "  'connector.topic' = 'mytesttopic', " +
>> "  'connector.properties.zookeeper.connect' = '172.17.0.2:2181', " +
>> "  'connector.properties.bootstrap.servers' = '172.17.0.2:9092', " +
>> "  'connector.properties.group.id' = 'flink-test-cxy', " +
>> "  'connector.startup-mode' = 'latest-offset', " +
>> "  'format.type' = 'json' " +
>> " ) ");
>> tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
>> " id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> " )  " +
>> " with ( " +
>> "  'connector.type' = 'jdbc',   " +
>> "  'connector.url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' 
>> , " +
>> "  'connector.username' = 'root' , " +
>> "  'connector.password' = 'root',  " +
>> "  'connector.table' = 'mysqlsink' , " +
>> "  'connector.driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> "  'connector.write.flush.interval' = '2s',  " +
>> "  'connector.write.flush.max-rows' = '300'  " +
>> " )");
>> tableEnvironment.executeSql("insert into mysqlsink (`id`,`game_id`) values 
>> (select a,cast(b as varchar) b from mySource)");
>> 
>> 
>> 问题一 : 上面的insert语句会出现如下错误
>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Cannot 
>> apply '$SCALAR_QUERY' to arguments of type '$SCALAR_QUERY(> A, VARCHAR(2147483647) B)>)'. Supported form(s): 
>> '$SCALAR_QUERY()'
>> 
>> 
>> 问题二 : 如果insert改成 tableEnvironment.executeSql("insert into mysqlsink select 
>> a,cast(b as varchar) b from mySource"); 是可以运行的,但是出现唯一主键重复时会报错
>> Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate 
>> entry '1' for key 'PRIMARY'
>> 
>> 
>> 
>


Re:Re: flinksql kafka 插入mysql 的insert语法问题和唯一主键的问题

2020-07-31 Thread chenxuying
hi
ok,谢谢,懂了哈哈














在 2020-07-31 21:27:02,"Leonard Xu"  写道:
>Hello
>
>> 在 2020年7月31日,21:13,chenxuying  写道:
>> 
>> 但是不太明白 "使用老的option参数还是需要根据query推导主键" 这里话是什么意思,需要怎么做
>
>简单来讲,如果使用的是老版本(1.10)的option参数,代码执行的路径就和1.10版本一样的,1.10版本里是不支持定义 PRIMARY KEY 的,
>是通过用户的query来决定写入的模式是upsert 还是 append ,  你可以看下1.10的文档关于用query 推导 写入模式的文档[1], 
>如果已经在用1.11了,1.10的文档可以不用看的。
> 
>在1.10里经常出现query 推导不出 key 导致无法做upsert写入的case, 在1.11里通过支持定义 PRIMARY 
>KEY,不会再有类似问题.1.11的文档参考[2]。
>
>祝好
>Leonard
>
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector
> 
><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#jdbc-connector>
>[2] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table
> 
><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#how-to-create-a-jdbc-table>


flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-01 Thread chenxuying
Hello
请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
SupportsOverwrite interface.
是得自定义connector吗,实现DynamicTableSink?


祝好
chenxuying
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax

Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 Thread chenxuying
你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update , 
我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持

















在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>Hi,
>
>这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  INSERT 
>OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>OVERWRITE到DB的场景吗?
>
>Best
>Leonard
>[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
> 
><https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling>
>
>> 在 2020年8月1日,19:20,chenxuying  写道:
>> 
>> Hello
>> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
>> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
>> SupportsOverwrite interface.
>> 是得自定义connector吗,实现DynamicTableSink?
>> 
>> 
>> 祝好
>> chenxuying
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>


Re:Re: flinksql insert overwrite 报错 INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement SupportsOverwrite interface

2020-08-02 Thread chenxuying
谢谢, 明白了








在 2020-08-03 10:42:53,"Leonard Xu"  写道:
>如果 DB 支持 upsert 语法,执行的是update, 如果不支持 upsert语法, 则是 delete + insert,MySQL 和 PG 
>都支持 upsert, 底层对应的sql语句是
>
>Database   Upsert Grammar
>MySQL  INSERT .. ON DUPLICATE KEY UPDATE ..
>PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..
>
>MySQL connector 不支持 replace into, 用的是 on duplicate key update.
>
>祝好
>Leonard 
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#idempotent-writes
> 
><https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/jdbc.html#idempotent-writes>
>
>
>> 在 2020年8月3日,10:33,chenxuying  写道:
>> 
>> 你好,我这边只是做一个尝试 , 因为之前使用了insert into + 主键做到更新db记录的操作 , 然后现在看到INSERT 
>> OVERWRITE的语法就试了一下 , 原来OVERWRITE目前只支持 Filesystem connector 和 Hive table
>> 然后还想问下在使用insert into + 主键 时,如果主键存在 , 则在底层执行sql时是update语句是吗, 还是delete+update 
>> , 我们这边之前有个需求是update执行效率太低 , 然后需要用到replace into , 不知道flink是否支持
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-08-02 09:48:04,"Leonard Xu"  写道:
>>> Hi,
>>> 
>>> 这个错是因为JDBC connector 不支持INSERT OVERWRITE, 你看的文档是列出了目前 Flink SQL 
>>> 支持的INSERT语法,但是不是所有的 connector 都支持  INSERT OVERWRITE, 目前支持的只有 Filesystem 
>>> connector 和 Hive table, 这些表一般不会有主键。其他connector 如 JDBC\ES\HBase 目前不支持  
>>> INSERT OVERWRITE,现在 JDBC\ES\HBase connector都是支持upsert 插入的[1],
>>> 就是在connector 表上定义了PK,结果可以按照PK更新,对于DB类的系统应该都是可以满足业务需求的。 可以分享下需要INSERT 
>>> OVERWRITE到DB的场景吗?
>>> 
>>> Best
>>> Leonard
>>> [1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#key-handling>
>>> 
>>>> 在 2020年8月1日,19:20,chenxuying  写道:
>>>> 
>>>> Hello
>>>> 请问在flink 1.11.0文档[1]中发现可以使用时可以使用INSERTOVERWRITE... 尝试了一下,但是在执行语句"insert 
>>>> overwrite mysqlsink select a,cast(b as varchar) b from mySource"时报如下错误
>>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>>> INSERT OVERWRITE requires JDBC:MySQL DynamicTableSink to implement 
>>>> SupportsOverwrite interface.
>>>> 是得自定义connector吗,实现DynamicTableSink?
>>>> 
>>>> 
>>>> 祝好
>>>> chenxuying
>>>> [1] 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#syntax
>>> 
>


有界数据中batch和stream的区别

2020-08-03 Thread chenxuying
hi :
flink table sql 1.11.0
在EnvironmentSettings中可以设置BatchMode或StreamingMode


EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();


如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,  
不知道大佬们有没有例子可以比较容易理解
我的代码
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
//.inStreamingMode()
.inBatchMode()
.build();
TableEnvironment tableEnvironment = 
TableEnvironment.create(environmentSettings);
tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
" id bigint, " +
"  game_id varchar, " +
"  PRIMARY KEY (id) NOT ENFORCED  " +
" )  " +
" with ( " +
"'connector' = 'jdbc',  " +
" 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
" 'username' = 'root' , " +
" 'password' = 'root', " +
" 'table-name' = 'mysqlsink' , " +
" 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
" 'sink.buffer-flush.interval' = '2s', " +
" 'sink.buffer-flush.max-rows' = '300' " +
" )");
tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
" id bigint, " +
"  game_id varchar, " +
"  PRIMARY KEY (id) NOT ENFORCED  " +
" )  " +
" with ( " +
"'connector' = 'print'  " +
" )");
tableEnvironment.executeSql("insert into print_sink select id,game_id from 
mysql_source");

Re:Re: 有界数据中batch和stream的区别

2020-08-03 Thread chenxuying
你好,请问下我修改后的语句是
insert into print_sink select game_id,count(id) from mysql_source group by 
game_id
然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下
2> +I(12,1)
5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2) 6> 
-U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4)


然后如果我使用的是batchMode,他就报错了
org.apache.flink.util.FlinkException: Error while shutting the TaskExecutor 
down.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425)
...
Caused by: java.util.concurrent.CompletionException: 
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.util.JavaGcCleanerWrapper
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
...
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
... 21 more
Suppressed: org.apache.flink.util.FlinkException: Could not properly shut down 
the TaskManager services.
at 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462)
at...
... 21 more
Caused by: org.apache.flink.util.FlinkException: Could not close resource.
at org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)
at 
org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204)
... 37 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.util.JavaGcCleanerWrapper
at 
org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94)
at 
org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
...
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
... 21 more
[CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.util.JavaGcCleanerWrapper]


不知道您是否知道原因


在 2020-08-04 12:11:32,"godfrey he"  写道:
>逻辑上批产生的结果是Table,流产生的结果是Changelog。
>你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
>最简单的方式可以将query改为带group by的,再看结果的差异。
>更多关于Table和Changelog的概念可以参考 [1]
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
>
>chenxuying  于2020年8月4日周二 上午11:44写道:
>
>> hi :
>> flink table sql 1.11.0
>> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
>>
>>
>> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
>> //.inStreamingMode()
>> .inBatchMode()
>> .build();
>>
>>
>> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
>> 不知道大佬们有没有例子可以比较容易理解
>> 我的代码
>> EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
>> //.inStreamingMode()
>> .inBatchMode()
>> .build();
>> TableEnvironment tableEnvironment =
>> TableEnvironment.create(environmentSettings);
>> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
>> " id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> " )  " +
>> " with ( " +
>> "'connector' = 'jdbc',  " +
>> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
>> " 'username' = 'root' , " +
>> " 'password' = 'root', " +
>> " 'table-name' = 'mysqlsink' , " +
>> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> " 'sink.buffer-flush.interval' = '2s', " +
>> " 'sink.buffer-flush.max-rows' = '300' " +
>> " )");
>> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
>> " id bigint, " +
>> "  game_id varchar, " +
>> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> " )  " +
>> " with ( " +
>> "'connector' = 'print'  " +
>> " )");
>> tableEnvironment.executeSql("insert into print_sink select id,game_id from
>> mysql_source");


Re:Re: Re: 有界数据中batch和stream的区别

2020-08-04 Thread chenxuying
我的是在win10+idea上开发测试的, 然后同一个项目代码, 在我同事电脑上却可以正常运行, 不知道是不是系统




在 2020-08-04 17:19:48,"godfrey he"  写道:
>你的运行环境是啥?能提供一下相关配置吗?
>
>chenxuying  于2020年8月4日周二 下午2:46写道:
>
>> 你好,请问下我修改后的语句是
>> insert into print_sink select game_id,count(id) from mysql_source group by
>> game_id
>> 然后在执行的时候如果选择的是streamMode他会打印出Changelog,如下
>> 2> +I(12,1)
>> 5> +I(12555,1) 1> +I(122,1) 3> +I(13,1) 6> +I(1,1) 6> -U(1,1) 6> +U(1,2)
>> 6> -U(1,2) 6> +U(1,3) 6> -U(1,3) 6> +U(1,4) 6> -U(1,4)
>>
>>
>> 然后如果我使用的是batchMode,他就报错了
>> org.apache.flink.util.FlinkException: Error while shutting the
>> TaskExecutor down.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.handleOnStopException(TaskExecutor.java:440)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$onStop$2(TaskExecutor.java:425)
>> ...
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.flink.util.JavaGcCleanerWrapper
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> ...
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
>> ... 21 more
>> Suppressed: org.apache.flink.util.FlinkException: Could not properly shut
>> down the TaskManager services.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:236)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.stopTaskExecutorServices(TaskExecutor.java:462)
>> at...
>> ... 21 more
>> Caused by: org.apache.flink.util.FlinkException: Could not close resource.
>> at
>> org.apache.flink.util.AutoCloseableAsync.close(AutoCloseableAsync.java:42)
>> at
>> org.apache.flink.runtime.taskexecutor.TaskManagerServices.shutDown(TaskManagerServices.java:204)
>> ... 37 more
>> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
>> org.apache.flink.util.JavaGcCleanerWrapper
>> at
>> org.apache.flink.runtime.memory.UnsafeMemoryBudget.reserveMemory(UnsafeMemoryBudget.java:94)
>> at
>> org.apache.flink.runtime.memory.UnsafeMemoryBudget.verifyEmpty(UnsafeMemoryBudget.java:64)
>> ...
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:422)
>> ... 21 more
>> [CIRCULAR REFERENCE:java.lang.NoClassDefFoundError: Could not initialize
>> class org.apache.flink.util.JavaGcCleanerWrapper]
>>
>>
>> 不知道您是否知道原因
>>
>>
>> 在 2020-08-04 12:11:32,"godfrey he"  写道:
>> >逻辑上批产生的结果是Table,流产生的结果是Changelog。
>> >你在例子中Table的结果和changelog的结果是一样的,所以你感觉差不多。
>> >最简单的方式可以将query改为带group by的,再看结果的差异。
>> >更多关于Table和Changelog的概念可以参考 [1]
>> >
>> >[1]
>> >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/dynamic_tables.html
>> >
>> >chenxuying  于2020年8月4日周二 上午11:44写道:
>> >
>> >> hi :
>> >> flink table sql 1.11.0
>> >> 在EnvironmentSettings中可以设置BatchMode或StreamingMode
>> >>
>> >>
>> >> EnvironmentSettings environmentSettings =
>> EnvironmentSettings.newInstance()
>> >> //.inStreamingMode()
>> >> .inBatchMode()
>> >> .build();
>> >>
>> >>
>> >> 如果使用mysql作为source , 使用这两种模式都可以运行 , 效果都一样 , 感觉对批和流的理解还不够 , 不知道其中区别 ,
>> >> 不知道大佬们有没有例子可以比较容易理解
>> >> 我的代码
>> >> EnvironmentSettings environmentSettings =
>> EnvironmentSettings.newInstance()
>> >> //.inStreamingMode()
>> >> .inBatchMode()
>> >> .build();
>> >> TableEnvironment tableEnvironment =
>> >> TableEnvironment.create(environmentSettings);
>> >> tableEnvironment.executeSql("CREATE TABLE mysql_source ( " +
>> >> " id bigint, " +
>> >> "  game_id varchar, " +
>> >> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> >> " )  " +
>> >> " with ( " +
>> >> "'connector' = 'jdbc',  " +
>> >> " 'url' = 'jdbc:mysql://47.99.181.86:3306/flinksql?useSSL=false' , " +
>> >> " 'username' = 'root' , " +
>> >> " 'password' = 'root', " +
>> >> " 'table-name' = 'mysqlsink' , " +
>> >> " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
>> >> " 'sink.buffer-flush.interval' = '2s', " +
>> >> " 'sink.buffer-flush.max-rows' = '300' " +
>> >> " )");
>> >> tableEnvironment.executeSql("CREATE TABLE print_sink ( " +
>> >> " id bigint, " +
>> >> "  game_id varchar, " +
>> >> "  PRIMARY KEY (id) NOT ENFORCED  " +
>> >> " )  " +
>> >> " with ( " +
>> >> "'connector' = 'print'  " +
>> >> " )");
>> >> tableEnvironment.executeSql("insert into print_sink select id,game_id
>> from
>> >> mysql_source");
>>


​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 Thread chenxuying
版本:
flinksql 1.11.0
需求:
需要实现多行聚合成一行功能
代码如下:
environmentSettings = 
EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(environment_settings = 
environmentSettings)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
 'true')


a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]})
a_table = t_env.from_pandas(a_df,
DataTypes.ROW([DataTypes.FIELD("id", DataTypes.STRING()),
 DataTypes.FIELD("uuid", 
DataTypes.STRING())]))
t_env.create_temporary_view("table_a",a_table)


b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]})
table_b = t_env.from_pandas(b_df ,
DataTypes.ROW([DataTypes.FIELD("val", DataTypes.STRING()),
 DataTypes.FIELD("uuid", 
DataTypes.STRING())]))
t_env.create_temporary_view("table_b",table_b)


t_env.sql_update("""
CREATE TABLE mySink (   
 
b varchar ,
c varchar 
) WITH ( 
'connector' = 'print'   
) 
""")


t_env.sql_update("""
insert into mySink 
select t1.id ,LISTAGG(t2.val , ',') 
from table_a t1 left join table_b t2 on t1.uuid = t2.uuid
group by t1.id
""")
t_env.execute("tutorial_job")


报错:
Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to 
org.apache.flink.table.data.StringData at 
org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) 
at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) 
at org.apache.flink.table.data.RowData.get(RowData.java:273) at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown
 Source) at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:745)





Re:Re: ​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 Thread chenxuying
好的 , 原来是bug , 感谢回答


在 2020-08-12 21:32:40,"Benchao Li"  写道:
>看起来是一个已知bug[1],已经修复,但是还没有发布。
>
>[1] https://issues.apache.org/jira/browse/FLINK-18862
>
>chenxuying  于2020年8月12日周三 下午9:25写道:
>
>> 版本:
>> flinksql 1.11.0
>> 需求:
>> 需要实现多行聚合成一行功能
>> 代码如下:
>> environmentSettings =
>> EnvironmentSettings.new_instance().in_streaming_mode().build()
>> t_env = StreamTableEnvironment.create(environment_settings =
>> environmentSettings)
>> t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
>> 'true')
>>
>>
>> a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]})
>> a_table = t_env.from_pandas(a_df,
>> DataTypes.ROW([DataTypes.FIELD("id",
>> DataTypes.STRING()),
>>  DataTypes.FIELD("uuid",
>> DataTypes.STRING())]))
>> t_env.create_temporary_view("table_a",a_table)
>>
>>
>> b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]})
>> table_b = t_env.from_pandas(b_df ,
>> DataTypes.ROW([DataTypes.FIELD("val",
>> DataTypes.STRING()),
>>  DataTypes.FIELD("uuid",
>> DataTypes.STRING())]))
>> t_env.create_temporary_view("table_b",table_b)
>>
>>
>> t_env.sql_update("""
>> CREATE TABLE mySink (
>>
>> b varchar ,
>> c varchar
>> ) WITH (
>> 'connector' = 'print'
>> )
>> """)
>>
>>
>> t_env.sql_update("""
>> insert into mySink
>> select t1.id ,LISTAGG(t2.val , ',')
>> from table_a t1 left join table_b t2 on t1.uuid = t2.uuid
>> group by t1.id
>> """)
>> t_env.execute("tutorial_job")
>>
>>
>> 报错:
>> Caused by: java.lang.ClassCastException:
>> org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
>> org.apache.flink.table.data.StringData at
>> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
>> at
>> org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139)
>> at org.apache.flink.table.data.RowData.get(RowData.java:273) at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
>> at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
>> at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>> at
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
>> at
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>> at
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown
>> Source) at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at
>> java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>
>-- 
>
>Best,
>Benchao Li


使用flinksql时 jdbc connector参数不起作用

2020-09-17 Thread chenxuying
环境是flink1.11.2+idea
sql:
CREATE TABLE sourceTable (
platform STRING
,game_id bigint
) WITH (
...
);
CREATE TABLE sinktable (
platform STRING
,game_id bigint
) WITH (
'connector' = 'jdbc',
'url' = '',
'table-name' = '',
'driver' = 'com.mysql.jdbc.Driver',
'username' = '',
'password' = '',
'sink.buffer-flush.max-rows' = '2',
'sink.buffer-flush.interval' = '30s'
);
insert into sinktable select platform,game_id from sourceTable;


官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval 这两个属性可以设置成 
'0' 来禁用他 , 不过我试了下是不行
如果设置如下
   sink.buffer-flush.max-rows = '0'
   'sink.buffer-flush.interval' = '60s'
导致每接收一条数据就插入数据库
如果设置如下
   sink.buffer-flush.max-rows = '10'
   'sink.buffer-flush.interval' = '0'
导致无法插入数据库


[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options



Re:Re: 使用flinksql时 jdbc connector参数不起作用

2020-09-21 Thread chenxuying
好的, 明白




在 2020-09-17 20:29:09,"Jark Wu"  写道:
>>  sink.buffer-flush.max-rows = '0' 导致每接收一条数据就插入数据库
>
>这个应该是个 bug,我建了个 issue:https://issues.apache.org/jira/browse/FLINK-19280
>
>Best,
>Jark
>
>On Thu, 17 Sep 2020 at 18:15, chenxuying  wrote:
>
>> 环境是flink1.11.2+idea
>> sql:
>> CREATE TABLE sourceTable (
>> platform STRING
>> ,game_id bigint
>> ) WITH (
>> ...
>> );
>> CREATE TABLE sinktable (
>> platform STRING
>> ,game_id bigint
>> ) WITH (
>> 'connector' = 'jdbc',
>> 'url' = '',
>> 'table-name' = '',
>> 'driver' = 'com.mysql.jdbc.Driver',
>> 'username' = '',
>> 'password' = '',
>> 'sink.buffer-flush.max-rows' = '2',
>> 'sink.buffer-flush.interval' = '30s'
>> );
>> insert into sinktable select platform,game_id from sourceTable;
>>
>>
>> 官方文档[1]中 , 说到 sink.buffer-flush.max-rows和sink.buffer-flush.interval
>> 这两个属性可以设置成 '0' 来禁用他 , 不过我试了下是不行
>> 如果设置如下
>>sink.buffer-flush.max-rows = '0'
>>'sink.buffer-flush.interval' = '60s'
>> 导致每接收一条数据就插入数据库
>> 如果设置如下
>>sink.buffer-flush.max-rows = '10'
>>'sink.buffer-flush.interval' = '0'
>> 导致无法插入数据库
>>
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html#connector-options
>>
>>


flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-23 Thread chenxuying
flinksql 版本是1.11.2 
source接收到字段是字符串类型的时间
CREATE TABLE sourceTable (
 `time` STRING
 ) WITH(
...
 );


sink如下
CREATE TABLE sinktable (
`time1` STRING,
`time` TIMESTAMP(3)
 ) WITH (
 'connector' = 'print'
 );


insert语句,不知道怎么正确修改TO_TIMESTAMP默认的格式
 insert into sinktable select 
`time`,TO_TIMESTAMP(`time`,'-MM-ddTHH:mm:ss+08:00') from sourceTable


报错说是format错误
Caused by: java.lang.IllegalArgumentException: Unknown pattern letter: T
at 
java.time.format.DateTimeFormatterBuilder.parsePattern(DateTimeFormatterBuilder.java:1663)
at 
java.time.format.DateTimeFormatterBuilder.appendPattern(DateTimeFormatterBuilder.java:1572)
at java.time.format.DateTimeFormatter.ofPattern(DateTimeFormatter.java:534)

flink使用在docker环境中部署出现的两个问题

2020-09-27 Thread chenxuying
根据官网[1]使用docker部署flink,session cluster模式
环境win10+docker+flink1.11.2
cmd命令
docker run ^
-d^
--rm ^
--name=jobmanager ^
--hostname=jobmanager ^
--network flink-network ^
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
-p 28081:8081 ^
flink:1.11.2-scala_2.11 jobmanager
docker run ^
-d^
--rm ^
--name=taskmanager ^
--hostname=taskmanager ^
--network flink-network ^
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
flink:1.11.2-scala_2.11 taskmanager


问题一:
在webui查看任务输出stdout提示找不到输出文件
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: 
The file STDOUT does not exist on the TaskExecutor.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_265]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not exist 
on the TaskExecutor.
... 5 more
2020-09-27 09:04:33,370 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler 
[] - Unhandled exception.
org.apache.flink.util.FlinkException: The file STDOUT does not exist on the 
TaskExecutor.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_265]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_265]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]




问题二:
mount的src的配置文件要多份吗
因为我把env替换成mount,如下
docker run ^
-d^
--rm ^
--name=jobmanager ^
--network flink-network ^
--mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf ^
-p 28081:8081 ^
flink:1.11.2-scala_2.11 jobmanager


docker run ^
-d^
--rm ^
--name=taskmanager ^
--network flink-network ^
--mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf ^
flink:1.11.2-scala_2.11 taskmanager


结果发现webui上的可用Task Managers为0
每次执行命令的时候都会把mount配置的src下flink-conf.yaml中的jobmanager.rpc.address替换成了新的容器ip
我猜应该是这个原因导致启动taskmanager的时候jobmanager.rpc.address替换成了taskmanager的ip.所以没有Task可用
想问下大佬们,是我哪一步出现问题了吗


[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/docker.html

Re:Re: flink使用在docker环境中部署出现的两个问题

2020-09-28 Thread chenxuying
请问一下第一个问题您说的修改启动命令,有例子吗,或者什么文档可以参考




在 2020-09-28 12:50:25,"Yang Wang"  写道:
>第一个问题,因为默认情况下JM/TM进程的STDOUT已经输出到console,所以是没有办公通过webui查看STDOUT输出的
>可以通过docker logs来查看,当然你也可以修改启动命令,把STDOUT重定向到具体的文件
>
>第二个问题,在JobManager和TaskManager的docker-entrypoint.sh[1]里面会修改flink-conf.yaml的,
>所以你mount进去会被修改掉
>
>[1].
>https://github.com/apache/flink-docker/blob/dev-1.11/docker-entrypoint.sh
>
>
>Best,
>Yang
>
>chenxuying  于2020年9月27日周日 下午7:56写道:
>
>> 根据官网[1]使用docker部署flink,session cluster模式
>> 环境win10+docker+flink1.11.2
>> cmd命令
>> docker run ^
>> -d^
>> --rm ^
>> --name=jobmanager ^
>> --hostname=jobmanager ^
>> --network flink-network ^
>> --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
>> -p 28081:8081 ^
>> flink:1.11.2-scala_2.11 jobmanager
>> docker run ^
>> -d^
>> --rm ^
>> --name=taskmanager ^
>> --hostname=taskmanager ^
>> --network flink-network ^
>> --env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" ^
>> flink:1.11.2-scala_2.11 taskmanager
>>
>>
>> 问题一:
>> 在webui查看任务输出stdout提示找不到输出文件
>> java.util.concurrent.CompletionException:
>> org.apache.flink.util.FlinkException: The file STDOUT does not exist on the
>> TaskExecutor.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ~[?:1.8.0_265]
>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
>> Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not
>> exist on the TaskExecutor.
>> ... 5 more
>> 2020-09-27 09:04:33,370 ERROR
>> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
>> [] - Unhandled exception.
>> org.apache.flink.util.FlinkException: The file STDOUT does not exist on
>> the TaskExecutor.
>> at
>> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(TaskExecutor.java:1742)
>> ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> ~[?:1.8.0_265]
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> ~[?:1.8.0_265]
>> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_265]
>>
>>
>>
>>
>> 问题二:
>> mount的src的配置文件要多份吗
>> 因为我把env替换成mount,如下
>> docker run ^
>> -d^
>> --rm ^
>> --name=jobmanager ^
>> --network flink-network ^
>> --mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf
>> ^
>> -p 28081:8081 ^
>> flink:1.11.2-scala_2.11 jobmanager
>>
>>
>> docker run ^
>> -d^
>> --rm ^
>> --name=taskmanager ^
>> --network flink-network ^
>> --mount type=bind,src=D:/cxy/soft/flink-1.11.2/conf,target=/opt/flink/conf
>> ^
>> flink:1.11.2-scala_2.11 taskmanager
>>
>>
>> 结果发现webui上的可用Task Managers为0
>> 每次执行命令的时候都会把mount配置的src下flink-conf.yaml中的jobmanager.rpc.address替换成了新的容器ip
>>
>> 我猜应该是这个原因导致启动taskmanager的时候jobmanager.rpc.address替换成了taskmanager的ip.所以没有Task可用
>> 想问下大佬们,是我哪一步出现问题了吗
>>
>>
>> [1]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/docker.html


flink1.11.2基于官网在k8s上部署是正常的,但是加了volume配置之后报错Read-only file system

2020-09-28 Thread chenxuying
我在使用k8s部署的时候也是按照官网的方式[1],是正常使用的, 然后后面加了volume配置

{

  ...

  "spec": {

...

"template": {

  ...

  "spec": {

"volumes": [

  ...

  {

"name": "libs-volume",

"hostPath": {

  "path": "/data/volumes/flink/jobmanager/cxylib",

  "type": ""

}

  },

  ...

],

"containers": [

  {

...

"volumeMounts": [

  {

"name": "flink-config-volume",

"mountPath": "/opt/flink/conf"

  },

  ...

],

...

  }

],

...

  }

},

...

  },

  ...

}

然后启动jobmanager报错

Starting Job Manager

sed: couldn't open temporary file /opt/flink/conf/sedz0NYKX: Read-only file 
system

sed: couldn't open temporary file /opt/flink/conf/sede6R0BY: Read-only file 
system

/docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml: Permission denied

/docker-entrypoint.sh: 91: /docker-entrypoint.sh: cannot create 
/opt/flink/conf/flink-conf.yaml.tmp: Read-only file system

Starting standalonesession as a console application on host 
flink-jobmanager-66fb98869d-w7plb.

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.

Sep 28, 2020 7:11:14 AM org.apache.hadoop.util.NativeCodeLoader 

WARNING: Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable




[1]:https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions

flinksql注册udtf使用ROW类型做为输出输出时出错

2020-09-30 Thread chenxuying
版本:
pyflink==1.0
apache-flink==1.11.2
代码如下:
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
 'true')


class SplitStr(TableFunction):
def eval(self, data):
for row in data:
yield row[0], row[1]
splitStr = udtf(
SplitStr(),
DataTypes.ARRAY(
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.STRING())
]
)
),
DataTypes.ROW(
[
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("id", DataTypes.STRING())
]
)
)
t_env.register_function("splitStr", splitStr)


t_env.sql_update("""
CREATE TABLE mySource ( 
   
id varchar,
data array> 
) WITH ( 
'connector' = 'kafka',
'topic' = 'mytesttopic',
'properties.bootstrap.servers' = '172.17.0.2:9092',
'properties.group.id' = 'flink-test-cxy',
'scan.startup.mode' = 'latest-offset',
'format' = 'json' 
) 
""")
t_env.sql_update("""
CREATE TABLE mysqlsink (
id varchar
,name varchar
,age  varchar
) 
with (
'connector' = 'print'
)
""")
t_env.sql_update("insert into mysqlsink select id,name,age from mySource 
,LATERAL TABLE(splitStr(data)) as T(name, age)")
t_env.execute("test")


最终报错
TypeError: Invalid result_type: result_type should be DataType but contains 
RowField(name, VARCHAR)
报错的地方是
File 
"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
 line 264, in __init__


def __init__(self, func, input_types, result_types, deterministic=None, 
name=None):
super(UserDefinedTableFunctionWrapper, self).__init__(
func, input_types, deterministic, name)


if not isinstance(result_types, collections.Iterable):
result_types = [result_types]


for result_type in result_types:
if not isinstance(result_type, DataType):
raise TypeError(
"Invalid result_type: result_type should be DataType but contains {}".format(
result_type))


self._result_types = result_types
self._judtf_placeholder = None


断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗


另外的,假如我在
上面在创建udtf的时候,如果这样写
splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), 
DataTypes.BIGINT()])
却可以正常运行,但是显然类型跟我实际运行的不对应

Re:flinksql注册udtf使用ROW类型做为输出输出时出错

2020-09-30 Thread chenxuying
上面最后说的splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), 
DataTypes.BIGINT()])需要改成这个地方splitStr = udtf(SplitStr(), DataTypes.STRING(), 
[DataTypes.STRING(), DataTypes.STRING()])udtf的第三个参数, 
好像只要是能跟sink的字段类型对应就能运行, 但是第二个参数并不能跟source字段对应却能运行就感觉有点奇怪
在 2020-09-30 19:07:06,"chenxuying"  写道:
>版本:
>pyflink==1.0
>apache-flink==1.11.2
>代码如下:
>env = StreamExecutionEnvironment.get_execution_environment()
>env.set_parallelism(1)
>t_env = StreamTableEnvironment.create(env)
>t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
> 'true')
>
>
>class SplitStr(TableFunction):
>def eval(self, data):
>for row in data:
>yield row[0], row[1]
>splitStr = udtf(
>SplitStr(),
>DataTypes.ARRAY(
>DataTypes.ROW(
>[
>DataTypes.FIELD("name", DataTypes.STRING()),
>DataTypes.FIELD("id", DataTypes.STRING())
>]
>)
>),
>DataTypes.ROW(
>[
>DataTypes.FIELD("name", DataTypes.STRING()),
>DataTypes.FIELD("id", DataTypes.STRING())
>]
>)
>)
>t_env.register_function("splitStr", splitStr)
>
>
>t_env.sql_update("""
>CREATE TABLE mySource (
>
>id varchar,
>data array> 
>) WITH ( 
>'connector' = 'kafka',
>'topic' = 'mytesttopic',
>'properties.bootstrap.servers' = '172.17.0.2:9092',
>'properties.group.id' = 'flink-test-cxy',
>'scan.startup.mode' = 'latest-offset',
>'format' = 'json' 
>) 
>""")
>t_env.sql_update("""
>CREATE TABLE mysqlsink (
>id varchar
>,name varchar
>,age  varchar
>) 
>with (
>'connector' = 'print'
>)
>""")
>t_env.sql_update("insert into mysqlsink select id,name,age from mySource 
>,LATERAL TABLE(splitStr(data)) as T(name, age)")
>t_env.execute("test")
>
>
>最终报错
>TypeError: Invalid result_type: result_type should be DataType but contains 
>RowField(name, VARCHAR)
>报错的地方是
>File 
>"C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\udf.py",
> line 264, in __init__
>
>
>def __init__(self, func, input_types, result_types, deterministic=None, 
>name=None):
>super(UserDefinedTableFunctionWrapper, self).__init__(
>func, input_types, deterministic, name)
>
>
>if not isinstance(result_types, collections.Iterable):
>result_types = [result_types]
>
>
>for result_type in result_types:
>if not isinstance(result_type, DataType):
>raise TypeError(
>"Invalid result_type: result_type should be DataType but contains {}".format(
>result_type))
>
>
>self._result_types = result_types
>self._judtf_placeholder = None
>
>
>断点中可以看到result_types是对应着ROW里面的FIELD数组,所以报错了,这个是bug吗
>
>
>另外的,假如我在
>上面在创建udtf的时候,如果这样写
>splitStr = udtf(SplitStr(), DataTypes.STRING(), [DataTypes.BIGINT(), 
>DataTypes.BIGINT()])
>却可以正常运行,但是显然类型跟我实际运行的不对应


flink1.11.2 在k8s上部署,如何启动history server

2020-10-10 Thread chenxuying
flink1.11.2 在k8s上部署,如何启动history server
之前1.10的yaml里面可以加命令,但是1.11的yaml是通过docker-entrypoint.sh
好像没发现这个入口脚本没有对应的history server参数



flink1.11.2webui界面中的任务详情页面获取不到任务的received和sent数据详情

2020-10-13 Thread chenxuying
集群是local Standalone模式,任务可以正常的运行, sink是print,能在Stdout看到数据输出,
但是在任务详情页面没有Bytes Received, Records Received, Bytes Sent , Records Sent等实时数据 都是0

Re:Re: Re: flink1.11加载外部jar包进行UDF注册

2020-10-16 Thread chenxuying
1使用系统类加载器的时候,本身作业包开放给外部UDF jar包实现的接口会报ClassNotFound异常
2线程上下文类加载器是什么

不太明白这两点,可以写个代码例子看看吗


在 2020-10-15 19:47:20,"amen...@163.com"  写道:
>追加问题,在使用线程上下文类加载器的时候,数据会重复发送三条,这是因为添加pipeline.classpaths的缘故吗?
>那这种设置env的方式有可能还会造成其他什么问题?
>
>best,
>amenhub
> 
>发件人: amen...@163.com
>发送时间: 2020-10-15 19:22
>收件人: user-zh
>主题: Re: Re: flink1.11加载外部jar包进行UDF注册
>非常感谢您的回复!
> 
>对于我来说,这是一个非常好的办法,但是我有一个疑问:类加载器只能使用系统类加载器进行加载吗?
>因为我在尝试使用系统类加载器的时候,本身作业包开放给外部UDF 
>jar包实现的接口会报ClassNotFound异常,而将类加载器指向主类(这种方式的话这里应该是使用默认的线程上下文加载器),则可避免这个问题。
> 
>期待您的回复,谢谢~
> 
>best, 
>amenhub
>发件人: cxydeve...@163.com
>发送时间: 2020-10-15 17:46
>收件人: user-zh
>主题: Re: flink1.11加载外部jar包进行UDF注册
>我们用方法是通过反射设置env的配置,增加pipeline.classpaths
>具体代码如下
>public static void main(final String[] args) throws Exception {
>StreamExecutionEnvironment env =
>StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings settings =
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment tableEnvironment =
>StreamTableEnvironment.create(env, settings);
>//String path = "file:///D:/cxy/idea_workspace/...xxx.jar";
>String path = "https://...xxx.jar";;
>loadJar(new URL(path));
>Field configuration =
>StreamExecutionEnvironment.class.getDeclaredField("configuration");
>configuration.setAccessible(true);
>Configuration o = (Configuration)configuration.get(env);
>Field confData = Configuration.class.getDeclaredField("confData");
>confData.setAccessible(true);
>Map temp = (Map)confData.get(o);
>List jarList = new ArrayList<>();
>jarList.add(path);
>temp.put("pipeline.classpaths",jarList);
>tableEnvironment.executeSql("CREATE FUNCTION CxyTestReturnSelf AS
>'flinksql.function.udf.CxyTestReturnSelf'");
>tableEnvironment.executeSql("CREATE TABLE sourceTable (\n" +
>" f_sequence INT,\n" +
>" f_random INT,\n" +
>" f_random_str STRING,\n" +
>" ts AS localtimestamp,\n" +
>" WATERMARK FOR ts AS ts\n" +
>") WITH (\n" +
>" 'connector' = 'datagen',\n" +
>" 'rows-per-second'='5',\n" +
>"\n" +
>" 'fields.f_sequence.kind'='sequence',\n" +
>" 'fields.f_sequence.start'='1',\n" +
>" 'fields.f_sequence.end'='1000',\n" +
>"\n" +
>" 'fields.f_random.min'='1',\n" +
>" 'fields.f_random.max'='1000',\n" +
>"\n" +
>" 'fields.f_random_str.length'='10'\n" +
>")");
>tableEnvironment.executeSql("CREATE TABLE sinktable (\n" +
>"f_random_str STRING" +
>") WITH (\n" +
>"'connector' = 'print'\n" +
>")");
>tableEnvironment.executeSql(
>"insert into sinktable " +
>"select CxyTestReturnSelf(f_random_str) " +
>"from sourceTable");
>}
>//动态加载Jar
>public static void loadJar(URL jarUrl) {
>//从URLClassLoader类加载器中获取类的addURL方法
>Method method = null;
>try {
>method = URLClassLoader.class.getDeclaredMethod("addURL",
>URL.class);
>} catch (NoSuchMethodException | SecurityException e1) {
>e1.printStackTrace();
>}
>// 获取方法的访问权限
>boolean accessible = method.isAccessible();
>try {
>//修改访问权限为可写
>if (accessible == false) {
>method.setAccessible(true);
>}
>// 获取系统类加载器
>URLClassLoader classLoader = (URLClassLoader)
>ClassLoader.getSystemClassLoader();
>//jar路径加入到系统url路径里
>method.invoke(classLoader, jarUrl);
>} catch (Exception e) {
>e.printStackTrace();
>} finally {
>method.setAccessible(accessible);
>}
>}
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


请问flink1.11版本如何设置checkpoint的默认保存个数

2020-10-22 Thread chenxuying
我看官方文档[1]应该是设置state.checkpoints.num-retained , 默认是1, 但是设置了没有效果, 官方说默认是1, 
但是我发现好像是10 , 
同时我也设置了其他的属性,比如
execution.checkpointing.externalized-checkpoint-retention: 
RETAIN_ON_CANCELLATION
是可行,所以我的设置应该没有什么问题


[1]:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained



flinksql当一个job的kafka连接信息错误时,会导致整个session集群无法正常发布任务

2021-04-25 Thread chenxuying
环境:

flinksql 1.12.2

k8s session模式

描述:

当kafka 端口错误,过一段时间会有如下报错:

2021-04-25 16:49:50

org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition filebeat_json_install_log-3 could be 
determined

当kafka ip错误,过一段时间会有如下报错:

2021-04-25 20:12:53

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)

at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)

at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)

at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)

at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)

at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired 
while fetching topic metadata







然后对任务执行停止取消操作,会得到如下错误

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job 
v2_ods_device_action_log (fcc451b8a521398b10e5b86153141fbf) switched from state 
CANCELLING to CANCELED.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Stopping 
checkpoint coordinator for job fcc451b8a521398b10e5b86153141fbf.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - 
Shutting down

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 1 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-1'
 not discarded.

2021-04-25 08:53:41,115 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 2 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-2'
 not discarded.

2021-04-25 08:53:41,116 INFO  
org.apache.flink.runtime.checkpoint.CompletedCheckpoint  [] - Checkpoint 
with ID 3 at 
'oss://tanwan-datahub/test/flinksql/checkpoints/fcc451b8a521398b10e5b86153141fbf/chk-3'
 not discarded.

2021-04-25 08:53:41,137 INFO  
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 
fcc451b8a521398b10e5b86153141fbf reached globally terminal state CANCELED.

2021-04-25 08:53:41,148 INFO  org.apache.flink.runtime.jobmaster.JobMaster  
   [] - Stopping the JobMaster for job 
v2_ods_device_action_log(fcc451b8a521398b10e5b86153141fbf).

2021-04-25 08:53:41,151 INFO  
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl [] - Suspending 
SlotPool.

2021-04-25 08:53:41,151 INFO

来自chenxuying的邮件

2021-06-17 Thread chenxuying



flink启动任务的方式

2020-04-20 Thread chenxuying
请问下目前flink的启动方式有哪些
1 通过命令行来执行
flink run -C file:///usr/local/soft/flink/function-0.1.jar -c 
cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
2通过自带的webui页面上传jar , submit jar
3 通过代码 createRemoteEnvironment


目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api 
无法实现命令行那样提供其他的jar包 

Re:Re: flink启动任务的方式

2020-04-21 Thread chenxuying
这个是可以 , 不过我们的需求不允许打FatJar

















在 2020-04-21 15:27:48,"Arnold Zai"  写道:
>打个FatJar
>
>chenxuying  于2020年4月21日周二 下午2:47写道:
>
>> 请问下目前flink的启动方式有哪些
>> 1 通过命令行来执行
>> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
>> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
>> 2通过自带的webui页面上传jar , submit jar
>> 3 通过代码 createRemoteEnvironment
>>
>> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
>> 无法实现命令行那样提供其他的jar包
>>
>>
>>
>>


Re:Re: Re: flink启动任务的方式

2020-04-21 Thread chenxuying
您说的jarFiles是以什么样的方式提交任务
然后我试了一下plugin,好像并不可以,重启flink cluster也不行 , 也不知是不是我的方式不对
我的目录结构是
xxx/flink/plugins/
folder1/
udf.jar


另外说一下,如果我把udf.jar放到 
/flink/lib下,重启是可以的,不过这不是我想要的方式,不知道您是否理解,因为我想要的我随时可以写个udf.jar,随时可以用,不要重启flink 
cluster

在 2020-04-21 17:46:00,"Arnold Zai"  写道:
>jarFiles参数不是个参数列表么,多传几个。
>
>或把依赖提前部署到${FLINK_HOME}/plugins里
>
>chenxuying  于2020年4月21日周二 下午3:36写道:
>
>> 这个是可以 , 不过我们的需求不允许打FatJar
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-21 15:27:48,"Arnold Zai"  写道:
>> >打个FatJar
>> >
>> >chenxuying  于2020年4月21日周二 下午2:47写道:
>> >
>> >> 请问下目前flink的启动方式有哪些
>> >> 1 通过命令行来执行
>> >> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
>> >> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
>> >> 2通过自带的webui页面上传jar , submit jar
>> >> 3 通过代码 createRemoteEnvironment
>> >>
>> >> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
>> >> 无法实现命令行那样提供其他的jar包
>> >>
>> >>
>> >>
>> >>
>>


Re:Re: Re: FlinkSQL 是否支持类似临时中间表的概念

2020-06-23 Thread chenxuying
你好,请问下,my_parse是个udf吧
然后有没有什么操作可以使用udtf解析出多个字段 , 这些字段直接就是source表的字段 , 然后选出时间字段定义 watermark ,
类似如下
CREATE TABLE sourceTable(
request_uri STRING,
(column_1,column_2,heart_time) as udtf_parse(request_uri)
)with(..);
哈哈,不知道有没有这样的语法











在 2020-06-24 12:24:46,"Jark Wu"  写道:
>你可以在 DDL 中直接用计算列去从 request_uri 里获得 heart_time 哈,然后在这个计算列上定义 watermark 即可。
>例如:
>
>CREATE TABLE sourceTable (
>  request_uri STRING,
>  heart_time AS my_parse(request_uri),
>  WATERMARK FOR heart_time AS heart_time - INTERVAL '1' SECOND
>) WITH ( ... );
>
>虽然这会导致重复解析两遍。
>
>
>Best,
>Jark
>
>On Wed, 24 Jun 2020 at 12:09, Weixubin <18925434...@163.com> wrote:
>
>>
>>
>>
>> 感谢你提供了子查询的思路,不过经过试验后有点可惜,这似乎还是满足不了我们的需求。
>>
>>
>> 我们的场景是从阿里云SLS读取消息。每条消息有一个字段是request_uri。
>> 第一个数据处理过程就是将 request_uri 解析为多个属性(多列),存成一行,作为一条记录。
>> 第二个数据处理过程就是将每条记录里的 声明heart_time为事件时间属性并使用5秒延迟水印策略,进行开窗聚合处理。
>>
>>
>> //如果应用到子查询的话,Flink是不支持这样做的。WATERMARK FOR 水印声明只能在DDL里应用。如下:
>> select
>> ..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') ,
>> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>>  from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·
>>
>>
>> //如果应用到source,则一开始并不知道heart_time 的值
>> CREATE TABLE sourceTable (
>>   request_uri STRING
>> ..ts as TO_TIMESTAMP(heart_time,'-MM-ddHH:mm:ss') ,
>> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>> ) WITH ( ... );
>>
>>
>> 只能等待Flink 1.11 尝试是否可以用View作为中间临时表,并对View进行 WATERMARK水印声明
>> Thanks
>> Bin
>>
>> 在 2020-06-23 15:28:50,"Leonard Xu"  写道:
>> >Hi
>> >我的意思是你如果中间结果表如果要输出,那你就一个sink写到中间结果表(什么表根据你的需要),一个sink写到你的最终结果表,在这两个sink之前的`select
>> * from sourceTable , LATERAL TABLE(ParseUriRow(request_uri)) as T( )….`
>> 的这段sql是可以复用的,就和 VIEW的作用类似。
>> >
>> >如果你不需要中间结果表,只是要最终的结果表,那你写个嵌套的sql就行了,里层是`select * from sourceTable ,
>> LATERAL TABLE(ParseUriRow(request_uri)) as T( )….·,外层是 group by,
>> 插入最终的结果表就能满足需求了吧。
>> >
>> >祝好,
>> >Leonard Xu
>> >
>> >
>> >> 在 2020年6月23日,15:21,Weixubin <18925434...@163.com> 写道:
>> >>
>> >>
>> >>
>> >>
>> >> Hi,
>> >> 关于这句 “把 ` select * from sourceTable , LATERAL
>> TABLE(ParseUriRow(request_uri)) as T( )….` 这段 sql insert 到中间结果表 和
>> group后再写入最终结果表就可以了”
>> >> 我不太明白所说的中间结果表是什么意思, 我所理解为在数据库创建多一张middleSinkTable表,作为中间结果表。请问这样理解是否有误?
>> 可否简单举个例子。
>> >> Thanks,
>> >> Bin
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-06-23 11:57:28,"Leonard Xu"  写道:
>> >>> Hi,
>> >>> 是的,这个是在 FLINK 1.11 的版本里支持的,当前 FLINK 1.11代码已经冻结,正在测试中,最近应该快发布了,所以master
>> 分支上的版本号为1.12-SNAPSHOT
>> >>> ,等1.11版本发布了就可以看到对应的文档。
>> >>>
>> >>> 回到这个问题,flink sql(blink planner) 支持 multiple-sinks 的, 在 1.10
>> 版本也可以一个作业搞定。 把 `  select * from sourceTable , LATERAL
>> TABLE(ParseUriRow(request_uri)) as T( )….`  这段 sql insert 到中间结果表 和
>> group后再写入最终结果表就可以了。效果和使用 VIEW 应该类似,因为 planner 会做分段优化。
>> >>> 另外建议使用1.10.1版本,1.10.1在1.10的基础上修复了不少bug,也可以等1.11发布了试用下1.11。
>> >>>
>> >>>
>> >>> 祝好,
>> >>> Leonard Xu
>> >
>>


flink REST API是否支持-C参数

2020-06-24 Thread chenxuying
目前使用的是flink 1.10.0
背景: 
REST API有一个提交job的接口
接口 /jars/:jarid/run

参数entryClass,programArgs,parallelism,jobId,allowNonRestoredState,savepointPath


如果使用命令行方式提交job
flink run -C file:///usr/local/soft/flink/my-function-0.1.jar -c 
cn.xuying.flink.table.sql.ParserSqlJob 
/usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
可以看到命令行方式支持-C提供另外的jar包,flink会加载到classpath
问题:
发现目前的restapi并没有提供想命令行一样的-C参数的功能 , 所以想知道这个功能将来是否会增加