[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197591#comment-15197591 ] ASF GitHub Bot commented on STORM-1483: --- Github user ptgoetz commented on the pull request: https://github.com/apache/storm/pull/1219#issuecomment-197404109 +1 > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198506#comment-15198506 ] ASF GitHub Bot commented on STORM-1483: --- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/1219#issuecomment-197625582 Thanks @vesense merged into 1.x-branch > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198507#comment-15198507 ] ASF GitHub Bot commented on STORM-1483: --- Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/1219 > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15197325#comment-15197325 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1219#issuecomment-197335297 The build error is unrelated. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15196911#comment-15196911 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1219#issuecomment-197188155 cc @harshach > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15196896#comment-15196896 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-197184879 @harshach Done. -> https://github.com/apache/storm/pull/1219 > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15196895#comment-15196895 ] ASF GitHub Bot commented on STORM-1483: --- GitHub user vesense opened a pull request: https://github.com/apache/storm/pull/1219 STORM-1483: add storm-mongodb connector for 1.x-branch You can merge this pull request into a Git repository by running: $ git pull https://github.com/vesense/storm STORM-1483-1.x Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1219.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1219 commit 45fe4595c64bb7772cafd3d4bd9923b2429e835b Author: Xin WangDate: 2016-01-24T14:05:23Z STORM-1483: add storm-mongodb connector > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15195608#comment-15195608 ] ASF GitHub Bot commented on STORM-1483: --- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-196913861 @vesense thanks for the patch. Merged into master. Can you open up the same PR against 1.x-branch as well. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15195606#comment-15195606 ] ASF GitHub Bot commented on STORM-1483: --- Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/1038 > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193319#comment-15193319 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-196319894 ping @harshach > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192163#comment-15192163 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-195888153 @harshach Done. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192144#comment-15192144 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-195881629 @harshach OK. I'll update it right away. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192142#comment-15192142 ] ASF GitHub Bot commented on STORM-1483: --- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-195881377 +1. Can you please squash additional commits. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192141#comment-15192141 ] ASF GitHub Bot commented on STORM-1483: --- Github user harshach commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-195881257 @vesense I can sponsor this connector. Please add me as sponsor in README. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15182561#comment-15182561 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-193087867 I filed some follow on JIRAs ([STORM-1573](https://issues.apache.org/jira/browse/STORM-1573) [STORM-1607](https://issues.apache.org/jira/browse/STORM-1607)) for the next phase. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15146340#comment-15146340 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-183808322 @2new Maybe Storm 1.0.0 or 2.0.0 will include this.(Storm PMC members will make the final decision). Actually, I hope the 1.0 release can include this. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15135674#comment-15135674 ] ASF GitHub Bot commented on STORM-1483: --- Github user 2new commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-180716979 What's the status of this PR? which Storm version will include `storm-mongodb`? > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15123560#comment-15123560 ] ASF GitHub Bot commented on STORM-1483: --- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-176802552 +1 the changes look good to me. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15123982#comment-15123982 ] ASF GitHub Bot commented on STORM-1483: --- Github user ptgoetz commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-176912782 +1 I've not used MongoDB in ages, so I'm in the same boat as @revans2. Are there any other committers that are willing to sponsor this so it is maintained moving forward? > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15123065#comment-15123065 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r51230032 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.MongoDBClient; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class MongoState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); + +private Options options; +private MongoDBClient mongoClient; +private Map map; + +protected MongoState(Map map, int partitionIndex, int numPartitions, Options options) { +this.options = options; +this.map = map; +} + +public static class Options implements Serializable { +private String url; +private String collectionName; +private MongoMapper mapper; + +public Options withUrl(String url) { +this.url = url; +return this; +} + +public Options withCollectionName(String collectionName) { +this.collectionName = collectionName; +return this; +} + +public Options withMapper(MongoMapper mapper) { +this.mapper = mapper; +return this; +} +} + +protected void prepare() { +Validate.notEmpty(options.url, "url can not be blank or null"); +Validate.notEmpty(options.collectionName, "collectionName can not be blank or null"); +Validate.notNull(options.mapper, "MongoMapper can not be null"); + +this.mongoClient = new MongoDBClient(options.url, options.collectionName); +} + +@Override +public void beginCommit(Long txid) { +LOG.debug("beginCommit is noop."); +} + +@Override +public void commit(Long txid) { +LOG.debug("commit is noop."); +} + +public void updateState(List tuples, TridentCollector collector) { +List documents = Lists.newArrayList(); +for (TridentTuple tuple : tuples) { +Document document = options.mapper.toDocument(tuple); +documents.add(document); +} +this.mongoClient.insert(documents); --- End diff -- Updated. Thanks @revans2 > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15123103#comment-15123103 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r51231367 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java --- @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.bolt; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.QueryFilterCreator; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.bson.Document; +import org.bson.conversions.Bson; + +/** + * Basic bolt for updating from MongoDB. + * + * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection. + * + */ +public class MongoUpdateBolt extends AbstractMongoBolt { --- End diff -- I add `upsert` option to `MongoUpdateBolt`. Thanks @arunmahadevan > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15123150#comment-15123150 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-176628173 @revans2 I think we can merge this to master and 1.x branch. What do you think? > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15121625#comment-15121625 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r51131897 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java --- @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.bolt; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.QueryFilterCreator; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.bson.Document; +import org.bson.conversions.Bson; + +/** + * Basic bolt for updating from MongoDB. + * + * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection. + * + */ +public class MongoUpdateBolt extends AbstractMongoBolt { --- End diff -- In MongoDB, `upsert` is an option of `UpdateRequest`: >whether this update will insert a new document if no documents match the filter. The default is false. I don't think it's a better way to merge insert for inserting scenario. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15121627#comment-15121627 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-176224452 @ptgoetz @HeartSaVioR Could you have a chance to take a look? > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15121626#comment-15121626 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r51132020 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.MongoDBClient; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class MongoState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); + +private Options options; +private MongoDBClient mongoClient; +private Map map; + +protected MongoState(Map map, int partitionIndex, int numPartitions, Options options) { +this.options = options; +this.map = map; +} + +public static class Options implements Serializable { +private String url; +private String collectionName; +private MongoMapper mapper; + +public Options withUrl(String url) { +this.url = url; +return this; +} + +public Options withCollectionName(String collectionName) { +this.collectionName = collectionName; +return this; +} + +public Options withMapper(MongoMapper mapper) { +this.mapper = mapper; +return this; +} +} + +protected void prepare() { +Validate.notEmpty(options.url, "url can not be blank or null"); +Validate.notEmpty(options.collectionName, "collectionName can not be blank or null"); +Validate.notNull(options.mapper, "MongoMapper can not be null"); + +this.mongoClient = new MongoDBClient(options.url, options.collectionName); +} + +@Override +public void beginCommit(Long txid) { +LOG.debug("beginCommit is noop."); +} + +@Override +public void commit(Long txid) { +LOG.debug("commit is noop."); +} + +public void updateState(List tuples, TridentCollector collector) { +List documents = Lists.newArrayList(); +for (TridentTuple tuple : tuples) { +Document document = options.mapper.toDocument(tuple); +documents.add(document); +} +this.mongoClient.insert(documents); --- End diff -- @revans2 Like RDBMS, if no unique index created or having a different primary key(`_id`), the existing documents can be inserted again. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15121638#comment-15121638 ] ASF GitHub Bot commented on STORM-1483: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r51133208 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.MongoDBClient; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class MongoState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); + +private Options options; +private MongoDBClient mongoClient; +private Map map; + +protected MongoState(Map map, int partitionIndex, int numPartitions, Options options) { +this.options = options; +this.map = map; +} + +public static class Options implements Serializable { +private String url; +private String collectionName; +private MongoMapper mapper; + +public Options withUrl(String url) { +this.url = url; +return this; +} + +public Options withCollectionName(String collectionName) { +this.collectionName = collectionName; +return this; +} + +public Options withMapper(MongoMapper mapper) { +this.mapper = mapper; +return this; +} +} + +protected void prepare() { +Validate.notEmpty(options.url, "url can not be blank or null"); +Validate.notEmpty(options.collectionName, "collectionName can not be blank or null"); +Validate.notNull(options.mapper, "MongoMapper can not be null"); + +this.mongoClient = new MongoDBClient(options.url, options.collectionName); +} + +@Override +public void beginCommit(Long txid) { +LOG.debug("beginCommit is noop."); +} + +@Override +public void commit(Long txid) { +LOG.debug("commit is noop."); +} + +public void updateState(List tuples, TridentCollector collector) { +List documents = Lists.newArrayList(); +for (TridentTuple tuple : tuples) { +Document document = options.mapper.toDocument(tuple); +documents.add(document); +} +this.mongoClient.insert(documents); --- End diff -- Then my only comment would be to add that to the documentation. If there is no primary key provided trident state inserts in the case of failures may result in duplicate documents. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117299#comment-15117299 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-175048700 I fixed some issues. Work is going on for `upsert`. And I will file a new JIRA for the next phase including `MongoMapState`. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117301#comment-15117301 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r50841890 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.MongoDBClient; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class MongoState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); + +private Options options; +private MongoDBClient mongoClient; +private Map map; + +protected MongoState(Map map, int partitionIndex, int numPartitions, Options options) { --- End diff -- OK > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117300#comment-15117300 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r50841871 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java --- @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.common; + +import java.util.List; + +import org.bson.Document; +import org.bson.conversions.Bson; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +public class MongoDBClient { + +private MongoClient client; +private MongoCollection collection; + +public MongoDBClient(String url, String collectionName) { +//Creates a MongoURI from the given string. +MongoClientURI uri = new MongoClientURI(url); +//Creates a MongoClient described by a URI. +this.client = new MongoClient(uri); +//Gets a Database. +MongoDatabase db = client.getDatabase(uri.getDatabase()); +//Gets a collection. +this.collection = db.getCollection(collectionName); +} + +public void insert(Document document) { +collection.insertOne(document); +} + +public void insert(List documents) { +//This method is equivalent to a call to the bulkWrite method. +collection.insertMany(documents); +} + +public void update(Bson filter, Bson update) { +//Update all documents in the collection --- End diff -- OK > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117351#comment-15117351 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r50846026 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java --- @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.bolt; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.QueryFilterCreator; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.bson.Document; +import org.bson.conversions.Bson; + +/** + * Basic bolt for updating from MongoDB. + * + * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection. + * + */ +public class MongoUpdateBolt extends AbstractMongoBolt { --- End diff -- Since `MongoUpdateBolt` uses `QueryFilterCreator` for creating a Mongo query Filter, maybe it's more clear to the user. @arunmahadevan 's advice is also make sence. I'd like to listen to other people's opinions. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117361#comment-15117361 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r50847058 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.MongoDBClient; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class MongoState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); + +private Options options; +private MongoDBClient mongoClient; +private Map map; + +protected MongoState(Map map, int partitionIndex, int numPartitions, Options options) { +this.options = options; +this.map = map; +} + +public static class Options implements Serializable { +private String url; +private String collectionName; +private MongoMapper mapper; + +public Options withUrl(String url) { +this.url = url; +return this; +} + +public Options withCollectionName(String collectionName) { +this.collectionName = collectionName; +return this; +} + +public Options withMapper(MongoMapper mapper) { +this.mapper = mapper; +return this; +} +} + +protected void prepare() { +Validate.notEmpty(options.url, "url can not be blank or null"); +Validate.notEmpty(options.collectionName, "collectionName can not be blank or null"); +Validate.notNull(options.mapper, "MongoMapper can not be null"); + +this.mongoClient = new MongoDBClient(options.url, options.collectionName); +} + +@Override +public void beginCommit(Long txid) { +LOG.debug("beginCommit is noop."); +} + +@Override +public void commit(Long txid) { +LOG.debug("commit is noop."); +} + +public void updateState(List tuples, TridentCollector collector) { +List documents = Lists.newArrayList(); +for (TridentTuple tuple : tuples) { +Document document = options.mapper.toDocument(tuple); +documents.add(document); +} +this.mongoClient.insert(documents); --- End diff -- The `MongoStateUpdater` is something like `JdbcUpdater`, just for inserting data to a database. But what you state is also meaningful. I'll think about it > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117572#comment-15117572 ] ASF GitHub Bot commented on STORM-1483: --- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r50866045 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.MongoDBClient; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class MongoState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); + +private Options options; +private MongoDBClient mongoClient; +private Map map; + +protected MongoState(Map map, int partitionIndex, int numPartitions, Options options) { +this.options = options; +this.map = map; +} + +public static class Options implements Serializable { +private String url; +private String collectionName; +private MongoMapper mapper; + +public Options withUrl(String url) { +this.url = url; +return this; +} + +public Options withCollectionName(String collectionName) { +this.collectionName = collectionName; +return this; +} + +public Options withMapper(MongoMapper mapper) { +this.mapper = mapper; +return this; +} +} + +protected void prepare() { +Validate.notEmpty(options.url, "url can not be blank or null"); +Validate.notEmpty(options.collectionName, "collectionName can not be blank or null"); +Validate.notNull(options.mapper, "MongoMapper can not be null"); + +this.mongoClient = new MongoDBClient(options.url, options.collectionName); +} + +@Override +public void beginCommit(Long txid) { +LOG.debug("beginCommit is noop."); +} + +@Override +public void commit(Long txid) { +LOG.debug("commit is noop."); +} + +public void updateState(List tuples, TridentCollector collector) { +List documents = Lists.newArrayList(); +for (TridentTuple tuple : tuples) { +Document document = options.mapper.toDocument(tuple); +documents.add(document); +} +this.mongoClient.insert(documents); --- End diff -- The issue here is with failures. I am not an mongodb expert so I don't know what happens if you insert in a document that is already there. If you have a distributed state there will be multiple instances of this class spear throughout your topology. If a single tuple fails as part of the updateState trident will replay the entire batch. In that case we can try to insert documents that were already inserted. I am not worried about consistency, because there is no query possible from trident. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > --
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15117573#comment-15117573 ] ASF GitHub Bot commented on STORM-1483: --- Github user revans2 commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-175123625 I am no expert on mongodb but overall it looks rather good. @vesense if you do need a +1 from me to get this in please let me know, The code is simple enough I feel OK with doing it, but I would rather have someone else who knows the DB better then I do lend their support. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114322#comment-15114322 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1037#issuecomment-174301486 Thanks @arunmahadevan > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114319#comment-15114319 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense commented on the pull request: https://github.com/apache/storm/pull/1037#issuecomment-174301313 Yeah I met some conflicts on `pom.xml`, and the fix made too many files. I have created a new PR based on latest master. Please move to https://github.com/apache/storm/pull/1038 > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114320#comment-15114320 ] ASF GitHub Bot commented on STORM-1483: --- Github user vesense closed the pull request at: https://github.com/apache/storm/pull/1037 > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114299#comment-15114299 ] ASF GitHub Bot commented on STORM-1483: --- Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/1037#issuecomment-174293822 This PR shows much more files than whats actually changed probably due to some merge issues. Since almost all the files for the mongodb connector are new, you could just cut a fresh branch out of latest master, cherry pick the commit and force push the changes to the same PR. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114841#comment-15114841 ] ASF GitHub Bot commented on STORM-1483: --- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r50658241 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.MongoDBClient; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class MongoState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); + +private Options options; +private MongoDBClient mongoClient; +private Map map; + +protected MongoState(Map map, int partitionIndex, int numPartitions, Options options) { --- End diff -- partitionIndex and numPartitions are not used so could be removed. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114840#comment-15114840 ] ASF GitHub Bot commented on STORM-1483: --- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r50658230 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/common/MongoDBClient.java --- @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.common; + +import java.util.List; + +import org.bson.Document; +import org.bson.conversions.Bson; + +import com.mongodb.MongoClient; +import com.mongodb.MongoClientURI; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +public class MongoDBClient { + +private MongoClient client; +private MongoCollection collection; + +public MongoDBClient(String url, String collectionName) { +//Creates a MongoURI from the given string. +MongoClientURI uri = new MongoClientURI(url); +//Creates a MongoClient described by a URI. +this.client = new MongoClient(uri); +//Gets a Database. +MongoDatabase db = client.getDatabase(uri.getDatabase()); +//Gets a collection. +this.collection = db.getCollection(collectionName); +} + +public void insert(Document document) { +collection.insertOne(document); +} + +public void insert(List documents) { +//This method is equivalent to a call to the bulkWrite method. +collection.insertMany(documents); +} + +public void update(Bson filter, Bson update) { +//Update all documents in the collection --- End diff -- These could be proper javadocs > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114839#comment-15114839 ] ASF GitHub Bot commented on STORM-1483: --- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r50658222 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/bolt/MongoUpdateBolt.java --- @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.bolt; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.QueryFilterCreator; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.bson.Document; +import org.bson.conversions.Bson; + +/** + * Basic bolt for updating from MongoDB. + * + * Note: Each MongoUpdateBolt defined in a topology is tied to a specific collection. + * + */ +public class MongoUpdateBolt extends AbstractMongoBolt { --- End diff -- Why do we need two separate bolts - one for inserting and one for updating ? Can't we merge both of them into one using something like an `upsert` operation ? 'Insert if not exists' could be an option and if specified could insert the document if it does not exist. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114842#comment-15114842 ] ASF GitHub Bot commented on STORM-1483: --- Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/storm/pull/1038#discussion_r50658249 --- Diff: external/storm-mongodb/src/main/java/org/apache/storm/mongodb/trident/state/MongoState.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.mongodb.trident.state; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.storm.mongodb.common.MongoDBClient; +import org.apache.storm.mongodb.common.mapper.MongoMapper; +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.State; +import org.apache.storm.trident.tuple.TridentTuple; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class MongoState implements State { + +private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); + +private Options options; +private MongoDBClient mongoClient; +private Map map; + +protected MongoState(Map map, int partitionIndex, int numPartitions, Options options) { +this.options = options; +this.map = map; +} + +public static class Options implements Serializable { +private String url; +private String collectionName; +private MongoMapper mapper; + +public Options withUrl(String url) { +this.url = url; +return this; +} + +public Options withCollectionName(String collectionName) { +this.collectionName = collectionName; +return this; +} + +public Options withMapper(MongoMapper mapper) { +this.mapper = mapper; +return this; +} +} + +protected void prepare() { +Validate.notEmpty(options.url, "url can not be blank or null"); +Validate.notEmpty(options.collectionName, "collectionName can not be blank or null"); +Validate.notNull(options.mapper, "MongoMapper can not be null"); + +this.mongoClient = new MongoDBClient(options.url, options.collectionName); +} + +@Override +public void beginCommit(Long txid) { +LOG.debug("beginCommit is noop."); +} + +@Override +public void commit(Long txid) { +LOG.debug("commit is noop."); +} + +public void updateState(List tuples, TridentCollector collector) { +List documents = Lists.newArrayList(); +for (TridentTuple tuple : tuples) { +Document document = options.mapper.toDocument(tuple); +documents.add(document); +} +this.mongoClient.insert(documents); --- End diff -- I think we should use `update` (with upsert) here. `insert` is always going to create new documents. For instance, if using `MongoState` to store word counts, it will create multiple documents for the same word. Unless thats what we want, the values should be updated. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114845#comment-15114845 ] ASF GitHub Bot commented on STORM-1483: --- Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-174418642 Github shows `1,058 additions, 1,057 deletions not shown` for pom.xml but does not show any changes. If there are no changes, you could just revert the pom.xml. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (STORM-1483) add storm-mongodb connector
[ https://issues.apache.org/jira/browse/STORM-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114848#comment-15114848 ] ASF GitHub Bot commented on STORM-1483: --- Github user arunmahadevan commented on the pull request: https://github.com/apache/storm/pull/1038#issuecomment-174419061 May be not as a part of this PR, but would be good to have a `MapState` implementation for supporting trident's exactly once semantics. Take a look at `RedisMapState`, `HbaseMapState` etc. > add storm-mongodb connector > --- > > Key: STORM-1483 > URL: https://issues.apache.org/jira/browse/STORM-1483 > Project: Apache Storm > Issue Type: Improvement >Reporter: Xin Wang >Assignee: Xin Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)