This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 3a4bedc [ZEPPELIN-3548] KSQL Interpreter for Zeppelin 3a4bedc is described below commit 3a4bedcbc837634672b08c0da233164eef8da66a Author: Andrea Santurbano <sant...@gmail.com> AuthorDate: Mon Oct 21 23:02:13 2019 +0200 [ZEPPELIN-3548] KSQL Interpreter for Zeppelin ### What is this PR for? This PR adds the support to [KSQL](https://docs.confluent.io/current/ksql/docs/developer-guide/api.html) ### What type of PR is it? [Feature] ### Todos * [x] - Created the interpreter ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-3548 ### How should this be tested? You can use the following guide in order to spin-up the KSQL stack. https://github.com/confluentinc/demo-scene/blob/master/ksql-intro/demo_ksql-intro.adoc ### Screenshots (if appropriate) ![ksql](https://user-images.githubusercontent.com/1833335/66709276-f6840000-ed60-11e9-8f2f-c7cf6603a1a0.gif) ### Questions: * Does the licenses files need update? No, I didn't add any new dependency into the project. * Is there breaking changes for older versions? No * Does this needs documentation? Yes Author: Andrea Santurbano <sant...@gmail.com> Closes #3485 from conker84/ksql and squashes the following commits: baa480f32 [Andrea Santurbano] PR review feedback ad3785404 [Andrea Santurbano] [ZEPPELIN-3548] KSQL Interpreter for Zeppelin --- .../assets/themes/zeppelin/img/docs-img/ksql.1.png | Bin 0 -> 197688 bytes .../assets/themes/zeppelin/img/docs-img/ksql.2.png | Bin 0 -> 133453 bytes .../assets/themes/zeppelin/img/docs-img/ksql.3.png | Bin 0 -> 235272 bytes docs/interpreter/ksql.md | 78 +++++++ ksql/README.md | 10 + ksql/pom.xml | 93 ++++++++ .../apache/zeppelin/ksql/BasicKSQLHttpClient.java | 175 ++++++++++++++ .../org/apache/zeppelin/ksql/KSQLInterpreter.java | 169 ++++++++++++++ .../java/org/apache/zeppelin/ksql/KSQLRequest.java | 51 ++++ .../org/apache/zeppelin/ksql/KSQLResponse.java | 86 +++++++ .../org/apache/zeppelin/ksql/KSQLRestService.java | 257 +++++++++++++++++++++ ksql/src/main/resources/interpreter-setting.json | 21 ++ .../apache/zeppelin/ksql/KSQLInterpreterTest.java | 169 ++++++++++++++ pom.xml | 1 + 14 files changed, 1110 insertions(+) diff --git a/docs/assets/themes/zeppelin/img/docs-img/ksql.1.png b/docs/assets/themes/zeppelin/img/docs-img/ksql.1.png new file mode 100644 index 0000000..276602b Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/ksql.1.png differ diff --git a/docs/assets/themes/zeppelin/img/docs-img/ksql.2.png b/docs/assets/themes/zeppelin/img/docs-img/ksql.2.png new file mode 100644 index 0000000..e741503 Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/ksql.2.png differ diff --git a/docs/assets/themes/zeppelin/img/docs-img/ksql.3.png b/docs/assets/themes/zeppelin/img/docs-img/ksql.3.png new file mode 100644 index 0000000..abb44a4 Binary files /dev/null and b/docs/assets/themes/zeppelin/img/docs-img/ksql.3.png differ diff --git a/docs/interpreter/ksql.md b/docs/interpreter/ksql.md new file mode 100644 index 0000000..bc91ade --- /dev/null +++ b/docs/interpreter/ksql.md @@ -0,0 +1,78 @@ +--- +layout: page +title: "KSQL Interpreter for Apache Zeppelin" +description: "SQL is the streaming SQL engine for Apache Kafka and provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka." +group: interpreter +--- +<!-- +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +{% include JB/setup %} + +# KSQL Interpreter for Apache Zeppelin + +<div id="toc"></div> + +## Overview +[KSQL](https://www.confluent.io/product/ksql/) is the streaming SQL engine for Apache Kafka®. It provides an easy-to-use yet powerful interactive SQL interface for stream processing on Kafka, + +## Configuration +<table class="table-configuration"> + <thead> + <tr> + <th>Property</th> + <th>Default</th> + <th>Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>ksql.url</td> + <td>http://localhost:8080</td> + <td>The KSQL Endpoint base URL</td> + </tr> + </tbody> +</table> + +N.b. The interpreter supports all the KSQL properties, i.e. `ksql.streams.auto.offset.reset`. +The full list of KSQL parameters is [here](https://docs.confluent.io/current/ksql/docs/installation/server-config/config-reference.html). + +## Using the KSQL Interpreter +In a paragraph, use `%ksql` and start your SQL query in order to start to interact with KSQL. + +Following some examples: + +``` +%ksql +PRINT 'orders'; +``` + +![PRINT image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.1.gif) + +``` +%ksql +CREATE STREAM ORDERS WITH + (VALUE_FORMAT='AVRO', + KAFKA_TOPIC ='orders'); +``` + +![CREATE image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.1.gif) + +``` +%ksql +SELECT * +FROM ORDERS +LIMIT 10 +``` + +![LIMIT image]({{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/ksql.3.gif) \ No newline at end of file diff --git a/ksql/README.md b/ksql/README.md new file mode 100644 index 0000000..22f89f2 --- /dev/null +++ b/ksql/README.md @@ -0,0 +1,10 @@ +# Overview +KSQL interpreter for Apache Zeppelin + +# Connection +The Interpreter opens a connection with the KSQL REST endpoint. + +# Confluent KSQL resources +Following a list of useful resources: + * [Docs](https://docs.confluent.io/current/ksql/docs/index.html) + * [Getting Started](https://github.com/confluentinc/demo-scene/blob/master/ksql-intro/demo_ksql-intro.adoc) diff --git a/ksql/pom.xml b/ksql/pom.xml new file mode 100644 index 0000000..b3b25b1 --- /dev/null +++ b/ksql/pom.xml @@ -0,0 +1,93 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zeppelin-interpreter-parent</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.9.0-SNAPSHOT</version> + <relativePath>../zeppelin-interpreter-parent/pom.xml</relativePath> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-ksql</artifactId> + <packaging>jar</packaging> + <version>0.9.0-SNAPSHOT</version> + <name>Zeppelin: Kafka SQL interpreter</name> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <interpreter.name>ksql</interpreter.name> + </properties> + + <dependencies> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.9.8</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-io</artifactId> + <version>1.3.2</version> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>3.0.0</version> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + </plugin> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <skip>false</skip> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java b/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java new file mode 100644 index 0000000..72e41db --- /dev/null +++ b/ksql/src/main/java/org/apache/zeppelin/ksql/BasicKSQLHttpClient.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.ksql; + +import org.apache.commons.io.IOUtils; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class BasicKSQLHttpClient implements Closeable { + + public static final String UTF_8 = "utf-8"; + + interface BasicHTTPClientResponse { + void onMessage(int status, String message); + + void onError(int status, String message); + } + + private final String jsonData; + private final Map<String, Object> formData; + private final String type; + private final Map<String, String> headers; + private final URL url; + private HttpURLConnection connection; + private final int timeout; + private boolean connected; + + + public BasicKSQLHttpClient(String url, String jsonData, Map<String, Object> formData, + String type, Map<String, String> headers, int timeout) + throws IOException { + this.url = new URL(url); + this.jsonData = jsonData; + this.formData = formData; + this.type = type; + this.headers = headers; + this.timeout = timeout; + this.connected = false; + } + + @Override + public void close() throws IOException { + connected = false; + if (connection != null) { + connection.disconnect(); + } + } + + private void writeOutput(String data) throws IOException { + try (OutputStream os = connection.getOutputStream()) { + byte[] input = data.getBytes(UTF_8); + os.write(input); + } + } + + public String connect() throws IOException { + int status = createConnection(); + boolean isStatusOk = isStatusOk(status); + return IOUtils.toString(isStatusOk ? + connection.getInputStream() : connection.getErrorStream(), UTF_8); + } + + public void connectAsync(BasicHTTPClientResponse onResponse) throws IOException { + int status = createConnection(); + boolean isStatusOk = isStatusOk(status); + long start = System.currentTimeMillis(); + + try (InputStreamReader in = new InputStreamReader(connection.getInputStream(), UTF_8); + BufferedReader br = new BufferedReader(in)) { + while (connected && (timeout == -1 || System.currentTimeMillis() - start < timeout)) { + if (br.ready()) { + String responseLine = br.readLine(); + if (responseLine == null || responseLine.isEmpty()) { + continue; + } + if (isStatusOk) { + onResponse.onMessage(status, responseLine.trim()); + } else { + onResponse.onError(status, responseLine.trim()); + } + } + } + } + } + + private boolean isStatusOk(int status) { + return status >= 200 && status < 300; + } + + private int createConnection() throws IOException { + this.connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod(this.type); + this.headers.forEach((k, v) -> connection.setRequestProperty(k, v)); + connection.setDoOutput(true); + if (jsonData != null && !jsonData.isEmpty()) { + writeOutput(jsonData); + } else if (formData != null && !formData.isEmpty()) { + String queryStringParams = formData.entrySet() + .stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining("&")); + writeOutput(queryStringParams); + } + connected = true; + return connection.getResponseCode(); + } + + static class Builder { + private String url; + private String json; + private Map<String, Object> formData = new HashMap<>(); + private String type; + private Map<String, String> headers = new HashMap<>(); + private int timeout = -1; + + public Builder withTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public Builder withUrl(String url) { + this.url = url; + return this; + } + + public Builder withJson(String json) { + this.json = json; + return this; + } + + public Builder withType(String type) { + this.type = type; + return this; + } + + public Builder withHeader(String header, String value) { + this.headers.put(header, value); + return this; + } + + public Builder withFormData(String name, Object value) { + this.formData.put(name, value); + return this; + } + + public BasicKSQLHttpClient build() throws IOException { + return new BasicKSQLHttpClient(url, json, formData, type, headers, timeout); + } + + } +} diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java new file mode 100644 index 0000000..461b97a --- /dev/null +++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLInterpreter.java @@ -0,0 +1,169 @@ +/* +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 + +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.zeppelin.ksql; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + + +public class KSQLInterpreter extends Interpreter { + private static final String NEW_LINE = "\n"; + + private static final Logger LOGGER = LoggerFactory.getLogger(KSQLInterpreter.class); + public static final String TABLE_DELIMITER = "\t"; + + private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER); + + private final KSQLRestService ksqlRestService; + + private static final ObjectMapper json = new ObjectMapper(); + + public KSQLInterpreter(Properties properties) { + this(properties, new KSQLRestService(properties.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().toString(), + e -> e.getValue() != null ? e.getValue().toString() : null)))); + } + + // VisibleForTesting + public KSQLInterpreter(Properties properties, KSQLRestService ksqlRestService) { + super(properties); + this.ksqlRestService = ksqlRestService; + } + + @Override + public void open() throws InterpreterException {} + + @Override + public void close() throws InterpreterException { + ksqlRestService.close(); + } + + private String writeValueAsString(Object data) { + try { + if (data instanceof Collection || data instanceof Map) { + return json.writeValueAsString(data); + } + if (data instanceof String) { + return (String) data; + } + return String.valueOf(data); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + private void checkResponseErrors(String message) throws IOException { + if (StringUtils.isNotBlank(message)) { + // throw new RuntimeException(message); + interpreterOutput.getInterpreterOutput().write("%text"); + interpreterOutput.getInterpreterOutput().write(NEW_LINE); + interpreterOutput.getInterpreterOutput().write(message); + } + } + + @Override + public InterpreterResult interpret(String query, + InterpreterContext context) throws InterpreterException { + if (StringUtils.isBlank(query)) { + return new InterpreterResult(InterpreterResult.Code.SUCCESS); + } + interpreterOutput.setInterpreterOutput(context.out); + try { + interpreterOutput.getInterpreterOutput().flush(); + interpreterOutput.getInterpreterOutput().write("%table"); + interpreterOutput.getInterpreterOutput().write(NEW_LINE); + Set<String> header = new LinkedHashSet<>(); + executeQuery(context.getParagraphId(), query.trim(), header); + return new InterpreterResult(InterpreterResult.Code.SUCCESS); + } catch (IOException e) { + return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage()); + } + } + + private void executeQuery(final String paragraphId, + final String query, Set<String> header) throws IOException { + AtomicBoolean isFirstLine = new AtomicBoolean(true); + ksqlRestService + .executeQuery(paragraphId, query, (resp) -> { + try { + if (resp.getRow() == null || resp.getRow().isEmpty()) { + return; + } + if (isFirstLine.get()) { + isFirstLine.set(false); + header.addAll(resp.getRow().keySet()); + interpreterOutput.getInterpreterOutput().write(header.stream() + .collect(Collectors.joining(TABLE_DELIMITER))); + interpreterOutput.getInterpreterOutput().write(NEW_LINE); + } + interpreterOutput.getInterpreterOutput().write(resp.getRow().values().stream() + .map(this::writeValueAsString) + .collect(Collectors.joining(TABLE_DELIMITER))); + interpreterOutput.getInterpreterOutput().write(NEW_LINE); + checkResponseErrors(resp.getFinalMessage()); + checkResponseErrors(resp.getErrorMessage()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void cancel(InterpreterContext context) throws InterpreterException { + logger.info("Trying to cancel paragraphId {}", context.getParagraphId()); + try { + ksqlRestService.closeClient(context.getParagraphId()); + logger.info("Removed"); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public FormType getFormType() throws InterpreterException { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) throws InterpreterException { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + KSQLInterpreter.class.getName() + this.hashCode()); + } +} diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java new file mode 100644 index 0000000..a05a70d --- /dev/null +++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRequest.java @@ -0,0 +1,51 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.ksql; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +public class KSQLRequest { + + private static final String EXPLAIN_QUERY = "EXPLAIN %s"; + private final String ksql; + private final Map<String, String> streamsProperties; + + KSQLRequest(final String ksql, final Map<String, String> streamsProperties) { + String inputQuery = Objects.requireNonNull(ksql, "ksql") + .replaceAll("[\\n\\t\\r]", " ") + .trim(); + this.ksql = inputQuery.endsWith(";") ? inputQuery : inputQuery + ";"; + this.streamsProperties = streamsProperties; + } + + KSQLRequest(final String ksql) { + this(ksql, Collections.emptyMap()); + } + + KSQLRequest toExplainRequest() { + return new KSQLRequest(String.format(EXPLAIN_QUERY, this.ksql), this.streamsProperties); + } + + public String getKsql() { + return ksql; + } + + public Map<String, String> getStreamsProperties() { + return streamsProperties; + } +} diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java new file mode 100644 index 0000000..4646135 --- /dev/null +++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLResponse.java @@ -0,0 +1,86 @@ +/* + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.ksql; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class KSQLResponse { + private final Map<String, Object> row; + private final String finalMessage; + private final String errorMessage; + private final boolean terminal; + + private <T, K, U> Collector<T, ?, Map<K, U>> + toLinkedHashMap(Function<? super T, ? extends K> keyMapper, + Function<? super T, ? extends U> valueMapper) { + return Collectors.toMap( + keyMapper, + valueMapper, + (u, v) -> { throw new IllegalStateException(String.format("Duplicate key %s", u)); }, + LinkedHashMap::new); + } + + KSQLResponse(final List<String> fields, final Map<String, Object> row, + final String finalMessage, final String errorMessage, boolean terminal) { + List<Object> columns = row == null ? null : (List<Object>) row.getOrDefault("columns", + Collections.emptyList()); + this.row = row == null ? null : IntStream.range(0, columns.size()) + .mapToObj(index -> new AbstractMap.SimpleEntry<>(fields.get(index), + columns.get(index))) + .collect(toLinkedHashMap(e -> e.getKey(), e -> e.getValue())); + this.finalMessage = finalMessage; + this.errorMessage = errorMessage; + this.terminal = terminal; + } + + KSQLResponse(final List<String> fields, final Map<String, Object> resp) { + this(fields, (Map<String, Object>) resp.get("row"), + (String) resp.get("finalMessage"), + (String) resp.get("errorMessage"), + (boolean) resp.get("terminal")); + } + + KSQLResponse(final Map<String, Object> resp) { + this.row = resp; + this.finalMessage = null; + this.errorMessage = null; + this.terminal = true; + } + + public Map<String, Object> getRow() { + return row; + } + + public String getFinalMessage() { + return finalMessage; + } + + public String getErrorMessage() { + return errorMessage; + } + + public boolean isTerminal() { + return terminal; + } +} diff --git a/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java new file mode 100644 index 0000000..3393330 --- /dev/null +++ b/ksql/src/main/java/org/apache/zeppelin/ksql/KSQLRestService.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.ksql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang.StringUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +public class KSQLRestService { + + private static final String KSQL_ENDPOINT = "%s/ksql"; + private static final String QUERY_ENDPOINT = "%s/query"; + + private static final String KSQL_V1_CONTENT_TYPE = "application/vnd.ksql.v1+json; charset=utf-8"; + + private static final List<String> KSQL_COMMON_FIELDS = Arrays + .asList("statementText", "warnings", "@type"); + private static final String KSQL_URL = "ksql.url"; + + private static final ObjectMapper json = new ObjectMapper(); + + private final String ksqlUrl; + private final String queryUrl; + private final String baseUrl; + private final Map<String, String> streamsProperties; + + private final Map<String, BasicKSQLHttpClient> clientCache; + + public KSQLRestService(Map<String, String> props) { + baseUrl = Objects.requireNonNull(props.get(KSQL_URL), KSQL_URL).toString(); + ksqlUrl = String.format(KSQL_ENDPOINT, baseUrl); + queryUrl = String.format(QUERY_ENDPOINT, baseUrl); + clientCache = new ConcurrentHashMap<>(); + this.streamsProperties = props.entrySet().stream() + .filter(e -> e.getKey().startsWith("ksql.") && !e.getKey().equals(KSQL_URL)) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + } + + + public void executeQuery(final String paragraphId, final String query, + final Consumer<KSQLResponse> callback) throws IOException { + KSQLRequest request = new KSQLRequest(query, streamsProperties); + if (isSelect(request)) { + executeSelect(paragraphId, callback, request); + } else if (isPrint(request)) { + executePrint(paragraphId, callback, request); + } else { + executeKSQL(paragraphId, callback, request); + } + } + + private void executeKSQL(String paragraphId, Consumer<KSQLResponse> callback, + KSQLRequest request) throws IOException { + try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, ksqlUrl)) { + List<Map<String, Object>> queryResponse = json.readValue(client.connect(), List.class); + queryResponse.stream() + .map(map -> excludeKSQLCommonFields(map)) + .flatMap(map -> map.entrySet().stream() + .filter(e -> e.getValue() instanceof List) + .flatMap(e -> ((List<Map<String, Object>>) e.getValue()).stream())) + .map(KSQLResponse::new) + .forEach(callback::accept); + queryResponse.stream() + .map(map -> excludeKSQLCommonFields(map)) + .flatMap(map -> map.entrySet().stream() + .filter(e -> e.getValue() instanceof Map) + .map(e -> (Map<String, Object>) e.getValue())) + .map(KSQLResponse::new) + .forEach(callback::accept); + } + } + + private Map<String, Object> excludeKSQLCommonFields(Map<String, Object> map) { + return map.entrySet().stream() + .filter(e -> !KSQL_COMMON_FIELDS.contains(e.getKey())) + .collect(Collectors + .toMap(e -> e.getKey(), e -> e.getValue())); + } + + private BasicKSQLHttpClient createNewClient(String paragraphId, KSQLRequest request, + String url) throws IOException { + BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder() + .withUrl(url) + .withJson(json.writeValueAsString(request)) + .withType("POST") + .withHeader("Content-type", KSQL_V1_CONTENT_TYPE) + .build(); + BasicKSQLHttpClient oldClient = clientCache.put(paragraphId, client); + if (oldClient != null) { + oldClient.close(); + } + return client; + } + + private void executeSelect(String paragraphId, Consumer<KSQLResponse> callback, + KSQLRequest request) throws IOException { + List<String> fieldNames = getFields(request); + if (fieldNames.isEmpty()) { + throw new RuntimeException("Field are empty"); + } + try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, queryUrl)) { + client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() { + @Override + public void onMessage(int status, String message) { + try { + Map<String, Object> queryResponse = json.readValue(message, LinkedHashMap.class); + KSQLResponse resp = new KSQLResponse(fieldNames, queryResponse); + callback.accept(resp); + if (resp.isTerminal() || StringUtils.isNotBlank(resp.getErrorMessage()) + || StringUtils.isNotBlank(resp.getFinalMessage())) { + client.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onError(int status, String message) { + try { + KSQLResponse resp = new KSQLResponse(Collections.singletonMap("error", message)); + callback.accept(resp); + client.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + } + } + + private void executePrint(String paragraphId, Consumer<KSQLResponse> callback, + KSQLRequest request) throws IOException { + try (BasicKSQLHttpClient client = createNewClient(paragraphId, request, queryUrl)) { + client.connectAsync(new BasicKSQLHttpClient.BasicHTTPClientResponse() { + @Override + public void onMessage(int status, String message) { + if (message.toUpperCase().startsWith("FORMAT:")) { + return; + } + List<String> elements = Arrays.asList(message.split(",")); + Map<String, Object> row = new LinkedHashMap<>(); + row.put("timestamp", elements.get(0)); + row.put("offset", elements.get(1)); + row.put("record", String.join("", elements.subList(2, elements.size()))); + KSQLResponse resp = new KSQLResponse(row); + callback.accept(resp); + } + + @Override + public void onError(int status, String message) { + try { + KSQLResponse resp = new KSQLResponse(Collections.singletonMap("error", message)); + callback.accept(resp); + client.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + } + } + + private boolean isSelect(KSQLRequest request) { + return request.getKsql().toUpperCase().startsWith("SELECT"); + } + + private boolean isPrint(KSQLRequest request) { + return request.getKsql().toUpperCase().startsWith("PRINT"); + } + + public void closeClient(final String paragraphId) throws IOException { + BasicKSQLHttpClient toClose = clientCache.remove(paragraphId); + if (toClose != null) { + toClose.close(); + } + } + + private List<String> getFields(KSQLRequest request) throws IOException { + return getFields(request, false); + } + + private List<String> getFields(KSQLRequest request, boolean tryCoerce) throws IOException { + if (tryCoerce) { + /* + * this because a query like + * `EXPLAIN SELECT * FROM ORDERS WHERE ADDRESS->STATE = 'New York' LIMIT 10;` + * fails with the message `Column STATE cannot be resolved` + * so we try to coerce the field resolution + */ + String query = request.getKsql() + .substring(0, request.getKsql().toUpperCase().indexOf("WHERE")); + request = new KSQLRequest(query, request.getStreamsProperties()); + } + try (BasicKSQLHttpClient client = new BasicKSQLHttpClient.Builder() + .withUrl(ksqlUrl) + .withJson(json.writeValueAsString(request.toExplainRequest())) + .withType("POST") + .withHeader("Content-type", KSQL_V1_CONTENT_TYPE) + .build()) { + List<Map<String, Object>> explainResponseList = json.readValue(client.connect(), List.class); + Map<String, Object> explainResponse = explainResponseList.get(0); + Map<String, Object> queryDescription = (Map<String, Object>) explainResponse + .getOrDefault("queryDescription", Collections.emptyMap()); + List<Map<String, Object>> fields = (List<Map<String, Object>>) queryDescription + .getOrDefault("fields", Collections.emptyList()); + return fields.stream() + .map(elem -> elem.getOrDefault("name", "").toString()) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + } catch (IOException e) { + if (!tryCoerce) { + return getFields(request, true); + } else { + throw e; + } + } + } + + + public void close() { + Set<String> keys = clientCache.keySet(); + keys.forEach(key -> { + try { + closeClient(key); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } +} diff --git a/ksql/src/main/resources/interpreter-setting.json b/ksql/src/main/resources/interpreter-setting.json new file mode 100644 index 0000000..cf15bbf --- /dev/null +++ b/ksql/src/main/resources/interpreter-setting.json @@ -0,0 +1,21 @@ +[ + { + "group": "ksql", + "name": "ksql", + "className": "org.apache.zeppelin.ksql.KSQLInterpreter", + "properties": { + "ksql.url": { + "envName": null, + "propertyName": "ksql.url", + "defaultValue": "http://localhost:8088", + "description": "KSQL Endpoint base URL", + "type": "string" + } + }, + "editor": { + "language": "sql", + "editOnDblClick": false, + "completionSupport": false + } + } +] diff --git a/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java b/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java new file mode 100644 index 0000000..b9520fb --- /dev/null +++ b/ksql/src/test/java/org/apache/zeppelin/ksql/KSQLInterpreterTest.java @@ -0,0 +1,169 @@ +/* +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 + +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.zeppelin.ksql; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterOutput; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.stubbing.Stubber; + +import java.io.IOException; +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; + +public class KSQLInterpreterTest { + + private InterpreterContext context; + + private static final Map<String, String> PROPS = new HashMap<String, String>() {{ + put("ksql.url", "http://localhost:8088"); + put("ksql.streams.auto.offset.reset", "earliest"); + }}; + + + @Before + public void setUpZeppelin() throws IOException { + context = InterpreterContext.builder() + .setInterpreterOut(new InterpreterOutput(null)) + .setParagraphId("ksql-test") + .build(); + } + + @Test + public void shouldRenderKSQLSelectAsTable() throws InterpreterException, + IOException, InterruptedException { + // given + Properties p = new Properties(); + p.putAll(PROPS); + KSQLRestService service = Mockito.mock(KSQLRestService.class); + Stubber stubber = Mockito.doAnswer((invocation) -> { + Consumer< KSQLResponse> callback = (Consumer< KSQLResponse>) + invocation.getArguments()[2]; + IntStream.range(1, 5) + .forEach(i -> { + Map<String, Object> map = new HashMap<>(); + if (i == 4) { + map.put("row", null); + map.put("terminal", true); + } else { + map.put("row", Collections.singletonMap("columns", Arrays.asList("value " + i))); + map.put("terminal", false); + } + callback.accept(new KSQLResponse(Arrays.asList("fieldName"), map)); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + return null; + }); + stubber.when(service).executeQuery(Mockito.any(String.class), + Mockito.anyString(), + Mockito.any(Consumer.class)); + Interpreter interpreter = new KSQLInterpreter(p, service); + + // when + String query = "select * from orders"; + interpreter.interpret(query, context); + + // then + String expected = "%table fieldName\n" + + "value 1\n" + + "value 2\n" + + "value 3\n"; + assertEquals(1, context.out.toInterpreterResultMessage().size()); + assertEquals(expected, context.out.toInterpreterResultMessage().get(0).toString()); + assertEquals(InterpreterResult.Type.TABLE, context.out + .toInterpreterResultMessage().get(0).getType()); + interpreter.close(); + } + + @Test + public void shouldRenderKSQLNonSelectAsTable() throws InterpreterException, + IOException, InterruptedException { + // given + Properties p = new Properties(); + p.putAll(PROPS); + KSQLRestService service = Mockito.mock(KSQLRestService.class); + Map<String, Object> row1 = new HashMap<>(); + row1.put("name", "orders"); + row1.put("registered", "false"); + row1.put("replicaInfo", "[1]"); + row1.put("consumerCount", "0"); + row1.put("consumerGroupCount", "0"); + Map<String, Object> row2 = new HashMap<>(); + row2.put("name", "orders"); + row2.put("registered", "false"); + row2.put("replicaInfo", "[1]"); + row2.put("consumerCount", "0"); + row2.put("consumerGroupCount", "0"); + Stubber stubber = Mockito.doAnswer((invocation) -> { + Consumer< KSQLResponse> callback = (Consumer< KSQLResponse>) + invocation.getArguments()[2]; + callback.accept(new KSQLResponse(row1)); + callback.accept(new KSQLResponse(row2)); + return null; + }); + stubber.when(service).executeQuery( + Mockito.any(String.class), + Mockito.anyString(), + Mockito.any(Consumer.class)); + Interpreter interpreter = new KSQLInterpreter(p, service); + + // when + String query = "show topics"; + interpreter.interpret(query, context); + + // then + List<Map<String, Object>> expected = Arrays.asList(row1, row2); + + String[] lines = context.out.toInterpreterResultMessage() + .get(0).toString() + .replace("%table ", "") + .trim() + .split("\n"); + List<String[]> rows = Stream.of(lines) + .map(line -> line.split("\t")) + .collect(Collectors.toList()); + List<Map<String, String>> actual = rows.stream() + .skip(1) + .map(row -> IntStream.range(0, row.length) + .mapToObj(index -> new AbstractMap.SimpleEntry<>(rows.get(0)[index], row[index])) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()))) + .collect(Collectors.toList()); + assertEquals(1, context.out.toInterpreterResultMessage().size()); + assertEquals(expected, actual); + assertEquals(InterpreterResult.Type.TABLE, context.out + .toInterpreterResultMessage().get(0).getType()); + } +} diff --git a/pom.xml b/pom.xml index 15e6c2c..ffe1802 100644 --- a/pom.xml +++ b/pom.xml @@ -85,6 +85,7 @@ <module>beam</module> <module>hazelcastjet</module> <module>geode</module> + <module>ksql</module> <module>zeppelin-web</module> <module>zeppelin-server</module> <module>zeppelin-jupyter</module>