SQL interval join 问题

2020-10-19 Thread Mic
现有 SQL 语句如下:
create table source1(
  id varchar PRIMARY KEY,
  a varchar,
  proctime AS PROCTIME()
) with (
'connector' = 'kafka'
...
);
create table source2(
  id varchar PRIMARY KEY,
  a varchar,
  proctime AS PROCTIME()
) with (
'connector' = 'kafka'
...
);
select
  case
when s1.id is not null then s1.id
else s2.id
  end as ids,
  s1.a, s2.b
from source1 as s1 full outer join source2 as s2 on s1.id = s2.id where 
s1.proctime between s2.proctime - INTERVAL '5' SECOND and s2.proctime + 
INTERVAL '5' SECOND;


最后的 join 语句预期是 如果两个source的消息, 先后到达时间超过 10 秒,则输出,  
两条消息。


目前的观察结果是,如果两条消息, 先后到达时间超过10 秒,输出为:,  
为何超过 10 秒后,仍然会输出  ?

请问现在Flink支持动态扩缩容吗?

2020-10-19 Thread 林影
请问现在Flink支持动态扩缩容吗,或者说社区在这方面有什么计划吗?


Re:Re: flink sql 更新mysql字段

2020-10-19 Thread Michael Ran
我们是自定义SQL。  但是不同SQL 更新部分字段,会有锁冲突,是能单条减少冲突量。 批量会死锁
在 2020-09-28 21:36:11,"Leonard Xu"  写道:
>Hi
>
>Insert 到指定字段是个通用的需求,社区已经有一个issue[1] 在跟踪了,你可以关注下
>
>
>祝好
>Leonard
>[1] https://issues.apache.org/jira/browse/FLINK-18726 
> 
>
>> 在 2020年9月28日,17:46,lemon  写道:
>> 
>> hi各位:
>> 请问一下,如果mysql表中有20个字段,现在有多个insert into的语句分别更新指定字段,即同一条记录可能有多个insert语句去分别更新不同字段
>> 现在遇到的问题是,因为在insert into语句中需要将mysql中所有字段都带上,所以更新会覆盖其他字段的值。
>> 例如insert into mysql select a,b c from 
>> kafka,但是我只要更新a,b字段,c字段想保持原来的值,请问这种情况需要怎么操作?
>> flink1.10.1版本 blink
>
>


??????????????????????????????????????

2020-10-19 Thread x
1224




  --
??: 
   "user-zh"

<584680...@qq.com>;
: 2020??10??15??(??) 3:47
??: "user-zh"

Re: 求助:如何处理数据不连续导致状态无法清理

2020-10-19 Thread Congxian Qiu
Hi
 或许你可以使用 timer 来进行兜底,注册一个未来某个时间的 timer,然后 timer 触发的时候把 state 清理掉
Best,
Congxian


x <35907...@qq.com> 于2020年10月19日周一 下午2:55写道:

> 版本为v1.10.1
> 使用AggregateFunction+ProcessWindowFunction的方式,进行实时统计,ProcessWindowFunction中涉及状态的累计运算,使用事件时间,按维度+日期分区,按分钟开窗,跨天需要将状态清除,避免状态越来越大。状态清除的逻辑,覆盖ProcessWindowFunction的clear方法,判断窗口开始时间是否为“23:59:00”,如下:override
> def clear(ctx: Context): Unit = {
>   val dt = new SimpleDateFormat("HH:mm:00").format(ctx.window.getStart)
>   if(dt.equals("23:59:00")){
>
> state.clear()遇到的一个问题是,开窗前,keyBy分区时,有的key对应的数据不连续,十分稀疏,可能会出现每天的最后一个窗口没有数据,导致无法触发状态清理逻辑,导致总状态数据越来越大的现象,请问各位老师,有什么好的办法,可以避免这种情况吗?


??????????????????????????????????????

2020-10-19 Thread x
1224




--  --
??: 
   "user-zh"



flink1.10 stop with a savepoint失败

2020-10-19 Thread Robin Zhang
普通的source -> map -> filter-> sink 测试应用。

触发savepoint的脚本 :
${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
具体报错信息:

org.apache.flink.util.FlinkException: Could not stop with a savepoint job
"81990282a4686ebda3d04041e3620776".
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.util.concurrent.TimeoutException
at
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
... 9 more


查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
应用遇到权限问题,但是不知道怎么解决,目前卡在这里。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10 stop with a savepoint失败

2020-10-19 Thread Congxian Qiu
Hi
你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task
完成的慢,(savepoint 可能比 checkpoint 要慢)
Best,
Congxian


Robin Zhang  于2020年10月19日周一 下午3:42写道:

> 普通的source -> map -> filter-> sink 测试应用。
>
> 触发savepoint的脚本 :
> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
> 具体报错信息:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "81990282a4686ebda3d04041e3620776".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
> at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at
> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
> at
>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
> ... 9 more
>
>
>
> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re:SQL interval join 问题

2020-10-19 Thread Mic
搜了一下,目前是有一个 issue 
看起来相关,https://issues.apache.org/jira/browse/FLINK-18996不知道处理进度如何?
在 2020-10-19 15:03:54,"Mic"  写道:
>现有 SQL 语句如下:
>create table source1(
>  id varchar PRIMARY KEY,
>  a varchar,
>  proctime AS PROCTIME()
>) with (
>'connector' = 'kafka'
>...
>);
>create table source2(
>  id varchar PRIMARY KEY,
>  a varchar,
>  proctime AS PROCTIME()
>) with (
>'connector' = 'kafka'
>...
>);
>select
>  case
>when s1.id is not null then s1.id
>else s2.id
>  end as ids,
>  s1.a, s2.b
>from source1 as s1 full outer join source2 as s2 on s1.id = s2.id where 
>s1.proctime between s2.proctime - INTERVAL '5' SECOND and s2.proctime + 
>INTERVAL '5' SECOND;
>
>
>最后的 join 语句预期是 如果两个source的消息, 先后到达时间超过 10 秒,则输出,  
>两条消息。
>
>
>目前的观察结果是,如果两条消息, 先后到达时间超过10 秒,输出为:,  
>为何超过 10 秒后,仍然会输出  ?


Re: flink1.10 stop with a savepoint失败

2020-10-19 Thread zilong xiao
Hi Robin Zhang
你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626
,可以看下这个issue描述,祝好~

Robin Zhang  于2020年10月19日周一 下午3:42写道:

> 普通的source -> map -> filter-> sink 测试应用。
>
> 触发savepoint的脚本 :
> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
> 具体报错信息:
>
> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
> "81990282a4686ebda3d04041e3620776".
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
> at
>
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
> at
> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
> at
>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.util.concurrent.TimeoutException
> at
>
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
> ... 9 more
>
>
>
> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10 stop with a savepoint失败

2020-10-19 Thread Robin Zhang
Hi,Congxian
感谢提供思路,看了一下,JM端没有暴露日志,只能查看到ck正常的日志

Best,
Robin



Congxian Qiu wrote
> Hi
> 你可以看下 JM log 中这个 savepoint 失败是什么原因导致的,如果是 savepoint 超时了,就要看哪个 task
> 完成的慢,(savepoint 可能比 checkpoint 要慢)
> Best,
> Congxian
> 
> 
> Robin Zhang <

> vincent2015qdlg@

> > 于2020年10月19日周一 下午3:42写道:
> 
>> 普通的source -> map -> filter-> sink 测试应用。
>>
>> 触发savepoint的脚本 :
>> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
>> 具体报错信息:
>>
>> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
>> "81990282a4686ebda3d04041e3620776".
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at
>> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
>> at
>>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>>
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
>> ... 9 more
>>
>>
>>
>> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
>> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10 stop with a savepoint失败

2020-10-19 Thread Robin Zhang
Hi,zilong
的确是这个问题,感谢帮助。
Best,
Robin


zilong xiao wrote
> Hi Robin Zhang
> 你应该是遇到了这个issue报告的问题:https://issues.apache.org/jira/browse/FLINK-16626
> ,可以看下这个issue描述,祝好~
> 
> Robin Zhang <

> vincent2015qdlg@

> > 于2020年10月19日周一 下午3:42写道:
> 
>> 普通的source -> map -> filter-> sink 测试应用。
>>
>> 触发savepoint的脚本 :
>> ${FLINK_HOME} stop -p ${TARGET_DIR} -d ${JOB_ID}
>> 具体报错信息:
>>
>> org.apache.flink.util.FlinkException: Could not stop with a savepoint job
>> "81990282a4686ebda3d04041e3620776".
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:462)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:843)
>> at
>> org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:454)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:907)
>> at
>>
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1962)
>> at
>>
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.util.concurrent.TimeoutException
>> at
>>
>> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>> at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:460)
>> ... 9 more
>>
>>
>>
>> 查看报错,怀疑是权限问题,我是root用户启动的应用,savepoint目录所在的hdfs路径权限所属也是root,如果不停止应用,直接触发savepoint没问题,继续定位到是root用户去停止hadoop
>> 应用遇到权限问题,但是不知道怎么解决,目前卡在这里。
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


flinkSQL1.11写出数据到jdbc fleld type do not match

2020-10-19 Thread 奔跑的小飞袁
hello
我在使用flinksql1.11写出数据到jdbc是遇到了field type类型不匹配的问题,是我类型设置有问题吗?
下面是我的异常日志以及sql文件

SET stream.enableCheckpointing=1000*60;
SET stream.setParallelism=3;

-- Kafka cdbp zdao source 表
create TABLE cloud_behavior_source(
operation STRING,
operation_channel STRING,
`time` STRING,
ip STRING,
lat STRING,
lng STRING,
user_id STRING,
device_id STRING,
imei STRING,
targets ARRAY>,
product_name STRING,
product_version STRING,
product_vendor STRING,
platform STRING,
platform_version STRING,
`languaage` STRING,
locale STRING,
other_para MAP
) with (
'connector'='kafka',
'topic'='cloud_behavior',
'properties.bootstrap.servers'='',
'properties.group.id'='testGroup',
'format'='avro',
'scan.startup.mode'='earliest-offset'
);

-- Hbase zdao uv 统计 Sink 表
create TABLE cloud_behavior_sink(
operation STRING,
operation_channel STRING,
ip STRING,
lat STRING,
lng STRING,
user_id STRING,
device_id STRING
) with (
'connector'='jdbc',
'url'='jdbc:mysql://hosts:3306/d_bigdata',
'table-name'='flink_sql_test',
'username'='',
'password'='',
'sink.buffer-flush.max-rows'='100'
);

-- 业务过程
insert into cloud_behavior_sink
select
 *
from cloud_behavior_source;

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data1/flink/flink-1.11.1-log/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type
[org.apache.logging.slf4j.Log4jLoggerFactory]


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Field types of query result and registered TableSink
default_catalog.default_database.cloud_behavior_sink do not match.
Query schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
VARCHAR(2147483647), targets: ARRAY>, product_name: VARCHAR(2147483647),
product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
MAP]
Sink schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng:
VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
VARCHAR(2147483647)]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink
default_catalog.default_database.cloud_behavior_sink do not match.
Query schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
VARCHAR(2147483647), targets: ARRAY>, product_name: VARCHAR(2147483647),
product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
MAP]
Sink schema: [operation: VARCHAR(2147483647), operation_channel:
VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng:
VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
VARCHAR(2147483647)]
at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplici

Re: SQL interval join 问题

2020-10-19 Thread Benchao Li
Hi Mic,

感谢关注这个issue,这个issue当前还在讨论中。
我认为问题已经定位清楚了,抄送了其他的committer同学进一步讨论确认。

Mic  于2020年10月19日周一 下午3:51写道:

> 搜了一下,目前是有一个 issue 看起来相关,https://issues.apache.org/jira/browse/FLINK-18996
> 不知道处理进度如何?
> 在 2020-10-19 15:03:54,"Mic"  写道:
> >现有 SQL 语句如下:
> >create table source1(
> >  id varchar PRIMARY KEY,
> >  a varchar,
> >  proctime AS PROCTIME()
> >) with (
> >'connector' = 'kafka'
> >...
> >);
> >create table source2(
> >  id varchar PRIMARY KEY,
> >  a varchar,
> >  proctime AS PROCTIME()
> >) with (
> >'connector' = 'kafka'
> >...
> >);
> >select
> >  case
> >when s1.id is not null then s1.id
> >else s2.id
> >  end as ids,
> >  s1.a, s2.b
> >from source1 as s1 full outer join source2 as s2 on s1.id = s2.id where
> s1.proctime between s2.proctime - INTERVAL '5' SECOND and s2.proctime +
> INTERVAL '5' SECOND;
> >
> >
> >最后的 join 语句预期是 如果两个source的消息, 先后到达时间超过 10 秒,则输出,
>  两条消息。
> >
> >
> >目前的观察结果是,如果两条消息, 先后到达时间超过10 秒,输出为:, 
> >为何超过 10 秒后,仍然会输出  ?
>


-- 

Best,
Benchao Li


Flink 1.11里如何parse出未解析的执行计划

2020-10-19 Thread 马阳阳
Flink 
1.11里的org.apache.flink.table.planner.ParserImpl的parse方法里包含了对Planner相关方法的调用,这导致在某些前置sql(例如insert
 into用到的表的create table语句)没有执行之前,这个parse方法会报错。如果只是想调用Calcite的相关的功能去parse 
sql语句,有什么办法可以做到吗?能想到的一个办法是通过反射拿到ParserImpl里面的calciteParserSupplier。想知道Flink有没有提供直接的接口或者方法去做纯的sql
 parsing。


谢谢~

Re: 关于内存大小设置以及预测

2020-10-19 Thread Xintong Song
事前估算是比较难的,不同作业差别可能会很大。
如果只是 heap oom 的话,没必要调大整个 JM/TM 的内存,可以只针对 heap 部分进行调整。
可以参考一下这篇文档 [1]。

Thank you~

Xintong Song


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/memory/mem_tuning.html

On Sun, Oct 18, 2020 at 8:54 PM guangyong yang 
wrote:

> 可以使用jvm自带命令jstat或通过 ManagementFactory类 监控TaskManager所在主机的堆/元空间内存、GC收集等一些信息
>
> Kyle Zhang  于2020年10月16日周五 下午5:34写道:
>
> > Hi all,
> >   最近也是遇到比较常见的内存溢出的错误OutOfMemoryError: Java heap space,JM:1g
> > TM:2g,简单粗暴的设置成2g、4g就可以运行了,
> > INFO  [] - Loading configuration property:
> > cluster.termination-message-path, /flink/log/termination.log
> > INFO  [] - Final TaskExecutor Memory configuration:
> > INFO  [] -   Total Process Memory:  3.906gb (4194304000 bytes)
> > INFO  [] - Total Flink Memory:  3.266gb (3506438138 bytes)
> > INFO  [] -   Total JVM Heap Memory: 1.508gb (1619001315 bytes)
> > INFO  [] - Framework:   128.000mb (134217728 bytes)
> > INFO  [] - Task:1.383gb (1484783587 bytes)
> > INFO  [] -   Total Off-heap Memory: 1.758gb (1887436823 bytes)
> > INFO  [] - Managed: 1.306gb (1402575276 bytes)
> > INFO  [] - Total JVM Direct Memory: 462.400mb (484861547 bytes)
> > INFO  [] -   Framework: 128.000mb (134217728 bytes)
> > INFO  [] -   Task:  0 bytes
> > INFO  [] -   Network:   334.400mb (350643819 bytes)
> > INFO  [] - JVM Metaspace:   256.000mb (268435456 bytes)
> > INFO  [] - JVM Overhead:400.000mb (419430406 bytes)
> >
> > 请问有没有指标能够事前估算JM、TM需要的内存大小?
> >
> > Best
> >
>


请问现在Flink支持动态扩缩容吗?

2020-10-19 Thread 林影
请问现在Flink支持动态扩缩容吗,或者说社区在这方面有什么计划吗?


?????? ????????????????????????????????????????

2020-10-19 Thread x
??KeyedProcessFunctionProcessWindowFunction.




--  --
??: 
   "user-zh"



Re: flinkSQL1.11写出数据到jdbc fleld type do not match

2020-10-19 Thread Benchao Li
你的source跟sink的字段数量都不一样多,你需要让insert的语句的query的table schema跟sink表的schema相同才可以。
比如可以用下面的SQL来写入:
```SQL
insert into cloud_behavior_sink
select
operation,
operation_channel,
ip,
lat,
lng,
user_id,
device_id
from cloud_behavior_source;
```

奔跑的小飞袁  于2020年10月19日周一 下午4:29写道:

> hello
> 我在使用flinksql1.11写出数据到jdbc是遇到了field type类型不匹配的问题,是我类型设置有问题吗?
> 下面是我的异常日志以及sql文件
>
> SET stream.enableCheckpointing=1000*60;
> SET stream.setParallelism=3;
>
> -- Kafka cdbp zdao source 表
> create TABLE cloud_behavior_source(
> operation STRING,
> operation_channel STRING,
> `time` STRING,
> ip STRING,
> lat STRING,
> lng STRING,
> user_id STRING,
> device_id STRING,
> imei STRING,
> targets ARRAY>,
> product_name STRING,
> product_version STRING,
> product_vendor STRING,
> platform STRING,
> platform_version STRING,
> `languaage` STRING,
> locale STRING,
> other_para MAP
> ) with (
> 'connector'='kafka',
> 'topic'='cloud_behavior',
> 'properties.bootstrap.servers'='',
> 'properties.group.id'='testGroup',
> 'format'='avro',
> 'scan.startup.mode'='earliest-offset'
> );
>
> -- Hbase zdao uv 统计 Sink 表
> create TABLE cloud_behavior_sink(
> operation STRING,
> operation_channel STRING,
> ip STRING,
> lat STRING,
> lng STRING,
> user_id STRING,
> device_id STRING
> ) with (
> 'connector'='jdbc',
> 'url'='jdbc:mysql://hosts:3306/d_bigdata',
> 'table-name'='flink_sql_test',
> 'username'='',
> 'password'='',
> 'sink.buffer-flush.max-rows'='100'
> );
>
> -- 业务过程
> insert into cloud_behavior_sink
> select
>  *
> from cloud_behavior_source;
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
>
> [jar:file:/data1/flink/flink-1.11.1-log/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
>
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Field types of query result and registered TableSink
> default_catalog.default_database.cloud_behavior_sink do not match.
> Query schema: [operation: VARCHAR(2147483647), operation_channel:
> VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
> lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
> VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
> VARCHAR(2147483647), targets: ARRAY `value` VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647),
> product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647),
> platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647),
> languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para:
> MAP]
> Sink schema: [operation: VARCHAR(2147483647), operation_channel:
> VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647),
> lng:
> VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id:
> VARCHAR(2147483647)]
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink
> default_catalog.default_database.cloud_behavior_sink do not match.
> Query schema: [operation: VARCHAR(2147483647), operation_channel:
> VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647),
> lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id:
> VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei:
> VARCHAR(2147483647), targets: ARRAY `va

Re:pyflink下同时读查mysql优化求助

2020-10-19 Thread hailongwang
Hi,
  我理解主要是在写入或者查询的网络IO上。对于写入的化,可以批量写入,减少网络IO。
查询的话,如果数据特点比较适合做缓存的话,可以增加 LRU 缓存,异步查询等。
Best,
Hailong Wang
在 2020-10-19 12:19:34,"小学生" <201782...@qq.com> 写道:
>各位大佬,我现在通过pyflink实时将kafka的消息获取,一条线直接进入mysql,另一条线通过udf去查该mysql的历史数据进行计算,当数据量达到1百万的时候,性能就下降了。怎么优化呢


Re:flink table转datastream失败

2020-10-19 Thread hailongwang
Hi Dream,
可以分享下你完整的程序吗,我感觉这个是因为 JOIN ON 条件上类型不一致引起的,可以分享下你完整的程序看下。
Best,
Hailong Wang

在 2020-10-19 09:50:33,"Dream-底限"  写道:
>hi、我这面正在将flinktable转换为datastream,现在抛出如下异常,貌似是个bug。。。
>
>table.printSchema();
>streamTableEnv.toRetractStream(table,
>Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes(.print();
>
>
>
>root
> |-- register_id: BIGINT
> |-- asi_uid: BIGINT
> |-- person_uuid: BIGINT
> |-- app_id: BIGINT
> |-- country_id: BIGINT
> |-- channel_id: STRING
> |-- device_id: STRING
> |-- adjust_id: STRING
> |-- google_adid: STRING
> |-- referrer: BIGINT
> |-- login_pwd: STRING
> |-- sync_data_flag: INT
> |-- register_phone_number: STRING
> |-- device_type: INT
> |-- imei: STRING
> |-- device_model: STRING
> |-- os_version: STRING
> |-- app_name: STRING
> |-- app_version: STRING
> |-- app_package_name: STRING
> |-- network_type: STRING
> |-- wifi_mac: STRING
> |-- longitude: DECIMAL(38, 18)
> |-- latitude: DECIMAL(38, 18)
> |-- geo_hash7: STRING
> |-- ip: STRING
> |-- register_time: BIGINT
> |-- etl_time: BIGINT NOT NULL
>
>
>org.apache.flink.table.api.TableException: BIGINT and
>VARCHAR(2147483647) does not have common type now


flink1.10.0 batch模式写入hive失败,找不到TableSinkFactory

2020-10-19 Thread Allen
请问照片中的报错怎么解决?



发自我的iPhone

回复:请问现在Flink支持动态扩缩容吗?

2020-10-19 Thread 熊云昆
目前还不支持吧


| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|

签名由 网易邮箱大师 定制

在2020年10月19日 18:22,林影 写道:
请问现在Flink支持动态扩缩容吗,或者说社区在这方面有什么计划吗?


Re: flink table转datastream失败

2020-10-19 Thread Dream-底限
hi、
我查看了一下,join条件类型是一样的,我这面下游sink使用jdbc时候是可以运行的,但是转换为datastream时候失败了,下面是程序及异常:

streamTableEnv.executeSql(kafkaDDL);//ddl语句见下面日志


Table table = streamTableEnv.sqlQuery("SELECT cast(t1.id as bigint) as
register_id,cast(t1.uid as bigint) as asi_uid,cast(null as bigint) as
person_uuid,cast(t1.app_id as bigint) as app_id,cast(t1.country_id as
bigint) as country_id,t2.channel_id as channel_id,t2.device_id as
device_id,t2.adjust_id as adjust_id,t2.google_adid as
google_adid,cast(t3.referrer as bigint) as referrer,t3.login_pwd as
login_pwd,cast(t1.sync_data_flag as int) as
sync_data_flag,t3.phone_number as
register_phone_number,cast(t2.device_type as int) as
device_type,t2.imei as imei,t2.device_model as
device_model,t2.os_version as os_version,t2.app_name as
app_name,t2.app_version as app_version,t2.app_package_name as
app_package_name,cast(t2.network_type as string) as
network_type,t2.wifi_mac as wifi_mac,t2.lgt as longitude,t2.lat as
latitude,cast(null as string) as geo_hash7,t2.ip as
ip,unix_timestamp(t1.create_time,'-MM-dd HH:mm:ss') as
register_time,UNIX_TIMESTAMP() as etl_time from (SELECT
`uid`,`update_time`,`adjust_id`,`create_time`,`source_type`,`sync_data_flag`,`id`,`app_id`,`country_id`
FROM (SELECT 
`rowData`.`uid`,`rowData`.`update_time`,`rowData`.`adjust_id`,`rowData`.`create_time`,`rowData`.`source_type`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`app_id`,`rowData`.`country_id`,`binlogTime`,ROW_NUMBER()
OVER (PARTITION BY `rowData`.`uid`,`rowData`.`id` ORDER BY
`binlogTime` desc) AS rownum FROM
asi_user_user_service_t_user_register_source) WHERE rownum = 1) t1
left join  (SELECT
`refer_id`,`device_id`,`channel_id`,`device_type`,`imei`,`adjust_id`,`google_adid`,`device_model`,`os_version`,`app_name`,`app_package_name`,`app_version`,`ip`,`network_type`,`wifi_mac`,`lgt`,`lat`,`event_id`,`country_id`,`uid`,`create_time`,`api_version`
FROM (SELECT 
`data`.`data`.`refer_id`,`data`.`data`.`device_id`,`data`.`data`.`channel_id`,`data`.`data`.`device_type`,`data`.`data`.`imei`,`data`.`data`.`adjust_id`,`data`.`data`.`google_adid`,`data`.`data`.`device_model`,`data`.`data`.`os_version`,`data`.`data`.`app_name`,`data`.`data`.`app_package_name`,`data`.`data`.`app_version`,`data`.`data`.`ip`,`data`.`data`.`network_type`,`data`.`data`.`wifi_mac`,`data`.`data`.`lgt`,`data`.`data`.`lat`,`data`.`event_id`,`data`.`country_id`,`data`.`uid`,`data`.`create_time`,`data`.`api_version`,ROW_NUMBER()
OVER (PARTITION BY `data`.`data`.`refer_id`,`data`.`event_id` ORDER BY
createTime desc) AS rownum FROM eventDeviceInfo where
`data`.`event_id`=1002) WHERE rownum = 1) t2 on t1.uid = t2.refer_id
left join (SELECT
`register_channel_source`,`last_login_time`,`create_time`,`language`,`avatar`,`login_pwd`,`email_status`,`storage_source`,`uid`,`referrer`,`update_time`,`nickname`,`phone_number`,`sync_data_flag`,`id`,`country_id`,`email`,`status`
FROM (SELECT 
`rowData`.`register_channel_source`,`rowData`.`last_login_time`,`rowData`.`create_time`,`rowData`.`language`,`rowData`.`avatar`,`rowData`.`login_pwd`,`rowData`.`email_status`,`rowData`.`storage_source`,`rowData`.`uid`,`rowData`.`referrer`,`rowData`.`update_time`,`rowData`.`nickname`,`rowData`.`phone_number`,`rowData`.`sync_data_flag`,`rowData`.`id`,`rowData`.`country_id`,`rowData`.`email`,`rowData`.`status`,`binlogTime`,ROW_NUMBER()
OVER (PARTITION BY `rowData`.`uid` ORDER BY `binlogTime` desc) AS
rownum FROM asi_user_user_service_t_user) WHERE rownum = 1) t3 on
t1.uid=t3.uid");

table.printSchema();
streamTableEnv.toRetractStream(table,
Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes(.print();
streamExecEnv.execute("kafka-json-test");




CREATE TABLE eventDeviceInfo (`data` ROW<`event_id`
BIGINT,`country_id` BIGINT,`uid` BIGINT,`create_time` BIGINT,`data`
ROW<`refer_id` STRING,`device_id` STRING,`channel_id`
STRING,`device_type` BIGINT,`imei` STRING,`adjust_id`
STRING,`google_adid` STRING,`device_model` STRING,`os_version`
STRING,`app_name` STRING,`app_package_name` STRING,`app_version`
STRING,`ip` STRING,`network_type` BIGINT,`wifi_mac` STRING,`lgt`
DECIMAL(38,18),`lat` DECIMAL(38,18)>,`api_version`
STRING>,`createTime` BIGINT) WITH ('connector' = 'kafka-0.11','topic'
= 'eventDeviceInfo','properties.bootstrap.servers' =
'127.0.0.1:9092','properties.group.id' =
'test','properties.max.poll.records' =
'1000','properties.flink.poll-timeout' = '10','scan.startup.mode' =
'earliest-offset','format' = 'json','json.fail-on-missing-field' =
'false','json.ignore-parse-errors' = 'true')
CREATE TABLE asi_user_user_service_t_user (`binlogTime`
BIGINT,`rowData` ROW<`register_channel_source`
STRING,`last_login_time` STRING,`create_time` STRING,`language`
STRING,`avatar` STRING,`login_pwd` STRING,`email_status`
STRING,`storage_source` STRING,`uid` STRING,`referrer`
STRING,`update_time` STRING,`nickname` STRING,`phone_number`
STRING,`sync_data_flag` STRING,`id` STRING,`country_id` STRING,`email`
STRING,`status` STRING>) WITH ('connector'

答复: Flink 1.11里如何parse出未解析的执行计划

2020-10-19 Thread 刘首维
Hi,



 我之前跟你有相同的需求,实现方式也跟你的思路基本类似, mock一个env 然后反射获取calciteParserSupplier


目前在生产环境运行良好

FYI


发件人: 马阳阳 
发送时间: 2020年10月19日 17:57:47
收件人: Flink中文邮件列表
主题: Flink 1.11里如何parse出未解析的执行计划

Flink 
1.11里的org.apache.flink.table.planner.ParserImpl的parse方法里包含了对Planner相关方法的调用,这导致在某些前置sql(例如insert
 into用到的表的create table语句)没有执行之前,这个parse方法会报错。如果只是想调用Calcite的相关的功能去parse 
sql语句,有什么办法可以做到吗?能想到的一个办法是通过反射拿到ParserImpl里面的calciteParserSupplier。想知道Flink有没有提供直接的接口或者方法去做纯的sql
 parsing。


谢谢~


pyflink1.11.0 kafka connector如果有访问权限

2020-10-19 Thread whh_960101
CREATETABLEkafkaTable(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3))WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','format'='csv','scan.startup.mode'='earliest-offset')你好,如果使用sql语句来创建kafkaTable,kafka节点有访问权限,option里面没有设置用户名密码这一项该如何解决?

flink更换scala版本问题

2020-10-19 Thread 赵一旦
如题,更换scala版本的话,不清楚检查点等是否兼容呢?线上操作。

更换原因主要是准备用flinksql,但zeppelin貌似对scala支持为2.11,之前我用的2.12都是。

想着一次性都换成2.11吧。


关于flink-sql count/sum 数据如何每天重新计算

2020-10-19 Thread 夜思流年梦
现有此场景:
计算每天员工的业绩(只计算当天的)


现在我用flink-sql 的方式,insert into  select current_date, count(1) ,worker from XX  
where writeTime>=current_date  group by worker;  
把数据按天分区的方式先把数据sink到mysql


但是发现落地到mysql的数据把前几天的数据都给算进来了,如何只算今天的数据?
另外还有一个疑惑,如何既计算当天数据,又包含了本月的所有数据?



单任务多条流的逻辑报错

2020-10-19 Thread freeza1...@outlook.com
Hi all:

请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class 
invalid for deserialization
at 
java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
at java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)

大致逻辑如下, 我有2条流:
1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
代码如下:
 StreamExecutionEnvironment streamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
List kafkaSourceConfiguration = 
this.kafkaConfiguration.getSource0();
KafkaInfo kafkaSinkConfiguration = this.kafkaConfiguration.getSink();
RecordTransformOperator transformOperator = new 
RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
RecordKeySelector keySelector = new RecordKeySelector();
RecordComputeOperator computeOperator = new RecordComputeOperator();
Properties sinkProperties = new Properties();
sinkProperties.setProperty("bootstrap.servers", 
kafkaSinkConfiguration.getBootstrapServer());
FlinkKafkaProducer011 flinkKafkaProducer
= new 
FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new 
KafkaSerializer(), sinkProperties);

List>> dataStreamList 
= new ArrayList<>();
for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
Properties sourceProperties = new Properties();
sourceProperties.setProperty("bootstrap.servers", 
kafkaInfo.getBootstrapServer());
sourceProperties.setProperty("group.id", kafkaInfo.getGroupId());
sourceProperties.setProperty("max.poll.records", 
kafkaInfo.getMaxPollRecord());
sourceProperties.put("max.poll.interval.ms", 
kafkaInfo.getMaxPollIntervalMs());
String topicName = kafkaInfo.getTopicName();
FlinkKafkaConsumer011> flinkKafkaConsumer
= new FlinkKafkaConsumer011(topicName,
new KafkaDeserializer(),
sourceProperties);
SingleOutputStreamOperator> 
singleOutputStreamOperator =
streamExecutionEnvironment.addSource(flinkKafkaConsumer);
dataStreamList.add(singleOutputStreamOperator);
}

DataStream> unionDataStream = 
dataStreamList.get(0);
for(int i = 1; i> flinkKafkaConsumer
= new FlinkKafkaConsumer011(topicName,
new KafkaDeserializer(),
sourceProperties);
streamExecutionEnvironment
.addSource(flinkKafkaConsumer)
.flatMap(transformOperator1)
.addSink(flinkKafkaProducer1);
streamExecutionEnvironment.execute();



freeza1...@outlook

退订

2020-10-19 Thread 费文杰
hi:
 退订!

Re: 退订

2020-10-19 Thread Congxian Qiu
Hi
   退订请发邮件到  user-zh-unsubscr...@flink.apache.org  更多详细情况可以参考[1]
[1] https://flink.apache.org/community.html#mailing-lists
Best,
Congxian


费文杰 <15171440...@163.com> 于2020年10月20日周二 下午1:51写道:

> hi:
>  退订!


Re: 回复:请问现在Flink支持动态扩缩容吗?

2020-10-19 Thread Yun Tang
Hi

Flink-1.8 之前支持通过rest命令进行扩缩容 
[1],不过后来在重构时该功能被disable了[2]。当然这个功能距离动态扩缩容还是有差距的,可以理解成是从外部进行扩缩容的基础。
目前在阿里巴巴的企业版中,有名为libra的动态扩缩容插件 [3] 提供相关功能。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling
[2] https://issues.apache.org/jira/browse/FLINK-12312
[3] https://developer.aliyun.com/article/727376

祝好
唐云


From: 熊云昆 
Sent: Tuesday, October 20, 2020 7:15
To: 林影 
Cc: user-zh@flink.apache.org 
Subject: 回复:请问现在Flink支持动态扩缩容吗?

目前还不支持吧


| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|

签名由 网易邮箱大师 定制

在2020年10月19日 18:22,林影 写道:
请问现在Flink支持动态扩缩容吗,或者说社区在这方面有什么计划吗?


flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-19 Thread 奔跑的小飞袁
hello 
我在使用flinksql连接器时当我将flink-sql-connector-elasticsearch6_2.11-1.11.1.jar放在lib下,程序正常执行,但是当我在pom中进行配置时会产生如下报错,同样的问题会产生在hbase、jdbc的connector中,请问下这可能是什么造成的
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Unable to create a sink for writing table
'default_catalog.default_database.cloud_behavior_sink'.

Table options are:

'connector'='elasticsearch-6'
'document-type'='cdbp'
'hosts'='http://10.2.11.116:9200;http://10.2.11.117:9200;http://10.2.11.118:9200;http://10.2.11.119:9200'
'index'='flink_sql_test'
'sink.bulk-flush.max-actions'='100'
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a sink for writing table
'default_catalog.default_database.cloud_behavior_sink'.

Table options are:

'connector'='elasticsearch-6'
'document-type'='cdbp'
'hosts'='http://10.2.11.116:9200;http://10.2.11.117:9200;http://10.2.11.118:9200;http://10.2.11.119:9200'
'index'='flink_sql_test'
'sink.bulk-flush.max-actions'='100'
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53)
at
com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option ''connector'='elasticsearch-6''.
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 37 more
Caused by: org.apache.fli

Re: 单任务多条流的逻辑报错

2020-10-19 Thread Robin Zhang
Hi,
   根据报错内容,定位到你的代码在
  at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
  at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
  at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
  at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
InstantiationUtil类的处理有问题,应该是反序列化问题。本地测试没问题,是因为本地不涉及到序列化。

Best,
Robin




freeza1...@outlook.com wrote
> Hi all:
> 
> 请问我用flink1.10.2版本,写了1个代码,这个代码本地可以跑起来,但是以任务方式发布到flink中,启动就报错,异常如下, 请问是什么原因?
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:430)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> 
> (OperatorChain.java:144)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:432)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.InvalidClassException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; class
> invalid for deserialization
> at
> java.io.ObjectStreamClass$ExceptionInfo.newInvalidClassException(ObjectStreamClass.java:150)
> at
> java.io.ObjectStreamClass.checkDeserialize(ObjectStreamClass.java:790)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1782)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> 
> 大致逻辑如下, 我有2条流:
> 1.通过多个kafkasource,得到多个流后union,然后这个union的单流经过2个算子,最后sink到kafka
> 2.通过单个kafkasource,得到流,经过1个算子,最后sink到kafka
> 代码如下:
>  StreamExecutionEnvironment streamExecutionEnvironment =
> StreamExecutionEnvironment.getExecutionEnvironment();
> List
> 
>  kafkaSourceConfiguration = this.kafkaConfiguration.getSource0();
> KafkaInfo kafkaSinkConfiguration =
> this.kafkaConfiguration.getSink();
> RecordTransformOperator transformOperator = new
> RecordTransformOperator(DefRecordTransform.CHAIN_TYPE_TRANSFORM_COMPUTE);
> RecordKeySelector keySelector = new RecordKeySelector();
> RecordComputeOperator computeOperator = new
> RecordComputeOperator();
> Properties sinkProperties = new Properties();
> sinkProperties.setProperty("bootstrap.servers",
> kafkaSinkConfiguration.getBootstrapServer());
> FlinkKafkaProducer011 flinkKafkaProducer
> = new
> FlinkKafkaProducer011(kafkaSinkConfiguration.getTopicName(), new
> KafkaSerializer(), sinkProperties);
> 
> List String>>> dataStreamList = new ArrayList<>();
> for (KafkaInfo kafkaInfo : kafkaSourceConfiguration) {
> Properties sourceProperties = new Properties();
> sourceProperties.setProperty("bootstrap.servers",
> kafkaInfo.getBootstrapServer());
> sourceProperties.setProperty("group.id",
> kafkaInfo.getGroupId());
> sourceProperties.setProperty("max.poll.records",
> kafkaInfo.getMaxPollRecord());
> sourceProperties.put("max.poll.interval.ms",
> kafkaInfo.getMaxPollIntervalMs());
> String topicName = kafkaInfo.getTopicName();
> FlinkKafkaConsumer011>
> flinkKafkaConsumer
> = new FlinkKafkaConsumer011(topicName,
> new Kafk

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-19 Thread Robin Zhang
Hi, 奔跑的小飞袁
Flink的class加载原则是child
first,所以,尽量避免在pom中自己引入flink相关的依赖,避免跟Flink集群环境造成冲突,建议将安装包放在lib下,由flink去加载。

Best,
Robin



奔跑的小飞袁 wrote
> hello 
> 我在使用flinksql连接器时当我将flink-sql-connector-elasticsearch6_2.11-1.11.1.jar放在lib下,程序正常执行,但是当我在pom中进行配置时会产生如下报错,同样的问题会产生在hbase、jdbc的connector中,请问下这可能是什么造成的
> org.apache.flink.client.program.ProgramInvocationException: The main
> method
> caused an error: Unable to create a sink for writing table
> 'default_catalog.default_database.cloud_behavior_sink'.
> 
> Table options are:
> 
> 'connector'='elasticsearch-6'
> 'document-type'='cdbp'
> 'hosts'='http://10.2.11.116:9200;http://10.2.11.117:9200;http://10.2.11.118:9200;http://10.2.11.119:9200'
> 'index'='flink_sql_test'
> 'sink.bulk-flush.max-actions'='100'
>   at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>   at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>   at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>   at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>   at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>   at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
>   at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> create
> a sink for writing table
> 'default_catalog.default_database.cloud_behavior_sink'.
> 
> Table options are:
> 
> 'connector'='elasticsearch-6'
> 'document-type'='cdbp'
> 'hosts'='http://10.2.11.116:9200;http://10.2.11.117:9200;http://10.2.11.118:9200;http://10.2.11.119:9200'
> 'index'='flink_sql_test'
> 'sink.bulk-flush.max-actions'='100'
>   at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>   at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>   at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>   at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>   at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>   ... 11 more
> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover
> a
> connector using option ''connector'='elastics

Re: 关于flink-sql count/sum 数据如何每天重新计算

2020-10-19 Thread Robin Zhang
Hi, 夜思流年梦
我理解按照日期分组就可以解决你的需求,流数据属于哪一天就只算当天的,不影响其他date的数据;
按天分组的数据都计算出来了,再汇总一下就是一个月的

Best,
Robin



夜思流年梦 wrote
> 现有此场景:
> 计算每天员工的业绩(只计算当天的)
> 
> 
> 现在我用flink-sql 的方式,insert into  select current_date, count(1) ,worker from
> XX  where writeTime>=current_date  group by worker;  
> 把数据按天分区的方式先把数据sink到mysql
> 
> 
> 但是发现落地到mysql的数据把前几天的数据都给算进来了,如何只算今天的数据?
> 另外还有一个疑惑,如何既计算当天数据,又包含了本月的所有数据?





--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-19 Thread 奔跑的小飞袁
现在我的lib下没有ElasticSearch相关的connector,在pom中引用,这样会产生冲突吗,还有这种现象有可能是在哪块冲突了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flinksql报错Could not find any factory for identifier 'elasticsearch-6' that implements 'org.apache.flink.table.factories.DynamicTableSinkFactory' in the classpath.

2020-10-19 Thread Robin Zhang
Hi,奔跑的小飞袁
目前没试过flink集成es,所以细节方面没办法深究太多,但是,可以给你提供个思路:
  1. 查看pom中es的dependency是否设置了scope,导致依赖没有成功引入;
  2.
如果依赖成功引入了,但是还不行,相反,在lib下放置相同的jar却可以正常执行,基本可以确定就是依赖冲突,具体什么类导致的,这个目前无法确定,期待更好地思路。


Best,
Robin


奔跑的小飞袁 wrote
> 现在我的lib下没有ElasticSearch相关的connector,在pom中引用,这样会产生冲突吗,还有这种现象有可能是在哪块冲突了
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/





--
Sent from: http://apache-flink.147419.n8.nabble.com/