http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java deleted file mode 100644 index 95b8581..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java +++ /dev/null @@ -1,832 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.session; - -import com.datastax.driver.core.BatchStatement; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Row; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.Statement; -import com.datastax.driver.core.exceptions.AlreadyExistsException; -import com.datastax.driver.core.exceptions.InvalidQueryException; -import com.datastax.driver.core.querybuilder.Batch; -import java.io.IOException; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import javax.cache.Cache; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.store.cassandra.common.CassandraHelper; -import org.apache.ignite.cache.store.cassandra.common.RandomSleeper; -import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; -import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool; -import org.apache.ignite.internal.processors.cache.CacheEntryImpl; - -/** - * Implementation for {@link org.apache.ignite.cache.store.cassandra.session.CassandraSession}. - */ -public class CassandraSessionImpl implements CassandraSession { - /** Number of CQL query execution attempts. */ - private static final int CQL_EXECUTION_ATTEMPTS_COUNT = 20; - - /** Min timeout between CQL query execution attempts. */ - private static final int CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT = 100; - - /** Max timeout between CQL query execution attempts. */ - private static final int CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT = 500; - - /** Timeout increment for CQL query execution attempts. */ - private static final int CQL_ATTEMPTS_TIMEOUT_INCREMENT = 100; - - /** Cassandra cluster builder. */ - private volatile Cluster.Builder builder; - - /** Cassandra driver session. */ - private volatile Session ses; - - /** Number of references to Cassandra driver session (for multithreaded environment). */ - private volatile int refCnt = 0; - - /** Storage for the session prepared statements */ - private static final Map<String, PreparedStatement> sesStatements = new HashMap<>(); - - /** Number of records to immediately fetch in CQL statement execution. */ - private Integer fetchSize; - - /** Consistency level for Cassandra READ operations (select). */ - private ConsistencyLevel readConsistency; - - /** Consistency level for Cassandra WRITE operations (insert/update/delete). */ - private ConsistencyLevel writeConsistency; - - /** Logger. */ - private IgniteLogger log; - - /** Table absence error handlers counter. */ - private final AtomicInteger tblAbsenceHandlersCnt = new AtomicInteger(-1); - - /** Prepared statement cluster disconnection error handlers counter. */ - private final AtomicInteger prepStatementHandlersCnt = new AtomicInteger(-1); - - /** - * Creates instance of Cassandra driver session wrapper. - * - * @param builder Builder for Cassandra cluster. - * @param fetchSize Number of rows to immediately fetch in CQL statement execution. - * @param readConsistency Consistency level for Cassandra READ operations (select). - * @param writeConsistency Consistency level for Cassandra WRITE operations (insert/update/delete). - * @param log Logger. - */ - public CassandraSessionImpl(Cluster.Builder builder, Integer fetchSize, ConsistencyLevel readConsistency, - ConsistencyLevel writeConsistency, IgniteLogger log) { - this.builder = builder; - this.fetchSize = fetchSize; - this.readConsistency = readConsistency; - this.writeConsistency = writeConsistency; - this.log = log; - } - - /** {@inheritDoc} */ - @Override public <V> V execute(ExecutionAssistant<V> assistant) { - int attempt = 0; - Throwable error = null; - String errorMsg = "Failed to execute Cassandra CQL statement: " + assistant.getStatement(); - - RandomSleeper sleeper = newSleeper(); - - incrementSessionRefs(); - - try { - while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - error = null; - - if (attempt != 0) { - log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra CQL statement: " + - assistant.getStatement()); - } - - try { - PreparedStatement preparedSt = prepareStatement(assistant.getStatement(), - assistant.getPersistenceSettings(), assistant.tableExistenceRequired()); - - if (preparedSt == null) - return null; - - Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt)); - ResultSet res = session().execute(statement); - - Row row = res == null || !res.iterator().hasNext() ? null : res.iterator().next(); - - return row == null ? null : assistant.process(row); - } - catch (Throwable e) { - error = e; - - if (CassandraHelper.isTableAbsenceError(e)) { - if (!assistant.tableExistenceRequired()) { - log.warning(errorMsg, e); - return null; - } - - handleTableAbsenceError(assistant.getPersistenceSettings()); - } - else if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, errorMsg); - else if (CassandraHelper.isPreparedStatementClusterError(e)) - handlePreparedStatementClusterError(e); - else - // For an error which we don't know how to handle, we will not try next attempts and terminate. - throw new IgniteException(errorMsg, e); - } - - sleeper.sleep(); - - attempt++; - } - } - catch (Throwable e) { - error = e; - } - finally { - decrementSessionRefs(); - } - - log.error(errorMsg, error); - - throw new IgniteException(errorMsg, error); - } - - /** {@inheritDoc} */ - @Override public <R, V> R execute(BatchExecutionAssistant<R, V> assistant, Iterable<? extends V> data) { - if (data == null || !data.iterator().hasNext()) - return assistant.processedData(); - - int attempt = 0; - String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation"; - Throwable error = new IgniteException(errorMsg); - - RandomSleeper sleeper = newSleeper(); - - int dataSize = 0; - - incrementSessionRefs(); - - try { - while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - if (attempt != 0) { - log.warning("Trying " + (attempt + 1) + " attempt to execute Cassandra batch " + - assistant.operationName() + " operation to process rest " + - (dataSize - assistant.processedCount()) + " of " + dataSize + " elements"); - } - - //clean errors info before next communication with Cassandra - Throwable unknownEx = null; - Throwable tblAbsenceEx = null; - Throwable hostsAvailEx = null; - Throwable prepStatEx = null; - - List<Cache.Entry<Integer, ResultSetFuture>> futResults = new LinkedList<>(); - - PreparedStatement preparedSt = prepareStatement(assistant.getStatement(), - assistant.getPersistenceSettings(), assistant.tableExistenceRequired()); - - if (preparedSt == null) - return null; - - int seqNum = 0; - - for (V obj : data) { - if (!assistant.alreadyProcessed(seqNum)) { - try { - Statement statement = tuneStatementExecutionOptions(assistant.bindStatement(preparedSt, obj)); - ResultSetFuture fut = session().executeAsync(statement); - futResults.add(new CacheEntryImpl<>(seqNum, fut)); - } - catch (Throwable e) { - if (CassandraHelper.isTableAbsenceError(e)) { - // If there are table absence error and it is not required for the operation we can return. - if (!assistant.tableExistenceRequired()) - return assistant.processedData(); - - tblAbsenceEx = e; - handleTableAbsenceError(assistant.getPersistenceSettings()); - } - else if (CassandraHelper.isHostsAvailabilityError(e)) { - hostsAvailEx = e; - - // Handle host availability only once. - if (hostsAvailEx == null) - handleHostsAvailabilityError(e, attempt, errorMsg); - } - else if (CassandraHelper.isPreparedStatementClusterError(e)) { - prepStatEx = e; - handlePreparedStatementClusterError(e); - } - else - unknownEx = e; - } - } - - seqNum++; - } - - dataSize = seqNum; - - // For an error which we don't know how to handle, we will not try next attempts and terminate. - if (unknownEx != null) - throw new IgniteException(errorMsg, unknownEx); - - // Remembering any of last errors. - if (tblAbsenceEx != null) - error = tblAbsenceEx; - else if (hostsAvailEx != null) - error = hostsAvailEx; - else if (prepStatEx != null) - error = prepStatEx; - - // Clean errors info before next communication with Cassandra. - unknownEx = null; - tblAbsenceEx = null; - hostsAvailEx = null; - prepStatEx = null; - - for (Cache.Entry<Integer, ResultSetFuture> futureResult : futResults) { - try { - ResultSet resSet = futureResult.getValue().getUninterruptibly(); - Row row = resSet != null && resSet.iterator().hasNext() ? resSet.iterator().next() : null; - - if (row != null) - assistant.process(row, futureResult.getKey()); - } - catch (Throwable e) { - if (CassandraHelper.isTableAbsenceError(e)) - tblAbsenceEx = e; - else if (CassandraHelper.isHostsAvailabilityError(e)) - hostsAvailEx = e; - else if (CassandraHelper.isPreparedStatementClusterError(e)) - prepStatEx = e; - else - unknownEx = e; - } - } - - // For an error which we don't know how to handle, we will not try next attempts and terminate. - if (unknownEx != null) - throw new IgniteException(errorMsg, unknownEx); - - // If there are no errors occurred it means that operation successfully completed and we can return. - if (tblAbsenceEx == null && hostsAvailEx == null && prepStatEx == null) - return assistant.processedData(); - - if (tblAbsenceEx != null) { - // If there are table absence error and it is not required for the operation we can return. - if (!assistant.tableExistenceRequired()) - return assistant.processedData(); - - error = tblAbsenceEx; - handleTableAbsenceError(assistant.getPersistenceSettings()); - } - - if (hostsAvailEx != null) { - error = hostsAvailEx; - handleHostsAvailabilityError(hostsAvailEx, attempt, errorMsg); - } - - if (prepStatEx != null) { - error = prepStatEx; - handlePreparedStatementClusterError(prepStatEx); - } - - sleeper.sleep(); - - attempt++; - } - } - catch (Throwable e) { - error = e; - } - finally { - decrementSessionRefs(); - } - - errorMsg = "Failed to process " + (dataSize - assistant.processedCount()) + - " of " + dataSize + " elements, during " + assistant.operationName() + - " operation with Cassandra"; - - log.error(errorMsg, error); - - throw new IgniteException(errorMsg, error); - } - - /** {@inheritDoc} */ - @Override public void execute(BatchLoaderAssistant assistant) { - int attempt = 0; - String errorMsg = "Failed to execute Cassandra " + assistant.operationName() + " operation"; - Throwable error = new IgniteException(errorMsg); - - RandomSleeper sleeper = newSleeper(); - - incrementSessionRefs(); - - try { - while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - if (attempt != 0) - log.warning("Trying " + (attempt + 1) + " attempt to load Ignite cache"); - - Statement statement = tuneStatementExecutionOptions(assistant.getStatement()); - - try { - ResultSetFuture fut = session().executeAsync(statement); - ResultSet resSet = fut.getUninterruptibly(); - - if (resSet == null || !resSet.iterator().hasNext()) - return; - - for (Row row : resSet) - assistant.process(row); - - return; - } - catch (Throwable e) { - error = e; - - if (CassandraHelper.isTableAbsenceError(e)) - return; - else if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, errorMsg); - else if (CassandraHelper.isPreparedStatementClusterError(e)) - handlePreparedStatementClusterError(e); - else - // For an error which we don't know how to handle, we will not try next attempts and terminate. - throw new IgniteException(errorMsg, e); - } - - sleeper.sleep(); - - attempt++; - } - } - catch (Throwable e) { - error = e; - } - finally { - decrementSessionRefs(); - } - - log.error(errorMsg, error); - - throw new IgniteException(errorMsg, error); - } - - /** {@inheritDoc} */ - @Override public synchronized void close() throws IOException { - if (decrementSessionRefs() == 0 && ses != null) { - SessionPool.put(this, ses); - ses = null; - } - } - - /** - * Recreates Cassandra driver session. - */ - private synchronized void refresh() { - //make sure that session removed from the pool - SessionPool.get(this); - - //closing and reopening session - CassandraHelper.closeSession(ses); - ses = null; - session(); - - synchronized (sesStatements) { - sesStatements.clear(); - } - } - - /** - * @return Cassandra driver session. - */ - private synchronized Session session() { - if (ses != null) - return ses; - - ses = SessionPool.get(this); - - if (ses != null) - return ses; - - synchronized (sesStatements) { - sesStatements.clear(); - } - - try { - return ses = builder.build().connect(); - } - catch (Throwable e) { - throw new IgniteException("Failed to establish session with Cassandra database", e); - } - } - - /** - * Increments number of references to Cassandra driver session (required for multithreaded environment). - */ - private synchronized void incrementSessionRefs() { - refCnt++; - } - - /** - * Decrements number of references to Cassandra driver session (required for multithreaded environment). - */ - private synchronized int decrementSessionRefs() { - if (refCnt != 0) - refCnt--; - - return refCnt; - } - - /** - * Prepares CQL statement using current Cassandra driver session. - * - * @param statement CQL statement. - * @param settings Persistence settings. - * @param tblExistenceRequired Flag indicating if table existence is required for the statement. - * @return Prepared statement. - */ - private PreparedStatement prepareStatement(String statement, KeyValuePersistenceSettings settings, - boolean tblExistenceRequired) { - - int attempt = 0; - Throwable error = null; - String errorMsg = "Failed to prepare Cassandra CQL statement: " + statement; - - RandomSleeper sleeper = newSleeper(); - - incrementSessionRefs(); - - try { - synchronized (sesStatements) { - if (sesStatements.containsKey(statement)) - return sesStatements.get(statement); - } - - while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - try { - PreparedStatement prepStatement = session().prepare(statement); - - synchronized (sesStatements) { - sesStatements.put(statement, prepStatement); - } - - return prepStatement; - } - catch (Throwable e) { - if (CassandraHelper.isTableAbsenceError(e)) { - if (!tblExistenceRequired) - return null; - - handleTableAbsenceError(settings); - } - else if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, errorMsg); - else - throw new IgniteException(errorMsg, e); - - error = e; - } - - sleeper.sleep(); - - attempt++; - } - } - finally { - decrementSessionRefs(); - } - - throw new IgniteException(errorMsg, error); - } - - /** - * Creates Cassandra keyspace. - * - * @param settings Persistence settings. - */ - private void createKeyspace(KeyValuePersistenceSettings settings) { - int attempt = 0; - Throwable error = null; - String errorMsg = "Failed to create Cassandra keyspace '" + settings.getKeyspace() + "'"; - - while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - try { - log.info("-----------------------------------------------------------------------"); - log.info("Creating Cassandra keyspace '" + settings.getKeyspace() + "'"); - log.info("-----------------------------------------------------------------------\n\n" + - settings.getKeyspaceDDLStatement() + "\n"); - log.info("-----------------------------------------------------------------------"); - session().execute(settings.getKeyspaceDDLStatement()); - log.info("Cassandra keyspace '" + settings.getKeyspace() + "' was successfully created"); - return; - } - catch (AlreadyExistsException ignored) { - log.info("Cassandra keyspace '" + settings.getKeyspace() + "' already exist"); - return; - } - catch (Throwable e) { - if (!CassandraHelper.isHostsAvailabilityError(e)) - throw new IgniteException(errorMsg, e); - - handleHostsAvailabilityError(e, attempt, errorMsg); - - error = e; - } - - attempt++; - } - - throw new IgniteException(errorMsg, error); - } - - /** - * Creates Cassandra table. - * - * @param settings Persistence settings. - */ - private void createTable(KeyValuePersistenceSettings settings) { - int attempt = 0; - Throwable error = null; - String errorMsg = "Failed to create Cassandra table '" + settings.getTableFullName() + "'"; - - while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - try { - log.info("-----------------------------------------------------------------------"); - log.info("Creating Cassandra table '" + settings.getTableFullName() + "'"); - log.info("-----------------------------------------------------------------------\n\n" + - settings.getTableDDLStatement() + "\n"); - log.info("-----------------------------------------------------------------------"); - session().execute(settings.getTableDDLStatement()); - log.info("Cassandra table '" + settings.getTableFullName() + "' was successfully created"); - return; - } - catch (AlreadyExistsException ignored) { - log.info("Cassandra table '" + settings.getTableFullName() + "' already exist"); - return; - } - catch (Throwable e) { - if (!CassandraHelper.isHostsAvailabilityError(e) && !CassandraHelper.isKeyspaceAbsenceError(e)) - throw new IgniteException(errorMsg, e); - - if (CassandraHelper.isKeyspaceAbsenceError(e)) { - log.warning("Failed to create Cassandra table '" + settings.getTableFullName() + - "' cause appropriate keyspace doesn't exist", e); - createKeyspace(settings); - } - else if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, errorMsg); - - error = e; - } - - attempt++; - } - - throw new IgniteException(errorMsg, error); - } - - /** - * Creates Cassandra table indexes. - * - * @param settings Persistence settings. - */ - private void createTableIndexes(KeyValuePersistenceSettings settings) { - if (settings.getIndexDDLStatements() == null || settings.getIndexDDLStatements().isEmpty()) - return; - - int attempt = 0; - Throwable error = null; - String errorMsg = "Failed to create indexes for Cassandra table " + settings.getTableFullName(); - - while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - try { - log.info("Creating indexes for Cassandra table '" + settings.getTableFullName() + "'"); - - for (String statement : settings.getIndexDDLStatements()) { - try { - session().execute(statement); - } - catch (AlreadyExistsException ignored) { - } - catch (Throwable e) { - if (!(e instanceof InvalidQueryException) || !e.getMessage().equals("Index already exists")) - throw new IgniteException(errorMsg, e); - } - } - - log.info("Indexes for Cassandra table '" + settings.getTableFullName() + "' were successfully created"); - - return; - } - catch (Throwable e) { - if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, errorMsg); - else if (CassandraHelper.isTableAbsenceError(e)) - createTable(settings); - else - throw new IgniteException(errorMsg, e); - - error = e; - } - - attempt++; - } - - throw new IgniteException(errorMsg, error); - } - - /** - * Tunes CQL statement execution options (consistency level, fetch option and etc.). - * - * @param statement Statement. - * @return Modified statement. - */ - private Statement tuneStatementExecutionOptions(Statement statement) { - String qry = ""; - - if (statement instanceof BoundStatement) - qry = ((BoundStatement)statement).preparedStatement().getQueryString().trim().toLowerCase(); - else if (statement instanceof PreparedStatement) - qry = ((PreparedStatement)statement).getQueryString().trim().toLowerCase(); - - boolean readStatement = qry.startsWith("select"); - boolean writeStatement = statement instanceof Batch || statement instanceof BatchStatement || - qry.startsWith("insert") || qry.startsWith("delete") || qry.startsWith("update"); - - if (readStatement && readConsistency != null) - statement.setConsistencyLevel(readConsistency); - - if (writeStatement && writeConsistency != null) - statement.setConsistencyLevel(writeConsistency); - - if (fetchSize != null) - statement.setFetchSize(fetchSize); - - return statement; - } - - /** - * Handles situation when Cassandra table doesn't exist. - * - * @param settings Persistence settings. - */ - private void handleTableAbsenceError(KeyValuePersistenceSettings settings) { - int hndNum = tblAbsenceHandlersCnt.incrementAndGet(); - - try { - synchronized (tblAbsenceHandlersCnt) { - // Oooops... I am not the first thread who tried to handle table absence problem. - if (hndNum != 0) { - log.warning("Table " + settings.getTableFullName() + " absence problem detected. " + - "Another thread already fixed it."); - return; - } - - log.warning("Table " + settings.getTableFullName() + " absence problem detected. " + - "Trying to create table."); - - IgniteException error = new IgniteException("Failed to create Cassandra table " + settings.getTableFullName()); - - int attempt = 0; - - while (error != null && attempt < CQL_EXECUTION_ATTEMPTS_COUNT) { - error = null; - - try { - createKeyspace(settings); - createTable(settings); - createTableIndexes(settings); - } - catch (Throwable e) { - if (CassandraHelper.isHostsAvailabilityError(e)) - handleHostsAvailabilityError(e, attempt, null); - else - throw new IgniteException("Failed to create Cassandra table " + settings.getTableFullName(), e); - - error = (e instanceof IgniteException) ? (IgniteException)e : new IgniteException(e); - } - - attempt++; - } - - if (error != null) - throw error; - } - } - finally { - if (hndNum == 0) - tblAbsenceHandlersCnt.set(-1); - } - } - - /** - * Handles situation when prepared statement execution failed cause session to the cluster was released. - * - */ - private void handlePreparedStatementClusterError(Throwable e) { - int hndNum = prepStatementHandlersCnt.incrementAndGet(); - - try { - synchronized (prepStatementHandlersCnt) { - // Oooops... I am not the first thread who tried to handle prepared statement problem. - if (hndNum != 0) { - log.warning("Prepared statement cluster error detected, another thread already fixed the problem", e); - return; - } - - log.warning("Prepared statement cluster error detected, refreshing Cassandra session", e); - - refresh(); - - log.warning("Cassandra session refreshed"); - } - } - finally { - if (hndNum == 0) - prepStatementHandlersCnt.set(-1); - } - } - - /** - * Handles situation when Cassandra host which is responsible for CQL query execution became unavailable. - * - * @param e Exception to handle. - * @param attempt Number of attempts. - * @param msg Error message. - * @return {@code true} if host unavailability was successfully handled. - */ - private boolean handleHostsAvailabilityError(Throwable e, int attempt, String msg) { - if (attempt >= CQL_EXECUTION_ATTEMPTS_COUNT) { - log.error("Host availability problem detected. " + - "Number of CQL execution attempts reached maximum " + CQL_EXECUTION_ATTEMPTS_COUNT + - ", exception will be thrown to upper execution layer.", e); - throw msg == null ? new IgniteException(e) : new IgniteException(msg, e); - } - - if (attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 4 || - attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 || - attempt == CQL_EXECUTION_ATTEMPTS_COUNT / 2 + CQL_EXECUTION_ATTEMPTS_COUNT / 4 || - attempt == CQL_EXECUTION_ATTEMPTS_COUNT - 1) { - log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " + - "refreshing Cassandra session", e); - - refresh(); - - log.warning("Cassandra session refreshed"); - - return true; - } - - log.warning("Host availability problem detected, CQL execution attempt " + (attempt + 1) + ", " + - "sleeping extra " + CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT + " milliseconds", e); - - try { - Thread.sleep(CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT); - } - catch (InterruptedException ignored) { - } - - log.warning("Sleep completed"); - - return false; - } - - /** - * @return New random sleeper. - */ - private RandomSleeper newSleeper() { - return new RandomSleeper(CQL_EXECUTION_ATTEMPT_MIN_TIMEOUT, - CQL_EXECUTION_ATTEMPT_MAX_TIMEOUT, - CQL_ATTEMPTS_TIMEOUT_INCREMENT, log); - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java deleted file mode 100644 index 867f58d..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/ExecutionAssistant.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.session; - -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.Row; -import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; - -/** - * Provides information for single operations (load, delete, write) of Ignite cache - * backed by {@link org.apache.ignite.cache.store.cassandra.CassandraCacheStore}. - * - * @param <R> type of the result returned from operation. - */ -public interface ExecutionAssistant<R> { - /** - * Indicates if Cassandra table existence is required for operation. - * - * @return true if table existence required. - */ - public boolean tableExistenceRequired(); - - /** - * Returns CQL statement to be used for operation. - * - * @return CQL statement. - */ - public String getStatement(); - - /** - * Binds prepared statement. - * - * @param statement prepared statement. - * - * @return bound statement. - */ - public BoundStatement bindStatement(PreparedStatement statement); - - /** - * Persistence settings to use for operation. - * - * @return persistence settings. - */ - public KeyValuePersistenceSettings getPersistenceSettings(); - - /** - * Returns operation name. - * - * @return operation name. - */ - public String operationName(); - - /** - * Processes Cassandra database table row returned by specified CQL statement. - * - * @param row Cassandra database table row. - * - * @return result of the operation. - */ - public R process(Row row); -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java deleted file mode 100644 index 17494dd..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/GenericBatchExecutionAssistant.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.session; - -import com.datastax.driver.core.Row; -import java.util.HashSet; -import java.util.Set; - -/** - * Implementation of the {@link org.apache.ignite.cache.store.cassandra.session.BatchExecutionAssistant}. - * - * @param <R> Type of the result returned from batch operation - * @param <V> Type of the value used in batch operation - */ -public abstract class GenericBatchExecutionAssistant<R, V> implements BatchExecutionAssistant<R, V> { - /** Identifiers of already processed objects. */ - private Set<Integer> processed = new HashSet<>(); - - /** {@inheritDoc} */ - @Override public void process(Row row, int seqNum) { - if (processed.contains(seqNum)) - return; - - process(row); - - processed.add(seqNum); - } - - /** {@inheritDoc} */ - @Override public boolean alreadyProcessed(int seqNum) { - return processed.contains(seqNum); - } - - /** {@inheritDoc} */ - @Override public int processedCount() { - return processed.size(); - } - - /** {@inheritDoc} */ - @Override public R processedData() { - return null; - } - - /** {@inheritDoc} */ - @Override public boolean tableExistenceRequired() { - return false; - } - - /** - * Processes particular row inside batch operation. - * - * @param row Row to process. - */ - protected void process(Row row) { - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java deleted file mode 100644 index d3ace7d..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/LoadCacheCustomQueryWorker.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.session; - -import com.datastax.driver.core.Row; -import com.datastax.driver.core.SimpleStatement; -import com.datastax.driver.core.Statement; -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController; -import org.apache.ignite.lang.IgniteBiInClosure; - -/** - * Worker for load cache using custom user query. - * - * @param <K> Key type. - * @param <V> Value type. - */ -public class LoadCacheCustomQueryWorker<K, V> implements Callable<Void> { - /** Cassandra session to execute CQL query */ - private final CassandraSession ses; - - /** User query. */ - private final String qry; - - /** Persistence controller */ - private final PersistenceController ctrl; - - /** Logger */ - private final IgniteLogger log; - - /** Closure for loaded values. */ - private final IgniteBiInClosure<K, V> clo; - - /** - * @param clo Closure for loaded values. - */ - public LoadCacheCustomQueryWorker(CassandraSession ses, String qry, PersistenceController ctrl, - IgniteLogger log, IgniteBiInClosure<K, V> clo) { - this.ses = ses; - this.qry = qry.trim().endsWith(";") ? qry : qry + ";"; - this.ctrl = ctrl; - this.log = log; - this.clo = clo; - } - - /** {@inheritDoc} */ - @Override public Void call() throws Exception { - ses.execute(new BatchLoaderAssistant() { - /** {@inheritDoc} */ - @Override public String operationName() { - return "loadCache"; - } - - /** {@inheritDoc} */ - @Override public Statement getStatement() { - return new SimpleStatement(qry); - } - - /** {@inheritDoc} */ - @Override public void process(Row row) { - K key; - V val; - - try { - key = (K)ctrl.buildKeyObject(row); - } - catch (Throwable e) { - log.error("Failed to build Ignite key object from provided Cassandra row", e); - - throw new IgniteException("Failed to build Ignite key object from provided Cassandra row", e); - } - - try { - val = (V)ctrl.buildValueObject(row); - } - catch (Throwable e) { - log.error("Failed to build Ignite value object from provided Cassandra row", e); - - throw new IgniteException("Failed to build Ignite value object from provided Cassandra row", e); - } - - clo.apply(key, val); - } - }); - - return null; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java deleted file mode 100644 index ecbbe78..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains classes responsible for handling sessions and communication with Cassandra - */ -package org.apache.ignite.cache.store.cassandra.session; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java deleted file mode 100644 index fc4a907..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.session.pool; - -import com.datastax.driver.core.Session; -import java.lang.Thread.State; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl; - -/** - * Cassandra driver sessions pool. - */ -public class SessionPool { - /** - * Monitors session pool and closes unused session. - */ - private static class SessionMonitor extends Thread { - /** {@inheritDoc} */ - @Override public void run() { - try { - while (true) { - try { - Thread.sleep(SLEEP_TIMEOUT); - } - catch (InterruptedException ignored) { - return; - } - - List<Map.Entry<CassandraSessionImpl, SessionWrapper>> expiredSessions = new LinkedList<>(); - - int sessionsCnt; - - synchronized (sessions) { - sessionsCnt = sessions.size(); - - for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : sessions.entrySet()) { - if (entry.getValue().expired()) - expiredSessions.add(entry); - } - - for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions) - sessions.remove(entry.getKey()); - } - - for (Map.Entry<CassandraSessionImpl, SessionWrapper> entry : expiredSessions) - entry.getValue().release(); - - // all sessions in the pool expired, thus we don't need additional thread to manage sessions in the pool - if (sessionsCnt == expiredSessions.size()) - return; - } - } - finally { - release(); - } - } - } - - /** Sessions monitor sleep timeout. */ - private static final long SLEEP_TIMEOUT = 60000; // 1 minute. - - /** Sessions which were returned to pool. */ - private static final Map<CassandraSessionImpl, SessionWrapper> sessions = new HashMap<>(); - - /** Singleton instance. */ - private static SessionMonitor monitorSingleton; - - static { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override public void run() { - release(); - } - }); - } - - /** - * Returns Cassandra driver session to sessions pool. - * - * @param cassandraSes Session wrapper. - * @param driverSes Driver session. - */ - public static void put(CassandraSessionImpl cassandraSes, Session driverSes) { - if (cassandraSes == null || driverSes == null) - return; - - SessionWrapper old; - - synchronized (sessions) { - old = sessions.put(cassandraSes, new SessionWrapper(driverSes)); - - if (monitorSingleton == null || State.TERMINATED.equals(monitorSingleton.getState())) { - monitorSingleton = new SessionMonitor(); - monitorSingleton.setDaemon(true); - monitorSingleton.setName("Cassandra-sessions-pool"); - monitorSingleton.start(); - } - } - - if (old != null) - old.release(); - } - - /** - * Extracts Cassandra driver session from pool. - * - * @param cassandraSes Session wrapper. - * @return Cassandra driver session. - */ - public static Session get(CassandraSessionImpl cassandraSes) { - if (cassandraSes == null) - return null; - - SessionWrapper wrapper; - - synchronized (sessions) { - wrapper = sessions.remove(cassandraSes); - } - - return wrapper == null ? null : wrapper.driverSession(); - } - - /** - * Releases all session from pool and closes all their connections to Cassandra database. - */ - public static void release() { - Collection<SessionWrapper> wrappers; - - synchronized (sessions) { - try { - if (sessions.size() == 0) - return; - - wrappers = new LinkedList<>(); - - for (SessionWrapper wrapper : sessions.values()) - wrappers.add(wrapper); - - sessions.clear(); - } - finally { - if (!(Thread.currentThread() instanceof SessionMonitor) && monitorSingleton != null) { - try { - monitorSingleton.interrupt(); - } - catch (Throwable ignored) { - } - } - } - } - - for (SessionWrapper wrapper : wrappers) - wrapper.release(); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java deleted file mode 100644 index 7c5722b..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionWrapper.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.session.pool; - -import com.datastax.driver.core.Session; -import org.apache.ignite.cache.store.cassandra.common.CassandraHelper; - -/** - * Wrapper for Cassandra driver session, responsible for monitoring session expiration and its closing. - */ -public class SessionWrapper { - /** Expiration timeout for Cassandra driver session. */ - public static final long DFLT_EXPIRATION_TIMEOUT = 300000; // 5 minutes. - - /** Cassandra driver session. */ - private Session ses; - - /** Wrapper creation time. */ - private long time; - - /** - * Creates instance of Cassandra driver session wrapper. - * - * @param ses Cassandra driver session. - */ - public SessionWrapper(Session ses) { - this.ses = ses; - this.time = System.currentTimeMillis(); - } - - /** - * Checks if Cassandra driver session expired. - * - * @return true if session expired. - */ - public boolean expired() { - return System.currentTimeMillis() - time > DFLT_EXPIRATION_TIMEOUT; - } - - /** - * Returns wrapped Cassandra driver session. - * - * @return Cassandra driver session. - */ - public Session driverSession() { - return ses; - } - - /** - * Closes wrapped Cassandra driver session - */ - public void release() { - CassandraHelper.closeSession(ses); - ses = null; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java deleted file mode 100644 index 21c292f..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains session pool implenetation for Cassandra sessions - */ -package org.apache.ignite.cache.store.cassandra.session.pool; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java deleted file mode 100644 index 4f40478..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/DDLGenerator.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.store.cassandra.utils; - -import java.io.File; -import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings; - -/** - * Generates Cassandra DDL statements from persistence descriptor xml file. - */ -public class DDLGenerator { - /** - * DDLGenerator entry point. - * - * @param args Arguments for DDLGenerator. - */ - public static void main(String[] args) { - if (args == null || args.length == 0) - return; - - for (String arg : args) { - File file = new File(arg); - if (!file.isFile()) { - System.out.println("-------------------------------------------------------------"); - System.out.println("Incorrect file specified: " + arg); - System.out.println("-------------------------------------------------------------"); - continue; - } - - try { - KeyValuePersistenceSettings settings = new KeyValuePersistenceSettings(file); - System.out.println("-------------------------------------------------------------"); - System.out.println("DDL for keyspace/table from file: " + arg); - System.out.println("-------------------------------------------------------------"); - System.out.println(); - System.out.println(settings.getKeyspaceDDLStatement()); - System.out.println(); - System.out.println(settings.getTableDDLStatement()); - System.out.println(); - } - catch (Throwable e) { - System.out.println("-------------------------------------------------------------"); - System.out.println("Incorrect file specified: " + arg); - System.out.println("-------------------------------------------------------------"); - e.printStackTrace(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java deleted file mode 100644 index 2460dfe..0000000 --- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/utils/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains utility classes - */ -package org.apache.ignite.cache.store.cassandra.utils; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/test/bootstrap/aws/README.txt ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/test/bootstrap/aws/README.txt b/modules/cassandra/src/test/bootstrap/aws/README.txt deleted file mode 100644 index 4457d81..0000000 --- a/modules/cassandra/src/test/bootstrap/aws/README.txt +++ /dev/null @@ -1,13 +0,0 @@ -Shell scripts to spin up Ignite, Cassandra and Load tests clusters in AWS. - -1) cassandra - bootstrap scripts for Cassandra cluster nodes -2) ganglia - bootstrap scripts for Ganglia master and agents -3) ignite - bootstrap scripts for Ignite cluster nodes -4) tests - bootstrap scripts for Load Tests cluster nodes -5) common.sh - definitions for common functions -6) env.sh - definitions for common variables -7) log-collector.sh - log collector daemon script, to collect logs and upload them to S3 - -For more details please look at the documentation: - - https://apacheignite.readme.io/docs/aws-infrastructure-deployment \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh b/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh deleted file mode 100644 index 017b1b1..0000000 --- a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-bootstrap.sh +++ /dev/null @@ -1,336 +0,0 @@ -#!/bin/sh - -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# ----------------------------------------------------------------------------------------------- -# Bootstrap script to spin up Cassandra cluster -# ----------------------------------------------------------------------------------------------- - -# URL to download AWS CLI tools -AWS_CLI_DOWNLOAD_URL=https://s3.amazonaws.com/aws-cli/awscli-bundle.zip - -# URL to download JDK -JDK_DOWNLOAD_URL=http://download.oracle.com/otn-pub/java/jdk/8u77-b03/jdk-8u77-linux-x64.tar.gz - -# URL to download Ignite-Cassandra tests package - you should previously package and upload it to this place -TESTS_PACKAGE_DONLOAD_URL=s3://<bucket>/<folder>/ignite-cassandra-tests-<version>.zip - -# Terminates script execution and upload logs to S3 -terminate() -{ - SUCCESS_URL=$S3_CASSANDRA_BOOTSTRAP_SUCCESS - FAILURE_URL=$S3_CASSANDRA_BOOTSTRAP_FAILURE - - if [ -n "$SUCCESS_URL" ] && [[ "$SUCCESS_URL" != */ ]]; then - SUCCESS_URL=${SUCCESS_URL}/ - fi - - if [ -n "$FAILURE_URL" ] && [[ "$FAILURE_URL" != */ ]]; then - FAILURE_URL=${FAILURE_URL}/ - fi - - host_name=$(hostname -f | tr '[:upper:]' '[:lower:]') - msg=$host_name - - if [ -n "$1" ]; then - echo "[ERROR] $1" - echo "[ERROR]-----------------------------------------------------" - echo "[ERROR] Cassandra node bootstrap failed" - echo "[ERROR]-----------------------------------------------------" - msg=$1 - - if [ -z "$FAILURE_URL" ]; then - exit 1 - fi - - reportFolder=${FAILURE_URL}${host_name} - reportFile=$reportFolder/__error__ - else - echo "[INFO]-----------------------------------------------------" - echo "[INFO] Cassandra node bootstrap successfully completed" - echo "[INFO]-----------------------------------------------------" - - if [ -z "$SUCCESS_URL" ]; then - exit 0 - fi - - reportFolder=${SUCCESS_URL}${host_name} - reportFile=$reportFolder/__success__ - fi - - echo $msg > /opt/bootstrap-result - - aws s3 rm --recursive $reportFolder - if [ $? -ne 0 ]; then - echo "[ERROR] Failed to drop report folder: $reportFolder" - fi - - aws s3 cp --sse AES256 /opt/bootstrap-result $reportFile - if [ $? -ne 0 ]; then - echo "[ERROR] Failed to report bootstrap result to: $reportFile" - fi - - rm -f /opt/bootstrap-result - - if [ -n "$1" ]; then - exit 1 - fi - - exit 0 -} - -# Downloads specified package -downloadPackage() -{ - echo "[INFO] Downloading $3 package from $1 into $2" - - for i in 0 9; - do - if [[ "$1" == s3* ]]; then - aws s3 cp $1 $2 - code=$? - else - curl "$1" -o "$2" - code=$? - fi - - if [ $code -eq 0 ]; then - echo "[INFO] $3 package successfully downloaded from $1 into $2" - return 0 - fi - - echo "[WARN] Failed to download $3 package from $i attempt, sleeping extra 5sec" - sleep 5s - done - - terminate "All 10 attempts to download $3 package from $1 are failed" -} - -# Downloads and setup JDK -setupJava() -{ - rm -Rf /opt/java /opt/jdk.tar.gz - - echo "[INFO] Downloading 'jdk'" - wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "$JDK_DOWNLOAD_URL" -O /opt/jdk.tar.gz - if [ $? -ne 0 ]; then - terminate "Failed to download 'jdk'" - fi - - echo "[INFO] Untaring 'jdk'" - tar -xvzf /opt/jdk.tar.gz -C /opt - if [ $? -ne 0 ]; then - terminate "Failed to untar 'jdk'" - fi - - rm -Rf /opt/jdk.tar.gz - - unzipDir=$(ls /opt | grep "jdk") - if [ "$unzipDir" != "java" ]; then - mv /opt/$unzipDir /opt/java - fi -} - -# Downloads and setup AWS CLI -setupAWSCLI() -{ - echo "[INFO] Installing 'awscli'" - pip install --upgrade awscli - if [ $? -eq 0 ]; then - return 0 - fi - - echo "[ERROR] Failed to install 'awscli' using pip" - echo "[INFO] Trying to install awscli using zip archive" - echo "[INFO] Downloading awscli zip" - - downloadPackage "$AWS_CLI_DOWNLOAD_URL" "/opt/awscli-bundle.zip" "awscli" - - echo "[INFO] Unzipping awscli zip" - unzip /opt/awscli-bundle.zip -d /opt - if [ $? -ne 0 ]; then - terminate "Failed to unzip awscli zip" - fi - - rm -Rf /opt/awscli-bundle.zip - - echo "[INFO] Installing awscli" - /opt/awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws - if [ $? -ne 0 ]; then - terminate "Failed to install awscli" - fi - - echo "[INFO] Successfully installed awscli from zip archive" -} - -# Setup all the pre-requisites (packages, settings and etc.) -setupPreRequisites() -{ - echo "[INFO] Installing 'wget' package" - yum -y install wget - if [ $? -ne 0 ]; then - terminate "Failed to install 'wget' package" - fi - - echo "[INFO] Installing 'net-tools' package" - yum -y install net-tools - if [ $? -ne 0 ]; then - terminate "Failed to install 'net-tools' package" - fi - - echo "[INFO] Installing 'python' package" - yum -y install python - if [ $? -ne 0 ]; then - terminate "Failed to install 'python' package" - fi - - echo "[INFO] Installing 'unzip' package" - yum -y install unzip - if [ $? -ne 0 ]; then - terminate "Failed to install 'unzip' package" - fi - - downloadPackage "https://bootstrap.pypa.io/get-pip.py" "/opt/get-pip.py" "get-pip.py" - - echo "[INFO] Installing 'pip'" - python /opt/get-pip.py - if [ $? -ne 0 ]; then - terminate "Failed to install 'pip'" - fi -} - -# Downloads and setup tests package -setupTestsPackage() -{ - downloadPackage "$TESTS_PACKAGE_DONLOAD_URL" "/opt/ignite-cassandra-tests.zip" "Tests" - - rm -Rf /opt/ignite-cassandra-tests - - unzip /opt/ignite-cassandra-tests.zip -d /opt - if [ $? -ne 0 ]; then - terminate "Failed to unzip tests package" - fi - - rm -f /opt/ignite-cassandra-tests.zip - - unzipDir=$(ls /opt | grep "ignite-cassandra") - if [ "$unzipDir" != "ignite-cassandra-tests" ]; then - mv /opt/$unzipDir /opt/ignite-cassandra-tests - fi - - find /opt/ignite-cassandra-tests -type f -name "*.sh" -exec chmod ug+x {} \; - - . /opt/ignite-cassandra-tests/bootstrap/aws/common.sh "cassandra" - - setupNTP - - echo "[INFO] Starting logs collector daemon" - - HOST_NAME=$(hostname -f | tr '[:upper:]' '[:lower:]') - /opt/ignite-cassandra-tests/bootstrap/aws/logs-collector.sh "$S3_LOGS_TRIGGER" "$S3_CASSANDRA_LOGS/$HOST_NAME" "/opt/cassandra/logs" "/opt/cassandra/cassandra-start.log" > /opt/logs-collector.log & - - echo "[INFO] Logs collector daemon started: $!" - - echo "----------------------------------------------------------------------------------------" - printInstanceInfo - echo "----------------------------------------------------------------------------------------" - tagInstance - bootstrapGangliaAgent "cassandra" 8641 -} - -# Downloads Cassandra package -downloadCassandra() -{ - downloadPackage "$CASSANDRA_DOWNLOAD_URL" "/opt/apache-cassandra.tar.gz" "Cassandra" - - rm -Rf /opt/cassandra - - echo "[INFO] Untaring Cassandra package" - tar -xvzf /opt/apache-cassandra.tar.gz -C /opt - if [ $? -ne 0 ]; then - terminate "Failed to untar Cassandra package" - fi - - rm -f /opt/apache-cassandra.tar.gz - - unzipDir=$(ls /opt | grep "cassandra" | grep "apache") - if [ "$unzipDir" != "cassandra" ]; then - mv /opt/$unzipDir /opt/cassandra - fi -} - -# Setups Cassandra -setupCassandra() -{ - echo "[INFO] Creating 'cassandra' group" - exists=$(cat /etc/group | grep cassandra) - if [ -z "$exists" ]; then - groupadd cassandra - if [ $? -ne 0 ]; then - terminate "Failed to create 'cassandra' group" - fi - fi - - echo "[INFO] Creating 'cassandra' user" - exists=$(cat /etc/passwd | grep cassandra) - if [ -z "$exists" ]; then - useradd -g cassandra cassandra - if [ $? -ne 0 ]; then - terminate "Failed to create 'cassandra' user" - fi - fi - - rm -f /opt/cassandra/conf/cassandra-env.sh /opt/cassandra/conf/cassandra-template.yaml - - cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-env.sh /opt/cassandra/conf - cp /opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-template.yaml /opt/cassandra/conf - - chown -R cassandra:cassandra /opt/cassandra /opt/ignite-cassandra-tests - - createCassandraStorageLayout - - cat /opt/cassandra/conf/cassandra-template.yaml | sed -r "s/\\\$\{CASSANDRA_DATA_DIR\}/$CASSANDRA_DATA_DIR/g" > /opt/cassandra/conf/cassandra-template-1.yaml - cat /opt/cassandra/conf/cassandra-template-1.yaml | sed -r "s/\\\$\{CASSANDRA_COMMITLOG_DIR\}/$CASSANDRA_COMMITLOG_DIR/g" > /opt/cassandra/conf/cassandra-template-2.yaml - cat /opt/cassandra/conf/cassandra-template-2.yaml | sed -r "s/\\\$\{CASSANDRA_CACHES_DIR\}/$CASSANDRA_CACHES_DIR/g" > /opt/cassandra/conf/cassandra-template-3.yaml - - rm -f /opt/cassandra/conf/cassandra-template.yaml /opt/cassandra/conf/cassandra-template-1.yaml /opt/cassandra/conf/cassandra-template-2.yaml - mv /opt/cassandra/conf/cassandra-template-3.yaml /opt/cassandra/conf/cassandra-template.yaml - - echo "export JAVA_HOME=/opt/java" >> $1 - echo "export CASSANDRA_HOME=/opt/cassandra" >> $1 - echo "export PATH=\$JAVA_HOME/bin:\$CASSANDRA_HOME/bin:\$PATH" >> $1 -} - -################################################################################################################### - -echo "[INFO]-----------------------------------------------------------------" -echo "[INFO] Bootstrapping Cassandra node" -echo "[INFO]-----------------------------------------------------------------" - -setupPreRequisites -setupJava -setupAWSCLI -setupTestsPackage -downloadCassandra -setupCassandra "/root/.bash_profile" - -cmd="/opt/ignite-cassandra-tests/bootstrap/aws/cassandra/cassandra-start.sh" - -#sudo -u cassandra -g cassandra sh -c "$cmd | tee /opt/cassandra/cassandra-start.log" - -$cmd | tee /opt/cassandra/cassandra-start.log \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh ---------------------------------------------------------------------- diff --git a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh b/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh deleted file mode 100644 index ba76401..0000000 --- a/modules/cassandra/src/test/bootstrap/aws/cassandra/cassandra-env.sh +++ /dev/null @@ -1,287 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# ----------------------------------------------------------------------------------------------- -# Environment setup script from Cassandra distribution -# ----------------------------------------------------------------------------------------------- - -calculate_heap_sizes() -{ - case "`uname`" in - Linux) - system_memory_in_mb=`free -m | awk '/:/ {print $2;exit}'` - system_cpu_cores=`egrep -c 'processor([[:space:]]+):.*' /proc/cpuinfo` - ;; - FreeBSD) - system_memory_in_bytes=`sysctl hw.physmem | awk '{print $2}'` - system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024` - system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'` - ;; - SunOS) - system_memory_in_mb=`prtconf | awk '/Memory size:/ {print $3}'` - system_cpu_cores=`psrinfo | wc -l` - ;; - Darwin) - system_memory_in_bytes=`sysctl hw.memsize | awk '{print $2}'` - system_memory_in_mb=`expr $system_memory_in_bytes / 1024 / 1024` - system_cpu_cores=`sysctl hw.ncpu | awk '{print $2}'` - ;; - *) - # assume reasonable defaults for e.g. a modern desktop or - # cheap server - system_memory_in_mb="2048" - system_cpu_cores="2" - ;; - esac - - # some systems like the raspberry pi don't report cores, use at least 1 - if [ "$system_cpu_cores" -lt "1" ] - then - system_cpu_cores="1" - fi - - # set max heap size based on the following - # max(min(1/2 ram, 1024MB), min(1/4 ram, 8GB)) - # calculate 1/2 ram and cap to 1024MB - # calculate 1/4 ram and cap to 8192MB - # pick the max - half_system_memory_in_mb=`expr $system_memory_in_mb / 2` - quarter_system_memory_in_mb=`expr $half_system_memory_in_mb / 2` - if [ "$half_system_memory_in_mb" -gt "1024" ] - then - half_system_memory_in_mb="1024" - fi - if [ "$quarter_system_memory_in_mb" -gt "8192" ] - then - quarter_system_memory_in_mb="8192" - fi - if [ "$half_system_memory_in_mb" -gt "$quarter_system_memory_in_mb" ] - then - max_heap_size_in_mb="$half_system_memory_in_mb" - else - max_heap_size_in_mb="$quarter_system_memory_in_mb" - fi - MAX_HEAP_SIZE="${max_heap_size_in_mb}M" - - # Young gen: min(max_sensible_per_modern_cpu_core * num_cores, 1/4 * heap size) - max_sensible_yg_per_core_in_mb="100" - max_sensible_yg_in_mb=`expr $max_sensible_yg_per_core_in_mb "*" $system_cpu_cores` - - desired_yg_in_mb=`expr $max_heap_size_in_mb / 4` - - if [ "$desired_yg_in_mb" -gt "$max_sensible_yg_in_mb" ] - then - HEAP_NEWSIZE="${max_sensible_yg_in_mb}M" - else - HEAP_NEWSIZE="${desired_yg_in_mb}M" - fi -} - -# Determine the sort of JVM we'll be running on. -java_ver_output=`"${JAVA:-java}" -version 2>&1` -jvmver=`echo "$java_ver_output" | grep '[openjdk|java] version' | awk -F'"' 'NR==1 {print $2}'` -JVM_VERSION=${jvmver%_*} -JVM_PATCH_VERSION=${jvmver#*_} - -if [ "$JVM_VERSION" \< "1.8" ] ; then - echo "Cassandra 3.0 and later require Java 8u40 or later." - exit 1; -fi - -if [ "$JVM_VERSION" \< "1.8" ] && [ "$JVM_PATCH_VERSION" \< "40" ] ; then - echo "Cassandra 3.0 and later require Java 8u40 or later." - exit 1; -fi - -jvm=`echo "$java_ver_output" | grep -A 1 'java version' | awk 'NR==2 {print $1}'` -case "$jvm" in - OpenJDK) - JVM_VENDOR=OpenJDK - # this will be "64-Bit" or "32-Bit" - JVM_ARCH=`echo "$java_ver_output" | awk 'NR==3 {print $2}'` - ;; - "Java(TM)") - JVM_VENDOR=Oracle - # this will be "64-Bit" or "32-Bit" - JVM_ARCH=`echo "$java_ver_output" | awk 'NR==3 {print $3}'` - ;; - *) - # Help fill in other JVM values - JVM_VENDOR=other - JVM_ARCH=unknown - ;; -esac - -# Override these to set the amount of memory to allocate to the JVM at -# start-up. For production use you may wish to adjust this for your -# environment. MAX_HEAP_SIZE is the total amount of memory dedicated -# to the Java heap. HEAP_NEWSIZE refers to the size of the young -# generation. Both MAX_HEAP_SIZE and HEAP_NEWSIZE should be either set -# or not (if you set one, set the other). -# -# The main trade-off for the young generation is that the larger it -# is, the longer GC pause times will be. The shorter it is, the more -# expensive GC will be (usually). -# -# The example HEAP_NEWSIZE assumes a modern 8-core+ machine for decent pause -# times. If in doubt, and if you do not particularly want to tweak, go with -# 100 MB per physical CPU core. - -#MAX_HEAP_SIZE="4G" -#HEAP_NEWSIZE="800M" - -# Set this to control the amount of arenas per-thread in glibc -#export MALLOC_ARENA_MAX=4 - -# only calculate the size if it's not set manually -if [ "x$MAX_HEAP_SIZE" = "x" ] && [ "x$HEAP_NEWSIZE" = "x" ]; then - calculate_heap_sizes -else - if [ "x$MAX_HEAP_SIZE" = "x" ] || [ "x$HEAP_NEWSIZE" = "x" ]; then - echo "please set or unset MAX_HEAP_SIZE and HEAP_NEWSIZE in pairs (see cassandra-env.sh)" - exit 1 - fi -fi - -if [ "x$MALLOC_ARENA_MAX" = "x" ] ; then - export MALLOC_ARENA_MAX=4 -fi - -#GC log path has to be defined here because it needs to access CASSANDRA_HOME -JVM_OPTS="$JVM_OPTS -Xloggc:${CASSANDRA_HOME}/logs/gc.log" - -# Here we create the arguments that will get passed to the jvm when -# starting cassandra. - -# Read user-defined JVM options from jvm.options file -JVM_OPTS_FILE=$CASSANDRA_CONF/jvm.options -for opt in `grep "^-" $JVM_OPTS_FILE` -do - JVM_OPTS="$JVM_OPTS $opt" -done - -# Check what parameters were defined on jvm.options file to avoid conflicts -echo $JVM_OPTS | grep -q Xmn -DEFINED_XMN=$? -echo $JVM_OPTS | grep -q Xmx -DEFINED_XMX=$? -echo $JVM_OPTS | grep -q Xms -DEFINED_XMS=$? -echo $JVM_OPTS | grep -q UseConcMarkSweepGC -USING_CMS=$? - -# We only set -Xms and -Xmx if they were not defined on jvm.options file -# If defined, both Xmx and Xms should be defined together. -if [ $DEFINED_XMX -ne 0 ] && [ $DEFINED_XMS -ne 0 ]; then - JVM_OPTS="$JVM_OPTS -Xms${MAX_HEAP_SIZE}" - JVM_OPTS="$JVM_OPTS -Xmx${MAX_HEAP_SIZE}" -elif [ $DEFINED_XMX -ne 0 ] || [ $DEFINED_XMS -ne 0 ]; then - echo "Please set or unset -Xmx and -Xms flags in pairs on jvm.options file." - exit 1 -fi - -# We only set -Xmn flag if it was not defined in jvm.options file -# and if the CMS GC is being used -# If defined, both Xmn and Xmx should be defined together. -if [ $DEFINED_XMN -eq 0 ] && [ $DEFINED_XMX -ne 0 ]; then - echo "Please set or unset -Xmx and -Xmn flags in pairs on jvm.options file." - exit 1 -elif [ $DEFINED_XMN -ne 0 ] && [ $USING_CMS -eq 0 ]; then - JVM_OPTS="$JVM_OPTS -Xmn${HEAP_NEWSIZE}" -fi - -if [ "$JVM_ARCH" = "64-Bit" ] && [ $USING_CMS -eq 0 ]; then - JVM_OPTS="$JVM_OPTS -XX:+UseCondCardMark" -fi - -# provides hints to the JIT compiler -JVM_OPTS="$JVM_OPTS -XX:CompileCommandFile=$CASSANDRA_CONF/hotspot_compiler" - -# add the jamm javaagent -JVM_OPTS="$JVM_OPTS -javaagent:$CASSANDRA_HOME/lib/jamm-0.3.0.jar" - -# set jvm HeapDumpPath with CASSANDRA_HEAPDUMP_DIR -if [ "x$CASSANDRA_HEAPDUMP_DIR" != "x" ]; then - JVM_OPTS="$JVM_OPTS -XX:HeapDumpPath=$CASSANDRA_HEAPDUMP_DIR/cassandra-`date +%s`-pid$$.hprof" -fi - -# jmx: metrics and administration interface -# -# add this if you're having trouble connecting: -# JVM_OPTS="$JVM_OPTS -Djava.rmi.server.hostname=<public name>" -# -# see -# https://blogs.oracle.com/jmxetc/entry/troubleshooting_connection_problems_in_jconsole -# for more on configuring JMX through firewalls, etc. (Short version: -# get it working with no firewall first.) -# -# Cassandra ships with JMX accessible *only* from localhost. -# To enable remote JMX connections, uncomment lines below -# with authentication and/or ssl enabled. See https://wiki.apache.org/cassandra/JmxSecurity -# -if [ "x$LOCAL_JMX" = "x" ]; then - LOCAL_JMX=yes -fi - -# Specifies the default port over which Cassandra will be available for -# JMX connections. -# For security reasons, you should not expose this port to the internet. Firewall it if needed. -JMX_PORT="7199" - -if [ "$LOCAL_JMX" = "yes" ]; then -# JVM_OPTS="$JVM_OPTS -Dcassandra.jmx.local.port=$JMX_PORT -XX:+DisableExplicitGC" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.local.only=false" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=false" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" - JVM_OPTS="$JVM_OPTS -XX:+UnlockCommercialFeatures" - JVM_OPTS="$JVM_OPTS -XX:+FlightRecorder" - JVM_OPTS="$JVM_OPTS -XX:FlightRecorderOptions=defaultrecording=true" -else - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.port=$JMX_PORT" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl=false" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.authenticate=true" - JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.password.file=/etc/cassandra/jmxremote.password" -# JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.keyStore=/path/to/keystore" -# JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.keyStorePassword=<keystore-password>" -# JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.trustStore=/path/to/truststore" -# JVM_OPTS="$JVM_OPTS -Djavax.net.ssl.trustStorePassword=<truststore-password>" -# JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl.need.client.auth=true" -# JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.registry.ssl=true" -# JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl.enabled.protocols=<enabled-protocols>" -# JVM_OPTS="$JVM_OPTS -Dcom.sun.management.jmxremote.ssl.enabled.cipher.suites=<enabled-cipher-suites>" -fi - -# To use mx4j, an HTML interface for JMX, add mx4j-tools.jar to the lib/ -# directory. -# See http://wiki.apache.org/cassandra/Operations#Monitoring_with_MX4J -# By default mx4j listens on 0.0.0.0:8081. Uncomment the following lines -# to control its listen address and port. -#MX4J_ADDRESS="-Dmx4jaddress=127.0.0.1" -#MX4J_PORT="-Dmx4jport=8081" - -# Cassandra uses SIGAR to capture OS metrics CASSANDRA-7838 -# for SIGAR we have to set the java.library.path -# to the location of the native libraries. -JVM_OPTS="$JVM_OPTS -Djava.library.path=$CASSANDRA_HOME/lib/sigar-bin" - -JVM_OPTS="$JVM_OPTS $MX4J_ADDRESS" -JVM_OPTS="$JVM_OPTS $MX4J_PORT" -JVM_OPTS="$JVM_OPTS $JVM_EXTRA_OPTS"