Author: jbellis Date: Tue Jan 25 19:57:03 2011 New Revision: 1063431 URL: http://svn.apache.org/viewvc?rev=1063431&view=rev Log: CLI attemptsto block for new schemato propagate patch by Pavel Yaskevich; reviewed by jbellis for CASSANDRA-2044
Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliClient.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliOptions.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliSessionState.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1063431&r1=1063430&r2=1063431&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jan 25 19:57:03 2011 @@ -38,6 +38,8 @@ * add single-line "--" comments to CLI (CASSANDRA-2032) * message serialization tests (CASSANDRA-1923) * switch from ivy to maven-ant-tasks (CASSANDRA-2017) + * CLI attempts to block for new schema to propagate (CASSANDRA-2044) + 0.7.0-final * fix offsets to ByteBuffer.get (CASSANDRA-1939) Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliClient.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliClient.java?rev=1063431&r1=1063430&r2=1063431&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliClient.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliClient.java Tue Jan 25 19:57:03 2011 @@ -669,7 +669,10 @@ public class CliClient extends CliUserHe try { - sessionState.out.println(thriftClient.system_add_keyspace(updateKsDefAttributes(statement, ksDef))); + String mySchemaVersion = thriftClient.system_add_keyspace(updateKsDefAttributes(statement, ksDef)); + sessionState.out.println(mySchemaVersion); + validateSchemaIsSettled(mySchemaVersion); + keyspacesMap.put(keyspaceName, thriftClient.describe_keyspace(keyspaceName)); } catch (InvalidRequestException e) @@ -697,7 +700,9 @@ public class CliClient extends CliUserHe try { - sessionState.out.println(thriftClient.system_add_column_family(updateCfDefAttributes(statement, cfDef))); + String mySchemaVersion = thriftClient.system_add_column_family(updateCfDefAttributes(statement, cfDef)); + sessionState.out.println(mySchemaVersion); + validateSchemaIsSettled(mySchemaVersion); keyspacesMap.put(keySpace, thriftClient.describe_keyspace(keySpace)); } catch (InvalidRequestException e) @@ -726,7 +731,9 @@ public class CliClient extends CliUserHe KsDef currentKsDef = getKSMetaData(keyspaceName); KsDef updatedKsDef = updateKsDefAttributes(statement, currentKsDef); - sessionState.out.println(thriftClient.system_update_keyspace(updatedKsDef)); + String mySchemaVersion = thriftClient.system_update_keyspace(updatedKsDef); + validateSchemaIsSettled(mySchemaVersion); + sessionState.out.println(mySchemaVersion); keyspacesMap.put(keyspaceName, thriftClient.describe_keyspace(keyspaceName)); } catch (InvalidRequestException e) @@ -754,7 +761,9 @@ public class CliClient extends CliUserHe try { - sessionState.out.println(thriftClient.system_update_column_family(updateCfDefAttributes(statement, cfDef))); + String mySchemaVersion = thriftClient.system_update_column_family(updateCfDefAttributes(statement, cfDef)); + sessionState.out.println(mySchemaVersion); + validateSchemaIsSettled(mySchemaVersion); keyspacesMap.put(keySpace, thriftClient.describe_keyspace(keySpace)); } catch (InvalidRequestException e) @@ -902,7 +911,9 @@ public class CliClient extends CliUserHe return; String keyspaceName = CliCompiler.getKeySpace(statement, thriftClient.describe_keyspaces()); - sessionState.out.println(thriftClient.system_drop_keyspace(keyspaceName)); + String version = thriftClient.system_drop_keyspace(keyspaceName); + sessionState.out.println(version); + validateSchemaIsSettled(version); } /** @@ -919,7 +930,9 @@ public class CliClient extends CliUserHe return; String cfName = CliCompiler.getColumnFamily(statement, keyspacesMap.get(keySpace).cf_defs); - sessionState.out.println(thriftClient.system_drop_column_family(cfName)); + String mySchemaVersion = thriftClient.system_drop_column_family(cfName); + sessionState.out.println(mySchemaVersion); + validateSchemaIsSettled(mySchemaVersion); } private void executeList(Tree statement) @@ -1981,6 +1994,51 @@ public class CliClient extends CliUserHe } } + /** validates schema is propagated to all nodes */ + private void validateSchemaIsSettled(String currentVersionId) + { + Map<String, List<String>> versions; + + long start = System.currentTimeMillis(); + long limit = start + sessionState.schema_mwt; + + boolean inAgreement = false; + while (limit - start >= 0) + { + try + { + versions = thriftClient.describe_schema_versions(); // getting schema version for nodes of the ring + } + catch (Exception e) + { + sessionState.err.println((e instanceof InvalidRequestException) ? ((InvalidRequestException) e).getWhy() : e.getMessage()); + continue; + } + + boolean currentlyInAgreement = true; + for (String version : versions.keySet()) + { + if (!version.equals(currentVersionId)) + { + currentlyInAgreement = false; + break; // only one disagreement is enough + } + } + + if (currentlyInAgreement) + { + inAgreement = true; + break; // all nodes are in agreement no need to loop + } + } + + if (!inAgreement) + { + sessionState.err.printf("The schema has not settled in %d seconds and further migrations are ill-advised until it does.%n", sessionState.schema_mwt / 1000); + System.exit(-1); + } + } + private static class CfDefNamesComparator implements Comparator<CfDef> { public int compare(CfDef a, CfDef b) Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliOptions.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliOptions.java?rev=1063431&r1=1063430&r2=1063431&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliOptions.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliOptions.java Tue Jan 25 19:57:03 2011 @@ -44,6 +44,7 @@ public class CliOptions private static final String FILE_OPTION = "file"; private static final String JMX_PORT_OPTION = "jmxport"; private static final String VERBOSE_OPTION = "verbose"; + private static final String SCHEMA_MIGRATION_WAIT_TIME = "schema-mwt"; // Default values for optional command line arguments private static final int DEFAULT_THRIFT_PORT = 9160; @@ -59,15 +60,16 @@ public class CliOptions options.addOption("u", USERNAME_OPTION, "USERNAME", "user name for cassandra authentication"); options.addOption("pw", PASSWORD_OPTION, "PASSWORD", "password for cassandra authentication"); options.addOption("k", KEYSPACE_OPTION, "KEYSPACE", "cassandra keyspace user is authenticated against"); - options.addOption("f", FILE_OPTION, "FILENAME", "load statements from the specific file."); + options.addOption("f", FILE_OPTION, "FILENAME", "load statements from the specific file"); options.addOption(null, JMX_PORT_OPTION, "JMX-PORT", "JMX service port"); + options.addOption(null, SCHEMA_MIGRATION_WAIT_TIME, "TIME", "Schema migration wait time (secs.), default is 10 secs"); // options without argument options.addOption("B", BATCH_OPTION, "enabled batch mode (suppress output; errors are fatal)"); options.addOption(null, UNFRAME_OPTION, "use cassandra server's unframed transport"); options.addOption(null, DEBUG_OPTION, "display stack traces"); - options.addOption("?", HELP_OPTION, "usage help."); - options.addOption("v", VERBOSE_OPTION, "verbose output when using batch mode."); + options.addOption("?", HELP_OPTION, "usage help"); + options.addOption("v", VERBOSE_OPTION, "verbose output when using batch mode"); } private static void printUsage() @@ -160,6 +162,15 @@ public class CliOptions css.verbose = true; } + if (cmd.hasOption(SCHEMA_MIGRATION_WAIT_TIME)) + { + css.schema_mwt = Integer.parseInt(cmd.getOptionValue(SCHEMA_MIGRATION_WAIT_TIME)) * 1000; + } + else + { + css.schema_mwt = 10 * 1000; + } + // Abort if there are any unrecognized arguments left if (cmd.getArgs().length > 0) { Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliSessionState.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliSessionState.java?rev=1063431&r1=1063430&r2=1063431&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliSessionState.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/cli/CliSessionState.java Tue Jan 25 19:57:03 2011 @@ -40,6 +40,7 @@ public class CliSessionState public String filename = ""; // file to read commands from public int jmxPort = 8080;// JMX service port public boolean verbose = false; // verbose output + public int schema_mwt; // Schema migration wait time (secs.) /* * Streams to read/write from */ @@ -82,5 +83,4 @@ public class CliSessionState return null; } - }