http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java new file mode 100644 index 0000000..4edd759 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains serializers implementation, to store BLOBs into Cassandra + */ +package org.apache.ignite.cache.store.cassandra.serializer; \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java new file mode 100644 index 0000000..e43db1d --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchExecutionAssistant.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Row; +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; + +/** + * Provides information for batch operations (loadAll, deleteAll, writeAll) of Ignite cache + * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}. + * + * @param <R> type of the result returned from batch operation. + * @param <V> type of the value used in batch operation. + */ +public interface BatchExecutionAssistant<R, V> { + /** + * Indicates if Cassandra tables existence is required for this batch operation. + * + * @return {@code true} true if table existence required. + */ + public boolean tableExistenceRequired(); + + /** + * Returns unbind CLQ statement for to be executed inside batch operation. + * + * @return Unbind CQL statement. + */ + public String getStatement(); + + /** + * Binds prepared statement to current Cassandra session. + * + * @param statement Statement. + * @param obj Parameters for statement binding. + * @return Bounded statement. + */ + public BoundStatement bindStatement(PreparedStatement statement, V obj); + + /** + * Returns Ignite cache key/value persistence settings. + * + * @return persistence settings. + */ + public KeyValuePersistenceSettings getPersistenceSettings(); + + /** + * Display name for the batch operation. + * + * @return Operation display name. + */ + public String operationName(); + + /** + * Processes particular row inside batch operation. + * + * @param row Row to process. + * @param seqNum Sequential number of the row. + */ + public void process(Row row, int seqNum); + + /** + * Checks if row/object with specified sequential number is already processed. + * + * @param seqNum object sequential number + * @return {@code true} if object is already processed + */ + public boolean alreadyProcessed(int seqNum); + + /** + * @return number of processed objects/rows. + */ + public int processedCount(); + + /** + * @return batch operation result. + */ + public R processedData(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java new file mode 100644 index 0000000..387c98f --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/BatchLoaderAssistant.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Statement; + +/** + * Provides information for loadCache operation of {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}. + */ +public interface BatchLoaderAssistant { + /** + * Returns name of the batch load operation. + * + * @return operation name. + */ + public String operationName(); + + /** + * Returns CQL statement to use in batch load operation. + * + * @return CQL statement for batch load operation. + */ + public Statement getStatement(); + + /** + * Processes each row returned by batch load operation. + * + * @param row row selected from Cassandra table. + */ + public void process(Row row); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java new file mode 100644 index 0000000..506982f --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session; + +import java.io.Closeable; + +/** + * Wrapper around Cassandra driver session, to automatically handle: + * <ul> + * <li>Keyspace and table absence exceptions</li> + * <li>Timeout exceptions</li> + * <li>Batch operations</li> + * </ul> + */ +public interface CassandraSession extends Closeable { + /** + * Execute single synchronous operation against Cassandra database. + * + * @param assistant execution assistance to perform the main operation logic. + * @param <V> type of the result returned from operation. + * + * @return result of the operation. + */ + public <V> V execute(ExecutionAssistant<V> assistant); + + /** + * Executes batch asynchronous operation against Cassandra database. + * + * @param assistant execution assistance to perform the main operation logic. + * @param data data which should be processed in batch operation. + * @param <R> type of the result returned from batch operation. + * @param <V> type of the value used in batch operation. + * + * @return result of the operation. + */ + public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data); + + /** + * Executes batch asynchronous operation to load bunch of records + * specified by CQL statement from Cassandra database + * + * @param assistant execution assistance to perform the main operation logic. + */ + public void execute(BatchLoaderAssistant assistant); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java new file mode 100644 index 0000000..95b8581 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java @@ -0,0 +1,832 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.AlreadyExistsException; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.querybuilder.Batch; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.store.cassandra.common.CassandraHelper; +import org.apache.ignite.cache.store.cassandra.common.RandomSleeper; +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; +import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool; +import org.apache.ignite.internal.processors.cache.CacheEntryImpl; + +/** + * Implementation for {@link org.apache.ignite.cache.store.cassandra.session.CassandraSession}. + */ +public class CassandraSessionImpl implements CassandraSession { + /** Number of CQL query execution attempts. */ + private static final int CQL_EXECUTION_ATTEMPTS_COUNT = 20; + + /** Min timeout between CQL query execution attempts. */ + private static final int CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT = 100; + + /** Max timeout between CQL query execution attempts. */ + private static final int CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT = 500; + + /** Timeout increment for CQL query execution attempts. */ + private static final int CQL_ATTEMPTS_TIMEOUT_INCREMENT = 100; + + /** Cassandra cluster builder. */ + private volatile Cluster.Builder builder; + + /** Cassandra driver session. */ + private volatile Session ses; + + /** Number of references to Cassandra driver session (for multithreaded environment). */ + private volatile int refCnt = 0; + + /** Storage for the session prepared statements */ + private static final Map<String, PreparedStatement> sesStatements = new HashMap<>(); + + /** Number of records to immediately fetch in CQL statement execution. */ + private Integer fetchSize; + + /** Consistency level for Cassandra READ operations (select). */ + private ConsistencyLevel readConsistency; + + /** Consistency level for Cassandra WRITE operations (insert/update/delete). */ + private ConsistencyLevel writeConsistency; + + /** Logger. */ + private IgniteLogger log; + + /** Table absence error handlers counter. */ + private final AtomicInteger tblAbsenceHandlersCnt = new AtomicInteger(-1); + + /** Prepared statement cluster disconnection error handlers counter. */ + private final AtomicInteger prepStatementHandlersCnt = new AtomicInteger(-1); + + /** + * Creates instance of Cassandra driver session wrapper. + * + * @param builder Builder for Cassandra cluster. + * @param fetchSize Number of rows to immediately fetch in CQL statement execution. + * @param readConsistency Consistency level for Cassandra READ operations (select). + * @param writeConsistency Consistency level for Cassandra WRITE operations (insert/update/delete). + * @param log Logger. + */ + public CassandraSessionImpl(Cluster.Builder builder, Integer fetchSize, ConsistencyLevel readConsistency, + ConsistencyLevel writeConsistency, IgniteLogger log) { + this.builder = builder; + this.fetchSize = fetchSize; + this.readConsistency = readConsistency; + this.writeConsistency = writeConsistency; + this.log = log; + } + + /** {@inheritDoc} */ + @Override public <V> V execute(ExecutionAssistant<V> assistant) { + int attempt = 0; + Throwable error = null; + String errorMsg = "Failed to execute Cassandra CQL statement: " + assistant.getStatement(); + + RandomSleeper sleeper = newSleeper(); + + incrementSessionRefs(); + + try { + while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + error = null; + + if (attempt != 0) { + log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra CQL statement: " + + assistant.getStatement()); + } + + try { + PreparedStatement preparedSt = prepareStatement(assistant.getStatement(), + assistant.getPersistenceSettings(), assistant.tableExistenceRequired()); + + if (preparedSt == null) + return null; + + Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt)); + ResultSet res = session().execute(statement); + + Row row = res == null || !res.iterator().hasNext() ? null : res.iterator().next(); + + return row == null ? null : assistant.process(row); + } + catch (Throwable e) { + error = e; + + if (CassandraHelper.isTableAbsenceError(e)) { + if (!assistant.tableExistenceRequired()) { + log.warning(errorMsg, e); + return null; + } + + handleTableAbsenceError(assistant.getPersistenceSettings()); + } + else if (CassandraHelper.isHostsAvailabilityError(e)) + handleHostsAvailabilityError(e, attempt, errorMsg); + else if (CassandraHelper.isPreparedStatementClusterError(e)) + handlePreparedStatementClusterError(e); + else + // For an error which we don't know how to handle, we will not try next attempts and terminate. + throw new IgniteException(errorMsg, e); + } + + sleeper.sleep(); + + attempt++; + } + } + catch (Throwable e) { + error = e; + } + finally { + decrementSessionRefs(); + } + + log.error(errorMsg, error); + + throw new IgniteException(errorMsg, error); + } + + /** {@inheritDoc} */ + @Override public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data) { + if (data == null || !data.iterator().hasNext()) + return assistant.processedData(); + + int attempt = 0; + String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation"; + Throwable error = new IgniteException(errorMsg); + + RandomSleeper sleeper = newSleeper(); + + int dataSize = 0; + + incrementSessionRefs(); + + try { + while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + if (attempt != 0) { + log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra batch " + + assistant.operationName() + " operation to process rest " + + (dataSize - assistant.processedCount()) + " of " + dataSize + " elements"); + } + + //clean errors info before next communication with Cassandra + Throwable unknownEx = null; + Throwable tblAbsenceEx = null; + Throwable hostsAvailEx = null; + Throwable prepStatEx = null; + + List<Cache.Entry<Integer, ResultSetFuture>> futResults = new LinkedList<>(); + + PreparedStatement preparedSt = prepareStatement(assistant.getStatement(), + assistant.getPersistenceSettings(), assistant.tableExistenceRequired()); + + if (preparedSt == null) + return null; + + int seqNum = 0; + + for (V obj : data) { + if (!assistant.alreadyProcessed(seqNum)) { + try { + Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt, obj)); + ResultSetFuture fut = session().executeAsync(statement); + futResults.add(new CacheEntryImpl<>(seqNum, fut)); + } + catch (Throwable e) { + if (CassandraHelper.isTableAbsenceError(e)) { + // If there are table absence error and it is not required for the operation we can return. + if (!assistant.tableExistenceRequired()) + return assistant.processedData(); + + tblAbsenceEx = e; + handleTableAbsenceError(assistant.getPersistenceSettings()); + } + else if (CassandraHelper.isHostsAvailabilityError(e)) { + hostsAvailEx = e; + + // Handle host availability only once. + if (hostsAvailEx == null) + handleHostsAvailabilityError(e, attempt, errorMsg); + } + else if (CassandraHelper.isPreparedStatementClusterError(e)) { + prepStatEx = e; + handlePreparedStatementClusterError(e); + } + else + unknownEx = e; + } + } + + seqNum++; + } + + dataSize = seqNum; + + // For an error which we don't know how to handle, we will not try next attempts and terminate. + if (unknownEx != null) + throw new IgniteException(errorMsg, unknownEx); + + // Remembering any of last errors. + if (tblAbsenceEx != null) + error = tblAbsenceEx; + else if (hostsAvailEx != null) + error = hostsAvailEx; + else if (prepStatEx != null) + error = prepStatEx; + + // Clean errors info before next communication with Cassandra. + unknownEx = null; + tblAbsenceEx = null; + hostsAvailEx = null; + prepStatEx = null; + + for (Cache.Entry<Integer, ResultSetFuture> futureResult : futResults) { + try { + ResultSet resSet = futureResult.getValue().getUninterruptibly(); + Row row = resSet != null && resSet.iterator().hasNext() ? resSet.iterator().next() : null; + + if (row != null) + assistant.process(row, futureResult.getKey()); + } + catch (Throwable e) { + if (CassandraHelper.isTableAbsenceError(e)) + tblAbsenceEx = e; + else if (CassandraHelper.isHostsAvailabilityError(e)) + hostsAvailEx = e; + else if (CassandraHelper.isPreparedStatementClusterError(e)) + prepStatEx = e; + else + unknownEx = e; + } + } + + // For an error which we don't know how to handle, we will not try next attempts and terminate. + if (unknownEx != null) + throw new IgniteException(errorMsg, unknownEx); + + // If there are no errors occurred it means that operation successfully completed and we can return. + if (tblAbsenceEx == null && hostsAvailEx == null && prepStatEx == null) + return assistant.processedData(); + + if (tblAbsenceEx != null) { + // If there are table absence error and it is not required for the operation we can return. + if (!assistant.tableExistenceRequired()) + return assistant.processedData(); + + error = tblAbsenceEx; + handleTableAbsenceError(assistant.getPersistenceSettings()); + } + + if (hostsAvailEx != null) { + error = hostsAvailEx; + handleHostsAvailabilityError(hostsAvailEx, attempt, errorMsg); + } + + if (prepStatEx != null) { + error = prepStatEx; + handlePreparedStatementClusterError(prepStatEx); + } + + sleeper.sleep(); + + attempt++; + } + } + catch (Throwable e) { + error = e; + } + finally { + decrementSessionRefs(); + } + + errorMsg = "Failed to process " + (dataSize - assistant.processedCount()) + + " of " + dataSize + " elements, during " + assistant.operationName() + + " operation with Cassandra"; + + log.error(errorMsg, error); + + throw new IgniteException(errorMsg, error); + } + + /** {@inheritDoc} */ + @Override public void execute(BatchLoaderAssistant assistant) { + int attempt = 0; + String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation"; + Throwable error = new IgniteException(errorMsg); + + RandomSleeper sleeper = newSleeper(); + + incrementSessionRefs(); + + try { + while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + if (attempt != 0) + log.warning("Trying " + (attempt + 1) + " attempt to load Ignite cache"); + + Statement statement = tuneStatementExecutionOptions(assistant.getStatement()); + + try { + ResultSetFuture fut = session().executeAsync(statement); + ResultSet resSet = fut.getUninterruptibly(); + + if (resSet == null || !resSet.iterator().hasNext()) + return; + + for (Row row : resSet) + assistant.process(row); + + return; + } + catch (Throwable e) { + error = e; + + if (CassandraHelper.isTableAbsenceError(e)) + return; + else if (CassandraHelper.isHostsAvailabilityError(e)) + handleHostsAvailabilityError(e, attempt, errorMsg); + else if (CassandraHelper.isPreparedStatementClusterError(e)) + handlePreparedStatementClusterError(e); + else + // For an error which we don't know how to handle, we will not try next attempts and terminate. + throw new IgniteException(errorMsg, e); + } + + sleeper.sleep(); + + attempt++; + } + } + catch (Throwable e) { + error = e; + } + finally { + decrementSessionRefs(); + } + + log.error(errorMsg, error); + + throw new IgniteException(errorMsg, error); + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (decrementSessionRefs() == 0 && ses != null) { + SessionPool.put(this, ses); + ses = null; + } + } + + /** + * Recreates Cassandra driver session. + */ + private synchronized void refresh() { + //make sure that session removed from the pool + SessionPool.get(this); + + //closing and reopening session + CassandraHelper.closeSession(ses); + ses = null; + session(); + + synchronized (sesStatements) { + sesStatements.clear(); + } + } + + /** + * @return Cassandra driver session. + */ + private synchronized Session session() { + if (ses != null) + return ses; + + ses = SessionPool.get(this); + + if (ses != null) + return ses; + + synchronized (sesStatements) { + sesStatements.clear(); + } + + try { + return ses = builder.build().connect(); + } + catch (Throwable e) { + throw new IgniteException("Failed to establish session with Cassandra database", e); + } + } + + /** + * Increments number of references to Cassandra driver session (required for multithreaded environment). + */ + private synchronized void incrementSessionRefs() { + refCnt++; + } + + /** + * Decrements number of references to Cassandra driver session (required for multithreaded environment). + */ + private synchronized int decrementSessionRefs() { + if (refCnt != 0) + refCnt--; + + return refCnt; + } + + /** + * Prepares CQL statement using current Cassandra driver session. + * + * @param statement CQL statement. + * @param settings Persistence settings. + * @param tblExistenceRequired Flag indicating if table existence is required for the statement. + * @return Prepared statement. + */ + private PreparedStatement prepareStatement(String statement, KeyValuePersistenceSettings settings, + boolean tblExistenceRequired) { + + int attempt = 0; + Throwable error = null; + String errorMsg = "Failed to prepare Cassandra CQL statement: " + statement; + + RandomSleeper sleeper = newSleeper(); + + incrementSessionRefs(); + + try { + synchronized (sesStatements) { + if (sesStatements.containsKey(statement)) + return sesStatements.get(statement); + } + + while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + try { + PreparedStatement prepStatement = session().prepare(statement); + + synchronized (sesStatements) { + sesStatements.put(statement, prepStatement); + } + + return prepStatement; + } + catch (Throwable e) { + if (CassandraHelper.isTableAbsenceError(e)) { + if (!tblExistenceRequired) + return null; + + handleTableAbsenceError(settings); + } + else if (CassandraHelper.isHostsAvailabilityError(e)) + handleHostsAvailabilityError(e, attempt, errorMsg); + else + throw new IgniteException(errorMsg, e); + + error = e; + } + + sleeper.sleep(); + + attempt++; + } + } + finally { + decrementSessionRefs(); + } + + throw new IgniteException(errorMsg, error); + } + + /** + * Creates Cassandra keyspace. + * + * @param settings Persistence settings. + */ + private void createKeyspace(KeyValuePersistenceSettings settings) { + int attempt = 0; + Throwable error = null; + String errorMsg = "Failed to create Cassandra keyspace '" + settings.getKeyspace() + "'"; + + while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + try { + log.info("-----------------------------------------------------------------------"); + log.info("Creating Cassandra keyspace '" + settings.getKeyspace() + "'"); + log.info("-----------------------------------------------------------------------\n\n" + + settings.getKeyspaceDDLStatement() + "\n"); + log.info("-----------------------------------------------------------------------"); + session().execute(settings.getKeyspaceDDLStatement()); + log.info("Cassandra keyspace '" + settings.getKeyspace() + "' was successfully created"); + return; + } + catch (AlreadyExistsException ignored) { + log.info("Cassandra keyspace '" + settings.getKeyspace() + "' already exist"); + return; + } + catch (Throwable e) { + if (!CassandraHelper.isHostsAvailabilityError(e)) + throw new IgniteException(errorMsg, e); + + handleHostsAvailabilityError(e, attempt, errorMsg); + + error = e; + } + + attempt++; + } + + throw new IgniteException(errorMsg, error); + } + + /** + * Creates Cassandra table. + * + * @param settings Persistence settings. + */ + private void createTable(KeyValuePersistenceSettings settings) { + int attempt = 0; + Throwable error = null; + String errorMsg = "Failed to create Cassandra table '" + settings.getTableFullName() + "'"; + + while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + try { + log.info("-----------------------------------------------------------------------"); + log.info("Creating Cassandra table '" + settings.getTableFullName() + "'"); + log.info("-----------------------------------------------------------------------\n\n" + + settings.getTableDDLStatement() + "\n"); + log.info("-----------------------------------------------------------------------"); + session().execute(settings.getTableDDLStatement()); + log.info("Cassandra table '" + settings.getTableFullName() + "' was successfully created"); + return; + } + catch (AlreadyExistsException ignored) { + log.info("Cassandra table '" + settings.getTableFullName() + "' already exist"); + return; + } + catch (Throwable e) { + if (!CassandraHelper.isHostsAvailabilityError(e) && !CassandraHelper.isKeyspaceAbsenceError(e)) + throw new IgniteException(errorMsg, e); + + if (CassandraHelper.isKeyspaceAbsenceError(e)) { + log.warning("Failed to create Cassandra table '" + settings.getTableFullName() + + "' cause appropriate keyspace doesn't exist", e); + createKeyspace(settings); + } + else if (CassandraHelper.isHostsAvailabilityError(e)) + handleHostsAvailabilityError(e, attempt, errorMsg); + + error = e; + } + + attempt++; + } + + throw new IgniteException(errorMsg, error); + } + + /** + * Creates Cassandra table indexes. + * + * @param settings Persistence settings. + */ + private void createTableIndexes(KeyValuePersistenceSettings settings) { + if (settings.getIndexDDLStatements() == null || settings.getIndexDDLStatements().isEmpty()) + return; + + int attempt = 0; + Throwable error = null; + String errorMsg = "Failed to create indexes for Cassandra table " + settings.getTableFullName(); + + while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + try { + log.info("Creating indexes for Cassandra table '" + settings.getTableFullName() + "'"); + + for (String statement : settings.getIndexDDLStatements()) { + try { + session().execute(statement); + } + catch (AlreadyExistsException ignored) { + } + catch (Throwable e) { + if (!(e instanceof InvalidQueryException) || !e.getMessage().equals("Index already exists")) + throw new IgniteException(errorMsg, e); + } + } + + log.info("Indexes for Cassandra table '" + settings.getTableFullName() + "' were successfully created"); + + return; + } + catch (Throwable e) { + if (CassandraHelper.isHostsAvailabilityError(e)) + handleHostsAvailabilityError(e, attempt, errorMsg); + else if (CassandraHelper.isTableAbsenceError(e)) + createTable(settings); + else + throw new IgniteException(errorMsg, e); + + error = e; + } + + attempt++; + } + + throw new IgniteException(errorMsg, error); + } + + /** + * Tunes CQL statement execution options (consistency level, fetch option and etc.). + * + * @param statement Statement. + * @return Modified statement. + */ + private Statement tuneStatementExecutionOptions(Statement statement) { + String qry = ""; + + if (statement instanceof BoundStatement) + qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase(); + else if (statement instanceof PreparedStatement) + qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase(); + + boolean readStatement = qry.startsWith("select"); + boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement || + qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update"); + + if (readStatement && readConsistency != null) + statement.setConsistencyLevel(readConsistency); + + if (writeStatement && writeConsistency != null) + statement.setConsistencyLevel(writeConsistency); + + if (fetchSize != null) + statement.setFetchSize(fetchSize); + + return statement; + } + + /** + * Handles situation when Cassandra table doesn't exist. + * + * @param settings Persistence settings. + */ + private void handleTableAbsenceError(KeyValuePersistenceSettings settings) { + int hndNum = tblAbsenceHandlersCnt.incrementAndGet(); + + try { + synchronized (tblAbsenceHandlersCnt) { + // Oooops... I am not the first thread who tried to handle table absence problem. + if (hndNum != 0) { + log.warning("Table " + settings.getTableFullName() + " absence problem detected. " + + "Another thread already fixed it."); + return; + } + + log.warning("Table " + settings.getTableFullName() + " absence problem detected. " + + "Trying to create table."); + + IgniteException error = new IgniteException("Failed to create Cassandra table " + settings.getTableFullName()); + + int attempt = 0; + + while (error != null && attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { + error = null; + + try { + createKeyspace(settings); + createTable(settings); + createTableIndexes(settings); + } + catch (Throwable e) { + if (CassandraHelper.isHostsAvailabilityError(e)) + handleHostsAvailabilityError(e, attempt, null); + else + throw new IgniteException("Failed to create Cassandra table " + settings.getTableFullName(), e); + + error = (e instanceof IgniteException) ? (IgniteException)e : new IgniteException(e); + } + + attempt++; + } + + if (error != null) + throw error; + } + } + finally { + if (hndNum == 0) + tblAbsenceHandlersCnt.set(-1); + } + } + + /** + * Handles situation when prepared statement execution failed cause session to the cluster was released. + * + */ + private void handlePreparedStatementClusterError(Throwable e) { + int hndNum = prepStatementHandlersCnt.incrementAndGet(); + + try { + synchronized (prepStatementHandlersCnt) { + // Oooops... I am not the first thread who tried to handle prepared statement problem. + if (hndNum != 0) { + log.warning("Prepared statement cluster error detected, another thread already fixed the problem", e); + return; + } + + log.warning("Prepared statement cluster error detected, refreshing Cassandra session", e); + + refresh(); + + log.warning("Cassandra session refreshed"); + } + } + finally { + if (hndNum == 0) + prepStatementHandlersCnt.set(-1); + } + } + + /** + * Handles situation when Cassandra host which is responsible for CQL query execution became unavailable. + * + * @param e Exception to handle. + * @param attempt Number of attempts. + * @param msg Error message. + * @return {@code true} if host unavailability was successfully handled. + */ + private boolean handleHostsAvailabilityError(Throwable e, int attempt, String msg) { + if (attempt >= CQL_EXECUTION_ATTEMPTS_COUNT) { + log.error("Host availability problem detected. " + + "Number of CQL execution attempts reached maximum " + CQL_EXECUTION_ATTEMPTS_COUNT + + ", exception will be thrown to upper execution layer.", e); + throw msg == null ? new IgniteException(e) : new IgniteException(msg, e); + } + + if (attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 4 || + attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 || + attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 + CQL_EXECUTION_ATTEMPTS_COUNT / 4 || + attempt == CQL_EXECUTION_ATTEMPTS_COUNT - 1) { + log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " + + "refreshing Cassandra session", e); + + refresh(); + + log.warning("Cassandra session refreshed"); + + return true; + } + + log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " + + "sleeping extra " + CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT + " milliseconds", e); + + try { + Thread.sleep(CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT); + } + catch (InterruptedException ignored) { + } + + log.warning("Sleep completed"); + + return false; + } + + /** + * @return New random sleeper. + */ + private RandomSleeper newSleeper() { + return new RandomSleeper(CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT, + CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT, + CQL_ATTEMPTS_TIMEOUT_INCREMENT, log); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java new file mode 100644 index 0000000..867f58d --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Row; +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; + +/** + * Provides information for single operations (load, delete, write) of Ignite cache + * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}. + * + * @param <R> type of the result returned from operation. + */ +public interface ExecutionAssistant<R> { + /** + * Indicates if Cassandra table existence is required for operation. + * + * @return true if table existence required. + */ + public boolean tableExistenceRequired(); + + /** + * Returns CQL statement to be used for operation. + * + * @return CQL statement. + */ + public String getStatement(); + + /** + * Binds prepared statement. + * + * @param statement prepared statement. + * + * @return bound statement. + */ + public BoundStatement bindStatement(PreparedStatement statement); + + /** + * Persistence settings to use for operation. + * + * @return persistence settings. + */ + public KeyValuePersistenceSettings getPersistenceSettings(); + + /** + * Returns operation name. + * + * @return operation name. + */ + public String operationName(); + + /** + * Processes Cassandra database table row returned by specified CQL statement. + * + * @param row Cassandra database table row. + * + * @return result of the operation. + */ + public R process(Row row); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java new file mode 100644 index 0000000..17494dd --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session; + +import com.datastax.driver.core.Row; +import java.util.HashSet; +import java.util.Set; + +/** + * Implementation of the {@link org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant}. + * + * @param <R> Type of the result returned from batch operation + * @param <V> Type of the value used in batch operation + */ +public abstract class GenericBatchExecutionAssistant<R, V> implements BatchExecutionAssistant<R, V> { + /** Identifiers of already processed objects. */ + private Set<Integer> processed = new HashSet<>(); + + /** {@inheritDoc} */ + @Override public void process(Row row, int seqNum) { + if (processed.contains(seqNum)) + return; + + process(row); + + processed.add(seqNum); + } + + /** {@inheritDoc} */ + @Override public boolean alreadyProcessed(int seqNum) { + return processed.contains(seqNum); + } + + /** {@inheritDoc} */ + @Override public int processedCount() { + return processed.size(); + } + + /** {@inheritDoc} */ + @Override public R processedData() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean tableExistenceRequired() { + return false; + } + + /** + * Processes particular row inside batch operation. + * + * @param row Row to process. + */ + protected void process(Row row) { + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java new file mode 100644 index 0000000..d3ace7d --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session; + +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.Statement; +import java.util.concurrent.Callable; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController; +import org.apache.ignite.lang.IgniteBiInClosure; + +/** + * Worker for load cache using custom user query. + * + * @param <K> Key type. + * @param <V> Value type. + */ +public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> { + /** Cassandra session to execute CQL query */ + private final CassandraSession ses; + + /** User query. */ + private final String qry; + + /** Persistence controller */ + private final PersistenceController ctrl; + + /** Logger */ + private final IgniteLogger log; + + /** Closure for loaded values. */ + private final IgniteBiInClosure<K, V> clo; + + /** + * @param clo Closure for loaded values. + */ + public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController ctrl, + IgniteLogger log, IgniteBiInClosure<K, V> clo) { + this.ses = ses; + this.qry = qry.trim().endsWith(";") ? qry : qry + ";"; + this.ctrl = ctrl; + this.log = log; + this.clo = clo; + } + + /** {@inheritDoc} */ + @Override public Void call() throws Exception { + ses.execute(new BatchLoaderAssistant() { + /** {@inheritDoc} */ + @Override public String operationName() { + return "loadCache"; + } + + /** {@inheritDoc} */ + @Override public Statement getStatement() { + return new SimpleStatement(qry); + } + + /** {@inheritDoc} */ + @Override public void process(Row row) { + K key; + V val; + + try { + key = (K)ctrl.buildKeyObject(row); + } + catch (Throwable e) { + log.error("Failed to build Ignite key object from provided Cassandra row", e); + + throw new IgniteException("Failed to build Ignite key object from provided Cassandra row", e); + } + + try { + val = (V)ctrl.buildValueObject(row); + } + catch (Throwable e) { + log.error("Failed to build Ignite value object from provided Cassandra row", e); + + throw new IgniteException("Failed to build Ignite value object from provided Cassandra row", e); + } + + clo.apply(key, val); + } + }); + + return null; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java new file mode 100644 index 0000000..ecbbe78 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains classes responsible for handling sessions and communication with Cassandra + */ +package org.apache.ignite.cache.store.cassandra.session; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java new file mode 100644 index 0000000..fc4a907 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session.pool; + +import com.datastax.driver.core.Session; +import java.lang.Thread.State; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl; + +/** + * Cassandra driver sessions pool. + */ +public class SessionPool { + /** + * Monitors session pool and closes unused session. + */ + private static class SessionMonitor extends Thread { + /** {@inheritDoc} */ + @Override public void run() { + try { + while (true) { + try { + Thread.sleep(SLEEP_TIMEOUT); + } + catch (InterruptedException ignored) { + return; + } + + List<Map.Entry<CassandraSessionImpl, SessionWrapper>> expiredSessions = new LinkedList<>(); + + int sessionsCnt; + + synchronized (sessions) { + sessionsCnt = sessions.size(); + + for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : sessions.entrySet()) { + if (entry.getValue().expired()) + expiredSessions.add(entry); + } + + for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions) + sessions.remove(entry.getKey()); + } + + for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions) + entry.getValue().release(); + + // all sessions in the pool expired, thus we don't need additional thread to manage sessions in the pool + if (sessionsCnt == expiredSessions.size()) + return; + } + } + finally { + release(); + } + } + } + + /** Sessions monitor sleep timeout. */ + private static final long SLEEP_TIMEOUT = 60000; // 1 minute. + + /** Sessions which were returned to pool. */ + private static final Map<CassandraSessionImpl, SessionWrapper> sessions = new HashMap<>(); + + /** Singleton instance. */ + private static SessionMonitor monitorSingleton; + + static { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override public void run() { + release(); + } + }); + } + + /** + * Returns Cassandra driver session to sessions pool. + * + * @param cassandraSes Session wrapper. + * @param driverSes Driver session. + */ + public static void put(CassandraSessionImpl cassandraSes, Session driverSes) { + if (cassandraSes == null || driverSes == null) + return; + + SessionWrapper old; + + synchronized (sessions) { + old = sessions.put(cassandraSes, new SessionWrapper(driverSes)); + + if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) { + monitorSingleton = new SessionMonitor(); + monitorSingleton.setDaemon(true); + monitorSingleton.setName("Cassandra-sessions-pool"); + monitorSingleton.start(); + } + } + + if (old != null) + old.release(); + } + + /** + * Extracts Cassandra driver session from pool. + * + * @param cassandraSes Session wrapper. + * @return Cassandra driver session. + */ + public static Session get(CassandraSessionImpl cassandraSes) { + if (cassandraSes == null) + return null; + + SessionWrapper wrapper; + + synchronized (sessions) { + wrapper = sessions.remove(cassandraSes); + } + + return wrapper == null ? null : wrapper.driverSession(); + } + + /** + * Releases all session from pool and closes all their connections to Cassandra database. + */ + public static void release() { + Collection<SessionWrapper> wrappers; + + synchronized (sessions) { + try { + if (sessions.size() == 0) + return; + + wrappers = new LinkedList<>(); + + for (SessionWrapper wrapper : sessions.values()) + wrappers.add(wrapper); + + sessions.clear(); + } + finally { + if (!(Thread.currentThread() instanceof SessionMonitor) && monitorSingleton != null) { + try { + monitorSingleton.interrupt(); + } + catch (Throwable ignored) { + } + } + } + } + + for (SessionWrapper wrapper : wrappers) + wrapper.release(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java new file mode 100644 index 0000000..7c5722b --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.session.pool; + +import com.datastax.driver.core.Session; +import org.apache.ignite.cache.store.cassandra.common.CassandraHelper; + +/** + * Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing. + */ +public class SessionWrapper { + /** Expiration timeout for Cassandra driver session. */ + public static final long DFLT_EXPIRATION_TIMEOUT = 300000; // 5 minutes. + + /** Cassandra driver session. */ + private Session ses; + + /** Wrapper creation time. */ + private long time; + + /** + * Creates instance of Cassandra driver session wrapper. + * + * @param ses Cassandra driver session. + */ + public SessionWrapper(Session ses) { + this.ses = ses; + this.time = System.currentTimeMillis(); + } + + /** + * Checks if Cassandra driver session expired. + * + * @return true if session expired. + */ + public boolean expired() { + return System.currentTimeMillis() - time > DFLT_EXPIRATION_TIMEOUT; + } + + /** + * Returns wrapped Cassandra driver session. + * + * @return Cassandra driver session. + */ + public Session driverSession() { + return ses; + } + + /** + * Closes wrapped Cassandra driver session + */ + public void release() { + CassandraHelper.closeSession(ses); + ses = null; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java new file mode 100644 index 0000000..21c292f --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains session pool implenetation for Cassandra sessions + */ +package org.apache.ignite.cache.store.cassandra.session.pool; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java new file mode 100644 index 0000000..4f40478 --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.store.cassandra.utils; + +import java.io.File; +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; + +/** + * Generates Cassandra DDL statements from persistence descriptor xml file. + */ +public class DDLGenerator { + /** + * DDLGenerator entry point. + * + * @param args Arguments for DDLGenerator. + */ + public static void main(String[] args) { + if (args == null || args.length == 0) + return; + + for (String arg : args) { + File file = new File(arg); + if (!file.isFile()) { + System.out.println("-------------------------------------------------------------"); + System.out.println("Incorrect file specified: " + arg); + System.out.println("-------------------------------------------------------------"); + continue; + } + + try { + KeyValuePersistenceSettings settings = new KeyValuePersistenceSettings(file); + System.out.println("-------------------------------------------------------------"); + System.out.println("DDL for keyspace/table from file: " + arg); + System.out.println("-------------------------------------------------------------"); + System.out.println(); + System.out.println(settings.getKeyspaceDDLStatement()); + System.out.println(); + System.out.println(settings.getTableDDLStatement()); + System.out.println(); + } + catch (Throwable e) { + System.out.println("-------------------------------------------------------------"); + System.out.println("Incorrect file specified: " + arg); + System.out.println("-------------------------------------------------------------"); + e.printStackTrace(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java new file mode 100644 index 0000000..2460dfe --- /dev/null +++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains utility classes + */ +package org.apache.ignite.cache.store.cassandra.utils; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/bootstrap/aws/README.txt ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/bootstrap/aws/README.txt b/modules/cassandra/store/src/test/bootstrap/aws/README.txt new file mode 100644 index 0000000..a61b235 --- /dev/null +++ b/modules/cassandra/store/src/test/bootstrap/aws/README.txt @@ -0,0 +1,13 @@ +Shell scripts to spin up Ignite, Cassandra and Load tests clusters in AWS. + +1) cassandra - bootstrap scripts for Cassandra cluster nodes +2) ganglia - bootstrap scripts for Ganglia master and agents +3) ignite - bootstrap scripts for Ignite cluster nodes +4) tests - bootstrap scripts for Load Tests cluster nodes +5) common.sh - definitions for common functions +6) env.sh - definitions for common variables +7) log-collector.sh - log collector daemon script, to collect logs and upload them to S3 + +For more details please look at the documentation: + + https://apacheignite.readme.io/docs/aws-infrastructure-deployment \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh b/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh new file mode 100644 index 0000000..017b1b1 --- /dev/null +++ b/modules/cassandra/store/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh @@ -0,0 +1,336 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# ----------------------------------------------------------------------------------------------- +# Bootstrap script to spin up Cassandra cluster +# ----------------------------------------------------------------------------------------------- + +# URL to download AWS CLI tools +AWS_CLI_DOWNLOAD_URL=https://s3.amazonaws.com/aws-cli/awscli-bundle.zip + +# URL to download JDK +JDK_DOWNLOAD_URL=http://download.oracle.com/otn-pub/java/jdk/8u77-b03/jdk-8u77-linux-x64.tar.gz + +# URL to download Ignite-Cassandra tests package - you should previously package and upload it to this place +TESTS_PACKAGE_DONLOAD_URL=s3://<bucket>/<folder>/ignite-cassandra-tests-<version>.zip + +# Terminates script execution and upload logs to S3 +terminate() +{ + SUCCESS_URL=$S3_CASSANDRA_BOOTSTRAP_SUCCESS + FAILURE_URL=$S3_CASSANDRA_BOOTSTRAP_FAILURE + + if [ -n "$SUCCESS_URL" ] && [[ "$SUCCESS_URL" != */ ]]; then + SUCCESS_URL=${SUCCESS_URL}/ + fi + + if [ -n "$FAILURE_URL" ] && [[ "$FAILURE_URL" != */ ]]; then + FAILURE_URL=${FAILURE_URL}/ + fi + + host_name=$(hostname -f | tr '[:upper:]' '[:lower:]') + msg=$host_name + + if [ -n "$1" ]; then + echo "[ERROR] $1" + echo "[ERROR]-----------------------------------------------------" + echo "[ERROR] Cassandra node bootstrap failed" + echo "[ERROR]-----------------------------------------------------" + msg=$1 + + if [ -z "$FAILURE_URL" ]; then + exit 1 + fi + + reportFolder=${FAILURE_URL}${host_name} + reportFile=$reportFolder/__error__ + else + echo "[INFO]-----------------------------------------------------" + echo "[INFO] Cassandra node bootstrap successfully completed" + echo "[INFO]-----------------------------------------------------" + + if [ -z "$SUCCESS_URL" ]; then + exit 0 + fi + + reportFolder=${SUCCESS_URL}${host_name} + reportFile=$reportFolder/__success__ + fi + + echo $msg > /opt/bootstrap-result + + aws s3 rm --recursive $reportFolder + if [ $? -ne 0 ]; then + echo "[ERROR] Failed to drop report folder: $reportFolder" + fi + + aws s3 cp --sse AES256 /opt/bootstrap-result $reportFile + if [ $? -ne 0 ]; then + echo "[ERROR] Failed to report bootstrap result to: $reportFile" + fi + + rm -f /opt/bootstrap-result + + if [ -n "$1" ]; then + exit 1 + fi + + exit 0 +} + +# Downloads specified package +downloadPackage() +{ + echo "[INFO] Downloading $3 package from $1 into $2" + + for i in 0 9; + do + if [[ "$1" == s3* ]]; then + aws s3 cp $1 $2 + code=$? + else + curl "$1" -o "$2" + code=$? + fi + + if [ $code -eq 0 ]; then + echo "[INFO] $3 package successfully downloaded from $1 into $2" + return 0 + fi + + echo "[WARN] Failed to download $3 package from $i attempt, sleeping extra 5sec" + sleep 5s + done + + terminate "All 10 attempts to download $3 package from $1 are failed" +} + +# Downloads and setup JDK +setupJava() +{ + rm -Rf /opt/java /opt/jdk.tar.gz + + echo "[INFO] Downloading 'jdk'" + wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "$JDK_DOWNLOAD_URL" -O /opt/jdk.tar.gz + if [ $? -ne 0 ]; then + terminate "Failed to download 'jdk'" + fi + + echo "[INFO] Untaring 'jdk'" + tar -xvzf /opt/jdk.tar.gz -C /opt + if [ $? -ne 0 ]; then + terminate "Failed to untar 'jdk'" + fi + + rm -Rf /opt/jdk.tar.gz + + unzipDir=$(ls /opt | grep "jdk") + if [ "$unzipDir" != "java" ]; then + mv /opt/$unzipDir /opt/java + fi +} + +# Downloads and setup AWS CLI +setupAWSCLI() +{ + echo "[INFO] Installing 'awscli'" + pip install --upgrade awscli + if [ $? -eq 0 ]; then + return 0 + fi + + echo "[ERROR] Failed to install 'awscli' using pip" + echo "[INFO] Trying to install awscli using zip archive" + echo "[INFO] Downloading awscli zip" + + downloadPackage "$AWS_CLI_DOWNLOAD_URL" "/opt/awscli-bundle.zip" "awscli" + + echo "[INFO] Unzipping awscli zip" + unzip /opt/awscli-bundle.zip -d /opt + if [ $? -ne 0 ]; then + terminate "Failed to unzip awscli zip" + fi + + rm -Rf /opt/awscli-bundle.zip + + echo "[INFO] Installing awscli" + /opt/awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws + if [ $? -ne 0 ]; then + terminate "Failed to install awscli" + fi + + echo "[INFO] Successfully installed awscli from zip archive" +} + +# Setup all the pre-requisites (packages, settings and etc.) +setupPreRequisites() +{ + echo "[INFO] Installing 'wget' package" + yum -y install wget + if [ $? -ne 0 ]; then + terminate "Failed to install 'wget' package" + fi + + echo "[INFO] Installing 'net-tools' package" + yum -y install net-tools + if [ $? -ne 0 ]; then + terminate "Failed to install 'net-tools' package" + fi + + echo "[INFO] Installing 'python' package" + yum -y install python + if [ $? -ne 0 ]; then + terminate "Failed to install 'python' package" + fi + + echo "[INFO] Installing 'unzip' package" + yum -y install unzip + if [ $? -ne 0 ]; then + terminate "Failed to install 'unzip' package" + fi + + downloadPackage "https://bootstrap.pypa.io/get-pip.py" "/opt/get-pip.py" "get-pip.py" + + echo "[INFO] Installing 'pip'" + python /opt/get-pip.py + if [ $? -ne 0 ]; then + terminate "Failed to install 'pip'" + fi +} + +# Downloads and setup tests package +setupTestsPackage() +{ + downloadPackage "$TESTS_PACKAGE_DONLOAD_URL" "/opt/ignite-cassandra-tests.zip" "Tests" + + rm -Rf /opt/ignite-cassandra-tests + + unzip /opt/ignite-cassandra-tests.zip -d /opt + if [ $? -ne 0 ]; then + terminate "Failed to unzip tests package" + fi + + rm -f /opt/ignite-cassandra-tests.zip + + unzipDir=$(ls /opt | grep "ignite-cassandra") + if [ "$unzipDir" != "ignite-cassandra-tests" ]; then + mv /opt/$unzipDir /opt/ignite-cassandra-tests + fi + + find /opt/ignite-cassandra-tests -type f -name "*.sh" -exec chmod ug+x {} \; + + . /opt/ignite-cassandra-tests/bootstrap/aws/common.sh "cassandra" + + setupNTP + + echo "[INFO] Starting logs collector daemon" + + HOST_NAME=$(hostname -f | tr '[:upper:]' '[:lower:]') + /opt/ignite-cassandra-tests/bootstrap/aws/logs-collector.sh "$S3_LOGS_TRIGGER" "$S3_CASSANDRA_LOGS/$HOST_NAME" "/opt/cassandra/logs" "/opt/cassandra/cassandra-start.log" > /opt/logs-collector.log & + + echo "[INFO] Logs collector daemon started: $!" + + echo "----------------------------------------------------------------------------------------" + printInstanceInfo + echo "----------------------------------------------------------------------------------------" + tagInstance + bootstrapGangliaAgent "cassandra" 8641 +} + +# Downloads Cassandra package +downloadCassandra() +{ + downloadPackage "$CASSANDRA_DOWNLOAD_URL" "/opt/apache-cassandra.tar.gz" "Cassandra" + + rm -Rf /opt/cassandra + + echo "[INFO] Untaring Cassandra package" + tar -xvzf /opt/apache-cassandra.tar.gz -C /opt + if [ $? -ne 0 ]; then + terminate "Failed to untar Cassandra package" + fi + + rm -f /opt/apache-cassandra.tar.gz + + unzipDir=$(ls /opt | grep "cassandra" | grep "apache") + if [ "$unzipDir" != "cassandra" ]; then + mv /opt/$unzipDir /opt/cassandra + fi +} + +# Setups Cassandra +setupCassandra() +{ + echo "[INFO] Creating 'cassandra' group" + exists=$(cat /etc/group | grep cassandra) + if [ -z "$exists" ]; then + groupadd cassandra + if [ $? -ne 0 ]; then + terminate "Failed to create 'cassandra' group" + fi + fi + + echo "[INFO] Creating 'cassandra' user" + exists=$(cat /etc/passwd | grep cassandra) + if [ -z "$exists" ]; then + useradd -g cassandra cassandra + if [ $? -ne 0 ]; then + terminate "Failed to create 'cassandra' user" + fi + fi + + rm -f /opt/cassandra/conf/cassandra-env.sh /opt/cassandra/conf/cassandra-template.yaml + + cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-env.sh /opt/cassandra/conf + cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-template.yaml /opt/cassandra/conf + + chown -R cassandra:cassandra /opt/cassandra /opt/ignite-cassandra-tests + + createCassandraStorageLayout + + cat /opt/cassandra/conf/cassandra-template.yaml | sed -r "s/\\\$\{CASSANDRA_DATA_DIR\}/$CASSANDRA_DATA_DIR/g" > /opt/cassandra/conf/cassandra-template-1.yaml + cat /opt/cassandra/conf/cassandra-template-1.yaml | sed -r "s/\\\$\{CASSANDRA_COMMITLOG_DIR\}/$CASSANDRA_COMMITLOG_DIR/g" > /opt/cassandra/conf/cassandra-template-2.yaml + cat /opt/cassandra/conf/cassandra-template-2.yaml | sed -r "s/\\\$\{CASSANDRA_CACHES_DIR\}/$CASSANDRA_CACHES_DIR/g" > /opt/cassandra/conf/cassandra-template-3.yaml + + rm -f /opt/cassandra/conf/cassandra-template.yaml /opt/cassandra/conf/cassandra-template-1.yaml /opt/cassandra/conf/cassandra-template-2.yaml + mv /opt/cassandra/conf/cassandra-template-3.yaml /opt/cassandra/conf/cassandra-template.yaml + + echo "export JAVA_HOME=/opt/java" >> $1 + echo "export CASSANDRA_HOME=/opt/cassandra" >> $1 + echo "export PATH=\$JAVA_HOME/bin:\$CASSANDRA_HOME/bin:\$PATH" >> $1 +} + +################################################################################################################### + +echo "[INFO]-----------------------------------------------------------------" +echo "[INFO] Bootstrapping Cassandra node" +echo "[INFO]-----------------------------------------------------------------" + +setupPreRequisites +setupJava +setupAWSCLI +setupTestsPackage +downloadCassandra +setupCassandra "/root/.bash_profile" + +cmd="/opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-start.sh" + +#sudo -u cassandra -g cassandra sh -c "$cmd | tee /opt/cassandra/cassandra-start.log" + +$cmd | tee /opt/cassandra/cassandra-start.log \ No newline at end of file