你好:
可以自己构建 indexRequest 设置id,type,source 等字段
 ElasticsearchSinkFunction 不知道是否满足你的需求?


发件人: Jark Wu
发送时间: 2019年8月26日 18:00
主题: Re: 关于elasticSearch table sink 构造过于复杂

> ETL作业, 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.

据我所知,目前是不支持的。 可以去建个 JIRA 给社区提需求。


如果使用的 blink planner,可以使用 deduplicate with keeping first row,是一个比较轻量的去重计算,能拿到一个 
key (也就是去重 key)。
文档还在 review 中,可以先看这个PR: 
https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953
 
<https://github.com/apache/flink/pull/9511/files#diff-b56b1750a20591d2ba61ba99eb3d3539R953>


Best,
Jark


> 在 2019年8月26日,17:20,hb <343122...@163.com> 写道:
> 
> 没有group by的语句,比如就是select * from table ,表明细数据,以DDL 方式 写入 es,
> 能指定某个字段作为es的主键id么, 我试了同步数据明细到es中,但是id 却是随机生成的.
> 
> 
> 
> 在 2019-08-26 15:47:53,"Jark Wu" <imj...@gmail.com> 写道:
>> 嗯,descriptor 和 DDL 就是可以用于这个场景,将 table 查询结果直接写入 sink。
>> 
>> Best,
>> Jark
>> 
>> 
>> 
>>> 在 2019年8月26日,16:44,巫旭阳 <danxieai...@163.com> 写道:
>>> 
>>> 感谢解答,
>>> 我的意图是 构建EStablesink,可以将table 查询的结果 直接写入ES 避免再转换DataStream 通过ESSink写入
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2019-08-26 16:39:49,"Jark Wu" <imj...@gmail.com> 写道:
>>>> Hi ,
>>>> 
>>>> 
>>>> Elasticsearch6UpsertTableSink 是标记成 @internal 的,不是开放给用户直接去构造的。
>>>> 如果要注册一个 ES sink,可以使用 descriptor API,也就是 
>>>> org.apache.flink.table.descriptors.Elasticsearch。
>>>> 或者使用 DDL 方式注册。
>>>> 
>>>> 
>>>> Best,
>>>> Jark
>>>> 
>>>>> 在 2019年8月26日,16:33,aven.wu <danxieai...@163.com> 写道:
>>>>> 
>>>>> Elasticsearch6UpsertTableSink
>>>>> 的构造方法过于复杂参数非常多
>>>>> 
>>>>> public Elasticsearch6UpsertTableSink(
>>>>>    boolean isAppendOnly,
>>>>>    TableSchema schema,
>>>>>    List<Host> hosts,
>>>>>    String index,
>>>>>    String docType,
>>>>>    String keyDelimiter,
>>>>>    String keyNullLiteral,
>>>>>    SerializationSchema<Row> serializationSchema,
>>>>>    XContentType contentType,
>>>>>    ActionRequestFailureHandler failureHandler,
>>>>>    Map<SinkOption, String> sinkOptions) {
>>>>> 
>>>>> super(
>>>>>    isAppendOnly,
>>>>>    schema,
>>>>>    hosts,
>>>>>    index,
>>>>>    docType,
>>>>>    keyDelimiter,
>>>>>    keyNullLiteral,
>>>>>    serializationSchema,
>>>>>    contentType,
>>>>>    failureHandler,
>>>>>    sinkOptions,
>>>>>    UPDATE_REQUEST_FACTORY);
>>>>> }
>>>>> 
>>>>> 
>>>>> 请问,是不是我的用法不对?
>>>>> 有没有类似ElasticTableSink.bulid 的建造类,方便构造tableSink对象。
>>>>> 



回复