无法生成rowtime导致在window失败

2020-06-28 文章 naturalfree
大家好
   在使用窗口的过程中遇到一个问题,麻烦大家帮忙看下!
   
简单描述下情况:我们是从kafka获取数据,在flink做一些相关处理后sink到elasticsearch中。没有使用window的时候没有问题,可以成功完成流程。使用窗口后报错:Exception
 in thread "main" org.apache.flink.table.api.ValidationException: A group 
window expects a time attribute for grouping in a stream environment.
 
   下边是我的详细流程的相关片段

1. 我们使用的jar包是flink-xx_2.12:1.10.0 / kafka版本为0.11
2. kafka的数据格式为{"acct":"acct1234", "evtime":1593396391819}
3. 使用descriptor的方式连接kafka,代码为:
StreamExecutionEnvironment fsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv);


fsTableEnv.connect(new Kafka()
  .version("universal")
  .topic("jes_topic_evtime")
  .property("zookeeper.connect", 
"172.xx.xx.xxx:2181")
  .property("bootstrap.servers", 
"172.xx.xx.xxx:9092")
  .property("group.id", "grp1")
  .startFromEarliest()
).withFormat(new Json()
  
.failOnMissingField(false).deriveSchema())
   .withSchema(new 
Schema().field("acct", "STRING").field("evtime", 
"LONG").field("logictime","TIMESTAMP(3)").rowTime(new 
Rowtime().timestampsFromField("evtime").watermarksPeriodicBounded(5000)))
 
.inAppendMode().createTemporaryTable("testTableName");


   Table testTab = fsTableEnv.sqlQuery("SELECT acct, evtime, logictime 
FROM testTableName")
   
.window(Tumble.over("5.seconds").on("logictime").as("w1"))
   .groupBy("w1, acct")
   .select("w1.rowtime, acctno");




测试发现在descriptor连接kafka时定义schema时,定义的rowtime字段和使用from的方式重命名字段好像都无法成功。测试时使用from方式重命名字段返回的值是null

回复:sinktable更新部分字段问题

2020-05-26 文章 naturalfree
Hi, Leonard xu

谢谢你的回复
connector当前是支持两种模式的。但是更新的时候,select的字段数量必须等于es索引的全部字段。我这边想要根据主键更新索引的部分字段


| |
naturalfree
|
|
邮箱:naturalf...@126.com
|

签名由 网易邮箱大师 定制

在2020年05月22日 23:29,Leonard Xu 写道:
Hi,naturalfree

Flink SQL 里es sink 是支持Append mode和upsert mode的[1],upsert mode下支持按主键更新的,你可以看看。


Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connect.html#elasticsearch-connector
 
<https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connect.html#elasticsearch-connector>;

> 在 2020年5月20日,16:07,naturalfree  写道:
>
> 现在有一个es索引,想通过flink sql根据主键更新部分字段。不知是否有可行方案
>
>
> | |
> naturalfree
> |
> |
> 邮箱:naturalf...@126.com
> |
>
> 签名由 网易邮箱大师 定制



回复:sql client定义指向elasticsearch索引密码问题

2020-05-22 文章 naturalfree
好的,非常感谢


| |
naturalfree
|
|
邮箱:naturalf...@126.com
|

签名由 网易邮箱大师 定制

在2020年05月22日 11:15,Yangze Guo 写道:
目前1.11已经feature freeze,该功能最早1.12才能支持,着急的话可以看看DataStream
API的ElasticSearchSink,这个是支持安全认证的,也可以自己实现一个TableSink

Best,
Yangze Guo

On Fri, May 22, 2020 at 9:59 AM Rui Li  wrote:
>
> Hi,目前还不支持,不过有PR在做这个功能:https://github.com/apache/flink/pull/11822
>
> On Wed, May 20, 2020 at 4:10 PM naturalfree  wrote:
>
> > 在flink sql client配置文件中定义指向es的索引。发现没有设置用户名密码的属性,现在的es connector是否支持安全认证呢
> >
> > | |
> > naturalfree
> > |
> > |
> > 邮箱:naturalf...@126.com
> > |
> >
> > 签名由 网易邮箱大师 定制
>
>
>
> --
> Best regards!
> Rui Li


sql client定义指向elasticsearch索引密码问题

2020-05-20 文章 naturalfree
在flink sql client配置文件中定义指向es的索引。发现没有设置用户名密码的属性,现在的es connector是否支持安全认证呢

| |
naturalfree
|
|
邮箱:naturalf...@126.com
|

签名由 网易邮箱大师 定制

sinktable更新部分字段问题

2020-05-20 文章 naturalfree
现在有一个es索引,想通过flink sql根据主键更新部分字段。不知是否有可行方案


| |
naturalfree
|
|
邮箱:naturalf...@126.com
|

签名由 网易邮箱大师 定制