http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala ---------------------------------------------------------------------- diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala new file mode 100644 index 0000000..c636de9 --- /dev/null +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/EnhancedSession.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cassandra + +import com.datastax.driver.core._ +import org.apache.zeppelin.cassandra.TextBlockHierarchy._ +import org.apache.zeppelin.interpreter.InterpreterException + + +/** + * Enhance the Java driver session + * with special statements + * to describe schema + */ +class EnhancedSession(val session: Session) { + + val clusterDisplay = DisplaySystem.ClusterDisplay + val keyspaceDisplay = DisplaySystem.KeyspaceDisplay + val tableDisplay = DisplaySystem.TableDisplay + val udtDisplay = DisplaySystem.UDTDisplay + val helpDisplay = DisplaySystem.HelpDisplay + private val noResultDisplay = DisplaySystem.NoResultDisplay + + + val HTML_MAGIC = "%html \n" + + val displayNoResult: String = HTML_MAGIC + noResultDisplay.formatNoResult + + def displayExecutionStatistics(query: String, execInfo: ExecutionInfo): String = { + HTML_MAGIC + noResultDisplay.noResultWithExecutionInfo(query, execInfo) + } + + private def execute(describeCluster: DescribeClusterCmd): String = { + val metaData = session.getCluster.getMetadata + HTML_MAGIC + clusterDisplay.formatClusterOnly(describeCluster.statement, metaData) + } + + private def execute(describeKeyspaces: DescribeKeyspacesCmd): String = { + val metaData = session.getCluster.getMetadata + HTML_MAGIC + clusterDisplay.formatClusterContent(describeKeyspaces.statement, metaData) + } + + private def execute(describeTables: DescribeTablesCmd): String = { + val metadata: Metadata = session.getCluster.getMetadata + HTML_MAGIC + clusterDisplay.formatAllTables(describeTables.statement,metadata) + } + + private def execute(describeKeyspace: DescribeKeyspaceCmd): String = { + val keyspace: String = describeKeyspace.keyspace + val metadata: KeyspaceMetadata = session.getCluster.getMetadata.getKeyspace(keyspace) + HTML_MAGIC + keyspaceDisplay.formatKeyspaceContent(describeKeyspace.statement, metadata) + } + + private def execute(describeTable: DescribeTableCmd): String = { + val metaData = session.getCluster.getMetadata + val tableName: String = describeTable.table + val keyspace: String = describeTable.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system") + + Option(metaData.getKeyspace(keyspace)).flatMap(ks => Option(ks.getTable(tableName))) match { + case Some(tableMeta) => HTML_MAGIC + tableDisplay.format(describeTable.statement, tableMeta, true) + case None => throw new InterpreterException(s"Cannot find table $keyspace.$tableName") + } + } + + private def execute(describeUDT: DescribeUDTCmd): String = { + val metaData = session.getCluster.getMetadata + val keyspace: String = describeUDT.keyspace.orElse(Option(session.getLoggedKeyspace)).getOrElse("system") + val udtName: String = describeUDT.udtName + + Option(metaData.getKeyspace(keyspace)).flatMap(ks => Option(ks.getUserType(udtName))) match { + case Some(userType) => HTML_MAGIC + udtDisplay.format(describeUDT.statement, userType, true) + case None => throw new InterpreterException(s"Cannot find type $keyspace.$udtName") + } + } + + private def execute(helpCmd: HelpCmd): String = { + HTML_MAGIC + helpDisplay.formatHelp() + } + + + def execute(st: Any): Any = { + st match { + case x:DescribeClusterCmd => execute(x) + case x:DescribeKeyspacesCmd => execute(x) + case x:DescribeTablesCmd => execute(x) + case x:DescribeKeyspaceCmd => execute(x) + case x:DescribeTableCmd => execute(x) + case x:DescribeUDTCmd => execute(x) + case x:HelpCmd => execute(x) + case x:Statement => session.execute(x) + case _ => throw new InterpreterException(s"Cannot execute statement '$st' of type ${st.getClass}") + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala ---------------------------------------------------------------------- diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala new file mode 100644 index 0000000..809bce7 --- /dev/null +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/InterpreterLogic.scala @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cassandra + +import java.io.{ByteArrayOutputStream, PrintStream} +import java.net.InetAddress +import java.nio.ByteBuffer +import java.text.SimpleDateFormat +import java.util +import java.util.Date +import java.util.concurrent.ConcurrentHashMap + +import com.datastax.driver.core.DataType.Name._ +import com.datastax.driver.core._ +import com.datastax.driver.core.exceptions.DriverException +import com.datastax.driver.core.policies.{LoggingRetryPolicy, FallthroughRetryPolicy, DowngradingConsistencyRetryPolicy, Policies} +import org.apache.zeppelin.cassandra.TextBlockHierarchy._ +import org.apache.zeppelin.display.Input.ParamOption +import org.apache.zeppelin.interpreter.InterpreterResult.Code +import org.apache.zeppelin.interpreter.{InterpreterException, InterpreterResult, InterpreterContext} +import org.slf4j.LoggerFactory +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + + +/** + * Value object to store runtime query parameters + * @param consistency consistency level + * @param serialConsistency serial consistency level + * @param timestamp timestamp + * @param retryPolicy retry policy + * @param fetchSize query fetch size + */ +case class CassandraQueryOptions(consistency: Option[ConsistencyLevel], + serialConsistency:Option[ConsistencyLevel], + timestamp: Option[Long], + retryPolicy: Option[RetryPolicy], + fetchSize: Option[Int]) + +/** + * Singleton object to store constants + */ +object InterpreterLogic { + + val CHOICES_SEPARATOR : String = """\|""" + val VARIABLE_PATTERN = """\{\{[^}]+\}\}""".r + val SIMPLE_VARIABLE_DEFINITION_PATTERN = """\{\{([^=]+)=([^=]+)\}\}""".r + val MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN = """\{\{([^=]+)=((?:[^=]+\|)+[^|]+)\}\}""".r + + val STANDARD_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss" + val ACCURATE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS" + + val defaultRetryPolicy = Policies.defaultRetryPolicy() + val downgradingConsistencyRetryPolicy = DowngradingConsistencyRetryPolicy.INSTANCE + val fallThroughRetryPolicy = FallthroughRetryPolicy.INSTANCE + val loggingDefaultRetryPolicy = new LoggingRetryPolicy(defaultRetryPolicy) + val loggingDownGradingRetryPolicy = new LoggingRetryPolicy(downgradingConsistencyRetryPolicy) + val loggingFallThrougRetryPolicy = new LoggingRetryPolicy(fallThroughRetryPolicy) + + val preparedStatements : mutable.Map[String,PreparedStatement] = new ConcurrentHashMap[String,PreparedStatement]().asScala + + val logger = LoggerFactory.getLogger(classOf[InterpreterLogic]) + + val paragraphParser = new ParagraphParser + val boundValuesParser = new BoundValuesParser + +} + +/** + * Real class to implement the + * interpreting logic of CQL statements + * and parameters blocks + * + * @param session java driver session + */ +class InterpreterLogic(val session: Session) { + + val enhancedSession: EnhancedSession = new EnhancedSession(session) + + import InterpreterLogic._ + + def interpret(session:Session, stringStatements : String, context: InterpreterContext): InterpreterResult = { + + logger.info(s"Executing CQL statements : \n\n$stringStatements\n") + + try { + val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersionEnum + + val queries:List[AnyBlock] = parseInput(stringStatements) + + val queryOptions = extractQueryOptions(queries + .filter(_.blockType == ParameterBlock) + .map(_.get[QueryParameters])) + + logger.info(s"Current Cassandra query options = $queryOptions") + + val queryStatements = queries.filter(_.blockType == StatementBlock).map(_.get[QueryStatement]) + + //Remove prepared statements + queryStatements + .filter(_.statementType == RemovePrepareStatementType) + .map(_.getStatement[RemovePrepareStm]) + .foreach(remove => { + logger.debug(s"Removing prepared statement '${remove.name}'") + preparedStatements.remove(remove.name) + }) + + //Update prepared statement maps + queryStatements + .filter(_.statementType == PrepareStatementType) + .map(_.getStatement[PrepareStm]) + .foreach(statement => { + logger.debug(s"Get or prepare statement '${statement.name}' : ${statement.query}") + preparedStatements.getOrElseUpdate(statement.name,session.prepare(statement.query)) + }) + + val statements: List[Any] = queryStatements + .filter(st => (st.statementType != PrepareStatementType) && (st.statementType != RemovePrepareStatementType)) + .map{ + case x:SimpleStm => generateSimpleStatement(x, queryOptions, context) + case x:BatchStm => { + val builtStatements: List[Statement] = x.statements.map { + case st:SimpleStm => generateSimpleStatement(st, queryOptions, context) + case st:BoundStm => generateBoundStatement(st, queryOptions, context) + case _ => throw new InterpreterException(s"Unknown statement type") + } + generateBatchStatement(x.batchType, queryOptions, builtStatements) + } + case x:BoundStm => generateBoundStatement(x, queryOptions, context) + case x:DescribeCommandStatement => x + case x:HelpCmd => x + case x => throw new InterpreterException(s"Unknown statement type : ${x}") + } + + val results: List[(Any,Any)] = for (statement <- statements) yield (enhancedSession.execute(statement),statement) + + if (results.nonEmpty) { + results.last match { + case(res: ResultSet, st: Statement) => buildResponseMessage((res, st), protocolVersion) + case(output: String, _) => new InterpreterResult(Code.SUCCESS, output) + case _ => throw new InterpreterException(s"Cannot parse result type : ${results.last}") + } + + } else { + new InterpreterResult(Code.SUCCESS, enhancedSession.displayNoResult) + } + + } catch { + case dex: DriverException => { + logger.error(dex.getMessage, dex) + new InterpreterResult(Code.ERROR, parseException(dex)) + } + case pex:ParsingException => { + logger.error(pex.getMessage, pex) + new InterpreterResult(Code.ERROR, pex.getMessage) + } + case iex: InterpreterException => { + logger.error(iex.getMessage, iex) + new InterpreterResult(Code.ERROR, iex.getMessage) + } + case ex: java.lang.Exception => { + logger.error(ex.getMessage, ex) + new InterpreterResult(Code.ERROR, parseException(ex)) + } + } + } + + def buildResponseMessage(lastResultSet: (ResultSet,Statement), protocolVersion: ProtocolVersion): InterpreterResult = { + val output = new StringBuilder() + val rows: collection.mutable.ArrayBuffer[Row] = ArrayBuffer() + + val iterator: util.Iterator[Row] = lastResultSet._1.iterator() + while (iterator.hasNext) { + rows.append(iterator.next()) + } + + val columnsDefinitions: List[(String, DataType)] = lastResultSet._1 + .getColumnDefinitions + .asList + .toList // Java list -> Scala list + .map(definition => (definition.getName, definition.getType)) + + + if (rows.nonEmpty) { + // Create table headers + output + .append("%table ") + .append(columnsDefinitions.map { case (columnName, _) => columnName }.mkString("\t")).append("\n") + + // Deserialize Data + rows.foreach { + row => { + val data = columnsDefinitions.map { + case (name, dataType) => { + if (row.isNull(name)) null else dataType.deserialize(row.getBytesUnsafe(name), protocolVersion) + } + } + output.append(data.mkString("\t")).append("\n") + } + } + } else { + val lastQuery: String = lastResultSet._2.toString + val executionInfo: ExecutionInfo = lastResultSet._1.getExecutionInfo + output.append(enhancedSession.displayExecutionStatistics(lastQuery, executionInfo)) + } + + val result: String = output.toString() + logger.debug(s"CQL result : \n\n$result\n") + new InterpreterResult(Code.SUCCESS, result) + } + + def parseInput(input:String): List[AnyBlock] = { + val parsingResult: ParagraphParser#ParseResult[List[AnyBlock]] = paragraphParser.parseAll(paragraphParser.queries, input) + parsingResult match { + case paragraphParser.Success(blocks,_) => blocks + case paragraphParser.Failure(msg,next) => { + throw new InterpreterException(s"Error parsing input:\n\t'$input'\nDid you forget to add ; (semi-colon) at the end of each CQL statement ?") + } + case paragraphParser.Error(msg,next) => { + throw new InterpreterException(s"Error parsing input:\n\t'$input'\nDid you forget to add ; (semi-colon) at the end of each CQL statement ?") + } + case _ => throw new InterpreterException(s"Error parsing input: $input") + } + } + + def extractQueryOptions(parameters: List[QueryParameters]): CassandraQueryOptions = { + + logger.debug(s"Extracting query options from $parameters") + + val consistency: Option[ConsistencyLevel] = parameters + .filter(_.paramType == ConsistencyParam) + .map(_.getParam[Consistency]) + .flatMap(x => Option(x.value)) + .headOption + + + val serialConsistency: Option[ConsistencyLevel] = parameters + .filter(_.paramType == SerialConsistencyParam) + .map(_.getParam[SerialConsistency]) + .flatMap(x => Option(x.value)) + .headOption + + val timestamp: Option[Long] = parameters + .filter(_.paramType == TimestampParam) + .map(_.getParam[Timestamp]) + .flatMap(x => Option(x.value)) + .headOption + + val retryPolicy: Option[RetryPolicy] = parameters + .filter(_.paramType == RetryPolicyParam) + .map(_.getParam[RetryPolicy]) + .headOption + + val fetchSize: Option[Int] = parameters + .filter(_.paramType == FetchSizeParam) + .map(_.getParam[FetchSize]) + .flatMap(x => Option(x.value)) + .headOption + + CassandraQueryOptions(consistency,serialConsistency, timestamp, retryPolicy, fetchSize) + } + + def generateSimpleStatement(st: SimpleStm, options: CassandraQueryOptions,context: InterpreterContext): SimpleStatement = { + logger.debug(s"Generating simple statement : '${st.text}'") + val statement = new SimpleStatement(maybeExtractVariables(st.text, context)) + applyQueryOptions(options, statement) + statement + } + + def generateBoundStatement(st: BoundStm, options: CassandraQueryOptions,context: InterpreterContext): BoundStatement = { + logger.debug(s"Generating bound statement with name : '${st.name}' and bound values : ${st.values}") + preparedStatements.get(st.name) match { + case Some(ps) => { + val boundValues = maybeExtractVariables(st.values, context) + createBoundStatement(st.name, ps, boundValues) + } + case None => throw new InterpreterException(s"The statement '${st.name}' can not be bound to values. " + + s"Are you sure you did prepare it with @prepare[${st.name}] ?") + } + } + + def generateBatchStatement(batchType: BatchStatement.Type, options: CassandraQueryOptions, statements: List[Statement]): BatchStatement = { + logger.debug(s"""Generating batch statement of type '${batchType} for ${statements.mkString(",")}'""") + val batch = new BatchStatement(batchType) + statements.foreach(batch.add(_)) + applyQueryOptions(options, batch) + batch + } + + def maybeExtractVariables(statement: String, context: InterpreterContext): String = { + + def extractVariableAndDefaultValue(statement: String, exp: String):String = { + exp match { + case MULTIPLE_CHOICES_VARIABLE_DEFINITION_PATTERN(variable,choices) => { + val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""").replaceAll("""\|""","""\\|""") + val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList + val paramOptions= listChoices.map(choice => new ParamOption(choice, choice)) + val selected = context.getGui.select(variable, listChoices.head, paramOptions.toArray) + statement.replaceAll(escapedExp,selected.toString) + } + case SIMPLE_VARIABLE_DEFINITION_PATTERN(variable,defaultVal) => { + val escapedExp: String = exp.replaceAll( """\{""", """\\{""").replaceAll( """\}""", """\\}""") + val value = context.getGui.input(variable,defaultVal) + statement.replaceAll(escapedExp,value.toString) + } + case _ => throw new ParsingException(s"Invalid bound variable definition for '$exp' in '$statement'. It should be of form 'variable=defaultValue' or 'variable=value1|value2|...|valueN'") + } + } + + VARIABLE_PATTERN.findAllIn(statement).foldLeft(statement)(extractVariableAndDefaultValue _) + } + + def applyQueryOptions(options: CassandraQueryOptions, statement: Statement): Unit = { + options.consistency.foreach(statement.setConsistencyLevel(_)) + options.serialConsistency.foreach(statement.setSerialConsistencyLevel(_)) + options.timestamp.foreach(statement.setDefaultTimestamp(_)) + options.retryPolicy.foreach { + case DefaultRetryPolicy => statement.setRetryPolicy(defaultRetryPolicy) + case DowngradingRetryPolicy => statement.setRetryPolicy(downgradingConsistencyRetryPolicy) + case FallThroughRetryPolicy => statement.setRetryPolicy(fallThroughRetryPolicy) + case LoggingDefaultRetryPolicy => statement.setRetryPolicy(loggingDefaultRetryPolicy) + case LoggingDowngradingRetryPolicy => statement.setRetryPolicy(loggingDownGradingRetryPolicy) + case LoggingFallThroughRetryPolicy => statement.setRetryPolicy(loggingFallThrougRetryPolicy) + case _ => throw new InterpreterException(s"""Unknown retry policy ${options.retryPolicy.getOrElse("???")}""") + } + options.fetchSize.foreach(statement.setFetchSize(_)) + } + + private def createBoundStatement(name: String, ps: PreparedStatement, rawBoundValues: String): BoundStatement = { + val dataTypes = ps.getVariables.toList + .map(cfDef => cfDef.getType) + + val boundValuesAsText = parseBoundValues(name,rawBoundValues) + + if(dataTypes.size != boundValuesAsText.size) throw new InterpreterException(s"Invalid @bind values for prepared statement '$name'. " + + s"Prepared parameters has ${dataTypes.size} variables whereas bound values have ${boundValuesAsText.size} parameters ...") + + val convertedValues: List[AnyRef] = boundValuesAsText + .zip(dataTypes).map { + case (value, dataType) => { + if(value.trim == "null") { + null + } else { + dataType.getName match { + case (ASCII | TEXT | VARCHAR) => value.trim.replaceAll("(?<!')'","") + case (INT | VARINT) => value.trim.toInt + case (BIGINT | COUNTER) => value.trim.toLong + case BLOB => ByteBuffer.wrap(value.trim.getBytes) + case BOOLEAN => value.trim.toBoolean + case DECIMAL => BigDecimal(value.trim) + case DOUBLE => value.trim.toDouble + case FLOAT => value.trim.toFloat + case INET => InetAddress.getByName(value.trim) + case TIMESTAMP => parseDate(value.trim) + case (UUID | TIMEUUID) => java.util.UUID.fromString(value.trim) + case LIST => dataType.parse(boundValuesParser.parse(boundValuesParser.list, value).get) + case SET => dataType.parse(boundValuesParser.parse(boundValuesParser.set, value).get) + case MAP => dataType.parse(boundValuesParser.parse(boundValuesParser.map, value).get) + case UDT => dataType.parse(boundValuesParser.parse(boundValuesParser.udt, value).get) + case TUPLE => dataType.parse(boundValuesParser.parse(boundValuesParser.tuple, value).get) + case _ => throw new InterpreterException(s"Cannot parse data of type : ${dataType.toString}") + } + } + } + }.asInstanceOf[List[AnyRef]] + + ps.bind(convertedValues.toArray: _*) + } + + protected def parseBoundValues(psName: String, boundValues: String): List[String] = { + val result: BoundValuesParser#ParseResult[List[String]] = boundValuesParser.parseAll(boundValuesParser.values, boundValues) + + result match { + case boundValuesParser.Success(list,_) => list + case _ => throw new InterpreterException(s"Cannot parse bound values for prepared statement '$psName' : $boundValues. Did you forget to wrap text with ' (simple quote) ?") + } + } + + def parseDate(dateString: String): Date = { + dateString match { + case boundValuesParser.STANDARD_DATE_PATTERN(datePattern) => new SimpleDateFormat(STANDARD_DATE_FORMAT).parse(datePattern) + case boundValuesParser.ACCURATE_DATE_PATTERN(datePattern) => new SimpleDateFormat(ACCURATE_DATE_FORMAT).parse(datePattern) + case _ => throw new InterpreterException(s"Cannot parse date '$dateString'. " + + s"Accepted formats : $STANDARD_DATE_FORMAT OR $ACCURATE_DATE_FORMAT"); + } + } + + def parseException(ex: Exception): String = { + val os = new ByteArrayOutputStream() + val ps = new PrintStream(os) + ex.printStackTrace(ps) + os.toString("UTF-8") + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala ---------------------------------------------------------------------- diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala new file mode 100644 index 0000000..d64ad90 --- /dev/null +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/JavaDriverConfig.scala @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cassandra + +import java.lang.Boolean._ + +import com.datastax.driver.core.HostDistance._ +import com.datastax.driver.core.ProtocolOptions.Compression +import com.datastax.driver.core._ +import com.datastax.driver.core.policies._ +import org.apache.commons.lang3.StringUtils._ +import org.apache.zeppelin.interpreter.Interpreter +import org.apache.zeppelin.cassandra.CassandraInterpreter._ +import org.slf4j.LoggerFactory + +/** + * Utility class to extract and configure the Java driver + */ +class JavaDriverConfig { + + val LOGGER = LoggerFactory.getLogger(classOf[JavaDriverConfig]) + + def getSocketOptions(intpr: Interpreter): SocketOptions = { + val socketOptions: SocketOptions = new SocketOptions + val socketOptionsInfo: StringBuilder = new StringBuilder("Socket options : \n\n") + + val connectTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS).toInt + socketOptions.setConnectTimeoutMillis(connectTimeoutMillis) + socketOptionsInfo + .append("\t") + .append(CASSANDRA_SOCKET_CONNECTION_TIMEOUT_MILLIS) + .append(" : ") + .append(connectTimeoutMillis).append("\n") + + val readTimeoutMillis: Int = intpr.getProperty(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS).toInt + socketOptions.setReadTimeoutMillis(readTimeoutMillis) + socketOptionsInfo + .append("\t") + .append(CASSANDRA_SOCKET_READ_TIMEOUT_MILLIS) + .append(" : ") + .append(readTimeoutMillis).append("\n") + + val tcpNoDelay: Boolean = parseBoolean(intpr.getProperty(CASSANDRA_SOCKET_TCP_NO_DELAY)) + socketOptions.setTcpNoDelay(tcpNoDelay) + socketOptionsInfo + .append("\t") + .append(CASSANDRA_SOCKET_TCP_NO_DELAY) + .append(" : ") + .append(tcpNoDelay) + .append("\n") + + val keepAlive: String = intpr.getProperty(CASSANDRA_SOCKET_KEEP_ALIVE) + if (isNotBlank(keepAlive)) { + val keepAliveValue: Boolean = parseBoolean(keepAlive) + socketOptions.setKeepAlive(keepAliveValue) + socketOptionsInfo + .append("\t") + .append(CASSANDRA_SOCKET_KEEP_ALIVE) + .append(" : ") + .append(keepAliveValue).append("\n") + } + + val receivedBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES) + if (isNotBlank(receivedBuffSize)) { + val receiveBufferSizeValue: Int = receivedBuffSize.toInt + socketOptions.setReceiveBufferSize(receiveBufferSizeValue) + socketOptionsInfo + .append("\t") + .append(CASSANDRA_SOCKET_RECEIVED_BUFFER_SIZE_BYTES) + .append(" : ") + .append(receiveBufferSizeValue) + .append("\n") + } + + val sendBuffSize: String = intpr.getProperty(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES) + if (isNotBlank(sendBuffSize)) { + val sendBufferSizeValue: Int = sendBuffSize.toInt + socketOptions.setSendBufferSize(sendBufferSizeValue) + socketOptionsInfo + .append("\t") + .append(CASSANDRA_SOCKET_SEND_BUFFER_SIZE_BYTES) + .append(" : ") + .append(sendBufferSizeValue) + .append("\n") + } + + val reuseAddress: String = intpr.getProperty(CASSANDRA_SOCKET_REUSE_ADDRESS) + if (isNotBlank(reuseAddress)) { + val reuseAddressValue: Boolean = parseBoolean(reuseAddress) + socketOptions.setReuseAddress(reuseAddressValue) + socketOptionsInfo + .append("\t") + .append(CASSANDRA_SOCKET_REUSE_ADDRESS) + .append(" : ") + .append(reuseAddressValue) + .append("\n") + } + + val soLinger: String = intpr.getProperty(CASSANDRA_SOCKET_SO_LINGER) + if (isNotBlank(soLinger)) { + val soLingerValue: Int = soLinger.toInt + socketOptions.setSoLinger(soLingerValue) + socketOptionsInfo + .append("\t") + .append(CASSANDRA_SOCKET_SO_LINGER) + .append(" : ") + .append(soLingerValue) + .append("\n") + } + + LOGGER.debug(socketOptionsInfo.append("\n").toString) + + return socketOptions + } + + def getQueryOptions(intpr: Interpreter): QueryOptions = { + val queryOptions: QueryOptions = new QueryOptions + val queryOptionsInfo: StringBuilder = new StringBuilder("Query options : \n\n") + + val consistencyLevel: ConsistencyLevel = ConsistencyLevel.valueOf(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_CONSISTENCY)) + queryOptions.setConsistencyLevel(consistencyLevel) + queryOptionsInfo + .append("\t") + .append(CASSANDRA_QUERY_DEFAULT_CONSISTENCY) + .append(" : ") + .append(consistencyLevel) + .append("\n") + + val serialConsistencyLevel: ConsistencyLevel = ConsistencyLevel.valueOf(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY)) + queryOptions.setSerialConsistencyLevel(serialConsistencyLevel) + queryOptionsInfo + .append("\t") + .append(CASSANDRA_QUERY_DEFAULT_SERIAL_CONSISTENCY) + .append(" : ") + .append(serialConsistencyLevel) + .append("\n") + + val fetchSize: Int = intpr.getProperty(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE).toInt + queryOptions.setFetchSize(fetchSize) + queryOptionsInfo + .append("\t") + .append(CASSANDRA_QUERY_DEFAULT_FETCH_SIZE) + .append(" : ") + .append(fetchSize) + .append("\n") + + val defaultIdempotence: Boolean = parseBoolean(intpr.getProperty(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE)) + queryOptions.setDefaultIdempotence(defaultIdempotence) + queryOptionsInfo + .append("\t") + .append(CASSANDRA_QUERY_DEFAULT_IDEMPOTENCE) + .append(" : ") + .append(defaultIdempotence) + .append("\n") + + LOGGER.debug(queryOptionsInfo.append("\n").toString) + + return queryOptions + } + + def getProtocolVersion(intpr: Interpreter): ProtocolVersion = { + val protocolVersion: String = intpr.getProperty(CASSANDRA_PROTOCOL_VERSION) + LOGGER.debug("Protocol version : " + protocolVersion) + + protocolVersion match { + case "1" => + DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8" + DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2" + DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2" + DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1" + DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100" + DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "1" + DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "128" + DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "128" + return ProtocolVersion.V1 + case "2" => + DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "8" + DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "2" + DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "2" + DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1" + DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "100" + DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "1" + DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "128" + DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "128" + return ProtocolVersion.V2 + case "3" => + DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1" + DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1" + DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1" + DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1" + DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800" + DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200" + DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024" + DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256" + return ProtocolVersion.V3 + case _ => + DEFAULT_MAX_CONNECTION_PER_HOST_LOCAL = "1" + DEFAULT_MAX_CONNECTION_PER_HOST_REMOTE = "1" + DEFAULT_CORE_CONNECTION_PER_HOST_LOCAL = "1" + DEFAULT_CORE_CONNECTION_PER_HOST_REMOTE = "1" + DEFAULT_NEW_CONNECTION_THRESHOLD_LOCAL = "800" + DEFAULT_NEW_CONNECTION_THRESHOLD_REMOTE = "200" + DEFAULT_MAX_REQUEST_PER_CONNECTION_LOCAL = "1024" + DEFAULT_MAX_REQUEST_PER_CONNECTION_REMOTE = "256" + return ProtocolVersion.NEWEST_SUPPORTED + } + } + + def getPoolingOptions(intpr: Interpreter): PoolingOptions = { + val poolingOptions: PoolingOptions = new PoolingOptions + val poolingOptionsInfo: StringBuilder = new StringBuilder("Pooling options : \n\n") + + val maxConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL).toInt + poolingOptions.setMaxConnectionsPerHost(LOCAL, maxConnPerHostLocal) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_LOCAL) + .append(" : ") + .append(maxConnPerHostLocal) + .append("\n") + + val maxConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE).toInt + poolingOptions.setMaxConnectionsPerHost(REMOTE, maxConnPerHostRemote) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_MAX_CONNECTION_PER_HOST_REMOTE) + .append(" : ") + .append(maxConnPerHostRemote) + .append("\n") + + val coreConnPerHostLocal: Int = intpr.getProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL).toInt + poolingOptions.setCoreConnectionsPerHost(LOCAL, coreConnPerHostLocal) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_LOCAL) + .append(" : ") + .append(coreConnPerHostLocal) + .append("\n") + + val coreConnPerHostRemote: Int = intpr.getProperty(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE).toInt + poolingOptions.setCoreConnectionsPerHost(REMOTE, coreConnPerHostRemote) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_CORE_CONNECTION_PER_HOST_REMOTE) + .append(" : ") + .append(coreConnPerHostRemote) + .append("\n") + + val newConnThresholdLocal: Int = intpr.getProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL).toInt + poolingOptions.setNewConnectionThreshold(LOCAL, newConnThresholdLocal) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_LOCAL) + .append(" : ") + .append(newConnThresholdLocal) + .append("\n") + + val newConnThresholdRemote: Int = intpr.getProperty(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE).toInt + poolingOptions.setNewConnectionThreshold(REMOTE, newConnThresholdRemote) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_NEW_CONNECTION_THRESHOLD_REMOTE) + .append(" : ") + .append(newConnThresholdRemote) + .append("\n") + + val maxReqPerConnLocal: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL).toInt + poolingOptions.setMaxRequestsPerConnection(LOCAL, maxReqPerConnLocal) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_LOCAL) + .append(" : ") + .append(maxReqPerConnLocal) + .append("\n") + + val maxReqPerConnRemote: Int = intpr.getProperty(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE).toInt + poolingOptions.setMaxRequestsPerConnection(REMOTE, maxReqPerConnRemote) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_MAX_REQUESTS_PER_CONNECTION_REMOTE) + .append(" : ") + .append(maxReqPerConnRemote) + .append("\n") + + val heartbeatIntervalSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS).toInt + poolingOptions.setHeartbeatIntervalSeconds(heartbeatIntervalSeconds) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_HEARTBEAT_INTERVAL_SECONDS) + .append(" : ") + .append(heartbeatIntervalSeconds) + .append("\n") + + val idleTimeoutSeconds: Int = intpr.getProperty(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS).toInt + poolingOptions.setIdleTimeoutSeconds(idleTimeoutSeconds) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_IDLE_TIMEOUT_SECONDS) + .append(" : ") + .append(idleTimeoutSeconds) + .append("\n") + + val poolTimeoutMillis: Int = intpr.getProperty(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS).toInt + poolingOptions.setPoolTimeoutMillis(poolTimeoutMillis) + poolingOptionsInfo + .append("\t") + .append(CASSANDRA_POOLING_POOL_TIMEOUT_MILLIS) + .append(" : ") + .append(poolTimeoutMillis) + .append("\n") + + LOGGER.debug(poolingOptionsInfo.append("\n").toString) + + return poolingOptions + } + + def getCompressionProtocol(intpr: Interpreter): ProtocolOptions.Compression = { + var compression: ProtocolOptions.Compression = null + val compressionProtocol: String = intpr.getProperty(CASSANDRA_COMPRESSION_PROTOCOL) + + LOGGER.debug("Compression protocol : " + compressionProtocol) + + if (compressionProtocol == null) "NONE" + else compressionProtocol.toUpperCase match { + case "NONE" => + compression = Compression.NONE + case "SNAPPY" => + compression = Compression.SNAPPY + case "LZ4" => + compression = Compression.LZ4 + case _ => + compression = Compression.NONE + } + return compression + } + + def getLoadBalancingPolicy(intpr: Interpreter): LoadBalancingPolicy = { + val loadBalancingPolicy: String = intpr.getProperty(CASSANDRA_LOAD_BALANCING_POLICY) + LOGGER.debug("Load Balancing Policy : " + loadBalancingPolicy) + + if (isBlank(loadBalancingPolicy) || (DEFAULT_POLICY == loadBalancingPolicy)) { + return Policies.defaultLoadBalancingPolicy + } + else { + try { + return (Class.forName(loadBalancingPolicy).asInstanceOf[Class[LoadBalancingPolicy]]).newInstance + } + catch { + case e: Any => { + e.printStackTrace + throw new RuntimeException("Cannot instantiate " + CASSANDRA_LOAD_BALANCING_POLICY + " = " + loadBalancingPolicy) + } + } + } + } + + def getRetryPolicy(intpr: Interpreter): RetryPolicy = { + val retryPolicy: String = intpr.getProperty(CASSANDRA_RETRY_POLICY) + LOGGER.debug("Retry Policy : " + retryPolicy) + + if (isBlank(retryPolicy) || (DEFAULT_POLICY == retryPolicy)) { + return Policies.defaultRetryPolicy + } + else { + try { + return (Class.forName(retryPolicy).asInstanceOf[Class[RetryPolicy]]).newInstance + } + catch { + case e: Any => { + e.printStackTrace + throw new RuntimeException("Cannot instantiate " + CASSANDRA_RETRY_POLICY + " = " + retryPolicy) + } + } + } + } + + def getReconnectionPolicy(intpr: Interpreter): ReconnectionPolicy = { + val reconnectionPolicy: String = intpr.getProperty(CASSANDRA_RECONNECTION_POLICY) + LOGGER.debug("Reconnection Policy : " + reconnectionPolicy) + + if (isBlank(reconnectionPolicy) || (DEFAULT_POLICY == reconnectionPolicy)) { + return Policies.defaultReconnectionPolicy + } + else { + try { + return (Class.forName(reconnectionPolicy).asInstanceOf[Class[ReconnectionPolicy]]).newInstance + } + catch { + case e: Any => { + e.printStackTrace + throw new RuntimeException("Cannot instantiate " + CASSANDRA_RECONNECTION_POLICY + " = " + reconnectionPolicy) + } + } + } + } + + def getSpeculativeExecutionPolicy(intpr: Interpreter): SpeculativeExecutionPolicy = { + val specExecPolicy: String = intpr.getProperty(CASSANDRA_SPECULATIVE_EXECUTION_POLICY) + LOGGER.debug("Speculative Execution Policy : " + specExecPolicy) + + if (isBlank(specExecPolicy) || (DEFAULT_POLICY == specExecPolicy)) { + return Policies.defaultSpeculativeExecutionPolicy + } + else { + try { + return (Class.forName(specExecPolicy).asInstanceOf[Class[SpeculativeExecutionPolicy]]).newInstance + } + catch { + case e: Any => { + e.printStackTrace + throw new RuntimeException("Cannot instantiate " + CASSANDRA_SPECULATIVE_EXECUTION_POLICY + " = " + specExecPolicy) + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala ---------------------------------------------------------------------- diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala new file mode 100644 index 0000000..66a0776 --- /dev/null +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/MetaDataHierarchy.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cassandra + +import java.util.UUID + +import com.datastax.driver.core.utils.UUIDs +import com.datastax.driver.core.{DataType, TableMetadata} + +import scala.util.parsing.json.JSONObject + +/** + * Define a hierarchy for CQL meta data + */ +object MetaDataHierarchy { + object OrderConverter { + def convert(clusteringOrder: TableMetadata.Order): ClusteringOrder = { + clusteringOrder match { + case TableMetadata.Order.ASC => ASC + case TableMetadata.Order.DESC => DESC + } + } + } + + + sealed trait ClusteringOrder + object ASC extends ClusteringOrder + object DESC extends ClusteringOrder + + sealed trait ColumnType + object PartitionKey extends ColumnType + case class ClusteringColumn(order: ClusteringOrder) extends ColumnType + object StaticColumn extends ColumnType + object NormalColumn extends ColumnType + case class IndexDetails(name: String, info: String) + case class ColumnDetails(name: String, columnType: ColumnType, dataType: DataType, index: Option[IndexDetails]) + + + case class ClusterDetails(name: String, partitioner: String) + case class ClusterContent(clusterName: String, clusterDetails: String, keyspaces: List[(UUID, String, String)]) + case class AllTables(tables: Map[String,List[String]]) + case class KeyspaceDetails(name: String, replication: Map[String,String], durableWrites: Boolean, asCQL: String, uniqueId: UUID = UUIDs.timeBased()) { + def getReplicationMap: String = { + JSONObject(replication).toString().replaceAll(""""""","'") + } + } + case class KeyspaceContent(keyspaceName: String, keyspaceDetails: String, tables: List[(UUID,String, String)], udts: List[(UUID, String, String)]) + case class TableDetails(tableName: String, columns: List[ColumnDetails], asCQL: String, uniqueId: UUID = UUIDs.timeBased()) + case class UDTDetails(typeName: String, columns: List[ColumnDetails], asCQL: String, uniqueId: UUID = UUIDs.timeBased()) + + + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala ---------------------------------------------------------------------- diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala new file mode 100644 index 0000000..0cd98ad --- /dev/null +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/ParagraphParser.scala @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cassandra + +import com.datastax.driver.core._ +import org.apache.zeppelin.cassandra.CassandraInterpreter._ +import org.apache.zeppelin.interpreter.InterpreterException +import scala.util.parsing.combinator._ +import org.apache.zeppelin.cassandra.TextBlockHierarchy._ + +object ParagraphParser { + val CONSISTENCY_LEVEL_PATTERN = ConsistencyLevel.values().toList + .map(_.name()).filter(!_.contains("SERIAL")).mkString("""^\s*@consistency\s*=\s*(""", "|" , """)\s*$""").r + + val SERIAL_CONSISTENCY_LEVEL_PATTERN = ConsistencyLevel.values().toList + .map(_.name()).filter(_.contains("SERIAL")).mkString("""^\s*@serialConsistency\s*=\s*(""", "|", """)\s*$""").r + val TIMESTAMP_PATTERN = """^\s*@timestamp\s*=\s*([0-9]+)\s*$""".r + + val RETRY_POLICIES_PATTERN = List(DEFAULT_POLICY,DOWNGRADING_CONSISTENCY_RETRY, FALLTHROUGH_RETRY, + LOGGING_DEFAULT_RETRY, LOGGING_DOWNGRADING_RETRY, LOGGING_FALLTHROUGH_RETRY) + .mkString("""^\s*@retryPolicy\s*=\s*(""", "|" , """)\s*$""").r + val FETCHSIZE_PATTERN = """^\s*@fetchSize\s*=\s*([0-9]+)\s*$""".r + + val SIMPLE_STATEMENT_PATTERN = """([^;]+;)""".r + val PREPARE_STATEMENT_PATTERN = """^\s*@prepare\[([^]]+)\]\s*=\s*([^;]+)$""".r + val REMOVE_PREPARE_STATEMENT_PATTERN = """^\s*@remove_prepare\[([^]]+)\]\s*$""".r + + val BIND_PATTERN = """^\s*@bind\[([^]]+)\](?:=([^;]+))?""".r + val BATCH_PATTERN = """^(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r + + val GENERIC_STATEMENT_PREFIX = + """(?is)\s*(?:INSERT|UPDATE|DELETE|SELECT|CREATE|UPDATE| + |DROP|GRANT|REVOKE|TRUNCATE|LIST|USE)\s+""".r + + val VALID_IDENTIFIER = "[a-z][a-z0-9_]*" + + val DESCRIBE_CLUSTER_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER;\s*$""".r + val DESCRIBE_KEYSPACES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES;\s*$""".r + val DESCRIBE_TABLES_PATTERN = """^(?i)\s*(?:DESCRIBE|DESC)\s+TABLES;\s*$""".r + val DESCRIBE_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACE\s*("""+VALID_IDENTIFIER+""");\s*$""").r + val DESCRIBE_TABLE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*("""+VALID_IDENTIFIER+""");\s*$""").r + val DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s*(""" + + VALID_IDENTIFIER + + """)\.(""" + + VALID_IDENTIFIER + + """);\s*$""").r + + val DESCRIBE_TYPE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*("""+VALID_IDENTIFIER+""");\s*$""").r + val DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN = ("""^(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s*(""" + + VALID_IDENTIFIER + + """)\.(""" + + VALID_IDENTIFIER + + """);\s*$""").r + + val HELP_PATTERN = """^(?i)\s*HELP;\s*$""".r +} + +class ParagraphParser extends RegexParsers{ + + + import ParagraphParser._ + + def singleLineComment: Parser[Comment] = """\s*#.*""".r ^^ {case text => Comment(text.trim.replaceAll("#",""))} + def multiLineComment: Parser[Comment] = """(?s)/\*(.*)\*/""".r ^^ {case text => Comment(text.trim.replaceAll("""/\*""","").replaceAll("""\*/""",""))} + + //Query parameters + def consistency: Parser[Consistency] = """\s*@consistency.+""".r ^^ {case x => extractConsistency(x.trim)} + def serialConsistency: Parser[SerialConsistency] = """\s*@serialConsistency.+""".r ^^ {case x => extractSerialConsistency(x.trim)} + def timestamp: Parser[Timestamp] = """\s*@timestamp.+""".r ^^ {case x => extractTimestamp(x.trim)} + def retryPolicy: Parser[RetryPolicy] = """\s*@retryPolicy.+""".r ^^ {case x => extractRetryPolicy(x.trim)} + def fetchSize: Parser[FetchSize] = """\s*@fetchSize.+""".r ^^ {case x => extractFetchSize(x.trim)} + + //Statements + def genericStatement: Parser[SimpleStm] = s"""$GENERIC_STATEMENT_PREFIX[^;]+;""".r ^^ {case x => extractSimpleStatement(x.trim)} + def prepare: Parser[PrepareStm] = """\s*@prepare.+""".r ^^ {case x => extractPreparedStatement(x.trim)} + def removePrepare: Parser[RemovePrepareStm] = """\s*@remove_prepare.+""".r ^^ {case x => extractRemovePreparedStatement(x.trim)} + def bind: Parser[BoundStm] = """\s*@bind.+""".r ^^ {case x => extractBoundStatement(x.trim)} + + + //Meta data + private def describeCluster: Parser[DescribeClusterCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+CLUSTER.*""".r ^^ {extractDescribeClusterCmd(_)} + private def describeKeyspaces: Parser[DescribeKeyspacesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+KEYSPACES.*""".r ^^ {extractDescribeKeyspacesCmd(_)} + private def describeTables: Parser[DescribeTablesCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLES.*""".r ^^ {extractDescribeTablesCmd(_)} + private def describeKeyspace: Parser[DescribeKeyspaceCmd] = """\s*(?i)(?:DESCRIBE|DESC)\s+KEYSPACE\s+.+""".r ^^ {extractDescribeKeyspaceCmd(_)} + private def describeTable: Parser[DescribeTableCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TABLE\s+.+""".r ^^ {extractDescribeTableCmd(_)} + private def describeType: Parser[DescribeUDTCmd] = """(?i)\s*(?:DESCRIBE|DESC)\s+TYPE\s+.*""".r ^^ {extractDescribeTypeCmd(_)} + + //Help + private def helpCommand: Parser[HelpCmd] = """(?i)\s*HELP.*""".r ^^{extractHelpCmd(_)} + + private def beginBatch: Parser[String] = """(?i)\s*BEGIN\s+(UNLOGGED|COUNTER)?\s*BATCH""".r + private def applyBatch: Parser[String] = """(?i)APPLY BATCH;""".r + private def insert: Parser[SimpleStm] = """(?i)INSERT [^;]+;""".r ^^{SimpleStm(_)} + private def update: Parser[SimpleStm] = """(?i)UPDATE [^;]+;""".r ^^{SimpleStm(_)} + private def delete: Parser[SimpleStm] = """(?i)DELETE [^;]+;""".r ^^{SimpleStm(_)} + + private def mutationStatements: Parser[List[QueryStatement]] = rep(insert | update | delete | bind) + + def batch: Parser[BatchStm] = beginBatch ~ mutationStatements ~ applyBatch ^^ { + case begin ~ cqls ~ end => BatchStm(extractBatchType(begin),cqls)} + + def queries:Parser[List[AnyBlock]] = rep(singleLineComment | multiLineComment | consistency | serialConsistency | + timestamp | retryPolicy | fetchSize | removePrepare | prepare | bind | batch | describeCluster | describeKeyspaces | + describeTables | describeKeyspace | describeTable | describeType | helpCommand | genericStatement) + + def extractConsistency(text: String): Consistency = { + text match { + case CONSISTENCY_LEVEL_PATTERN(consistency) => Consistency(ConsistencyLevel.valueOf(consistency)) + case _ => throw new InterpreterException(s"Invalid syntax for @consistency. " + + s"It should comply to the pattern ${CONSISTENCY_LEVEL_PATTERN.toString}") + } + } + + def extractSerialConsistency(text: String): SerialConsistency = { + text match { + case SERIAL_CONSISTENCY_LEVEL_PATTERN(consistency) => SerialConsistency(ConsistencyLevel.valueOf(consistency)) + case _ => throw new InterpreterException(s"Invalid syntax for @serialConsistency. " + + s"It should comply to the pattern ${SERIAL_CONSISTENCY_LEVEL_PATTERN.toString}") + } + } + + def extractTimestamp(text: String): Timestamp = { + text match { + case TIMESTAMP_PATTERN(timestamp) => Timestamp(timestamp.trim.toLong) + case _ => throw new InterpreterException(s"Invalid syntax for @timestamp. " + + s"It should comply to the pattern ${TIMESTAMP_PATTERN.toString}") + } + } + + def extractRetryPolicy(text: String): RetryPolicy = { + text match { + case RETRY_POLICIES_PATTERN(retry) => retry.trim match { + case DEFAULT_POLICY => DefaultRetryPolicy + case DOWNGRADING_CONSISTENCY_RETRY => DowngradingRetryPolicy + case FALLTHROUGH_RETRY => FallThroughRetryPolicy + case LOGGING_DEFAULT_RETRY => LoggingDefaultRetryPolicy + case LOGGING_DOWNGRADING_RETRY => LoggingDowngradingRetryPolicy + case LOGGING_FALLTHROUGH_RETRY => LoggingFallThroughRetryPolicy + } + case _ => throw new InterpreterException(s"Invalid syntax for @retryPolicy. " + + s"It should comply to the pattern ${RETRY_POLICIES_PATTERN.toString}") + } + } + + def extractFetchSize(text: String): FetchSize = { + text match { + case FETCHSIZE_PATTERN(fetchSize) => FetchSize(fetchSize.trim.toInt) + case _ => throw new InterpreterException(s"Invalid syntax for @fetchSize. " + + s"It should comply to the pattern ${FETCHSIZE_PATTERN.toString}") + } + } + + def extractSimpleStatement(text: String): SimpleStm = { + text match { + case SIMPLE_STATEMENT_PATTERN(statement) => SimpleStm(statement) + case _ => throw new InterpreterException(s"Invalid statement '$text'. Did you forget to add ; (semi-colon) at the end of each CQL statement ?") + } + } + + def extractPreparedStatement(text: String): PrepareStm = { + text match { + case PREPARE_STATEMENT_PATTERN(name,queryString) => PrepareStm(name.trim,queryString.trim) + case _ => throw new InterpreterException(s"Invalid syntax for @prepare. " + + s"It should comply to the pattern: @prepare[prepared_statement_name]=CQL Statement (without semi-colon)") + } + } + + def extractRemovePreparedStatement(text: String): RemovePrepareStm= { + text match { + case REMOVE_PREPARE_STATEMENT_PATTERN(name) => RemovePrepareStm(name.trim) + case _ => throw new InterpreterException(s"Invalid syntax for @remove_prepare. " + + s"It should comply to the pattern: @remove_prepare[prepared_statement_name]") + } + } + + def extractBoundStatement(text: String): BoundStm = { + text match { + case BIND_PATTERN(name,boundValues) => BoundStm(name.trim, Option(boundValues).map(_.trim).getOrElse("")) + case _ => throw new InterpreterException("Invalid syntax for @bind. It should comply to the pattern: " + + "@bind[prepared_statement_name]=10,'jdoe','John DOE',12345,'2015-07-32 12:04:23.234' " + + "OR @bind[prepared_statement_name] with no bound value. No semi-colon") + } + } + + def extractBatchType(text: String): BatchStatement.Type = { + text match { + case BATCH_PATTERN(batchType) => + val inferredType = Option(batchType).getOrElse("LOGGED") + BatchStatement.Type.valueOf(inferredType.toUpperCase) + case _ => throw new InterpreterException(s"Invalid syntax for BEGIN BATCH. " + + s"""It should comply to the pattern: ${BATCH_PATTERN.toString}""") + } + } + + def extractDescribeClusterCmd(text: String): DescribeClusterCmd = { + text match { + case DESCRIBE_CLUSTER_PATTERN() => new DescribeClusterCmd + case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE CLUSTER. " + + s"""It should comply to the pattern: ${DESCRIBE_CLUSTER_PATTERN.toString}""") + } + } + + def extractDescribeKeyspacesCmd(text: String): DescribeKeyspacesCmd = { + text match { + case DESCRIBE_KEYSPACES_PATTERN() => new DescribeKeyspacesCmd + case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE KEYSPACES. " + + s"""It should comply to the pattern: ${DESCRIBE_KEYSPACES_PATTERN.toString}""") + } + } + + def extractDescribeTablesCmd(text: String): DescribeTablesCmd = { + text match { + case DESCRIBE_TABLES_PATTERN() => new DescribeTablesCmd + case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TABLES. " + + s"""It should comply to the pattern: ${DESCRIBE_TABLES_PATTERN.toString}""") + } + } + + def extractDescribeKeyspaceCmd(text: String): DescribeKeyspaceCmd = { + text match { + case DESCRIBE_KEYSPACE_PATTERN(keyspace) => new DescribeKeyspaceCmd(keyspace) + case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE KEYSPACE. " + + s"""It should comply to the pattern: ${DESCRIBE_KEYSPACE_PATTERN.toString}""") + } + } + + def extractDescribeTableCmd(text: String): DescribeTableCmd = { + text match { + case DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN(keyspace,table) => new DescribeTableCmd(Option(keyspace),table) + case DESCRIBE_TABLE_PATTERN(table) => new DescribeTableCmd(Option.empty,table) + case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TABLE. " + + s"""It should comply to the patterns: ${DESCRIBE_TABLE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_TABLE_PATTERN.toString}""".stripMargin) + } + } + + def extractDescribeTypeCmd(text: String): DescribeUDTCmd = { + text match { + case DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN(keyspace,table) => new DescribeUDTCmd(Option(keyspace),table) + case DESCRIBE_TYPE_PATTERN(table) => new DescribeUDTCmd(Option.empty,table) + case _ => throw new InterpreterException(s"Invalid syntax for DESCRIBE TYPE. " + + s"""It should comply to the patterns: ${DESCRIBE_TYPE_WITH_KEYSPACE_PATTERN.toString} or ${DESCRIBE_TYPE_PATTERN.toString}""".stripMargin) + } + } + + def extractHelpCmd(text: String): HelpCmd = { + text match { + case HELP_PATTERN() => new HelpCmd + case _ => throw new InterpreterException(s"Invalid syntax for HELP. " + + s"""It should comply to the patterns: ${HELP_PATTERN.toString}""".stripMargin) + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/b9583c6e/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala ---------------------------------------------------------------------- diff --git a/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala new file mode 100644 index 0000000..70b2ce2 --- /dev/null +++ b/cassandra/src/main/scala/org/apache/zeppelin/cassandra/TextBlockHierarchy.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.cassandra + +import com.datastax.driver.core._ + +/** + * Define a Scala object hierarchy + * for input text parsing + */ +object TextBlockHierarchy { + + sealed trait BlockType + object ParameterBlock extends BlockType + object StatementBlock extends BlockType + object DescribeBlock extends BlockType + object CommentBlock extends BlockType + + abstract class AnyBlock(val blockType: BlockType) { + def get[U <: AnyBlock]: U = { + this.asInstanceOf[U] + } + } + + case class Comment(text:String) extends AnyBlock(CommentBlock) + + sealed trait ParameterType + object ConsistencyParam extends ParameterType + object SerialConsistencyParam extends ParameterType + object TimestampParam extends ParameterType + object RetryPolicyParam extends ParameterType + object FetchSizeParam extends ParameterType + + + abstract class QueryParameters(val paramType: ParameterType) extends AnyBlock(ParameterBlock) { + def getParam[U <: QueryParameters]: U = { + this.asInstanceOf[U] + } + } + + case class Consistency(value: ConsistencyLevel) extends QueryParameters(ConsistencyParam) + + case class SerialConsistency(value: ConsistencyLevel) extends QueryParameters(SerialConsistencyParam) + + case class Timestamp(value: Long) extends QueryParameters(TimestampParam) + + case class FetchSize(value: Int) extends QueryParameters(FetchSizeParam) + + abstract class RetryPolicy extends QueryParameters(RetryPolicyParam) + + object DefaultRetryPolicy extends RetryPolicy + object DowngradingRetryPolicy extends RetryPolicy + object FallThroughRetryPolicy extends RetryPolicy + object LoggingDefaultRetryPolicy extends RetryPolicy + object LoggingDowngradingRetryPolicy extends RetryPolicy + object LoggingFallThroughRetryPolicy extends RetryPolicy + + sealed trait StatementType + object PrepareStatementType extends StatementType + object RemovePrepareStatementType extends StatementType + object BoundStatementType extends StatementType + object SimpleStatementType extends StatementType + object BatchStatementType extends StatementType + object DescribeClusterStatementType extends StatementType + object DescribeAllKeyspacesStatementType extends StatementType + object DescribeKeyspaceStatementType extends StatementType + object DescribeAllTablesStatementType extends StatementType + object DescribeTableStatementType extends StatementType + object DescribeTypeStatementType extends StatementType + object HelpStatementType extends StatementType + + abstract class QueryStatement(val statementType: StatementType) extends AnyBlock(StatementBlock) { + def getStatement[U<: QueryStatement]: U = { + this.asInstanceOf[U] + } + } + + case class SimpleStm(text:String) extends QueryStatement(SimpleStatementType) + + case class PrepareStm(name: String, query:String) extends QueryStatement(PrepareStatementType) + + case class RemovePrepareStm(name:String) extends QueryStatement(RemovePrepareStatementType) + + case class BoundStm(name: String, values:String) extends QueryStatement(BoundStatementType) + + case class BatchStm(batchType: BatchStatement.Type, statements: List[QueryStatement]) + extends QueryStatement(BatchStatementType) + + sealed trait DescribeCommandStatement { + val statement: String + } + + class DescribeClusterCmd(override val statement: String = "DESCRIBE CLUSTER;") + extends QueryStatement(DescribeClusterStatementType) with DescribeCommandStatement + + class DescribeKeyspacesCmd(override val statement: String = "DESCRIBE KEYSPACES;") + extends QueryStatement(DescribeAllKeyspacesStatementType) with DescribeCommandStatement + + class DescribeTablesCmd(override val statement: String = "DESCRIBE TABLES;") + extends QueryStatement(DescribeAllTablesStatementType) with DescribeCommandStatement + + case class DescribeKeyspaceCmd(keyspace: String) extends QueryStatement(DescribeKeyspaceStatementType) + with DescribeCommandStatement { + override val statement: String = s"DESCRIBE KEYSPACE $keyspace;" + } + + case class DescribeTableCmd(keyspace:Option[String],table: String) extends QueryStatement(DescribeTableStatementType) + with DescribeCommandStatement { + override val statement: String = keyspace match { + case Some(ks) => s"DESCRIBE TABLE $ks.$table;" + case None => s"DESCRIBE TABLE $table;" + } + } + + case class DescribeUDTCmd(keyspace:Option[String],udtName: String) extends QueryStatement(DescribeTypeStatementType) + with DescribeCommandStatement { + override val statement: String = keyspace match { + case Some(ks) => s"DESCRIBE TYPE $ks.$udtName;" + case None => s"DESCRIBE TYPE $udtName;" + } + } + + class HelpCmd extends QueryStatement(HelpStatementType) + +}
