Repository: incubator-zeppelin Updated Branches: refs/heads/master 0292f30a8 -> 81b1e2271
ZEPPELIN-497 ] pyspark completion  Added auto-complete for pyspark. autocomplete function will operate through the spark python tag '% pyspark'. short-cut = Ctrl + Shift + Space. jira issue : https://issues.apache.org/jira/browse/ZEPPELIN-497 Author: CloverHearts <[email protected]> Closes #530 from cloverhearts/ZEPPELIN-497 and squashes the following commits: 6832409 [CloverHearts] Changed PySpark Completion Filter regex d7efe88 [CloverHearts] change filter regex character 1f82825 [CloverHearts] remove variable resultComplaetion f4dc487 [CloverHearts] Pyspark completion __ filtering ab05f86 [CloverHearts] implement pyspark completion Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/81b1e227 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/81b1e227 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/81b1e227 Branch: refs/heads/master Commit: 81b1e2271233721b6361796730cf5634ad0427a3 Parents: 0292f30 Author: CloverHearts <[email protected]> Authored: Wed Dec 16 20:12:17 2015 -0800 Committer: Lee moon soo <[email protected]> Committed: Mon Dec 21 08:53:49 2015 +0900 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 93 +++++++++++++++++++- .../main/resources/python/zeppelin_pyspark.py | 47 +++++++++- 2 files changed, 137 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/81b1e227/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 273c897..0bfad6a 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -58,6 +58,11 @@ import org.apache.zeppelin.spark.dep.DependencyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; + import py4j.GatewayServer; /** @@ -368,12 +373,96 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand return sparkInterpreter.getProgress(context); } + @Override public List<String> completion(String buf, int cursor) { - // not supported - return new LinkedList<String>(); + if (buf.length() < cursor) { + cursor = buf.length(); + } + String completionString = getCompletionTargetString(buf, cursor); + String completionCommand = "completion.getCompletion('" + completionString + "')"; + + //start code for completion + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + if (sparkInterpreter.getSparkVersion().isUnsupportedVersion() == false + && pythonscriptRunning == false) { + return new LinkedList<String>(); + } + + outputStream.reset(); + + pythonInterpretRequest = new PythonInterpretRequest(completionCommand, ""); + statementOutput = null; + + synchronized (statementSetNotifier) { + statementSetNotifier.notify(); + } + + synchronized (statementFinishedNotifier) { + while (statementOutput == null) { + try { + statementFinishedNotifier.wait(1000); + } catch (InterruptedException e) { + // not working + logger.info("wait drop"); + return new LinkedList<String>(); + } + } + } + + if (statementError) { + return new LinkedList<String>(); + } + InterpreterResult completionResult = new InterpreterResult(Code.SUCCESS, statementOutput); + //end code for completion + + Gson gson = new Gson(); + + return gson.fromJson(completionResult.message(), LinkedList.class); } + private String getCompletionTargetString(String text, int cursor) { + String[] completionSeqCharaters = {" ", "\n", "\t"}; + int completionEndPosition = cursor; + int completionStartPosition = cursor; + int indexOfReverseSeqPostion = cursor; + + String resultCompletionText = ""; + String completionScriptText = ""; + try { + completionScriptText = text.substring(0, cursor); + } + catch (Exception e) { + logger.error(e.toString()); + return null; + } + completionEndPosition = completionScriptText.length(); + + String tempReverseCompletionText = new StringBuilder(completionScriptText).reverse().toString(); + + for (String seqCharacter : completionSeqCharaters) { + indexOfReverseSeqPostion = tempReverseCompletionText.indexOf(seqCharacter); + + if (indexOfReverseSeqPostion < completionStartPosition && indexOfReverseSeqPostion > 0) { + completionStartPosition = indexOfReverseSeqPostion; + } + + } + + if (completionStartPosition == completionEndPosition) { + completionStartPosition = 0; + } + else + { + completionStartPosition = completionEndPosition - completionStartPosition; + } + resultCompletionText = completionScriptText.substring( + completionStartPosition , completionEndPosition); + + return resultCompletionText; + } + + private SparkInterpreter getSparkInterpreter() { InterpreterGroup intpGroup = getInterpreterGroup(); LazyOpenInterpreter lazy = null; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/81b1e227/spark/src/main/resources/python/zeppelin_pyspark.py ---------------------------------------------------------------------- diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 1b17772..7ab0be9 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -15,7 +15,7 @@ # limitations under the License. # -import sys, getopt, traceback +import sys, getopt, traceback, json, re from py4j.java_gateway import java_import, JavaGateway, GatewayClient from py4j.protocol import Py4JJavaError @@ -107,6 +107,50 @@ class SparkVersion(object): def isImportAllPackageUnderSparkSql(self): return self.version >= self.SPARK_1_3_0 +class PySparkCompletion: + def getGlobalCompletion(self): + objectDefList = [] + try: + for completionItem in list(globals().iterkeys()): + objectDefList.append(completionItem) + except: + return None + else: + return objectDefList + + def getMethodCompletion(self, text_value): + objectDefList = [] + completion_target = text_value + try: + if len(completion_target) <= 0: + return None + if text_value[-1] == ".": + completion_target = text_value[:-1] + exec("%s = %s(%s)" % ("objectDefList", "dir", completion_target)) + except: + return None + else: + return objectDefList + + + def getCompletion(self, text_value): + completionList = set() + + globalCompletionList = self.getGlobalCompletion() + if globalCompletionList != None: + for completionItem in list(globalCompletionList): + completionList.add(completionItem) + + if text_value != None: + objectCompletionList = self.getMethodCompletion(text_value) + if objectCompletionList != None: + for completionItem in list(objectCompletionList): + completionList.add(completionItem) + if len(completionList) <= 0: + print "" + else: + print json.dumps(filter(lambda x : not re.match("^__.*", x), list(completionList))) + output = Logger() sys.stdout = output @@ -149,6 +193,7 @@ sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) sqlc = SQLContext(sc, intp.getSQLContext()) sqlContext = sqlc +completion = PySparkCompletion() z = PyZeppelinContext(intp.getZeppelinContext()) while True :
