Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
 TABLE myUserTable (
  user_id STRING,
  user_name STRING
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'users'
);Connector Options
| Option | Required | Default | Type | Description |
|
connector
| required | (none) | String | Specify what connector to use, valid values are:
elasticsearch-6: connect to Elasticsearch 6.x cluster
elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
|
|
hosts
| required | (none) | String | One or more Elasticsearch hosts to connect to, 
e.g. 'http://host_name:9092;http://host_name:9093'. |
|
index
| required | (none) | String | Elasticsearch index for every record. Can be a 
static index (e.g. 'myIndex') or a dynamic index (e.g. 
'index-{log_ts|yyyy-MM-dd}'). See the following Dynamic Indexsection for more 
details. |
|
document-type
| required in 6.x | (none) | String | Elasticsearch document type. Not 
necessary anymore in elasticsearch-7. |
|
document-id.key-delimiter
| optional | _ | String | Delimiter for composite keys ("_" by default), e.g., 
"$" would result in IDs "KEY1$KEY2$KEY3"." |
|
failure-handler
| optional | fail | String | Failure handling strategy in case a request to 
Elasticsearch fails. Valid strategies are:
fail: throws an exception if a request fails and thus causes a job failure.
ignore: ignores failures and drops the request.
retry_rejected: re-adds requests that have failed due to queue capacity 
saturation.
custom class name: for failure handling with a ActionRequestFailureHandler 
subclass.
|
|
sink.flush-on-checkpoint
| optional | true | Boolean | Flush on checkpoint or not. When disabled, a sink 
will not wait for all pending action requests to be acknowledged by 
Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
guarantees for at-least-once delivery of action requests. |
|
sink.bulk-flush.max-actions
| optional | 1000 | Integer | Maximum number of buffered actions per bulk 
request. Can be set to '0' to disable it. |
|
sink.bulk-flush.max-size
| optional | 2mb | MemorySize | Maximum size in memory of buffered actions per 
bulk request. Must be in MB granularity. Can be set to '0' to disable it. |
|
sink.bulk-flush.interval
| optional | 1s | Duration | The interval to flush buffered actions. Can be set 
to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
allowing for complete async processing of buffered actions. |
|
sink.bulk-flush.backoff.strategy
| optional | DISABLED | String | Specify how to perform retries if any flush 
actions failed due to a temporary request error. Valid strategies are:
DISABLED: no retry performed, i.e. fail after the first request error.
CONSTANT: wait for backoff delay between retries.
EXPONENTIAL: initially wait for backoff delay and increase exponentially 
between retries.
|
|
sink.bulk-flush.backoff.max-retries
| optional | 8 | Integer | Maximum number of backoff retries. |
|
sink.bulk-flush.backoff.delay
| optional | 50ms | Duration | Delay between each backoff attempt. For CONSTANT 
backoff, this is simply the delay between each retry. For EXPONENTIAL backoff, 
this is the initial base delay. |
|
connection.max-retry-timeout
| optional | (none) | Duration | Maximum timeout between retries. |
|
connection.path-prefix
| optional | (none) | String | Prefix string to be added to every REST 
communication, e.g., '/v1' |
|
format
| optional | json | String | Elasticsearch connector supports to specify a 
format. The format must produce a valid json document. By default uses built-in 
'json' format. Please refer to JSON Format page for more details. |






 

回复