[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-11 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344797969
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/ContextUpdater.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import org.jetbrains.kotlin.cli.common.repl.AggregatedReplStageState;
+import org.jetbrains.kotlin.cli.common.repl.ReplHistoryRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import kotlin.Pair;
+import kotlin.reflect.KFunction;
+import kotlin.reflect.KProperty;
+import kotlin.reflect.jvm.ReflectJvmMapping;
+
+/**
+ * ContextUpdater updates current user-defined functions and variables
+ * to use in completion and KotlinContext.
+ */
+public class ContextUpdater {
+  private final Logger logger = LoggerFactory.getLogger(ContextUpdater.class);
+  private final Set objectMethods =
+  new HashSet<>(Arrays.asList(Object.class.getMethods()));
+  
+  private AggregatedReplStageState state;
+  private Map vars;
+  private Set functions;
+
+  public ContextUpdater(AggregatedReplStageState state,
+Map vars, 
+Set functions) {
+this.state = state;
+this.vars = vars;
+this.functions = functions;
+  }
+
+  public void update() {
+try {
+  List lines = getLines();
+  refreshVariables(lines);
+  refreshMethods(lines);
+} catch (ReflectiveOperationException | NullPointerException e) {
+  logger.error("Exception updating current variables", e);
+}
+  }
+
+  private void refreshMethods(List lines) {
+functions.clear();
+for (Object line : lines) {
+  Method[] methods = line.getClass().getMethods();
+  for (Method method : methods) {
+if (objectMethods.contains(method) || method.getName().equals("main")) 
{
 
 Review comment:
   ✔️ 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-09 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344469311
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java
 ##
 @@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
+import org.apache.zeppelin.kotlin.completion.KotlinCompleter;
+import org.apache.zeppelin.kotlin.context.KotlinReceiver;
+import org.apache.zeppelin.kotlin.reflect.KotlinFunctionInfo;
+import org.apache.zeppelin.kotlin.reflect.KotlinVariableInfo;
+import org.apache.zeppelin.kotlin.repl.KotlinRepl;
+import org.apache.zeppelin.kotlin.repl.building.KotlinReplProperties;
+import org.apache.zeppelin.scheduler.Job;
+
+public class KotlinInterpreter extends Interpreter {
+
+  private static Logger logger = 
LoggerFactory.getLogger(KotlinInterpreter.class);
+
+  private InterpreterOutputStream out;
+  private KotlinRepl interpreter;
+  private KotlinReplProperties replProperties;
+  private KotlinCompleter completer;
+
+  public KotlinInterpreter(Properties properties) {
+super(properties);
+replProperties = new KotlinReplProperties();
+
+int maxResult = Integer.parseInt(
+properties.getProperty("zeppelin.kotlin.maxResult", "1000"));
+
+boolean shortenTypes = Boolean.parseBoolean(
+properties.getProperty("zeppelin.kotlin.shortenTypes", "true"));
+String imports = properties.getProperty("zeppelin.interpreter.localRepo", 
"");
+
+completer = new KotlinCompleter();
+replProperties
+.receiver(new KotlinReceiver())
+.maxResult(maxResult)
+.codeOnLoad("")
+.classPath(getImportClasspath(imports))
+.shortenTypes(shortenTypes);
+  }
+
+  public KotlinReplProperties getKotlinReplProperties() {
+return replProperties;
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+interpreter = new KotlinRepl(replProperties);
+
+completer.setCtx(interpreter.getKotlinContext());
+out = new InterpreterOutputStream(logger);
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String code,
+ InterpreterContext context) throws 
InterpreterException{
+// saving job's running thread for cancelling
+Job runningJob = getRunningJob(context.getParagraphId());
+if (runningJob != null) {
+  runningJob.info().put("CURRENT_THREAD", Thread.currentThread());
+}
+
+return runWithOutput(code, context.out);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+Job runningJob = getRunningJob(context.getParagraphId());
+if (runningJob != null) {
+  Map info = runningJob.info();
+  Object object = info.get("CURRENT_THREAD");
+  if (object instanceof Thread) {
+try {
+  Thread t = (Thread) object;
+  t.interrupt();
+} catch (Throwable t) {
+  logger.error("Failed to cancel script: " + t, t);
+}
+  }
+}
+  }
+
+  @Override
+  public FormType getFormType() throws InterpreterException {
+return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws 
InterpreterException {
+return 0;
+  }
+
+  @Override
+  public List completion(String buf, int cursor,
+  InterpreterContext interpreterContext) throws InterpreterException {
+return 

[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-09 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344469220
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/ContextUpdater.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import org.jetbrains.kotlin.cli.common.repl.AggregatedReplStageState;
+import org.jetbrains.kotlin.cli.common.repl.ReplHistoryRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import kotlin.Pair;
+import kotlin.reflect.KFunction;
+import kotlin.reflect.KProperty;
+import kotlin.reflect.jvm.ReflectJvmMapping;
+
+/**
+ * ContextUpdater updates current user-defined functions and variables
+ * to use in completion and KotlinContext.
+ */
+public class ContextUpdater {
+  private final Logger logger = LoggerFactory.getLogger(ContextUpdater.class);
+  private final Set objectMethods =
+  new HashSet<>(Arrays.asList(Object.class.getMethods()));
+  
+  private AggregatedReplStageState state;
+  private Map vars;
+  private Set functions;
+
+  public ContextUpdater(AggregatedReplStageState state,
+Map vars, 
+Set functions) {
+this.state = state;
+this.vars = vars;
+this.functions = functions;
+  }
+
+  public void update() {
+try {
+  List lines = getLines();
+  refreshVariables(lines);
+  refreshMethods(lines);
+} catch (ReflectiveOperationException | NullPointerException e) {
+  logger.error("Exception updating current variables", e);
+}
+  }
+
+  private void refreshMethods(List lines) {
+functions.clear();
+for (Object line : lines) {
+  Method[] methods = line.getClass().getMethods();
+  for (Method method : methods) {
+if (objectMethods.contains(method) || method.getName().equals("main")) 
{
 
 Review comment:
   We've decided to add a comment to clarify the nature of lines


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-09 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344468835
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/ContextUpdater.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import org.jetbrains.kotlin.cli.common.repl.AggregatedReplStageState;
+import org.jetbrains.kotlin.cli.common.repl.ReplHistoryRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import kotlin.Pair;
+import kotlin.reflect.KFunction;
+import kotlin.reflect.KProperty;
+import kotlin.reflect.jvm.ReflectJvmMapping;
+
+/**
+ * ContextUpdater updates current user-defined functions and variables
+ * to use in completion and KotlinContext.
+ */
+public class ContextUpdater {
+  private final Logger logger = LoggerFactory.getLogger(ContextUpdater.class);
+  private final Set objectMethods =
+  new HashSet<>(Arrays.asList(Object.class.getMethods()));
+  
+  private AggregatedReplStageState state;
+  private Map vars;
+  private Set functions;
+
+  public ContextUpdater(AggregatedReplStageState state,
+Map vars, 
+Set functions) {
+this.state = state;
+this.vars = vars;
+this.functions = functions;
+  }
+
+  public void update() {
+try {
+  List lines = getLines();
+  refreshVariables(lines);
+  refreshMethods(lines);
+} catch (ReflectiveOperationException | NullPointerException e) {
+  logger.error("Exception updating current variables", e);
+}
+  }
+
+  private void refreshMethods(List lines) {
+functions.clear();
+for (Object line : lines) {
+  Method[] methods = line.getClass().getMethods();
+  for (Method method : methods) {
+if (objectMethods.contains(method) || method.getName().equals("main")) 
{
 
 Review comment:
   Ok, do you mean that `line` is an instance of `KotlinReceiver` (or its 
subclass) here?
   If yes, let's make an explicit type cast (maybe above in the code).
   If no, please clarify what's the nature of this variable and how may it be 
instantiated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-09 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344463578
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/KotlinReflectUtil.java
 ##
 @@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import kotlin.reflect.KFunction;
+
+/**
+ * Util class for pretty-printing Kotlin variables and functions.
+ */
+public class KotlinReflectUtil {
+  public static String functionSignature(KFunction function) {
+return function.toString().replaceAll("Line_\\d+\\.", "");
 
 Review comment:
   Ah, ok. I haven't noticed the dot in regex.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344190697
 
 

 ##
 File path: 
zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/LocalConfigStorage.java
 ##
 @@ -128,7 +128,8 @@ static void atomicWriteToFile(String content, File file) 
throws IOException {
 }
 try {
   file.getParentFile().mkdirs();
-  Files.move(tempFile.toPath(), destinationFilePath,  
StandardCopyOption.ATOMIC_MOVE);
+  Files.move(tempFile.toPath(), destinationFilePath,
+  StandardCopyOption.REPLACE_EXISTING); 
//StandardCopyOption.ATOMIC_MOVE);
 
 Review comment:
   Why has this changed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344190697
 
 

 ##
 File path: 
zeppelin-zengine/src/main/java/org/apache/zeppelin/storage/LocalConfigStorage.java
 ##
 @@ -128,7 +128,8 @@ static void atomicWriteToFile(String content, File file) 
throws IOException {
 }
 try {
   file.getParentFile().mkdirs();
-  Files.move(tempFile.toPath(), destinationFilePath,  
StandardCopyOption.ATOMIC_MOVE);
+  Files.move(tempFile.toPath(), destinationFilePath,
+  StandardCopyOption.REPLACE_EXISTING); 
//StandardCopyOption.ATOMIC_MOVE);
 
 Review comment:
   Why have this changed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344188577
 
 

 ##
 File path: 
spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java
 ##
 @@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import static org.apache.zeppelin.spark.Utils.buildJobDesc;
+import static org.apache.zeppelin.spark.Utils.buildJobGroupId;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import scala.Console;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.kotlin.KotlinInterpreter;
+import org.apache.zeppelin.spark.kotlin.KotlinZeppelinBindings;
+import org.apache.zeppelin.spark.kotlin.SparkKotlinReceiver;
+
+public class KotlinSparkInterpreter extends Interpreter {
+  private static Logger logger = 
LoggerFactory.getLogger(KotlinSparkInterpreter.class);
+  private static final SparkVersion KOTLIN_SPARK_SUPPORTED_VERSION = 
SparkVersion.SPARK_2_4_0;
+
+  private InterpreterResult unsupportedMessage;
+  private KotlinInterpreter interpreter;
+  private SparkInterpreter sparkInterpreter;
+  private BaseZeppelinContext z;
+  private JavaSparkContext jsc;
+
+  public KotlinSparkInterpreter(Properties properties) {
+super(properties);
+logger.debug("Creating KotlinSparkInterpreter");
+interpreter = new KotlinInterpreter(properties);
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+sparkInterpreter =
+getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
+jsc = sparkInterpreter.getJavaSparkContext();
+
+SparkVersion sparkVersion = SparkVersion.fromVersionString(jsc.version());
+if (sparkVersion.olderThan(KOTLIN_SPARK_SUPPORTED_VERSION)) {
+  unsupportedMessage = new InterpreterResult(
+  InterpreterResult.Code.ERROR,
+  "Spark version is " + sparkVersion + ", only " +
+  KOTLIN_SPARK_SUPPORTED_VERSION + " and newer are supported");
+}
+
+z = sparkInterpreter.getZeppelinContext();
+
+SparkKotlinReceiver ctx = new SparkKotlinReceiver(
+sparkInterpreter.getSparkSession(),
+jsc,
+sparkInterpreter.getSQLContext(),
+z);
+
+List classpath = sparkClasspath();
+
+String outputDir = null;
+SparkConf conf = jsc.getConf();
+if (conf != null) {
+  outputDir =  
conf.getOption("spark.repl.class.outputDir").getOrElse(null);
+}
+
+interpreter.getKotlinReplProperties()
+.receiver(ctx)
+.classPath(classpath)
+.outputDir(outputDir)
+.codeOnLoad(KotlinZeppelinBindings.Z_SELECT_KOTLIN_SYNTAX)
+.codeOnLoad(KotlinZeppelinBindings.SPARK_UDF_IMPORTS)
+.codeOnLoad(KotlinZeppelinBindings.CAST_SPARK_SESSION);
+interpreter.open();
+  }
+
+  @Override
+  public void close() throws InterpreterException {
+interpreter.close();
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context)
+  throws InterpreterException {
+
+if (isUnsupported()) {
+  return unsupportedMessage;
+}
+
+z.setInterpreterContext(context);
+z.setGui(context.getGui());
+z.setNoteGui(context.getNoteGui());
+InterpreterContext.set(context);
+
+jsc.setJobGroup(buildJobGroupId(context), buildJobDesc(context), 

[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344187107
 
 

 ##
 File path: 
spark/interpreter/src/main/java/org/apache/zeppelin/spark/KotlinSparkInterpreter.java
 ##
 @@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.spark;
+
+import static org.apache.zeppelin.spark.Utils.buildJobDesc;
+import static org.apache.zeppelin.spark.Utils.buildJobGroupId;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import scala.Console;
+import org.apache.zeppelin.interpreter.BaseZeppelinContext;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.kotlin.KotlinInterpreter;
+import org.apache.zeppelin.spark.kotlin.KotlinZeppelinBindings;
+import org.apache.zeppelin.spark.kotlin.SparkKotlinReceiver;
+
+public class KotlinSparkInterpreter extends Interpreter {
+  private static Logger logger = 
LoggerFactory.getLogger(KotlinSparkInterpreter.class);
+  private static final SparkVersion KOTLIN_SPARK_SUPPORTED_VERSION = 
SparkVersion.SPARK_2_4_0;
+
+  private InterpreterResult unsupportedMessage;
+  private KotlinInterpreter interpreter;
+  private SparkInterpreter sparkInterpreter;
+  private BaseZeppelinContext z;
+  private JavaSparkContext jsc;
+
+  public KotlinSparkInterpreter(Properties properties) {
+super(properties);
+logger.debug("Creating KotlinSparkInterpreter");
+interpreter = new KotlinInterpreter(properties);
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+sparkInterpreter =
+getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
+jsc = sparkInterpreter.getJavaSparkContext();
+
+SparkVersion sparkVersion = SparkVersion.fromVersionString(jsc.version());
+if (sparkVersion.olderThan(KOTLIN_SPARK_SUPPORTED_VERSION)) {
+  unsupportedMessage = new InterpreterResult(
+  InterpreterResult.Code.ERROR,
+  "Spark version is " + sparkVersion + ", only " +
+  KOTLIN_SPARK_SUPPORTED_VERSION + " and newer are supported");
+}
+
+z = sparkInterpreter.getZeppelinContext();
+
+SparkKotlinReceiver ctx = new SparkKotlinReceiver(
+sparkInterpreter.getSparkSession(),
+jsc,
+sparkInterpreter.getSQLContext(),
+z);
+
+List classpath = sparkClasspath();
+
+String outputDir = null;
+SparkConf conf = jsc.getConf();
+if (conf != null) {
+  outputDir =  
conf.getOption("spark.repl.class.outputDir").getOrElse(null);
+}
+
+interpreter.getKotlinReplProperties()
+.receiver(ctx)
+.classPath(classpath)
+.outputDir(outputDir)
+.codeOnLoad(KotlinZeppelinBindings.Z_SELECT_KOTLIN_SYNTAX)
+.codeOnLoad(KotlinZeppelinBindings.SPARK_UDF_IMPORTS)
+.codeOnLoad(KotlinZeppelinBindings.CAST_SPARK_SESSION);
+interpreter.open();
+  }
+
+  @Override
+  public void close() throws InterpreterException {
+interpreter.close();
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context)
+  throws InterpreterException {
+
+if (isUnsupported()) {
+  return unsupportedMessage;
+}
+
+z.setInterpreterContext(context);
+z.setGui(context.getGui());
+z.setNoteGui(context.getNoteGui());
+InterpreterContext.set(context);
+
+jsc.setJobGroup(buildJobGroupId(context), buildJobDesc(context), 

[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344181585
 
 

 ##
 File path: kotlin/src/main/java/org/apache/zeppelin/kotlin/repl/KotlinRepl.java
 ##
 @@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.repl;
+
+import static org.apache.zeppelin.kotlin.reflect.KotlinReflectUtil.shorten;
+import org.jetbrains.kotlin.cli.common.repl.AggregatedReplStageState;
+import org.jetbrains.kotlin.cli.common.repl.InvokeWrapper;
+import org.jetbrains.kotlin.cli.common.repl.ReplCodeLine;
+import org.jetbrains.kotlin.cli.common.repl.ReplCompileResult;
+import org.jetbrains.kotlin.cli.common.repl.ReplEvalResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import kotlin.jvm.functions.Function0;
+import kotlin.script.experimental.jvmhost.repl.JvmReplCompiler;
+import kotlin.script.experimental.jvmhost.repl.JvmReplEvaluator;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.kotlin.reflect.ContextUpdater;
+import org.apache.zeppelin.kotlin.reflect.KotlinFunctionInfo;
+import org.apache.zeppelin.kotlin.reflect.KotlinVariableInfo;
+import org.apache.zeppelin.kotlin.repl.building.KotlinReplProperties;
+import org.apache.zeppelin.kotlin.repl.building.ReplBuilding;
+
+/**
+ * Read-evaluate-print loop for Kotlin code.
+ * Each code snippet is compiled into Line_N class and evaluated.
+ *
+ * Outside variables and functions can be bound to REPL
+ * by inheriting KotlinReceiver class and passing it to REPL properties on 
creation.
+ * After that, all fields and methods of receiver are seen inside the snippet 
scope
+ * as if the code was run in Kotlin's `with` block.
+ *
+ * By default, KotlinReceiver has KotlinContext bound by the name `kc`.
+ * It can be used to show user-defined variables and functions
+ * and setting invokeWrapper to add effects to snippet evaluation.
+ */
+public class KotlinRepl {
+  private static Logger logger = LoggerFactory.getLogger(KotlinRepl.class);
+
+  private JvmReplCompiler compiler;
+  private JvmReplEvaluator evaluator;
+  private AggregatedReplStageState state;
+  private AtomicInteger counter;
+  private ClassWriter writer;
+  private KotlinContext ctx;
+  private InvokeWrapper wrapper;
+  private int maxResult;
+  private ContextUpdater contextUpdater;
+  boolean shortenTypes;
+
+  private KotlinRepl() { }
+
+  @SuppressWarnings("unchecked")
+  public KotlinRepl(KotlinReplProperties properties) {
+compiler = ReplBuilding.buildCompiler(properties);
+evaluator = ReplBuilding.buildEvaluator(properties);
+ReentrantReadWriteLock stateLock = new ReentrantReadWriteLock();
+state = new AggregatedReplStageState(
+compiler.createState(stateLock),
+evaluator.createState(stateLock),
+stateLock);
+counter = new AtomicInteger(0);
+
+writer = new ClassWriter(properties.getOutputDir());
+maxResult = properties.getMaxResult();
+shortenTypes = properties.getShortenTypes();
+
+ctx = new KotlinContext();
+properties.getReceiver().kc = ctx;
+
+contextUpdater = new ContextUpdater(
+state, ctx.vars, ctx.functions);
+
+for (String line: properties.getCodeOnLoad()) {
+  eval(line);
+}
+  }
+
+  public List getVariables() {
+return ctx.getVars();
+  }
+
+  public List getFunctions() {
+return ctx.getFunctions();
+  }
+
+  public KotlinContext getKotlinContext() {
+return ctx;
+  }
+
+  /**
+   * Evaluates code snippet and returns interpreter result.
+   * REPL evaluation consists of:
+   * - Compiling code in JvmReplCompiler
+   * - Writing compiled classes to disk
+   * - Evaluating compiled classes inside InvokeWrapper
+   * - Updating list of user-defined functions and 

[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344156928
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/repl/ClassWriter.java
 ##
 @@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.repl;
+
+import org.jetbrains.kotlin.cli.common.repl.CompiledClassData;
+import org.jetbrains.kotlin.cli.common.repl.ReplCompileResult;
+import 
org.jetbrains.kotlin.scripting.compiler.plugin.impl.KJvmCompiledModuleInMemory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+import kotlin.script.experimental.jvm.impl.KJvmCompiledScript;
+
+/**
+ *  Kotlin REPL compiler generates compiled classes consisting of
+ * compiled in-memory module and some other classes.
+ *  Spark may need saving them somewhere to send them to the executors,
+ * so this class provides writing classes on disk.
+ */
+public class ClassWriter {
+  private static Logger logger = LoggerFactory.getLogger(ClassWriter.class);
+
+  private String outputDir;
+
+  public ClassWriter(String outputDir) {
+this.outputDir = outputDir;
+  }
+
+  public void writeClasses(ReplCompileResult.CompiledClasses classes) {
+if (outputDir == null) {
+  return;
+}
+
+for (CompiledClassData compiledClass: classes.getClasses()) {
+  String filePath = compiledClass.getPath();
+  if (filePath.contains("/")) {
 
 Review comment:
   Is this condition still relevant under Windows?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344155885
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/KotlinVariableInfo.java
 ##
 @@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import static org.apache.zeppelin.kotlin.reflect.KotlinReflectUtil.shorten;
+import kotlin.reflect.KProperty;
+
+public class KotlinVariableInfo {
+  private final Object value;
+  private final KProperty descriptor;
+
+  public KotlinVariableInfo(Object value, KProperty descriptor) {
+this.value = value;
+this.descriptor = descriptor;
+  }
+
+  public Object getValue() {
+return value;
+  }
+
+  public KProperty getDescriptor() {
+return descriptor;
+  }
+
+  public String getName() {
+return descriptor.getName();
+  }
+
+  public String getType() {
+return descriptor.getReturnType().toString();
+  }
+
+  public String toString(boolean shortenTypes) {
+if (shortenTypes) {
+  return getName() + ": " + shorten(getType()) + " = " + getValue();
+}
+return toString();
+  }
+
+  @Override
+  public String toString() {
+return getName() + ": " + getType() + " = " + getValue();
 
 Review comment:
   Let's please avoid code duplication:
   ```
 public String toString(boolean shortenTypes) {
   String type = getType();
   if (shortenTypes) {
 type = shorten(type);
   }
   return getName() + ": " + type + " = " + getValue();
 }
   
 @Override
 public String toString() {
   return toString(false);
 }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344154442
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/KotlinReflectUtil.java
 ##
 @@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import kotlin.reflect.KFunction;
+
+/**
+ * Util class for pretty-printing Kotlin variables and functions.
+ */
+public class KotlinReflectUtil {
+  public static String functionSignature(KFunction function) {
+return function.toString().replaceAll("Line_\\d+\\.", "");
 
 Review comment:
   So, functions names store line numbers.
   1) What is the reason for that?
   2) What if the function is called like `Line_42`? May this happen and may we 
check this in test?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344151750
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/KotlinFunctionInfo.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import static 
org.apache.zeppelin.kotlin.reflect.KotlinReflectUtil.functionSignature;
+import static org.apache.zeppelin.kotlin.reflect.KotlinReflectUtil.shorten;
+import org.jetbrains.annotations.NotNull;
+import kotlin.reflect.KFunction;
+
+public class KotlinFunctionInfo implements Comparable {
+  private final KFunction function;
+
+  public KotlinFunctionInfo(KFunction function) {
+this.function = function;
+  }
+
+  public KFunction getFunction() {
+return function;
+  }
+
+  public String getName() {
+return function.getName();
+  }
+
+  public String toString(boolean shortenTypes) {
+if (shortenTypes) {
+  return shorten(toString());
+}
+return toString();
+  }
+
+  @Override
+  public String toString() {
+return functionSignature(function);
+  }
+
+  @Override
+  public int compareTo(@NotNull KotlinFunctionInfo f) {
+return this.toString().hashCode() - f.toString().hashCode();
 
 Review comment:
   `return this.toString().compareTo(f.toString());` seems to be more correct. 
Equality of hash codes doesn't guarantee that objects are equal (though the 
opposite is true).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344149188
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/ContextUpdater.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import org.jetbrains.kotlin.cli.common.repl.AggregatedReplStageState;
+import org.jetbrains.kotlin.cli.common.repl.ReplHistoryRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import kotlin.Pair;
+import kotlin.reflect.KFunction;
+import kotlin.reflect.KProperty;
+import kotlin.reflect.jvm.ReflectJvmMapping;
+
+/**
+ * ContextUpdater updates current user-defined functions and variables
+ * to use in completion and KotlinContext.
+ */
+public class ContextUpdater {
+  private final Logger logger = LoggerFactory.getLogger(ContextUpdater.class);
+  private final Set objectMethods =
+  new HashSet<>(Arrays.asList(Object.class.getMethods()));
+  
+  private AggregatedReplStageState state;
+  private Map vars;
+  private Set functions;
+
+  public ContextUpdater(AggregatedReplStageState state,
+Map vars, 
+Set functions) {
+this.state = state;
+this.vars = vars;
+this.functions = functions;
+  }
+
+  public void update() {
+try {
+  List lines = getLines();
+  refreshVariables(lines);
+  refreshMethods(lines);
+} catch (ReflectiveOperationException | NullPointerException e) {
+  logger.error("Exception updating current variables", e);
+}
+  }
+
+  private void refreshMethods(List lines) {
+functions.clear();
+for (Object line : lines) {
+  Method[] methods = line.getClass().getMethods();
+  for (Method method : methods) {
+if (objectMethods.contains(method) || method.getName().equals("main")) 
{
 
 Review comment:
   Why do we skip only the methods of `Object`? Shouldn't we skip other 
superclass methods? Reasons are not clear, maybe comment is needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344147575
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/ContextUpdater.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import org.jetbrains.kotlin.cli.common.repl.AggregatedReplStageState;
+import org.jetbrains.kotlin.cli.common.repl.ReplHistoryRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import kotlin.Pair;
+import kotlin.reflect.KFunction;
+import kotlin.reflect.KProperty;
+import kotlin.reflect.jvm.ReflectJvmMapping;
+
+/**
+ * ContextUpdater updates current user-defined functions and variables
+ * to use in completion and KotlinContext.
+ */
+public class ContextUpdater {
+  private final Logger logger = LoggerFactory.getLogger(ContextUpdater.class);
+  private final Set objectMethods =
+  new HashSet<>(Arrays.asList(Object.class.getMethods()));
+  
+  private AggregatedReplStageState state;
+  private Map vars;
+  private Set functions;
+
+  public ContextUpdater(AggregatedReplStageState state,
+Map vars, 
+Set functions) {
+this.state = state;
+this.vars = vars;
+this.functions = functions;
+  }
+
+  public void update() {
+try {
+  List lines = getLines();
+  refreshVariables(lines);
+  refreshMethods(lines);
+} catch (ReflectiveOperationException | NullPointerException e) {
+  logger.error("Exception updating current variables", e);
+}
+  }
+
+  private void refreshMethods(List lines) {
+functions.clear();
+for (Object line : lines) {
+  Method[] methods = line.getClass().getMethods();
+  for (Method method : methods) {
+if (objectMethods.contains(method) || method.getName().equals("main")) 
{
+  continue;
+}
+KFunction function = ReflectJvmMapping.getKotlinFunction(method);
+if (function == null) {
+  continue;
+}
+functions.add(new KotlinFunctionInfo(function));
+  }
+}
+  }
+
+  private List getLines() {
+List lines = state.getHistory().stream()
+.map(this::getLineFromRecord)
+.collect(Collectors.toList());
+
+Collections.reverse(lines);
+return lines;
+  }
+
+  private Object getLineFromRecord(ReplHistoryRecord> 
record) {
+Object statePair = record.getItem().getSecond();
+return ((Pair) statePair).getSecond();
+  }
+
+  private Object getImplicitReceiver(Object script)
+  throws ReflectiveOperationException {
+Field receiverField = 
script.getClass().getDeclaredField("$$implicitReceiver0");
+return receiverField.get(script);
+  }
+
+  private void refreshVariables(List lines) throws 
ReflectiveOperationException {
+vars.clear();
+if (!lines.isEmpty()) {
+  Object receiver = getImplicitReceiver(lines.get(0));
+  findReceiverVariables(receiver);
+}
+for (Object line : lines) {
+  findLineVariables(line);
+}
+  }
+
+  // For lines, we only want fields from top level class
+  private void findLineVariables(Object line) throws IllegalAccessException {
+Field[] fields = line.getClass().getDeclaredFields();
+findVariables(fields, line);
+  }
+
+  // For implicit receiver, we want to also get fields in parent classes
+  private void findReceiverVariables(Object receiver) throws 
IllegalAccessException {
+List fieldsList = new ArrayList<>();
+for (Class cl = receiver.getClass(); cl != null; cl = 
cl.getSuperclass()) {
+  fieldsList.addAll(Arrays.asList(cl.getDeclaredFields()));
+}
+findVariables(fieldsList.toArray(new Field[0]), receiver);
+  }
+
+  private void findVariables(Field[] fields, Object o) throws 
IllegalAccessException {
+for (Field field : fields) {
+   

[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344139312
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/ContextUpdater.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import org.jetbrains.kotlin.cli.common.repl.AggregatedReplStageState;
+import org.jetbrains.kotlin.cli.common.repl.ReplHistoryRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import kotlin.Pair;
+import kotlin.reflect.KFunction;
+import kotlin.reflect.KProperty;
+import kotlin.reflect.jvm.ReflectJvmMapping;
+
+/**
+ * ContextUpdater updates current user-defined functions and variables
+ * to use in completion and KotlinContext.
+ */
+public class ContextUpdater {
+  private final Logger logger = LoggerFactory.getLogger(ContextUpdater.class);
+  private final Set objectMethods =
 
 Review comment:
   `objectMethods` also may be static


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344139117
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/reflect/ContextUpdater.java
 ##
 @@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin.reflect;
+
+import org.jetbrains.kotlin.cli.common.repl.AggregatedReplStageState;
+import org.jetbrains.kotlin.cli.common.repl.ReplHistoryRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import kotlin.Pair;
+import kotlin.reflect.KFunction;
+import kotlin.reflect.KProperty;
+import kotlin.reflect.jvm.ReflectJvmMapping;
+
+/**
+ * ContextUpdater updates current user-defined functions and variables
+ * to use in completion and KotlinContext.
+ */
+public class ContextUpdater {
+  private final Logger logger = LoggerFactory.getLogger(ContextUpdater.class);
 
 Review comment:
   `logger` may be static


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344133375
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java
 ##
 @@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
+import org.apache.zeppelin.kotlin.completion.KotlinCompleter;
+import org.apache.zeppelin.kotlin.context.KotlinReceiver;
+import org.apache.zeppelin.kotlin.reflect.KotlinFunctionInfo;
+import org.apache.zeppelin.kotlin.reflect.KotlinVariableInfo;
+import org.apache.zeppelin.kotlin.repl.KotlinRepl;
+import org.apache.zeppelin.kotlin.repl.building.KotlinReplProperties;
+import org.apache.zeppelin.scheduler.Job;
+
+public class KotlinInterpreter extends Interpreter {
+
+  private static Logger logger = 
LoggerFactory.getLogger(KotlinInterpreter.class);
+
+  private InterpreterOutputStream out;
+  private KotlinRepl interpreter;
+  private KotlinReplProperties replProperties;
+  private KotlinCompleter completer;
+
+  public KotlinInterpreter(Properties properties) {
+super(properties);
+replProperties = new KotlinReplProperties();
+
+int maxResult = Integer.parseInt(
+properties.getProperty("zeppelin.kotlin.maxResult", "1000"));
+
+boolean shortenTypes = Boolean.parseBoolean(
+properties.getProperty("zeppelin.kotlin.shortenTypes", "true"));
+String imports = properties.getProperty("zeppelin.interpreter.localRepo", 
"");
+
+completer = new KotlinCompleter();
+replProperties
+.receiver(new KotlinReceiver())
+.maxResult(maxResult)
+.codeOnLoad("")
+.classPath(getImportClasspath(imports))
+.shortenTypes(shortenTypes);
+  }
+
+  public KotlinReplProperties getKotlinReplProperties() {
+return replProperties;
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+interpreter = new KotlinRepl(replProperties);
+
+completer.setCtx(interpreter.getKotlinContext());
+out = new InterpreterOutputStream(logger);
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String code,
+ InterpreterContext context) throws 
InterpreterException{
+// saving job's running thread for cancelling
+Job runningJob = getRunningJob(context.getParagraphId());
+if (runningJob != null) {
+  runningJob.info().put("CURRENT_THREAD", Thread.currentThread());
+}
+
+return runWithOutput(code, context.out);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+Job runningJob = getRunningJob(context.getParagraphId());
+if (runningJob != null) {
+  Map info = runningJob.info();
+  Object object = info.get("CURRENT_THREAD");
+  if (object instanceof Thread) {
+try {
+  Thread t = (Thread) object;
+  t.interrupt();
+} catch (Throwable t) {
+  logger.error("Failed to cancel script: " + t, t);
+}
+  }
+}
+  }
+
+  @Override
+  public FormType getFormType() throws InterpreterException {
+return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws 
InterpreterException {
+return 0;
+  }
+
+  @Override
+  public List completion(String buf, int cursor,
+  InterpreterContext interpreterContext) throws InterpreterException {
+return 

[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344132769
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java
 ##
 @@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
+import org.apache.zeppelin.kotlin.completion.KotlinCompleter;
+import org.apache.zeppelin.kotlin.context.KotlinReceiver;
+import org.apache.zeppelin.kotlin.reflect.KotlinFunctionInfo;
+import org.apache.zeppelin.kotlin.reflect.KotlinVariableInfo;
+import org.apache.zeppelin.kotlin.repl.KotlinRepl;
+import org.apache.zeppelin.kotlin.repl.building.KotlinReplProperties;
+import org.apache.zeppelin.scheduler.Job;
+
+public class KotlinInterpreter extends Interpreter {
+
+  private static Logger logger = 
LoggerFactory.getLogger(KotlinInterpreter.class);
+
+  private InterpreterOutputStream out;
+  private KotlinRepl interpreter;
+  private KotlinReplProperties replProperties;
+  private KotlinCompleter completer;
+
+  public KotlinInterpreter(Properties properties) {
+super(properties);
+replProperties = new KotlinReplProperties();
+
+int maxResult = Integer.parseInt(
+properties.getProperty("zeppelin.kotlin.maxResult", "1000"));
+
+boolean shortenTypes = Boolean.parseBoolean(
+properties.getProperty("zeppelin.kotlin.shortenTypes", "true"));
+String imports = properties.getProperty("zeppelin.interpreter.localRepo", 
"");
+
+completer = new KotlinCompleter();
+replProperties
+.receiver(new KotlinReceiver())
+.maxResult(maxResult)
+.codeOnLoad("")
+.classPath(getImportClasspath(imports))
+.shortenTypes(shortenTypes);
+  }
+
+  public KotlinReplProperties getKotlinReplProperties() {
+return replProperties;
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+interpreter = new KotlinRepl(replProperties);
+
+completer.setCtx(interpreter.getKotlinContext());
+out = new InterpreterOutputStream(logger);
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String code,
+ InterpreterContext context) throws 
InterpreterException{
+// saving job's running thread for cancelling
+Job runningJob = getRunningJob(context.getParagraphId());
+if (runningJob != null) {
+  runningJob.info().put("CURRENT_THREAD", Thread.currentThread());
+}
+
+return runWithOutput(code, context.out);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+Job runningJob = getRunningJob(context.getParagraphId());
+if (runningJob != null) {
+  Map info = runningJob.info();
+  Object object = info.get("CURRENT_THREAD");
+  if (object instanceof Thread) {
+try {
+  Thread t = (Thread) object;
+  t.interrupt();
+} catch (Throwable t) {
+  logger.error("Failed to cancel script: " + t, t);
+}
+  }
+}
+  }
+
+  @Override
+  public FormType getFormType() throws InterpreterException {
+return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws 
InterpreterException {
+return 0;
+  }
+
+  @Override
+  public List completion(String buf, int cursor,
+  InterpreterContext interpreterContext) throws InterpreterException {
+return 

[GitHub] [zeppelin] ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin support for Spark interpreter

2019-11-08 Thread GitBox
ileasile commented on a change in pull request #3440: [ZEPPELIN-4323] Kotlin 
support for Spark interpreter
URL: https://github.com/apache/zeppelin/pull/3440#discussion_r344132769
 
 

 ##
 File path: 
kotlin/src/main/java/org/apache/zeppelin/kotlin/KotlinInterpreter.java
 ##
 @@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.kotlin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
+import org.apache.zeppelin.kotlin.completion.KotlinCompleter;
+import org.apache.zeppelin.kotlin.context.KotlinReceiver;
+import org.apache.zeppelin.kotlin.reflect.KotlinFunctionInfo;
+import org.apache.zeppelin.kotlin.reflect.KotlinVariableInfo;
+import org.apache.zeppelin.kotlin.repl.KotlinRepl;
+import org.apache.zeppelin.kotlin.repl.building.KotlinReplProperties;
+import org.apache.zeppelin.scheduler.Job;
+
+public class KotlinInterpreter extends Interpreter {
+
+  private static Logger logger = 
LoggerFactory.getLogger(KotlinInterpreter.class);
+
+  private InterpreterOutputStream out;
+  private KotlinRepl interpreter;
+  private KotlinReplProperties replProperties;
+  private KotlinCompleter completer;
+
+  public KotlinInterpreter(Properties properties) {
+super(properties);
+replProperties = new KotlinReplProperties();
+
+int maxResult = Integer.parseInt(
+properties.getProperty("zeppelin.kotlin.maxResult", "1000"));
+
+boolean shortenTypes = Boolean.parseBoolean(
+properties.getProperty("zeppelin.kotlin.shortenTypes", "true"));
+String imports = properties.getProperty("zeppelin.interpreter.localRepo", 
"");
+
+completer = new KotlinCompleter();
+replProperties
+.receiver(new KotlinReceiver())
+.maxResult(maxResult)
+.codeOnLoad("")
+.classPath(getImportClasspath(imports))
+.shortenTypes(shortenTypes);
+  }
+
+  public KotlinReplProperties getKotlinReplProperties() {
+return replProperties;
+  }
+
+  @Override
+  public void open() throws InterpreterException {
+interpreter = new KotlinRepl(replProperties);
+
+completer.setCtx(interpreter.getKotlinContext());
+out = new InterpreterOutputStream(logger);
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public InterpreterResult interpret(String code,
+ InterpreterContext context) throws 
InterpreterException{
+// saving job's running thread for cancelling
+Job runningJob = getRunningJob(context.getParagraphId());
+if (runningJob != null) {
+  runningJob.info().put("CURRENT_THREAD", Thread.currentThread());
+}
+
+return runWithOutput(code, context.out);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+Job runningJob = getRunningJob(context.getParagraphId());
+if (runningJob != null) {
+  Map info = runningJob.info();
+  Object object = info.get("CURRENT_THREAD");
+  if (object instanceof Thread) {
+try {
+  Thread t = (Thread) object;
+  t.interrupt();
+} catch (Throwable t) {
+  logger.error("Failed to cancel script: " + t, t);
+}
+  }
+}
+  }
+
+  @Override
+  public FormType getFormType() throws InterpreterException {
+return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws 
InterpreterException {
+return 0;
+  }
+
+  @Override
+  public List completion(String buf, int cursor,
+  InterpreterContext interpreterContext) throws InterpreterException {
+return