Re: cqlinputformat and retired cqlpagingingputformat creates lots of connections to query the server
If you are using replication factor 1 and 3 cassandra nodes, 256 virtual nodes should be evenly distributed on 3 nodes. So there are totally 256 virtual nodes. But in your experiment, you saw 3*257 mapper. Is that because of the setting cassandra.input.split.size=3? It is nothing with node number=3. Otherwise, I am confused why there are 256 virtual nodes on every cassandra node. On Wed, Jan 28, 2015 at 12:29 AM, Shenghua(Daniel) Wan < wansheng...@gmail.com> wrote: > I did another experiment to verify indeed 3*257 (1 of 257 ranges is null > effectively) mappers were created. > > Thanks mcm for the information ! > > On Wed, Jan 28, 2015 at 12:17 AM, mck wrote: > >> Shenghua, >> >> > The problem is the user might only want all the data via a "select *" >> > like statement. It seems that 257 connections to query the rows are >> necessary. >> > However, is there any way to prohibit 257 concurrent connections? >> >> >> Your reasoning is correct. >> The number of connections should be tunable via the >> "cassandra.input.split.size" property. See >> ConfigHelper.setInputSplitSize(..) >> >> The problem is that vnodes completely trashes this, since splits >> returned don't span across vnodes. >> There's an issue out for this – >> https://issues.apache.org/jira/browse/CASSANDRA-6091 >> but part of the problem is that the thrift stuff involved here is >> getting rewritten¹ to be pure cql. >> >> In the meantime you override the CqlInputFormat and manually re-merge >> splits together, where location sets match, so to better honour >> inputSplitSize and to return to a more reasonable number of connections. >> We do this, using code similar to this patch >> https://github.com/michaelsembwever/cassandra/pull/2/files >> >> ~mck >> >> ¹ https://issues.apache.org/jira/browse/CASSANDRA-8358 >> > > > > -- > > Regards, > Shenghua (Daniel) Wan >
Re: cqlinputformat and retired cqlpagingingputformat creates lots of connections to query the server
In that case, each node will have 256/3 connections at most. Still 256 mappers. Someone please correct me if I am wrong. On Tue, Jan 27, 2015 at 11:04 PM, Shenghua(Daniel) Wan < wansheng...@gmail.com> wrote: > Hi, Huiliang, > Great to hear from you, again! > Image you have 3 nodes, replication factor=1, and using default number of > tokens. You will have 3*256 mappers... In that case, you will be soon out > of mappers or reach the limit. > > > On Tue, Jan 27, 2015 at 10:59 PM, Huiliang Zhang wrote: > >> Hi Shenghua, as I understand, each range is assigned to a mapper. Mapper >> will not share connections. So, it needs at least 256 connections to read >> all. But all 256 connections should not be set up at the same time unless >> you have 256 mappers running at the same time. >> >> On Tue, Jan 27, 2015 at 9:34 PM, Shenghua(Daniel) Wan < >> wansheng...@gmail.com> wrote: >> >>> By default, each C* node is set with 256 tokens. On a local 1-node C* >>> server, my hadoop drop creates 256 connections to the server. Is there any >>> way to control this behavior? e.g. reduce the number of connections to a >>> pre-configured gap. >>> >>> I debugged C* source code and found the client asks for partition >>> ranges, or virtual nodes. Then the client was told by server there were 257 >>> ranges, corresponding to 257 column family splits. >>> >>> Here is a snapshot of my logs >>> >>> 15/01/27 18:02:20 DEBUG hadoop.AbstractColumnFamilyInputFormat: adding >>> ColumnFamilySplit((9121856086738887846, '-9223372036854775808] @[localhost]) >>> ... >>> totally 257 splits. >>> >>> The problem is the user might only want all the data via a "select *" >>> like statement. It seems that 257 connections to query the rows are >>> necessary. However, is there any way to prohibit 257 concurrent >>> connections? >>> >>> My C* version is 2.0.11 and I also tried CqlPagingInputFormat, which has >>> same behavior. >>> >>> Thank you. >>> >>> -- >>> >>> Regards, >>> Shenghua (Daniel) Wan >>> >> >> > > > -- > > Regards, > Shenghua (Daniel) Wan >
Re: cqlinputformat and retired cqlpagingingputformat creates lots of connections to query the server
Hi Shenghua, as I understand, each range is assigned to a mapper. Mapper will not share connections. So, it needs at least 256 connections to read all. But all 256 connections should not be set up at the same time unless you have 256 mappers running at the same time. On Tue, Jan 27, 2015 at 9:34 PM, Shenghua(Daniel) Wan wrote: > By default, each C* node is set with 256 tokens. On a local 1-node C* > server, my hadoop drop creates 256 connections to the server. Is there any > way to control this behavior? e.g. reduce the number of connections to a > pre-configured gap. > > I debugged C* source code and found the client asks for partition ranges, > or virtual nodes. Then the client was told by server there were 257 ranges, > corresponding to 257 column family splits. > > Here is a snapshot of my logs > > 15/01/27 18:02:20 DEBUG hadoop.AbstractColumnFamilyInputFormat: adding > ColumnFamilySplit((9121856086738887846, '-9223372036854775808] @[localhost]) > ... > totally 257 splits. > > The problem is the user might only want all the data via a "select *" like > statement. It seems that 257 connections to query the rows are necessary. > However, is there any way to prohibit 257 concurrent connections? > > My C* version is 2.0.11 and I also tried CqlPagingInputFormat, which has > same behavior. > > Thank you. > > -- > > Regards, > Shenghua (Daniel) Wan >
Re: EC2 cassandra cluster node address problem
It should be good to use broadcast_address for my purpose. The only problem is coming from opscenter now because it cannot recognize the datacenter from the ip range. On Wed, Jun 25, 2014 at 7:13 PM, Michael Shuler wrote: > On 06/25/2014 09:05 PM, Huiliang Zhang wrote: > >> Thanks. In 2.0.6, a setting is like this: >> >> # Address to broadcast to other Cassandra nodes >> # Leaving this blank will set it to the same value as listen_address >> # broadcast_address: 1.2.3.4 >> >> If it is changing to elastic ip, it will cause other kinds of problem >> like Unknown Datacenter in opscenter. >> > > Right. That setting serves a different purpose :) > > -- > Kind regards, > Michael >
Re: EC2 cassandra cluster node address problem
Thanks. In 2.0.6, a setting is like this: # Address to broadcast to other Cassandra nodes # Leaving this blank will set it to the same value as listen_address # broadcast_address: 1.2.3.4 If it is changing to elastic ip, it will cause other kinds of problem like Unknown Datacenter in opscenter. On Wed, Jun 25, 2014 at 6:49 PM, Michael Shuler wrote: > On 06/25/2014 08:31 PM, Huiliang Zhang wrote: > >> Thanks, get it. It is working after I translate the private ip to >> elastic ip. >> > > This sounds like a nice way to work around a known networking limitation > when using EC2. Glad that worked out OK. In 2.1+, cassandra.yaml includes a > broadcast_rpc_address setting to help mitigate this limitation for client > connections. > > # RPC address to broadcast to drivers and other Cassandra nodes. This > cannot > # be set to 0.0.0.0. If left blank, this will be set to the value of > # rpc_address. If rpc_address is set to 0.0.0.0, broadcast_rpc_address must > # be set. > # broadcast_rpc_address: 1.2.3.4 > > -- > Kind regards, > Michael >
Re: EC2 cassandra cluster node address problem
Thanks, get it. It is working after I translate the private ip to elastic ip. On Wed, Jun 25, 2014 at 1:51 PM, Andrey Ilinykh wrote: > yes, of course. Private ip is real ip address of node. Cassandra can > listen on this ip only. elastic ip is external. It belongs to AWS firewall. > It is similar to your web router. You can forward your external port to > local one, but application running on your local node doesn't know anything > about it. > > > On Wed, Jun 25, 2014 at 1:25 PM, Huiliang Zhang wrote: > >> Thanks. In fact, it is Cassandra that returns private ip of nodes to my >> program by: >> >> client.describe_ring(keyspace) >> >> Then the program will start communicate with Cassandra through the >> private ips. One way is to translate the ips myself. >> >> >> On Tue, Jun 24, 2014 at 10:40 PM, Andrey Ilinykh >> wrote: >> >>> you can set rpc_address to 0.0.0.0, then it will listen on all >>> interfaces. Also you have to modify security group settings to allow >>> incoming connection for port 9160. But it is a really bad idea. By this >>> way you open your cluster to whole world, ssh tunnel is the best way. >>> >>> >>> On Tue, Jun 24, 2014 at 10:01 PM, Huiliang Zhang >>> wrote: >>> >>>> Thanks. Is there a way to configure Cassandra to use elastic ip instead >>>> of private ip? >>>> >>>> >>>> On Tue, Jun 24, 2014 at 9:29 PM, Andrey Ilinykh >>>> wrote: >>>> >>>>> Cassandra knows nothing about elastic ip. You have to use ssh tunnel >>>>> or run your client on ec2 instance. >>>>> >>>>> Thank you, >>>>> Andrey >>>>> >>>>> >>>>> On Tue, Jun 24, 2014 at 8:55 PM, Huiliang Zhang >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am using Cassandra on EC2 instances. My cassandra always returns >>>>>> private ips of the instances to the thrift program. Then the program >>>>>> cannot >>>>>> connect to the private ips. >>>>>> >>>>>> I already changed the >>>>>> rpc_address: elastic ip >>>>>> rpc_address: elastic ip >>>>>> >>>>>> Then I restarted the cassandra cluster. But the system.peers still >>>>>> save the private ips as peer address. >>>>>> >>>>>> How to fix this? >>>>>> >>>>>> Thanks, >>>>>> Huiliang >>>>>> >>>>>> >>>>> >>>> >>> >> >
Re: EC2 cassandra cluster node address problem
Thanks. In fact, it is Cassandra that returns private ip of nodes to my program by: client.describe_ring(keyspace) Then the program will start communicate with Cassandra through the private ips. One way is to translate the ips myself. On Tue, Jun 24, 2014 at 10:40 PM, Andrey Ilinykh wrote: > you can set rpc_address to 0.0.0.0, then it will listen on all interfaces. > Also you have to modify security group settings to allow incoming > connection for port 9160. But it is a really bad idea. By this way you > open your cluster to whole world, ssh tunnel is the best way. > > > On Tue, Jun 24, 2014 at 10:01 PM, Huiliang Zhang wrote: > >> Thanks. Is there a way to configure Cassandra to use elastic ip instead >> of private ip? >> >> >> On Tue, Jun 24, 2014 at 9:29 PM, Andrey Ilinykh >> wrote: >> >>> Cassandra knows nothing about elastic ip. You have to use ssh tunnel or >>> run your client on ec2 instance. >>> >>> Thank you, >>> Andrey >>> >>> >>> On Tue, Jun 24, 2014 at 8:55 PM, Huiliang Zhang >>> wrote: >>> >>>> Hi, >>>> >>>> I am using Cassandra on EC2 instances. My cassandra always returns >>>> private ips of the instances to the thrift program. Then the program cannot >>>> connect to the private ips. >>>> >>>> I already changed the >>>> rpc_address: elastic ip >>>> rpc_address: elastic ip >>>> >>>> Then I restarted the cassandra cluster. But the system.peers still save >>>> the private ips as peer address. >>>> >>>> How to fix this? >>>> >>>> Thanks, >>>> Huiliang >>>> >>>> >>> >> >
Re: Use Cassnadra thrift API with collection type
Yes, I realized the way to use CQL. I checked the way how map data is represented by using cassandra-cli. For each element in the map, it use the key as part of the column name and value as the column value. I just cannot insert this by using thrift API because I already defined a CompositeType column comparator. Is it a way to run a second program to insert map data with a different comparator? Thanks. On Mon, Jun 23, 2014 at 10:21 AM, Sylvain Lebresne wrote: > On Mon, Jun 23, 2014 at 6:19 PM, James Campbell < > ja...@breachintelligence.com> wrote: > >> Huilang, >> >> >> Since there hasn't been another reply yet, I'll throw out an idea that >> worked for us as part of a test, though it does not seem exactly like a >> "preferred" way since it crosses code-bases. We built the type using >> straight java type, then used the Datastax v2 driver's DataType class >> serializer. >> >> >> Concretely, it would look like the following (adapting your code): >> >> Column column = new Column(); >> column.name=columnSerializer.toByteBuffer(colname); // the >> column name of the map type, it works with other kinds of data type >> >> column.value = DataType.map(DataType.ascii, >> DataType.decimal).serialize(yourMapGoesHere); >> column.timestamp = new Date().getTime(); >> >> ... >> > > This is exactly equivalent to what Huiliang posted and will thus not work > any better. > > Collections are internally not store as one "thrift column" per > collection. Each element of the collection is a separate "thrift column" > and the exact encoding depends on the collection. The fact is, updating CQL > collection from thrift is technically possible but it is not recommended in > any way. I strongly advise you to stick to CQL if you want to use CQL > collections. > > -- > Sylvain > >> >> >> -- >> *From:* Huiliang Zhang >> *Sent:* Friday, June 20, 2014 10:10 PM >> *To:* user@cassandra.apache.org >> *Subject:* Use Cassnadra thrift API with collection type >> >> Hi, >> >> I have a problem when insert data of the map type into a cassandra >> table. I tried all kinds of MapSerializer to serialize the Map data and did >> not succeed. >> >> My code is like this: >> Column column = new Column(); >> column.name=columnSerializer.toByteBuffer(colname); // the >> column name of the map type, it works with other kinds of data type >> column.value = >> MapSerializer.getInstance(AsciiSerializer.instance, >> DecimalSerializer.instance).serialize(someMapData); >> column.timestamp = new Date().getTime(); >> >> Mutation mutation = new Mutation(); >> mutation.column_or_supercolumn = new ColumnOrSuperColumn(); >> mutation.column_or_supercolumn.column = column; >> mutationList.add(mutation); >> >> The data was input into the cassandra DB however it cannot be retrieved >> by CQL3 with the following error: >> ERROR 14:32:48,192 Exception in thread Thread[Thrift:4,5,main] >> java.lang.AssertionError >> at >> org.apache.cassandra.cql3.statements.ColumnGroupMap.getCollection(ColumnGroupMap.java:88) >> at >> org.apache.cassandra.cql3.statements.SelectStatement.getCollectionValue(SelectStatement.java:1185) >> at >> org.apache.cassandra.cql3.statements.SelectStatement.handleGroup(SelectStatement.java:1169) >> at >> org.apache.cassandra.cql3.statements.SelectStatement.processColumnFamily(SelectStatement.java:1076) >> ... >> >> So the question is how to write map data into cassandra by thrift API. >> Appreciated for any help. >> >> Thanks, >> Huiliang >> >> >> >> >
Re: EC2 cassandra cluster node address problem
Thanks. Is there a way to configure Cassandra to use elastic ip instead of private ip? On Tue, Jun 24, 2014 at 9:29 PM, Andrey Ilinykh wrote: > Cassandra knows nothing about elastic ip. You have to use ssh tunnel or > run your client on ec2 instance. > > Thank you, > Andrey > > > On Tue, Jun 24, 2014 at 8:55 PM, Huiliang Zhang wrote: > >> Hi, >> >> I am using Cassandra on EC2 instances. My cassandra always returns >> private ips of the instances to the thrift program. Then the program cannot >> connect to the private ips. >> >> I already changed the >> rpc_address: elastic ip >> rpc_address: elastic ip >> >> Then I restarted the cassandra cluster. But the system.peers still save >> the private ips as peer address. >> >> How to fix this? >> >> Thanks, >> Huiliang >> >> >
EC2 cassandra cluster node address problem
Hi, I am using Cassandra on EC2 instances. My cassandra always returns private ips of the instances to the thrift program. Then the program cannot connect to the private ips. I already changed the rpc_address: elastic ip rpc_address: elastic ip Then I restarted the cassandra cluster. But the system.peers still save the private ips as peer address. How to fix this? Thanks, Huiliang
Use Cassnadra thrift API with collection type
Hi, I have a problem when insert data of the map type into a cassandra table. I tried all kinds of MapSerializer to serialize the Map data and did not succeed. My code is like this: Column column = new Column(); column.name=columnSerializer.toByteBuffer(colname); // the column name of the map type, it works with other kinds of data type column.value = MapSerializer.getInstance(AsciiSerializer.instance, DecimalSerializer.instance).serialize(someMapData); column.timestamp = new Date().getTime(); Mutation mutation = new Mutation(); mutation.column_or_supercolumn = new ColumnOrSuperColumn(); mutation.column_or_supercolumn.column = column; mutationList.add(mutation); The data was input into the cassandra DB however it cannot be retrieved by CQL3 with the following error: ERROR 14:32:48,192 Exception in thread Thread[Thrift:4,5,main] java.lang.AssertionError at org.apache.cassandra.cql3.statements.ColumnGroupMap.getCollection(ColumnGroupMap.java:88) at org.apache.cassandra.cql3.statements.SelectStatement.getCollectionValue(SelectStatement.java:1185) at org.apache.cassandra.cql3.statements.SelectStatement.handleGroup(SelectStatement.java:1169) at org.apache.cassandra.cql3.statements.SelectStatement.processColumnFamily(SelectStatement.java:1076) ... So the question is how to write map data into cassandra by thrift API. Appreciated for any help. Thanks, Huiliang
A problem with truncate and bulk loader
Hi, I have a very strange problem with Cassandra bulk loader. Appreciated for explanations. I am using a local cassandra server 2.0.5 with default setting. 1. I created a table A and load 108 rows into it by using a hadoop program with "org.apache.cassandra.hadoop.BulkOutputFormat". 2. I run "truncate A" to remove all the records in cqlsh. Now 0 row is returned when run "select * from A". 3. I use the same hadoop program to load only the first 12 rows into A. 4. Run "select * from A". Now all 108 rows are back. 5. I stopped the cassandra server by pressing ^c. I removed all files in the /var/log/cassandra and start the cassandra server again using "./cassandra -f". 6. I repeated the steps 3-4. All 108 rows are back again 7. In cqlsh, I run "delete from A where A.a='100'". I use the same program to load the first 12 rows into A. This time, the rows with 'A.a=100' never appear when I run "select * from A" 8. The rows with "A.a=100" will reappear after I truncate the table and repeat step 3-4. Still, all 108 rows are back. Too many strange things here. Every one seems unexplainable. My local cassandra and hadoop program are both run on a MAC machine. The table A is defined as: CREATE TABLE A ( a text, b text, value text, PRIMARY KEY (a, b) ) WITH COMPACT STORAGE AND bloom_filter_fp_chance=0.10 AND caching='KEYS_ONLY' AND comment='' AND dclocal_read_repair_chance=0.00 AND gc_grace_seconds=864000 AND index_interval=128 AND read_repair_chance=1.00 AND replicate_on_write='true' AND populate_io_cache_on_flush='false' AND default_time_to_live=0 AND speculative_retry='99.0PERCENTILE' AND memtable_flush_period_in_ms=0 AND compaction={'class': 'LeveledCompactionStrategy'} AND compression={'sstable_compression': 'SnappyCompressor'}; Thanks, Huiliang
Re: Can Cassandra client programs use hostnames instead of IPs?
Thanks. My case is that there is no public ip and VPN cannot be set up. It seems that I have to run EMR job to operate on the AWS cassandra cluster. I got some timeout errors during running the EMR job as: java.lang.RuntimeException: Could not retrieve endpoint ranges: at org.apache.cassandra.hadoop.BulkRecordWriter$ExternalClient.init(BulkRecordWriter.java:333) at org.apache.cassandra.io.sstable.SSTableLoader.stream(SSTableLoader.java:149) at org.apache.cassandra.io.sstable.SSTableLoader.stream(SSTableLoader.java:144) at org.apache.cassandra.hadoop.BulkRecordWriter.close(BulkRecordWriter.java:228) at org.apache.cassandra.hadoop.BulkRecordWriter.close(BulkRecordWriter.java:213) at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.close(MapTask.java:658) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:773) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132) at org.apache.hadoop.mapred.Child.main(Child.java:249) Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection timed out at org.apache.thrift.transport.TSocket.open(TSocket.java:183) at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) at org.apache.cassandra.hadoop.BulkRecordWriter$ExternalClient.createThriftClient(BulkRecordWriter.java:348) at org.apache.cassandra.hadoop.BulkRecordWriter$ExternalClient.init(BulkRecordWriter.java:293) ... 12 more Caused by: java.net.ConnectException: Connection timed out at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:579) at org.apache.thrift.transport.TSocket.open(TSocket.java:178) ... 15 more Appreciated if some suggestions are provided. On Tue, May 13, 2014 at 7:45 AM, Ben Bromhead wrote: > You can set listen_address in cassandra.yaml to a hostname ( > http://www.datastax.com/documentation/cassandra/2.0/cassandra/configuration/configCassandra_yaml_r.html > ). > > Cassandra will use the IP address returned by a DNS query for that > hostname. On AWS you don't have to assign an elastic IP, all instances will > come with a public IP that lasts its lifetime (if you use ec2-classic or > your VPC is set up to assign them). > > Note that whatever hostname you set in a nodes listen_address, it will > need to return the private IP as AWS instances only have network access via > there private address. Traffic to a instances public IP is NATed and > forwarded to the private address. So you may as well just use the nodes IP > address. > > If you run hadoop on instances in the same AWS region it will be able to > access your Cassandra cluster via private IP. If you run hadoop externally > just use the public IPs. > > If you run in a VPC without public addressing and want to connect from > external hosts you will want to look at a VPN ( > http://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/VPC_VPN.html). > > Ben Bromhead > Instaclustr | www.instaclustr.com | > @instaclustr<http://twitter.com/instaclustr> | > +61 415 936 359 > > > > > On 13/05/2014, at 4:31 AM, Huiliang Zhang wrote: > > Hi, > > Cassandra returns ips of the nodes in the cassandra cluster for further > communication between hadoop program and the casandra cluster. Is there a > way to configure the cassandra cluster to return hostnames instead of ips? > My cassandra cluster is on AWS and has no elastic ips which can be accessed > outside AWS. > > Thanks, > Huiliang > > > >
Can Cassandra client programs use hostnames instead of IPs?
Hi, Cassandra returns ips of the nodes in the cassandra cluster for further communication between hadoop program and the casandra cluster. Is there a way to configure the cassandra cluster to return hostnames instead of ips? My cassandra cluster is on AWS and has no elastic ips which can be accessed outside AWS. Thanks, Huiliang