IGNITE-3314 Implement Serializable in org.apache.ignite.cache.store.cassandra.datasource.DataSource. Fixes #974.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2ba74af4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2ba74af4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2ba74af4 Branch: refs/heads/ignite-3443 Commit: 2ba74af4ebfc66631b7f0cd764c7493952da4112 Parents: 799b8ec Author: Igor Rudyak <irud...@gmail.com> Authored: Tue Sep 13 18:07:10 2016 +0700 Committer: Alexey Kuznetsov <akuznet...@apache.org> Committed: Tue Sep 13 18:07:10 2016 +0700 ---------------------------------------------------------------------- .../store/cassandra/datasource/Credentials.java | 4 +- .../store/cassandra/datasource/DataSource.java | 119 ++++++++++++-- .../cassandra/datasource/PlainCredentials.java | 3 + .../tests/DatasourceSerializationTest.java | 158 +++++++++++++++++++ .../tests/utils/CassandraAdminCredentials.java | 6 +- .../utils/CassandraRegularCredentials.java | 6 +- 6 files changed, 280 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java index e1fd60c..a2358a6 100644 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java +++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java @@ -17,10 +17,12 @@ package org.apache.ignite.cache.store.cassandra.datasource; +import java.io.Serializable; + /** * Provides credentials for Cassandra (instead of specifying user/password directly in Spring context XML). */ -public interface Credentials { +public interface Credentials extends Serializable { /** * Returns user name * http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java index 1ecb28f..f582aac 100644 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java +++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java @@ -31,19 +31,37 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy; import com.datastax.driver.core.policies.ReconnectionPolicy; import com.datastax.driver.core.policies.RetryPolicy; import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.LinkedList; import java.util.List; +import java.util.UUID; + import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.store.cassandra.session.CassandraSession; import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Data source abstraction to specify configuration of the Cassandra session to be used. */ -public class DataSource { +public class DataSource implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Null object, used as a replacement for those Cassandra connection options which + * don't support serialization (RetryPolicy, LoadBalancingPolicy and etc). + */ + private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9"); + /** Number of rows to immediately fetch in CQL statement execution. */ private Integer fetchSize; @@ -324,7 +342,7 @@ public class DataSource { * @param plc Load balancing policy. */ public void setLoadBalancingPolicy(LoadBalancingPolicy plc) { - this.loadBalancingPlc = plc; + loadBalancingPlc = plc; invalidate(); } @@ -336,7 +354,7 @@ public class DataSource { */ @SuppressWarnings("UnusedDeclaration") public void setReconnectionPolicy(ReconnectionPolicy plc) { - this.reconnectionPlc = plc; + reconnectionPlc = plc; invalidate(); } @@ -348,7 +366,7 @@ public class DataSource { */ @SuppressWarnings("UnusedDeclaration") public void setRetryPolicy(RetryPolicy plc) { - this.retryPlc = plc; + retryPlc = plc; invalidate(); } @@ -360,7 +378,7 @@ public class DataSource { */ @SuppressWarnings("UnusedDeclaration") public void setAddressTranslator(AddressTranslator translator) { - this.addrTranslator = translator; + addrTranslator = translator; invalidate(); } @@ -372,7 +390,7 @@ public class DataSource { */ @SuppressWarnings("UnusedDeclaration") public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) { - this.speculativeExecutionPlc = plc; + speculativeExecutionPlc = plc; invalidate(); } @@ -384,7 +402,7 @@ public class DataSource { */ @SuppressWarnings("UnusedDeclaration") public void setAuthProvider(AuthProvider provider) { - this.authProvider = provider; + authProvider = provider; invalidate(); } @@ -396,7 +414,7 @@ public class DataSource { */ @SuppressWarnings("UnusedDeclaration") public void setSslOptions(SSLOptions options) { - this.sslOptions = options; + sslOptions = options; invalidate(); } @@ -408,7 +426,7 @@ public class DataSource { */ @SuppressWarnings("UnusedDeclaration") public void setPoolingOptions(PoolingOptions options) { - this.poolingOptions = options; + poolingOptions = options; invalidate(); } @@ -420,7 +438,7 @@ public class DataSource { */ @SuppressWarnings("UnusedDeclaration") public void setSocketOptions(SocketOptions options) { - this.sockOptions = options; + sockOptions = options; invalidate(); } @@ -432,7 +450,7 @@ public class DataSource { */ @SuppressWarnings("UnusedDeclaration") public void setNettyOptions(NettyOptions options) { - this.nettyOptions = options; + nettyOptions = options; invalidate(); } @@ -522,6 +540,85 @@ public class DataSource { return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log); } + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(fetchSize); + out.writeObject(readConsistency); + out.writeObject(writeConsistency); + U.writeString(out, user); + U.writeString(out, pwd); + out.writeObject(port); + out.writeObject(contactPoints); + out.writeObject(contactPointsWithPorts); + out.writeObject(maxSchemaAgreementWaitSeconds); + out.writeObject(protoVer); + U.writeString(out, compression); + out.writeObject(useSSL); + out.writeObject(collectMetrix); + out.writeObject(jmxReporting); + out.writeObject(creds); + writeObject(out, loadBalancingPlc); + writeObject(out, reconnectionPlc); + writeObject(out, addrTranslator); + writeObject(out, speculativeExecutionPlc); + writeObject(out, authProvider); + writeObject(out, sslOptions); + writeObject(out, poolingOptions); + writeObject(out, sockOptions); + writeObject(out, nettyOptions); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fetchSize = (Integer)in.readObject(); + readConsistency = (ConsistencyLevel)in.readObject(); + writeConsistency = (ConsistencyLevel)in.readObject(); + user = U.readString(in); + pwd = U.readString(in); + port = (Integer)in.readObject(); + contactPoints = (List<InetAddress>)in.readObject(); + contactPointsWithPorts = (List<InetSocketAddress>)in.readObject(); + maxSchemaAgreementWaitSeconds = (Integer)in.readObject(); + protoVer = (Integer)in.readObject(); + compression = U.readString(in); + useSSL = (Boolean)in.readObject(); + collectMetrix = (Boolean)in.readObject(); + jmxReporting = (Boolean)in.readObject(); + creds = (Credentials)in.readObject(); + loadBalancingPlc = (LoadBalancingPolicy)readObject(in); + reconnectionPlc = (ReconnectionPolicy)readObject(in); + addrTranslator = (AddressTranslator)readObject(in); + speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in); + authProvider = (AuthProvider)readObject(in); + sslOptions = (SSLOptions)readObject(in); + poolingOptions = (PoolingOptions)readObject(in); + sockOptions = (SocketOptions)readObject(in); + nettyOptions = (NettyOptions)readObject(in); + } + + /** + * Helper method used to serialize class members + * @param out the stream to write the object to + * @param obj the object to be written + * @throws IOException Includes any I/O exceptions that may occur + */ + private void writeObject(ObjectOutput out, Object obj) throws IOException { + out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj); + } + + /** + * Helper method used to deserialize class members + * @param in the stream to read data from in order to restore the object + * @throws IOException Includes any I/O exceptions that may occur + * @throws ClassNotFoundException If the class for an object being restored cannot be found + * @return deserialized object + */ + private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException { + Object obj = in.readObject(); + return NULL_OBJECT.equals(obj) ? null : obj; + } + /** * Parses consistency level provided as string. * http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java index 9d0710e..46ebdc5 100644 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java +++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java @@ -21,6 +21,9 @@ package org.apache.ignite.cache.store.cassandra.datasource; * Simple implementation of {@link Credentials} which just uses its constructor to hold user/password values. */ public class PlainCredentials implements Credentials { + /** */ + private static final long serialVersionUID = 0L; + /** User name. */ private String user; http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java new file mode 100644 index 0000000..ceb90e0 --- /dev/null +++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Host; +import com.datastax.driver.core.HostDistance; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.policies.LoadBalancingPolicy; +import com.datastax.driver.core.policies.RoundRobinPolicy; +import com.datastax.driver.core.policies.TokenAwarePolicy; + +import java.io.Serializable; +import java.lang.reflect.Field; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +import org.apache.ignite.cache.store.cassandra.datasource.Credentials; +import org.apache.ignite.cache.store.cassandra.datasource.DataSource; +import org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer; +import org.apache.ignite.tests.utils.CassandraAdminCredentials; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +/** + * Test for datasource serialization. + */ +public class DatasourceSerializationTest { + /** + * Sample class for serialization test. + */ + private static class MyLoadBalancingPolicy implements LoadBalancingPolicy, Serializable { + /** */ + private transient LoadBalancingPolicy plc = new TokenAwarePolicy(new RoundRobinPolicy()); + + /** {@inheritDoc} */ + @Override public void init(Cluster cluster, Collection<Host> hosts) { + plc.init(cluster, hosts); + } + + /** {@inheritDoc} */ + @Override public HostDistance distance(Host host) { + return plc.distance(host); + } + + /** {@inheritDoc} */ + @Override public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) { + return plc.newQueryPlan(loggedKeyspace, statement); + } + + /** {@inheritDoc} */ + @Override public void onAdd(Host host) { + plc.onAdd(host); + } + + /** {@inheritDoc} */ + @Override public void onUp(Host host) { + plc.onUp(host); + } + + /** {@inheritDoc} */ + @Override public void onDown(Host host) { + plc.onDown(host); + } + + /** {@inheritDoc} */ + @Override public void onRemove(Host host) { + plc.onRemove(host); + } + + /** {@inheritDoc} */ + @Override public void close() { + plc.close(); + } + } + + /** + * Serialization test. + */ + @Test + public void serializationTest() { + DataSource src = new DataSource(); + + Credentials cred = new CassandraAdminCredentials(); + String[] points = new String[]{"127.0.0.1", "10.0.0.2", "10.0.0.3"}; + LoadBalancingPolicy plc = new MyLoadBalancingPolicy(); + + src.setCredentials(cred); + src.setContactPoints(points); + src.setReadConsistency("ONE"); + src.setWriteConsistency("QUORUM"); + src.setLoadBalancingPolicy(plc); + + JavaSerializer serializer = new JavaSerializer(); + + ByteBuffer buff = serializer.serialize(src); + DataSource _src = (DataSource)serializer.deserialize(buff); + + Credentials _cred = (Credentials)getFieldValue(_src, "creds"); + List<InetAddress> _points = (List<InetAddress>)getFieldValue(_src, "contactPoints"); + ConsistencyLevel _readCons = (ConsistencyLevel)getFieldValue(_src, "readConsistency"); + ConsistencyLevel _writeCons = (ConsistencyLevel)getFieldValue(_src, "writeConsistency"); + LoadBalancingPolicy _plc = (LoadBalancingPolicy)getFieldValue(_src, "loadBalancingPlc"); + + assertTrue("Incorrectly serialized/deserialized credentials for Cassandra DataSource", + cred.getPassword().equals(_cred.getPassword()) && cred.getUser().equals(_cred.getUser())); + + assertTrue("Incorrectly serialized/deserialized contact points for Cassandra DataSource", + "/127.0.0.1".equals(_points.get(0).toString()) && + "/10.0.0.2".equals(_points.get(1).toString()) && + "/10.0.0.3".equals(_points.get(2).toString())); + + assertTrue("Incorrectly serialized/deserialized consistency levels for Cassandra DataSource", + ConsistencyLevel.ONE == _readCons && ConsistencyLevel.QUORUM == _writeCons); + + assertTrue("Incorrectly serialized/deserialized load balancing policy for Cassandra DataSource", + _plc instanceof MyLoadBalancingPolicy); + } + + /** + * @param obj Object. + * @param field Field name. + * @return Field value. + */ + private Object getFieldValue(Object obj, String field) { + try { + Field f = obj.getClass().getDeclaredField(field); + + f.setAccessible(true); + + return f.get(obj); + } + catch (Throwable e) { + throw new RuntimeException("Failed to get field '" + field + "' value", e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java index 66df6e7..e7047f3 100644 --- a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java +++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java @@ -20,10 +20,12 @@ package org.apache.ignite.tests.utils; import org.apache.ignite.cache.store.cassandra.datasource.Credentials; /** - * Implementation of {@link org.apache.ignite.cache.store.cassandra.datasource.Credentials} - * providing admin user/password to establish Cassandra session. + * Implementation of {@link Credentials} providing admin user/password to establish Cassandra session. */ public class CassandraAdminCredentials implements Credentials { + /** */ + private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public String getUser() { return CassandraHelper.getAdminUser(); http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java index 52937ea..7546c9b 100644 --- a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java +++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java @@ -20,10 +20,12 @@ package org.apache.ignite.tests.utils; import org.apache.ignite.cache.store.cassandra.datasource.Credentials; /** - * Implementation of {@link org.apache.ignite.cache.store.cassandra.datasource.Credentials} - * providing regular user/password to establish Cassandra session. + * Implementation of {@link Credentials} providing regular user/password to establish Cassandra session. */ public class CassandraRegularCredentials implements Credentials { + /** */ + private static final long serialVersionUID = 0L; + /** {@inheritDoc} */ @Override public String getUser() { return CassandraHelper.getRegularUser();