[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173157#comment-16173157 ] ASF GitHub Bot commented on BAHIR-134: -- Github user asfgit closed the pull request at: https://github.com/apache/bahir-flink/pull/21 > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou_UTC+8 >Assignee: Hai Zhou_UTC+8 > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173156#comment-16173156 ] ASF subversion and git services commented on BAHIR-134: --- Commit f07276eef2babc52ffdb43c5fcb76f9d51b9153f in bahir-flink's branch refs/heads/master from zhouhai02 [ https://git-wip-us.apache.org/repos/asf?p=bahir-flink.git;h=f07276e ] [BAHIR-134] Add InfluxDb sink for flink stream This closes #21 > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou_UTC+8 >Assignee: Hai Zhou_UTC+8 > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173121#comment-16173121 ] ASF GitHub Bot commented on BAHIR-134: -- Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/21 Thanks. I'll merge the change. > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou_UTC+8 >Assignee: Hai Zhou_UTC+8 > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168973#comment-16168973 ] ASF GitHub Bot commented on BAHIR-134: -- Github user yew1eb commented on the issue: https://github.com/apache/bahir-flink/pull/21 Hi @rmetzger, thanks for your review. I have updated the PR. Best, Hai Zhou :beers: > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou_UTC+8 >Assignee: Hai Zhou_UTC+8 > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168970#comment-16168970 ] ASF GitHub Bot commented on BAHIR-134: -- Github user yew1eb commented on a diff in the pull request: https://github.com/apache/bahir-flink/pull/21#discussion_r139290380 --- Diff: flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.influxdb; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; + +import java.util.concurrent.TimeUnit; + +/** + * Sink to save data into a InfluxDB cluster. + */ +public class InfluxDBSink extends RichSinkFunction { + +private transient InfluxDB influxDB = null; +private final String dbName; +private final String username; +private final String password; +private final String host; +private boolean batchEnabled = true; + +/** + * Creates a new {@link InfluxDBSink} that connects to the InfluxDB server. + * + * @param host the url to connect to. + * @param username the username which is used to authorize against the influxDB instance. + * @param password the password for the username which is used to authorize against the influxDB instance. + * @param dbName the database to write to. + */ +public InfluxDBSink(String host, String username, String password, String dbName) { +this.host = Preconditions.checkNotNull(host, "host can not be null"); +this.username = Preconditions.checkNotNull(username, "username can not be null"); +this.password = Preconditions.checkNotNull(password, "password can not be null"); +this.dbName = Preconditions.checkNotNull(dbName, "dbName can not be null"); +} + +public InfluxDBSink(String host, String username, String password, String dbName, boolean batchEnabled) { +this(host, username, password, dbName); +this.batchEnabled = Preconditions.checkNotNull(batchEnabled, "batchEnabled can not be null"); +} + +/** + * Initializes the connection to InfluxDB by either cluster or sentinels or single server. + */ +@Override +public void open(Configuration parameters) throws Exception { +super.open(parameters); + +influxDB = InfluxDBFactory.connect(host, username, password); +if (!influxDB.databaseExists(dbName)) { +influxDB.createDatabase(dbName); --- End diff -- Thank you for review. I will use `throw RuntimeException` instead of helping the user to create. Because `open` method will being execute by all parallel sink tasks. > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou_UTC+8 >Assignee: Hai Zhou_UTC+8 > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168860#comment-16168860 ] ASF GitHub Bot commented on BAHIR-134: -- Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/21 Flink 1.3, the current release has still Java 7 support. But starting from Flink 1.4, Java 7 will be dropped. I'll start a quick discussion on the dev@ list, to make sure nobody disagrees. > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou >Assignee: Hai Zhou > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168422#comment-16168422 ] ASF GitHub Bot commented on BAHIR-134: -- Github user mridulm commented on the issue: https://github.com/apache/bahir-flink/pull/21 As of spark 2.2, java 7 support has been removed. I think flink also requires java 8, right ? Perhaps we can do the same for bahir > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou >Assignee: Hai Zhou > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168416#comment-16168416 ] ASF GitHub Bot commented on BAHIR-134: -- Github user rmetzger commented on the issue: https://github.com/apache/bahir-flink/pull/21 Overall, I like the change. The only problem is that it breaks the build because of missing Java 7 support. I guess we need to decide whether we want to drop java 7 support. > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou >Assignee: Hai Zhou > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168413#comment-16168413 ] ASF GitHub Bot commented on BAHIR-134: -- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/bahir-flink/pull/21#discussion_r139234041 --- Diff: flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.influxdb; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; + +import java.util.concurrent.TimeUnit; + +/** + * Sink to save data into a InfluxDB cluster. + */ +public class InfluxDBSink extends RichSinkFunction { + +private transient InfluxDB influxDB = null; +private final String dbName; +private final String username; +private final String password; +private final String host; +private boolean batchEnabled = true; + +/** + * Creates a new {@link InfluxDBSink} that connects to the InfluxDB server. + * + * @param host the url to connect to. + * @param username the username which is used to authorize against the influxDB instance. + * @param password the password for the username which is used to authorize against the influxDB instance. + * @param dbName the database to write to. + */ +public InfluxDBSink(String host, String username, String password, String dbName) { +this.host = Preconditions.checkNotNull(host, "host can not be null"); +this.username = Preconditions.checkNotNull(username, "username can not be null"); +this.password = Preconditions.checkNotNull(password, "password can not be null"); +this.dbName = Preconditions.checkNotNull(dbName, "dbName can not be null"); +} + +public InfluxDBSink(String host, String username, String password, String dbName, boolean batchEnabled) { +this(host, username, password, dbName); +this.batchEnabled = Preconditions.checkNotNull(batchEnabled, "batchEnabled can not be null"); +} + +/** + * Initializes the connection to InfluxDB by either cluster or sentinels or single server. + */ +@Override +public void open(Configuration parameters) throws Exception { +super.open(parameters); + +influxDB = InfluxDBFactory.connect(host, username, password); +if (!influxDB.databaseExists(dbName)) { +influxDB.createDatabase(dbName); +} + +if (batchEnabled) { +// Flush every 2000 Points, at least every 100ms +influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS); --- End diff -- Ideally we'll make this configurable. > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou >Assignee: Hai Zhou > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16168412#comment-16168412 ] ASF GitHub Bot commented on BAHIR-134: -- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/bahir-flink/pull/21#discussion_r139233921 --- Diff: flink-connector-influxdb/src/main/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSink.java --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.influxdb; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Point; + +import java.util.concurrent.TimeUnit; + +/** + * Sink to save data into a InfluxDB cluster. + */ +public class InfluxDBSink extends RichSinkFunction { + +private transient InfluxDB influxDB = null; +private final String dbName; +private final String username; +private final String password; +private final String host; +private boolean batchEnabled = true; + +/** + * Creates a new {@link InfluxDBSink} that connects to the InfluxDB server. + * + * @param host the url to connect to. + * @param username the username which is used to authorize against the influxDB instance. + * @param password the password for the username which is used to authorize against the influxDB instance. + * @param dbName the database to write to. + */ +public InfluxDBSink(String host, String username, String password, String dbName) { +this.host = Preconditions.checkNotNull(host, "host can not be null"); +this.username = Preconditions.checkNotNull(username, "username can not be null"); +this.password = Preconditions.checkNotNull(password, "password can not be null"); +this.dbName = Preconditions.checkNotNull(dbName, "dbName can not be null"); +} + +public InfluxDBSink(String host, String username, String password, String dbName, boolean batchEnabled) { +this(host, username, password, dbName); +this.batchEnabled = Preconditions.checkNotNull(batchEnabled, "batchEnabled can not be null"); +} + +/** + * Initializes the connection to InfluxDB by either cluster or sentinels or single server. + */ +@Override +public void open(Configuration parameters) throws Exception { +super.open(parameters); + +influxDB = InfluxDBFactory.connect(host, username, password); +if (!influxDB.databaseExists(dbName)) { +influxDB.createDatabase(dbName); --- End diff -- Maybe for the future: I would actually log an info message, stating that you've created the database. I think its a good if code always tells the user when it is doing magic. > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou >Assignee: Hai Zhou > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BAHIR-134) Add InfluxDB Sink for Flink Streaming
[ https://issues.apache.org/jira/browse/BAHIR-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143347#comment-16143347 ] ASF GitHub Bot commented on BAHIR-134: -- GitHub user yew1eb opened a pull request: https://github.com/apache/bahir-flink/pull/21 [BAHIR-134] Add InfluxDB sink for flink stream add InfluxDBSink for flink stream ## Verifying this change Add example `InfluxDBSinkExample` Runing environment: flink version 1.3.0 influxdb version: 1.3.4 influxdb-java(influxdb client) version: 2.7(Compatible with InfluxDB version 0.9 ~ 1.3.x) It works well. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yew1eb/bahir-flink BAHIR-134 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/bahir-flink/pull/21.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21 commit 1249a8c26fa261719431a269f4b46203e748774f Author: zhouhai02 Date: 2017-08-27T11:35:31Z add InfluxDb sink for flink stream > Add InfluxDB Sink for Flink Streaming > - > > Key: BAHIR-134 > URL: https://issues.apache.org/jira/browse/BAHIR-134 > Project: Bahir > Issue Type: Wish > Components: Flink Streaming Connectors >Reporter: Hai Zhou >Assignee: Hai Zhou > > InfluxDBSink via implementation RichSinkFunction. -- This message was sent by Atlassian JIRA (v6.4.14#64029)