无法生成rowtime导致在window失败
大家好 在使用窗口的过程中遇到一个问题,麻烦大家帮忙看下! 简单描述下情况:我们是从kafka获取数据,在flink做一些相关处理后sink到elasticsearch中。没有使用window的时候没有问题,可以成功完成流程。使用窗口后报错:Exception in thread "main" org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment. 下边是我的详细流程的相关片段 1. 我们使用的jar包是flink-xx_2.12:1.10.0 / kafka版本为0.11 2. kafka的数据格式为{"acct":"acct1234", "evtime":1593396391819} 3. 使用descriptor的方式连接kafka,代码为: StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv); fsTableEnv.connect(new Kafka() .version("universal") .topic("jes_topic_evtime") .property("zookeeper.connect", "172.xx.xx.xxx:2181") .property("bootstrap.servers", "172.xx.xx.xxx:9092") .property("group.id", "grp1") .startFromEarliest() ).withFormat(new Json() .failOnMissingField(false).deriveSchema()) .withSchema(new Schema().field("acct", "STRING").field("evtime", "LONG").field("logictime","TIMESTAMP(3)").rowTime(new Rowtime().timestampsFromField("evtime").watermarksPeriodicBounded(5000))) .inAppendMode().createTemporaryTable("testTableName"); Table testTab = fsTableEnv.sqlQuery("SELECT acct, evtime, logictime FROM testTableName") .window(Tumble.over("5.seconds").on("logictime").as("w1")) .groupBy("w1, acct") .select("w1.rowtime, acctno"); 测试发现在descriptor连接kafka时定义schema时,定义的rowtime字段和使用from的方式重命名字段好像都无法成功。测试时使用from方式重命名字段返回的值是null
回复:sinktable更新部分字段问题
Hi, Leonard xu 谢谢你的回复 connector当前是支持两种模式的。但是更新的时候,select的字段数量必须等于es索引的全部字段。我这边想要根据主键更新索引的部分字段 | | naturalfree | | 邮箱:naturalf...@126.com | 签名由 网易邮箱大师 定制 在2020年05月22日 23:29,Leonard Xu 写道: Hi,naturalfree Flink SQL 里es sink 是支持Append mode和upsert mode的[1],upsert mode下支持按主键更新的,你可以看看。 Best, Leonard Xu [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connect.html#elasticsearch-connector <https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/connect.html#elasticsearch-connector>; > 在 2020年5月20日,16:07,naturalfree 写道: > > 现在有一个es索引,想通过flink sql根据主键更新部分字段。不知是否有可行方案 > > > | | > naturalfree > | > | > 邮箱:naturalf...@126.com > | > > 签名由 网易邮箱大师 定制
回复:sql client定义指向elasticsearch索引密码问题
好的,非常感谢 | | naturalfree | | 邮箱:naturalf...@126.com | 签名由 网易邮箱大师 定制 在2020年05月22日 11:15,Yangze Guo 写道: 目前1.11已经feature freeze,该功能最早1.12才能支持,着急的话可以看看DataStream API的ElasticSearchSink,这个是支持安全认证的,也可以自己实现一个TableSink Best, Yangze Guo On Fri, May 22, 2020 at 9:59 AM Rui Li wrote: > > Hi,目前还不支持,不过有PR在做这个功能:https://github.com/apache/flink/pull/11822 > > On Wed, May 20, 2020 at 4:10 PM naturalfree wrote: > > > 在flink sql client配置文件中定义指向es的索引。发现没有设置用户名密码的属性,现在的es connector是否支持安全认证呢 > > > > | | > > naturalfree > > | > > | > > 邮箱:naturalf...@126.com > > | > > > > 签名由 网易邮箱大师 定制 > > > > -- > Best regards! > Rui Li
sql client定义指向elasticsearch索引密码问题
在flink sql client配置文件中定义指向es的索引。发现没有设置用户名密码的属性,现在的es connector是否支持安全认证呢 | | naturalfree | | 邮箱:naturalf...@126.com | 签名由 网易邮箱大师 定制
sinktable更新部分字段问题
现在有一个es索引,想通过flink sql根据主键更新部分字段。不知是否有可行方案 | | naturalfree | | 邮箱:naturalf...@126.com | 签名由 网易邮箱大师 定制