Re: Elasticsearch connector support?
Hi HangThanks for the link. I will wait for 3.1 connector release and hope it will be included. Med venlig hilsen / Best regardsLasse NedergaardDen 27. nov. 2023 kl. 12.00 skrev Hang Ruan :Hi, Lasse.There is already a discussion about the connector releases for 1.18[1].Best,Hang[1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2Lasse Nedergaard于2023年11月24日周五 22:57写道:Hi From the documentation I can see there isn’t any ES support in Flink 1.18 right now and Flink-26088 (ES 8 support) is still open. Does anyone has an idea when ES connector support will be available in 1.18 Please let me know. Med venlig hilsen / Best regards Lasse Nedergaard
Re: Elasticsearch connector support?
Hi, Lasse. There is already a discussion about the connector releases for 1.18[1]. Best, Hang [1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2 Lasse Nedergaard 于2023年11月24日周五 22:57写道: > Hi > > From the documentation I can see there isn’t any ES support in Flink 1.18 > right now and Flink-26088 (ES 8 support) is still open. > > Does anyone has an idea when ES connector support will be available in 1.18 > > Please let me know. > > Med venlig hilsen / Best regards > Lasse Nedergaard > >
Elasticsearch connector support?
Hi From the documentation I can see there isn’t any ES support in Flink 1.18 right now and Flink-26088 (ES 8 support) is still open. Does anyone has an idea when ES connector support will be available in 1.18 Please let me know. Med venlig hilsen / Best regards Lasse Nedergaard
Re: Flink-Elasticsearch connector support for elasticsearch 2.0
Hi Madhu, Not yet. The API has changed slightly. We'll add one very soon. In the meantime I've created an issue to keep track of the status: https://issues.apache.org/jira/browse/FLINK-3115 Thanks, Max On Thu, Dec 3, 2015 at 10:50 PM, Madhukar Thotawrote: > is current elasticsearch-flink connector support elasticsearch 2.x version? > > -Madhu
Re: Flink-Elasticsearch connector support for elasticsearch 2.0
Hi Madhu, Great. Do you want to contribute it back via a GitHub pull request? If not that's also fine. We will try look into the 2.0 connector next week. Best, Max On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thotawrote: > i have created working connector for Elasticsearch 2.0 based on > elasticsearch-flink connector. I am using it right now but i want official > connector from flink. > > ElasticsearchSink.java > > > import org.apache.flink.api.java.utils.ParameterTool; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import java.net.InetAddress; > import java.net.UnknownHostException; > import java.util.List; > import java.util.Map; > import java.util.concurrent.atomic.AtomicBoolean; > import java.util.concurrent.atomic.AtomicReference; > > import org.elasticsearch.action.bulk.BulkItemResponse; > import org.elasticsearch.action.bulk.BulkProcessor; > import org.elasticsearch.action.bulk.BulkRequest; > import org.elasticsearch.action.bulk.BulkResponse; > import org.elasticsearch.action.index.IndexRequest; > import org.elasticsearch.client.Client; > import org.elasticsearch.client.transport.TransportClient; > import org.elasticsearch.cluster.node.DiscoveryNode; > import org.elasticsearch.common.settings.Settings; > import org.elasticsearch.common.transport.InetSocketTransportAddress; > import org.elasticsearch.common.unit.ByteSizeUnit; > import org.elasticsearch.common.unit.ByteSizeValue; > import org.elasticsearch.common.unit.TimeValue; > > > public class ElasticsearchSink extends RichSinkFunction { > > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = > "bulk.flush.max.actions"; > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = > "bulk.flush.max.size.mb"; > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = > "bulk.flush.interval.ms"; > > private static final long serialVersionUID = 1L; > private static final int DEFAULT_PORT = 9300; > private static final Logger LOG = > LoggerFactory.getLogger(ElasticsearchSink.class); > > /** > * The user specified config map that we forward to Elasticsearch when > we create the Client. > */ > private final Map userConfig; > > /** > * The builder that is used to construct an {@link IndexRequest} from > the incoming element. > */ > private final IndexRequestBuilder indexRequestBuilder; > > /** > * The Client that was either retrieved from a Node or is a > TransportClient. > */ > private transient Client client; > > /** > * Bulk processor that was created using the client > */ > private transient BulkProcessor bulkProcessor; > > /** > * This is set from inside the BulkProcessor listener if there where > failures in processing. > */ > private final AtomicBoolean hasFailure = new AtomicBoolean(false); > > /** > * This is set from inside the BulkProcessor listener if a Throwable was > thrown during processing. > */ > private final AtomicReference failureThrowable = new > AtomicReference(); > > public ElasticsearchSink(Map userConfig, > IndexRequestBuilder indexRequestBuilder) { > this.userConfig = userConfig; > this.indexRequestBuilder = indexRequestBuilder; > } > > > @Override > public void open(Configuration configuration) { > > ParameterTool params = ParameterTool.fromMap(userConfig); > Settings settings = Settings.settingsBuilder() > .put(userConfig) > .build(); > > TransportClient transportClient = > TransportClient.builder().settings(settings).build(); > for (String server : params.get("esHost").split(";")) > { > String[] components = server.trim().split(":"); > String host = components[0]; > int port = DEFAULT_PORT; > if (components.length > 1) > { > port = Integer.parseInt(components[1]); > } > > try { > transportClient = transportClient.addTransportAddress(new > InetSocketTransportAddress(InetAddress.getByName(host), port)); > } catch (UnknownHostException e) { > e.printStackTrace(); > } > } > > List nodes = transportClient.connectedNodes(); > if (nodes.isEmpty()) { > throw new RuntimeException("Client is not connected to any > Elasticsearch nodes!"); > } else { > if (LOG.isDebugEnabled()) { > LOG.info("Connected to nodes: " + nodes.toString()); > } > } > client = transportClient; > > BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder( > client, > new BulkProcessor.Listener() { >
Re: Flink-Elasticsearch connector support for elasticsearch 2.0
Sure. I can submit the pull request. On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michelswrote: > Hi Madhu, > > Great. Do you want to contribute it back via a GitHub pull request? If > not that's also fine. We will try look into the 2.0 connector next > week. > > Best, > Max > > On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota > wrote: > > i have created working connector for Elasticsearch 2.0 based on > > elasticsearch-flink connector. I am using it right now but i want > official > > connector from flink. > > > > ElasticsearchSink.java > > > > > > import org.apache.flink.api.java.utils.ParameterTool; > > import org.apache.flink.configuration.Configuration; > > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; > > import org.slf4j.Logger; > > import org.slf4j.LoggerFactory; > > > > import java.net.InetAddress; > > import java.net.UnknownHostException; > > import java.util.List; > > import java.util.Map; > > import java.util.concurrent.atomic.AtomicBoolean; > > import java.util.concurrent.atomic.AtomicReference; > > > > import org.elasticsearch.action.bulk.BulkItemResponse; > > import org.elasticsearch.action.bulk.BulkProcessor; > > import org.elasticsearch.action.bulk.BulkRequest; > > import org.elasticsearch.action.bulk.BulkResponse; > > import org.elasticsearch.action.index.IndexRequest; > > import org.elasticsearch.client.Client; > > import org.elasticsearch.client.transport.TransportClient; > > import org.elasticsearch.cluster.node.DiscoveryNode; > > import org.elasticsearch.common.settings.Settings; > > import org.elasticsearch.common.transport.InetSocketTransportAddress; > > import org.elasticsearch.common.unit.ByteSizeUnit; > > import org.elasticsearch.common.unit.ByteSizeValue; > > import org.elasticsearch.common.unit.TimeValue; > > > > > > public class ElasticsearchSink extends RichSinkFunction { > > > > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = > > "bulk.flush.max.actions"; > > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = > > "bulk.flush.max.size.mb"; > > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = > > "bulk.flush.interval.ms"; > > > > private static final long serialVersionUID = 1L; > > private static final int DEFAULT_PORT = 9300; > > private static final Logger LOG = > > LoggerFactory.getLogger(ElasticsearchSink.class); > > > > /** > > * The user specified config map that we forward to Elasticsearch > when > > we create the Client. > > */ > > private final Map userConfig; > > > > /** > > * The builder that is used to construct an {@link IndexRequest} from > > the incoming element. > > */ > > private final IndexRequestBuilder indexRequestBuilder; > > > > /** > > * The Client that was either retrieved from a Node or is a > > TransportClient. > > */ > > private transient Client client; > > > > /** > > * Bulk processor that was created using the client > > */ > > private transient BulkProcessor bulkProcessor; > > > > /** > > * This is set from inside the BulkProcessor listener if there where > > failures in processing. > > */ > > private final AtomicBoolean hasFailure = new AtomicBoolean(false); > > > > /** > > * This is set from inside the BulkProcessor listener if a Throwable > was > > thrown during processing. > > */ > > private final AtomicReference failureThrowable = new > > AtomicReference(); > > > > public ElasticsearchSink(Map userConfig, > > IndexRequestBuilder indexRequestBuilder) { > > this.userConfig = userConfig; > > this.indexRequestBuilder = indexRequestBuilder; > > } > > > > > > @Override > > public void open(Configuration configuration) { > > > > ParameterTool params = ParameterTool.fromMap(userConfig); > > Settings settings = Settings.settingsBuilder() > > .put(userConfig) > > .build(); > > > > TransportClient transportClient = > > TransportClient.builder().settings(settings).build(); > > for (String server : params.get("esHost").split(";")) > > { > > String[] components = server.trim().split(":"); > > String host = components[0]; > > int port = DEFAULT_PORT; > > if (components.length > 1) > > { > > port = Integer.parseInt(components[1]); > > } > > > > try { > > transportClient = transportClient.addTransportAddress(new > > InetSocketTransportAddress(InetAddress.getByName(host), port)); > > } catch (UnknownHostException e) { > > e.printStackTrace(); > > } > > } > > > > List nodes = transportClient.connectedNodes(); > > if (nodes.isEmpty()) { > > throw new RuntimeException("Client is not connected to any >
Re: Flink-Elasticsearch connector support for elasticsearch 2.0
shouldn't be better to have both connectors for ES?one for 1.x and another for 2.x? On 4 Dec 2015 20:55, "Madhukar Thota"wrote: > Sure. I can submit the pull request. > > On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels > wrote: > >> Hi Madhu, >> >> Great. Do you want to contribute it back via a GitHub pull request? If >> not that's also fine. We will try look into the 2.0 connector next >> week. >> >> Best, >> Max >> >> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota >> wrote: >> > i have created working connector for Elasticsearch 2.0 based on >> > elasticsearch-flink connector. I am using it right now but i want >> official >> > connector from flink. >> > >> > ElasticsearchSink.java >> > >> > >> > import org.apache.flink.api.java.utils.ParameterTool; >> > import org.apache.flink.configuration.Configuration; >> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; >> > import org.slf4j.Logger; >> > import org.slf4j.LoggerFactory; >> > >> > import java.net.InetAddress; >> > import java.net.UnknownHostException; >> > import java.util.List; >> > import java.util.Map; >> > import java.util.concurrent.atomic.AtomicBoolean; >> > import java.util.concurrent.atomic.AtomicReference; >> > >> > import org.elasticsearch.action.bulk.BulkItemResponse; >> > import org.elasticsearch.action.bulk.BulkProcessor; >> > import org.elasticsearch.action.bulk.BulkRequest; >> > import org.elasticsearch.action.bulk.BulkResponse; >> > import org.elasticsearch.action.index.IndexRequest; >> > import org.elasticsearch.client.Client; >> > import org.elasticsearch.client.transport.TransportClient; >> > import org.elasticsearch.cluster.node.DiscoveryNode; >> > import org.elasticsearch.common.settings.Settings; >> > import org.elasticsearch.common.transport.InetSocketTransportAddress; >> > import org.elasticsearch.common.unit.ByteSizeUnit; >> > import org.elasticsearch.common.unit.ByteSizeValue; >> > import org.elasticsearch.common.unit.TimeValue; >> > >> > >> > public class ElasticsearchSink extends RichSinkFunction { >> > >> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = >> > "bulk.flush.max.actions"; >> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = >> > "bulk.flush.max.size.mb"; >> > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = >> > "bulk.flush.interval.ms"; >> > >> > private static final long serialVersionUID = 1L; >> > private static final int DEFAULT_PORT = 9300; >> > private static final Logger LOG = >> > LoggerFactory.getLogger(ElasticsearchSink.class); >> > >> > /** >> > * The user specified config map that we forward to Elasticsearch >> when >> > we create the Client. >> > */ >> > private final Map userConfig; >> > >> > /** >> > * The builder that is used to construct an {@link IndexRequest} >> from >> > the incoming element. >> > */ >> > private final IndexRequestBuilder indexRequestBuilder; >> > >> > /** >> > * The Client that was either retrieved from a Node or is a >> > TransportClient. >> > */ >> > private transient Client client; >> > >> > /** >> > * Bulk processor that was created using the client >> > */ >> > private transient BulkProcessor bulkProcessor; >> > >> > /** >> > * This is set from inside the BulkProcessor listener if there where >> > failures in processing. >> > */ >> > private final AtomicBoolean hasFailure = new AtomicBoolean(false); >> > >> > /** >> > * This is set from inside the BulkProcessor listener if a >> Throwable was >> > thrown during processing. >> > */ >> > private final AtomicReference failureThrowable = new >> > AtomicReference(); >> > >> > public ElasticsearchSink(Map userConfig, >> > IndexRequestBuilder indexRequestBuilder) { >> > this.userConfig = userConfig; >> > this.indexRequestBuilder = indexRequestBuilder; >> > } >> > >> > >> > @Override >> > public void open(Configuration configuration) { >> > >> > ParameterTool params = ParameterTool.fromMap(userConfig); >> > Settings settings = Settings.settingsBuilder() >> > .put(userConfig) >> > .build(); >> > >> > TransportClient transportClient = >> > TransportClient.builder().settings(settings).build(); >> > for (String server : params.get("esHost").split(";")) >> > { >> > String[] components = server.trim().split(":"); >> > String host = components[0]; >> > int port = DEFAULT_PORT; >> > if (components.length > 1) >> > { >> > port = Integer.parseInt(components[1]); >> > } >> > >> > try { >> > transportClient = >> transportClient.addTransportAddress(new >> > InetSocketTransportAddress(InetAddress.getByName(host), port));
Re: Flink-Elasticsearch connector support for elasticsearch 2.0
Hi Madhu Would you be able to provide the use case here in ElasticSearch with Flink? Thanks Deepak On Sat, Dec 5, 2015 at 1:25 AM, Madhukar Thotawrote: > Sure. I can submit the pull request. > > On Fri, Dec 4, 2015 at 12:37 PM, Maximilian Michels > wrote: > >> Hi Madhu, >> >> Great. Do you want to contribute it back via a GitHub pull request? If >> not that's also fine. We will try look into the 2.0 connector next >> week. >> >> Best, >> Max >> >> On Fri, Dec 4, 2015 at 4:16 PM, Madhukar Thota >> wrote: >> > i have created working connector for Elasticsearch 2.0 based on >> > elasticsearch-flink connector. I am using it right now but i want >> official >> > connector from flink. >> > >> > ElasticsearchSink.java >> > >> > >> > import org.apache.flink.api.java.utils.ParameterTool; >> > import org.apache.flink.configuration.Configuration; >> > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; >> > import org.slf4j.Logger; >> > import org.slf4j.LoggerFactory; >> > >> > import java.net.InetAddress; >> > import java.net.UnknownHostException; >> > import java.util.List; >> > import java.util.Map; >> > import java.util.concurrent.atomic.AtomicBoolean; >> > import java.util.concurrent.atomic.AtomicReference; >> > >> > import org.elasticsearch.action.bulk.BulkItemResponse; >> > import org.elasticsearch.action.bulk.BulkProcessor; >> > import org.elasticsearch.action.bulk.BulkRequest; >> > import org.elasticsearch.action.bulk.BulkResponse; >> > import org.elasticsearch.action.index.IndexRequest; >> > import org.elasticsearch.client.Client; >> > import org.elasticsearch.client.transport.TransportClient; >> > import org.elasticsearch.cluster.node.DiscoveryNode; >> > import org.elasticsearch.common.settings.Settings; >> > import org.elasticsearch.common.transport.InetSocketTransportAddress; >> > import org.elasticsearch.common.unit.ByteSizeUnit; >> > import org.elasticsearch.common.unit.ByteSizeValue; >> > import org.elasticsearch.common.unit.TimeValue; >> > >> > >> > public class ElasticsearchSink extends RichSinkFunction { >> > >> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = >> > "bulk.flush.max.actions"; >> > public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = >> > "bulk.flush.max.size.mb"; >> > public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = >> > "bulk.flush.interval.ms"; >> > >> > private static final long serialVersionUID = 1L; >> > private static final int DEFAULT_PORT = 9300; >> > private static final Logger LOG = >> > LoggerFactory.getLogger(ElasticsearchSink.class); >> > >> > /** >> > * The user specified config map that we forward to Elasticsearch >> when >> > we create the Client. >> > */ >> > private final Map userConfig; >> > >> > /** >> > * The builder that is used to construct an {@link IndexRequest} >> from >> > the incoming element. >> > */ >> > private final IndexRequestBuilder indexRequestBuilder; >> > >> > /** >> > * The Client that was either retrieved from a Node or is a >> > TransportClient. >> > */ >> > private transient Client client; >> > >> > /** >> > * Bulk processor that was created using the client >> > */ >> > private transient BulkProcessor bulkProcessor; >> > >> > /** >> > * This is set from inside the BulkProcessor listener if there where >> > failures in processing. >> > */ >> > private final AtomicBoolean hasFailure = new AtomicBoolean(false); >> > >> > /** >> > * This is set from inside the BulkProcessor listener if a >> Throwable was >> > thrown during processing. >> > */ >> > private final AtomicReference failureThrowable = new >> > AtomicReference(); >> > >> > public ElasticsearchSink(Map userConfig, >> > IndexRequestBuilder indexRequestBuilder) { >> > this.userConfig = userConfig; >> > this.indexRequestBuilder = indexRequestBuilder; >> > } >> > >> > >> > @Override >> > public void open(Configuration configuration) { >> > >> > ParameterTool params = ParameterTool.fromMap(userConfig); >> > Settings settings = Settings.settingsBuilder() >> > .put(userConfig) >> > .build(); >> > >> > TransportClient transportClient = >> > TransportClient.builder().settings(settings).build(); >> > for (String server : params.get("esHost").split(";")) >> > { >> > String[] components = server.trim().split(":"); >> > String host = components[0]; >> > int port = DEFAULT_PORT; >> > if (components.length > 1) >> > { >> > port = Integer.parseInt(components[1]); >> > } >> > >> > try { >> > transportClient = >> transportClient.addTransportAddress(new >> >