Hi, Pyflink 1.11还不支持datastream,1.12才有
Best, Xingbo whh_960101 <whh_960...@163.com> 于2020年10月27日周二 下午2:58写道: > 有没有其他方式可以写入username和password,我了解java > flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦! > > > > > > > > 在 2020-10-22 16:34:56,"Yangze Guo" <karma...@gmail.com> 写道: > >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1] > > > >[1] https://issues.apache.org/jira/browse/FLINK-18361 > > > >Best, > >Yangze Guo > > > >On Thu, Oct 22, 2020 at 3:47 PM whh_960101 <whh_960...@163.com> wrote: > >> > >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch > connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps:// > ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE > TABLE myUserTable ( > >> user_id STRING, > >> user_name STRING > >> uv BIGINT, > >> pv BIGINT, > >> PRIMARY KEY (user_id) NOT ENFORCED > >> ) WITH ( > >> 'connector' = 'elasticsearch-7', > >> 'hosts' = 'http://localhost:9200', > >> 'index' = 'users' > >> );Connector Options > >> | Option | Required | Default | Type | Description | > >> | > >> connector > >> | required | (none) | String | Specify what connector to use, valid > values are: > >> elasticsearch-6: connect to Elasticsearch 6.x cluster > >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster > >> | > >> | > >> hosts > >> | required | (none) | String | One or more Elasticsearch hosts to > connect to, e.g. 'http://host_name:9092;http://host_name:9093'. | > >> | > >> index > >> | required | (none) | String | Elasticsearch index for every record. > Can be a static index (e.g. 'myIndex') or a dynamic index (e.g. > 'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for > more details. | > >> | > >> document-type > >> | required in 6.x | (none) | String | Elasticsearch document type. Not > necessary anymore in elasticsearch-7. | > >> | > >> document-id.key-delimiter > >> | optional | _ | String | Delimiter for composite keys ("_" by > default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." | > >> | > >> failure-handler > >> | optional | fail | String | Failure handling strategy in case a > request to Elasticsearch fails. Valid strategies are: > >> fail: throws an exception if a request fails and thus causes a job > failure. > >> ignore: ignores failures and drops the request. > >> retry_rejected: re-adds requests that have failed due to queue capacity > saturation. > >> custom class name: for failure handling with a > ActionRequestFailureHandler subclass. > >> | > >> | > >> sink.flush-on-checkpoint > >> | optional | true | Boolean | Flush on checkpoint or not. When > disabled, a sink will not wait for all pending action requests to be > acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide > any strong guarantees for at-least-once delivery of action requests. | > >> | > >> sink.bulk-flush.max-actions > >> | optional | 1000 | Integer | Maximum number of buffered actions per > bulk request. Can be set to '0' to disable it. | > >> | > >> sink.bulk-flush.max-size > >> | optional | 2mb | MemorySize | Maximum size in memory of buffered > actions per bulk request. Must be in MB granularity. Can be set to '0' to > disable it. | > >> | > >> sink.bulk-flush.interval > >> | optional | 1s | Duration | The interval to flush buffered actions. > Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and > 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set > allowing for complete async processing of buffered actions. | > >> | > >> sink.bulk-flush.backoff.strategy > >> | optional | DISABLED | String | Specify how to perform retries if any > flush actions failed due to a temporary request error. Valid strategies are: > >> DISABLED: no retry performed, i.e. fail after the first request error. > >> CONSTANT: wait for backoff delay between retries. > >> EXPONENTIAL: initially wait for backoff delay and increase > exponentially between retries. > >> | > >> | > >> sink.bulk-flush.backoff.max-retries > >> | optional | 8 | Integer | Maximum number of backoff retries. | > >> | > >> sink.bulk-flush.backoff.delay > >> | optional | 50ms | Duration | Delay between each backoff attempt. For > CONSTANT backoff, this is simply the delay between each retry. For > EXPONENTIAL backoff, this is the initial base delay. | > >> | > >> connection.max-retry-timeout > >> | optional | (none) | Duration | Maximum timeout between retries. | > >> | > >> connection.path-prefix > >> | optional | (none) | String | Prefix string to be added to every REST > communication, e.g., '/v1' | > >> | > >> format > >> | optional | json | String | Elasticsearch connector supports to > specify a format. The format must produce a valid json document. By default > uses built-in 'json' format. Please refer to JSON Format page for more > details. | > >> > >> > >> > >> > >> > >> > >> > > > > > >