This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a4bedc  [ZEPPELIN-3548] KSQL Interpreter for Zeppelin
3a4bedc is described below

commit 3a4bedcbc837634672b08c0da233164eef8da66a
Author: Andrea Santurbano <sant...@gmail.com>
AuthorDate: Mon Oct 21 23:02:13 2019 +0200

    [ZEPPELIN-3548] KSQL Interpreter for Zeppelin
    
    ### What is this PR for?
    This PR adds the support to 
[KSQL](https://docs.confluent.io/current/ksql/docs/developer-guide/api.html)
    
    ### What type of PR is it?
    [Feature]
    
    ### Todos
    * [x] - Created the interpreter
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-3548
    
    ### How should this be tested?
    You can use the following guide in order to spin-up the KSQL stack.
    
https://github.com/confluentinc/demo-scene/blob/master/ksql-intro/demo_ksql-intro.adoc
    
    ### Screenshots (if appropriate)
    
![ksql](https://user-images.githubusercontent.com/1833335/66709276-f6840000-ed60-11e9-8f2f-c7cf6603a1a0.gif)
    
    ### Questions:
    * Does the licenses files need update? No, I didn't add any new dependency 
into the project.
    * Is there breaking changes for older versions? No
    * Does this needs documentation? Yes
    
    Author: Andrea Santurbano <sant...@gmail.com>
    
    Closes #3485 from conker84/ksql and squashes the following commits:
    
    baa480f32 [Andrea Santurbano] PR review feedback
    ad3785404 [Andrea Santurbano] [ZEPPELIN-3548] KSQL Interpreter for Zeppelin
---
 .../assets/themes/zeppelin/img/docs-img/ksql.1.png | Bin 0 -> 197688 bytes
 .../assets/themes/zeppelin/img/docs-img/ksql.2.png | Bin 0 -> 133453 bytes
 .../assets/themes/zeppelin/img/docs-img/ksql.3.png | Bin 0 -> 235272 bytes
 docs/interpreter/ksql.md                           |  78 +++++++
 ksql/README.md                                     |  10 +
 ksql/pom.xml                                       |  93 ++++++++
 .../apache/zeppelin/ksql/BasicKSQLHttpClient.java  | 175 ++++++++++++++
 .../org/apache/zeppelin/ksql/KSQLInterpreter.java  | 169 ++++++++++++++
 .../java/org/apache/zeppelin/ksql/KSQLRequest.java |  51 ++++
 .../org/apache/zeppelin/ksql/KSQLResponse.java     |  86 +++++++
 .../org/apache/zeppelin/ksql/KSQLRestService.java  | 257 +++++++++++++++++++++
 ksql/src/main/resources/interpreter-setting.json   |  21 ++
 .../apache/zeppelin/ksql/KSQLInterpreterTest.java  | 169 ++++++++++++++
 pom.xml                                            |   1 +
 14 files changed, 1110 insertions(+)

diff --git a/docs/assets/themes/zeppelin/img/docs-img/ksql.1.png 
b/docs/assets/themes/zeppelin/img/docs-img/ksql.1.png
new file mode 100644
index 0000000..276602b
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/ksql.1.png differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/ksql.2.png 
b/docs/assets/themes/zeppelin/img/docs-img/ksql.2.png
new file mode 100644
index 0000000..e741503
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/ksql.2.png differ
diff --git a/docs/assets/themes/zeppelin/img/docs-img/ksql.3.png 
b/docs/assets/themes/zeppelin/img/docs-img/ksql.3.png
new file mode 100644
index 0000000..abb44a4
Binary files /dev/null and 
b/docs/assets/themes/zeppelin/img/docs-img/ksql.3.png differ
diff --git a/docs/interpreter/ksql.md b/docs/interpreter/ksql.md
new file mode 100644
index 0000000..bc91ade
--- /dev/null
+++ b/docs/interpreter/ksql.md
@@ -0,0 +1,78 @@
+---
+layout: page
+title: "KSQL Interpreter for Apache Zeppelin"
+description: "SQL is the streaming SQL engine for Apache Kafka and provides an 
easy-to-use yet powerful interactive SQL interface for stream processing on 
Kafka."
+group: interpreter
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+{% include JB/setup %}
+
+# KSQL Interpreter for Apache Zeppelin
+
+<div id="toc"></div>
+
+## Overview
+[KSQL](https://www.confluent.io/product/ksql/) is the streaming SQL engine for 
Apache Kafka®. It provides an easy-to-use yet powerful interactive SQL 
interface for stream processing on Kafka,
+
+## Configuration
+<table class="table-configuration">
+  <thead>
+    <tr>
+      <th>Property</th>
+      <th>Default</th>
+      <th>Description</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+      <td>ksql.url</td>
+      <td>http://localhost:8080</td>
+      <td>The KSQL Endpoint base URL</td>
+    </tr>
+  </tbody>
+</table>
+
+N.b. The interpreter supports all the KSQL properties, i.e. 
`ksql.streams.auto.offset.reset`.
+The full list of KSQL parameters is 
[here](https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html).
+
+## Using the KSQL Interpreter
+In a paragraph, use `%ksql` and start your SQL query in order to start to 
interact with KSQL.
+
+Following some examples:
+
+```
+%ksql
+PRINT 'orders';
+```
+
+![PRINT image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.1.gif)
+
+```
+%ksql
+CREATE STREAM ORDERS WITH
+  (VALUE_FORMAT='AVRO',
+   KAFKA_TOPIC ='orders');
+```
+
+![CREATE image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.1.gif)
+
+```
+%ksql
+SELECT *
+FROM ORDERS
+LIMIT 10
+```
+
+![LIMIT image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.3.gif)
\ No newline at end of file
diff --git a/ksql/README.md b/ksql/README.md
new file mode 100644
index 0000000..22f89f2
--- /dev/null
+++ b/ksql/README.md
@@ -0,0 +1,10 @@
+# Overview
+KSQL interpreter for Apache Zeppelin
+
+# Connection
+The Interpreter opens a connection with the KSQL REST endpoint.
+
+# Confluent KSQL resources
+Following a list of useful resources:
+ * [Docs](https://docs.confluent.io/current/ksql/docs/index.html)
+ * [Getting 
Started](https://github.com/confluentinc/demo-scene/blob/master/ksql-intro/demo_ksql-intro.adoc)
diff --git a/ksql/pom.xml b/ksql/pom.xml
new file mode 100644
index 0000000..b3b25b1
--- /dev/null
+++ b/ksql/pom.xml
@@ -0,0 +1,93 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zeppelin-interpreter-parent</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.9.0-SNAPSHOT</version>
+    <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>zeppelin-ksql</artifactId>
+  <packaging>jar</packaging>
+  <version>0.9.0-SNAPSHOT</version>
+  <name>Zeppelin: Kafka SQL interpreter</name>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <interpreter.name>ksql</interpreter.name>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.9.8</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>1.3.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>3.0.0</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-resources-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <artifactId>maven-shade-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <skip>false</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git 
a/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java 
b/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java
new file mode 100644
index 0000000..72e41db
--- /dev/null
+++ b/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.ksql;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class BasicKSQLHttpClient implements Closeable {
+
+  public static final String UTF_8 = "utf-8";
+
+  interface BasicHTTPClientResponse {
+    void onMessage(int status, String message);
+
+    void onError(int status, String message);
+  }
+
+  private final String jsonData;
+  private final Map<String, Object> formData;
+  private final String type;
+  private final Map<String, String> headers;
+  private final URL url;
+  private HttpURLConnection connection;
+  private final int timeout;
+  private boolean connected;
+
+
+  public BasicKSQLHttpClient(String url, String jsonData, Map<String, Object> 
formData,
+                 String type, Map<String, String> headers, int timeout)
+      throws IOException {
+    this.url = new URL(url);
+    this.jsonData = jsonData;
+    this.formData = formData;
+    this.type = type;
+    this.headers = headers;
+    this.timeout = timeout;
+    this.connected = false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    connected = false;
+    if (connection != null) {
+      connection.disconnect();
+    }
+  }
+
+  private void writeOutput(String data) throws IOException {
+    try (OutputStream os = connection.getOutputStream()) {
+      byte[] input = data.getBytes(UTF_8);
+      os.write(input);
+    }
+  }
+
+  public String connect() throws IOException {
+    int status = createConnection();
+    boolean isStatusOk = isStatusOk(status);
+    return IOUtils.toString(isStatusOk ?
+        connection.getInputStream() : connection.getErrorStream(), UTF_8);
+  }
+
+  public void connectAsync(BasicHTTPClientResponse onResponse) throws 
IOException {
+    int status = createConnection();
+    boolean isStatusOk = isStatusOk(status);
+    long start = System.currentTimeMillis();
+
+    try (InputStreamReader in = new 
InputStreamReader(connection.getInputStream(), UTF_8);
+         BufferedReader br = new BufferedReader(in)) {
+      while (connected && (timeout == -1 || System.currentTimeMillis() - start 
< timeout)) {
+        if (br.ready()) {
+          String responseLine = br.readLine();
+          if (responseLine == null || responseLine.isEmpty()) {
+            continue;
+          }
+          if (isStatusOk) {
+            onResponse.onMessage(status, responseLine.trim());
+          } else {
+            onResponse.onError(status, responseLine.trim());
+          }
+        }
+      }
+    }
+  }
+
+  private boolean isStatusOk(int status) {
+    return status >= 200 && status < 300;
+  }
+
+  private int createConnection() throws IOException {
+    this.connection = (HttpURLConnection) url.openConnection();
+    connection.setRequestMethod(this.type);
+    this.headers.forEach((k, v) -> connection.setRequestProperty(k, v));
+    connection.setDoOutput(true);
+    if (jsonData != null && !jsonData.isEmpty()) {
+      writeOutput(jsonData);
+    } else if (formData != null && !formData.isEmpty()) {
+      String queryStringParams = formData.entrySet()
+          .stream()
+          .map(e -> e.getKey() + "=" + e.getValue())
+          .collect(Collectors.joining("&"));
+      writeOutput(queryStringParams);
+    }
+    connected = true;
+    return connection.getResponseCode();
+  }
+
+  static class Builder {
+    private String url;
+    private String json;
+    private Map<String, Object> formData = new HashMap<>();
+    private String type;
+    private Map<String, String> headers = new HashMap<>();
+    private int timeout = -1;
+
+    public Builder withTimeout(int timeout) {
+      this.timeout = timeout;
+      return this;
+    }
+
+    public Builder withUrl(String url) {
+      this.url = url;
+      return this;
+    }
+
+    public Builder withJson(String json) {
+      this.json = json;
+      return this;
+    }
+
+    public Builder withType(String type) {
+      this.type = type;
+      return this;
+    }
+
+    public Builder withHeader(String header, String value) {
+      this.headers.put(header, value);
+      return this;
+    }
+
+    public Builder withFormData(String name, Object value) {
+      this.formData.put(name, value);
+      return this;
+    }
+
+    public BasicKSQLHttpClient build() throws IOException {
+      return new BasicKSQLHttpClient(url, json, formData, type, headers, 
timeout);
+    }
+
+  }
+}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java 
b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java
new file mode 100644
index 0000000..461b97a
--- /dev/null
+++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java
@@ -0,0 +1,169 @@
+/*
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.zeppelin.ksql;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+
+public class KSQLInterpreter extends Interpreter {
+  private static final String NEW_LINE = "\n";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KSQLInterpreter.class);
+  public static final String TABLE_DELIMITER = "\t";
+
+  private InterpreterOutputStream interpreterOutput = new 
InterpreterOutputStream(LOGGER);
+
+  private final KSQLRestService ksqlRestService;
+
+  private static final ObjectMapper json = new ObjectMapper();
+
+  public KSQLInterpreter(Properties properties) {
+    this(properties, new KSQLRestService(properties.entrySet().stream()
+            .collect(Collectors.toMap(e -> e.getKey().toString(),
+                e -> e.getValue() != null ? e.getValue().toString() : null))));
+  }
+
+  // VisibleForTesting
+  public KSQLInterpreter(Properties properties, KSQLRestService 
ksqlRestService) {
+    super(properties);
+    this.ksqlRestService = ksqlRestService;
+  }
+
+  @Override
+  public void open() throws InterpreterException {}
+
+  @Override
+  public void close() throws InterpreterException {
+    ksqlRestService.close();
+  }
+
+  private String writeValueAsString(Object data) {
+    try {
+      if (data instanceof Collection || data instanceof Map) {
+        return json.writeValueAsString(data);
+      }
+      if (data instanceof String) {
+        return (String) data;
+      }
+      return String.valueOf(data);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void checkResponseErrors(String message) throws IOException {
+    if (StringUtils.isNotBlank(message)) {
+      // throw new RuntimeException(message);
+      interpreterOutput.getInterpreterOutput().write("%text");
+      interpreterOutput.getInterpreterOutput().write(NEW_LINE);
+      interpreterOutput.getInterpreterOutput().write(message);
+    }
+  }
+
+  @Override
+  public InterpreterResult interpret(String query,
+        InterpreterContext context) throws InterpreterException {
+    if (StringUtils.isBlank(query)) {
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+    }
+    interpreterOutput.setInterpreterOutput(context.out);
+    try {
+      interpreterOutput.getInterpreterOutput().flush();
+      interpreterOutput.getInterpreterOutput().write("%table");
+      interpreterOutput.getInterpreterOutput().write(NEW_LINE);
+      Set<String> header = new LinkedHashSet<>();
+      executeQuery(context.getParagraphId(), query.trim(), header);
+      return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+    } catch (IOException e) {
+      return new InterpreterResult(InterpreterResult.Code.ERROR, 
e.getMessage());
+    }
+  }
+
+  private void executeQuery(final String paragraphId,
+        final String query, Set<String> header) throws IOException {
+    AtomicBoolean isFirstLine = new AtomicBoolean(true);
+    ksqlRestService
+            .executeQuery(paragraphId, query, (resp) -> {
+              try {
+                if (resp.getRow() == null || resp.getRow().isEmpty()) {
+                  return;
+                }
+                if (isFirstLine.get()) {
+                  isFirstLine.set(false);
+                  header.addAll(resp.getRow().keySet());
+                  
interpreterOutput.getInterpreterOutput().write(header.stream()
+                          .collect(Collectors.joining(TABLE_DELIMITER)));
+                  interpreterOutput.getInterpreterOutput().write(NEW_LINE);
+                }
+                
interpreterOutput.getInterpreterOutput().write(resp.getRow().values().stream()
+                        .map(this::writeValueAsString)
+                        .collect(Collectors.joining(TABLE_DELIMITER)));
+                interpreterOutput.getInterpreterOutput().write(NEW_LINE);
+                checkResponseErrors(resp.getFinalMessage());
+                checkResponseErrors(resp.getErrorMessage());
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            });
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    logger.info("Trying to cancel paragraphId {}", context.getParagraphId());
+    try {
+      ksqlRestService.closeClient(context.getParagraphId());
+      logger.info("Removed");
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public FormType getFormType() throws InterpreterException {
+    return FormType.SIMPLE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) throws 
InterpreterException {
+    return 0;
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+            KSQLInterpreter.class.getName() + this.hashCode());
+  }
+}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java 
b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java
new file mode 100644
index 0000000..a05a70d
--- /dev/null
+++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.ksql;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+public class KSQLRequest {
+
+  private static final String EXPLAIN_QUERY = "EXPLAIN %s";
+  private final String ksql;
+  private final Map<String, String> streamsProperties;
+
+  KSQLRequest(final String ksql, final Map<String, String> streamsProperties) {
+    String inputQuery = Objects.requireNonNull(ksql, "ksql")
+        .replaceAll("[\\n\\t\\r]", " ")
+        .trim();
+    this.ksql = inputQuery.endsWith(";") ? inputQuery : inputQuery + ";";
+    this.streamsProperties = streamsProperties;
+  }
+
+  KSQLRequest(final String ksql) {
+    this(ksql, Collections.emptyMap());
+  }
+
+  KSQLRequest toExplainRequest() {
+    return new KSQLRequest(String.format(EXPLAIN_QUERY, this.ksql), 
this.streamsProperties);
+  }
+
+  public String getKsql() {
+    return ksql;
+  }
+
+  public Map<String, String> getStreamsProperties() {
+    return streamsProperties;
+  }
+}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java 
b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java
new file mode 100644
index 0000000..4646135
--- /dev/null
+++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java
@@ -0,0 +1,86 @@
+/*
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.ksql;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class KSQLResponse {
+  private final Map<String, Object> row;
+  private final String finalMessage;
+  private final String errorMessage;
+  private final boolean terminal;
+
+  private <T, K, U> Collector<T, ?, Map<K, U>>
+      toLinkedHashMap(Function<? super T, ? extends K> keyMapper,
+        Function<? super T, ? extends U> valueMapper) {
+    return Collectors.toMap(
+        keyMapper,
+        valueMapper,
+        (u, v) -> { throw new IllegalStateException(String.format("Duplicate 
key %s", u)); },
+        LinkedHashMap::new);
+  }
+
+  KSQLResponse(final List<String> fields, final Map<String, Object> row,
+         final String finalMessage, final String errorMessage, boolean 
terminal) {
+    List<Object> columns = row == null ? null : (List<Object>) 
row.getOrDefault("columns",
+        Collections.emptyList());
+    this.row = row == null ? null : IntStream.range(0, columns.size())
+        .mapToObj(index -> new AbstractMap.SimpleEntry<>(fields.get(index),
+            columns.get(index)))
+        .collect(toLinkedHashMap(e -> e.getKey(), e -> e.getValue()));
+    this.finalMessage = finalMessage;
+    this.errorMessage = errorMessage;
+    this.terminal = terminal;
+  }
+
+  KSQLResponse(final List<String> fields, final Map<String, Object> resp) {
+    this(fields, (Map<String, Object>) resp.get("row"),
+        (String) resp.get("finalMessage"),
+        (String) resp.get("errorMessage"),
+        (boolean) resp.get("terminal"));
+  }
+
+  KSQLResponse(final Map<String, Object> resp) {
+    this.row = resp;
+    this.finalMessage = null;
+    this.errorMessage = null;
+    this.terminal = true;
+  }
+
+  public Map<String, Object> getRow() {
+    return row;
+  }
+
+  public String getFinalMessage() {
+    return finalMessage;
+  }
+
+  public String getErrorMessage() {
+    return errorMessage;
+  }
+
+  public boolean isTerminal() {
+    return terminal;
+  }
+}
diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java 
b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java
new file mode 100644
index 0000000..3393330
--- /dev/null
+++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.ksql;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.StringUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class KSQLRestService {
+
+  private static final String KSQL_ENDPOINT = "%s/ksql";
+  private static final String QUERY_ENDPOINT = "%s/query";
+
+  private static final String KSQL_V1_CONTENT_TYPE = 
"application/vnd.ksql.v1+json; charset=utf-8";
+
+  private static final List<String> KSQL_COMMON_FIELDS = Arrays
+      .asList("statementText", "warnings", "@type");
+  private static final String KSQL_URL = "ksql.url";
+
+  private static final ObjectMapper json = new ObjectMapper();
+
+  private final String ksqlUrl;
+  private final String queryUrl;
+  private final String baseUrl;
+  private final Map<String, String> streamsProperties;
+
+  private final Map<String, BasicKSQLHttpClient> clientCache;
+
+  public KSQLRestService(Map<String, String> props) {
+    baseUrl = Objects.requireNonNull(props.get(KSQL_URL), KSQL_URL).toString();
+    ksqlUrl = String.format(KSQL_ENDPOINT, baseUrl);
+    queryUrl = String.format(QUERY_ENDPOINT, baseUrl);
+    clientCache = new ConcurrentHashMap<>();
+    this.streamsProperties = props.entrySet().stream()
+            .filter(e -> e.getKey().startsWith("ksql.") && 
!e.getKey().equals(KSQL_URL))
+            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+  }
+
+
+  public void executeQuery(final String paragraphId, final String query,
+               final Consumer<KSQLResponse> callback) throws IOException {
+    KSQLRequest request = new KSQLRequest(query, streamsProperties);
+    if (isSelect(request)) {
+      executeSelect(paragraphId, callback, request);
+    } else if (isPrint(request)) {
+      executePrint(paragraphId, callback, request);
+    } else {
+      executeKSQL(paragraphId, callback, request);
+    }
+  }
+
+  private void executeKSQL(String paragraphId, Consumer<KSQLResponse> callback,
+        KSQLRequest request) throws IOException {
+    try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, 
ksqlUrl)) {
+      List<Map<String, Object>> queryResponse = 
json.readValue(client.connect(), List.class);
+      queryResponse.stream()
+              .map(map -> excludeKSQLCommonFields(map))
+              .flatMap(map -> map.entrySet().stream()
+                  .filter(e -> e.getValue() instanceof List)
+                  .flatMap(e -> ((List<Map<String, Object>>) 
e.getValue()).stream()))
+              .map(KSQLResponse::new)
+              .forEach(callback::accept);
+      queryResponse.stream()
+              .map(map -> excludeKSQLCommonFields(map))
+              .flatMap(map -> map.entrySet().stream()
+                      .filter(e -> e.getValue() instanceof Map)
+                      .map(e -> (Map<String, Object>) e.getValue()))
+              .map(KSQLResponse::new)
+              .forEach(callback::accept);
+    }
+  }
+
+  private Map<String, Object> excludeKSQLCommonFields(Map<String, Object> map) 
{
+    return map.entrySet().stream()
+        .filter(e -> !KSQL_COMMON_FIELDS.contains(e.getKey()))
+        .collect(Collectors
+            .toMap(e -> e.getKey(), e -> e.getValue()));
+  }
+
+  private BasicKSQLHttpClient createNewClient(String paragraphId, KSQLRequest 
request,
+        String url) throws IOException {
+    BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder()
+            .withUrl(url)
+            .withJson(json.writeValueAsString(request))
+            .withType("POST")
+            .withHeader("Content-type", KSQL_V1_CONTENT_TYPE)
+            .build();
+    BasicKSQLHttpClient oldClient = clientCache.put(paragraphId, client);
+    if (oldClient != null) {
+      oldClient.close();
+    }
+    return client;
+  }
+
+  private void executeSelect(String paragraphId, Consumer<KSQLResponse> 
callback,
+        KSQLRequest request) throws IOException {
+    List<String> fieldNames = getFields(request);
+    if (fieldNames.isEmpty()) {
+      throw new RuntimeException("Field are empty");
+    }
+    try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, 
queryUrl)) {
+      client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() {
+        @Override
+        public void onMessage(int status, String message) {
+          try {
+            Map<String, Object> queryResponse = json.readValue(message, 
LinkedHashMap.class);
+            KSQLResponse resp = new KSQLResponse(fieldNames, queryResponse);
+            callback.accept(resp);
+            if (resp.isTerminal() || 
StringUtils.isNotBlank(resp.getErrorMessage())
+                    || StringUtils.isNotBlank(resp.getFinalMessage())) {
+              client.close();
+            }
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+
+        @Override
+        public void onError(int status, String message) {
+          try {
+            KSQLResponse resp = new 
KSQLResponse(Collections.singletonMap("error", message));
+            callback.accept(resp);
+            client.close();
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+  }
+
+  private void executePrint(String paragraphId, Consumer<KSQLResponse> 
callback,
+                             KSQLRequest request) throws IOException {
+    try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, 
queryUrl)) {
+      client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() {
+        @Override
+        public void onMessage(int status, String message) {
+          if (message.toUpperCase().startsWith("FORMAT:")) {
+            return;
+          }
+          List<String> elements = Arrays.asList(message.split(","));
+          Map<String, Object> row = new LinkedHashMap<>();
+          row.put("timestamp", elements.get(0));
+          row.put("offset", elements.get(1));
+          row.put("record", String.join("", elements.subList(2, 
elements.size())));
+          KSQLResponse resp = new KSQLResponse(row);
+          callback.accept(resp);
+        }
+
+        @Override
+        public void onError(int status, String message) {
+          try {
+            KSQLResponse resp = new 
KSQLResponse(Collections.singletonMap("error", message));
+            callback.accept(resp);
+            client.close();
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+  }
+
+  private boolean isSelect(KSQLRequest request) {
+    return request.getKsql().toUpperCase().startsWith("SELECT");
+  }
+
+  private boolean isPrint(KSQLRequest request) {
+    return request.getKsql().toUpperCase().startsWith("PRINT");
+  }
+
+  public void closeClient(final String paragraphId) throws IOException {
+    BasicKSQLHttpClient toClose = clientCache.remove(paragraphId);
+    if (toClose != null) {
+      toClose.close();
+    }
+  }
+
+  private List<String> getFields(KSQLRequest request) throws IOException {
+    return getFields(request, false);
+  }
+
+  private List<String> getFields(KSQLRequest request, boolean tryCoerce) 
throws IOException {
+    if (tryCoerce) {
+      /*
+       * this because a query like
+       * `EXPLAIN SELECT * FROM ORDERS WHERE ADDRESS->STATE = 'New York' LIMIT 
10;`
+       * fails with the message `Column STATE cannot be resolved`
+       * so we try to coerce the field resolution
+       */
+      String query = request.getKsql()
+          .substring(0, request.getKsql().toUpperCase().indexOf("WHERE"));
+      request = new KSQLRequest(query, request.getStreamsProperties());
+    }
+    try (BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder()
+        .withUrl(ksqlUrl)
+        .withJson(json.writeValueAsString(request.toExplainRequest()))
+        .withType("POST")
+        .withHeader("Content-type", KSQL_V1_CONTENT_TYPE)
+        .build()) {
+      List<Map<String, Object>> explainResponseList = 
json.readValue(client.connect(), List.class);
+      Map<String, Object> explainResponse = explainResponseList.get(0);
+      Map<String, Object> queryDescription = (Map<String, Object>) 
explainResponse
+          .getOrDefault("queryDescription", Collections.emptyMap());
+      List<Map<String, Object>> fields = (List<Map<String, Object>>) 
queryDescription
+          .getOrDefault("fields", Collections.emptyList());
+      return fields.stream()
+          .map(elem -> elem.getOrDefault("name", "").toString())
+          .filter(s -> !s.isEmpty())
+          .collect(Collectors.toList());
+    } catch (IOException e) {
+      if (!tryCoerce) {
+        return getFields(request, true);
+      } else {
+        throw e;
+      }
+    }
+  }
+
+
+  public void close() {
+    Set<String> keys = clientCache.keySet();
+    keys.forEach(key -> {
+      try {
+        closeClient(key);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+}
diff --git a/ksql/src/main/resources/interpreter-setting.json 
b/ksql/src/main/resources/interpreter-setting.json
new file mode 100644
index 0000000..cf15bbf
--- /dev/null
+++ b/ksql/src/main/resources/interpreter-setting.json
@@ -0,0 +1,21 @@
+[
+  {
+    "group": "ksql",
+    "name": "ksql",
+    "className": "org.apache.zeppelin.ksql.KSQLInterpreter",
+    "properties": {
+      "ksql.url": {
+        "envName": null,
+        "propertyName": "ksql.url",
+        "defaultValue": "http://localhost:8088";,
+        "description": "KSQL Endpoint base URL",
+        "type": "string"
+      }
+    },
+    "editor": {
+      "language": "sql",
+      "editOnDblClick": false,
+      "completionSupport": false
+    }
+  }
+]
diff --git 
a/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java 
b/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java
new file mode 100644
index 0000000..b9520fb
--- /dev/null
+++ b/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java
@@ -0,0 +1,169 @@
+/*
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.zeppelin.ksql;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.stubbing.Stubber;
+
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+
+public class KSQLInterpreterTest {
+
+  private InterpreterContext context;
+
+  private static final Map<String, String> PROPS = new HashMap<String, 
String>() {{
+      put("ksql.url", "http://localhost:8088";);
+      put("ksql.streams.auto.offset.reset", "earliest");
+    }};
+
+
+  @Before
+  public void setUpZeppelin() throws IOException {
+    context = InterpreterContext.builder()
+        .setInterpreterOut(new InterpreterOutput(null))
+        .setParagraphId("ksql-test")
+        .build();
+  }
+
+  @Test
+  public void shouldRenderKSQLSelectAsTable() throws InterpreterException,
+      IOException, InterruptedException {
+    // given
+    Properties p = new Properties();
+    p.putAll(PROPS);
+    KSQLRestService service = Mockito.mock(KSQLRestService.class);
+    Stubber stubber = Mockito.doAnswer((invocation) -> {
+      Consumer<  KSQLResponse> callback = (Consumer<  KSQLResponse>)
+            invocation.getArguments()[2];
+      IntStream.range(1, 5)
+          .forEach(i -> {
+            Map<String, Object> map = new HashMap<>();
+            if (i == 4) {
+              map.put("row", null);
+              map.put("terminal", true);
+            } else {
+              map.put("row", Collections.singletonMap("columns", 
Arrays.asList("value " + i)));
+              map.put("terminal", false);
+            }
+            callback.accept(new KSQLResponse(Arrays.asList("fieldName"), map));
+            try {
+              Thread.sleep(3000);
+            } catch (InterruptedException e) {
+              e.printStackTrace();
+            }
+          });
+      return null;
+    });
+    stubber.when(service).executeQuery(Mockito.any(String.class),
+          Mockito.anyString(),
+          Mockito.any(Consumer.class));
+    Interpreter interpreter = new KSQLInterpreter(p, service);
+
+    // when
+    String query = "select * from orders";
+    interpreter.interpret(query, context);
+
+    // then
+    String expected = "%table fieldName\n" +
+        "value 1\n" +
+        "value 2\n" +
+        "value 3\n";
+    assertEquals(1, context.out.toInterpreterResultMessage().size());
+    assertEquals(expected, 
context.out.toInterpreterResultMessage().get(0).toString());
+    assertEquals(InterpreterResult.Type.TABLE, context.out
+        .toInterpreterResultMessage().get(0).getType());
+    interpreter.close();
+  }
+
+  @Test
+  public void shouldRenderKSQLNonSelectAsTable() throws InterpreterException,
+      IOException, InterruptedException {
+    // given
+    Properties p = new Properties();
+    p.putAll(PROPS);
+    KSQLRestService service = Mockito.mock(KSQLRestService.class);
+    Map<String, Object> row1 = new HashMap<>();
+    row1.put("name", "orders");
+    row1.put("registered", "false");
+    row1.put("replicaInfo", "[1]");
+    row1.put("consumerCount", "0");
+    row1.put("consumerGroupCount", "0");
+    Map<String, Object> row2 = new HashMap<>();
+    row2.put("name", "orders");
+    row2.put("registered", "false");
+    row2.put("replicaInfo", "[1]");
+    row2.put("consumerCount", "0");
+    row2.put("consumerGroupCount", "0");
+    Stubber stubber = Mockito.doAnswer((invocation) -> {
+      Consumer<  KSQLResponse> callback = (Consumer<  KSQLResponse>)
+            invocation.getArguments()[2];
+      callback.accept(new KSQLResponse(row1));
+      callback.accept(new KSQLResponse(row2));
+      return null;
+    });
+    stubber.when(service).executeQuery(
+        Mockito.any(String.class),
+        Mockito.anyString(),
+        Mockito.any(Consumer.class));
+    Interpreter interpreter = new KSQLInterpreter(p, service);
+
+    // when
+    String query = "show topics";
+    interpreter.interpret(query, context);
+
+    // then
+    List<Map<String, Object>> expected = Arrays.asList(row1, row2);
+
+    String[] lines = context.out.toInterpreterResultMessage()
+        .get(0).toString()
+        .replace("%table ", "")
+        .trim()
+        .split("\n");
+    List<String[]> rows = Stream.of(lines)
+        .map(line -> line.split("\t"))
+        .collect(Collectors.toList());
+    List<Map<String, String>> actual = rows.stream()
+        .skip(1)
+        .map(row -> IntStream.range(0, row.length)
+            .mapToObj(index -> new 
AbstractMap.SimpleEntry<>(rows.get(0)[index], row[index]))
+            .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())))
+        .collect(Collectors.toList());
+    assertEquals(1, context.out.toInterpreterResultMessage().size());
+    assertEquals(expected, actual);
+    assertEquals(InterpreterResult.Type.TABLE, context.out
+        .toInterpreterResultMessage().get(0).getType());
+  }
+}
diff --git a/pom.xml b/pom.xml
index 15e6c2c..ffe1802 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,6 +85,7 @@
     <module>beam</module>
     <module>hazelcastjet</module>
     <module>geode</module>
+    <module>ksql</module>
     <module>zeppelin-web</module>
     <module>zeppelin-server</module>
     <module>zeppelin-jupyter</module>

Reply via email to