Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-26 文章 whh_960101
有没有其他方式可以写入username和password,我了解java 
flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦!







在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>





 

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>


Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?















在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>


Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-22 文章 whh_960101
现在能更新到1.12吗,好像还没有发布,有什么其他的解决办法吗?

















在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>