This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e2f27c  Switch Zeppelin connector from Session to JDBC (#2414)
7e2f27c is described below

commit 7e2f27c7dc81a4faa9bca4a8147e6cfe35a21746
Author: Rong-Kang <[email protected]>
AuthorDate: Thu Jan 7 12:04:24 2021 +0800

    Switch Zeppelin connector from Session to JDBC (#2414)
---
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |   2 -
 zeppelin-interpreter/pom.xml                       |   2 +-
 .../apache/zeppelin/iotdb/IoTDBInterpreter.java    | 200 +++++++++++++--------
 .../zeppelin/iotdb/IoTDBInterpreterTest.java       |   9 +-
 4 files changed, 133 insertions(+), 80 deletions(-)

diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java 
b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index c9be448..a068d98 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -189,8 +189,6 @@ public class RpcUtils {
     switch (newTimeFormat.trim().toLowerCase()) {
       case "long":
       case "number":
-        timeFormat = newTimeFormat.trim().toLowerCase();
-        break;
       case DEFAULT_TIME_FORMAT:
       case "iso8601":
         timeFormat = newTimeFormat.trim().toLowerCase();
diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml
index 222a95e..0376060 100644
--- a/zeppelin-interpreter/pom.xml
+++ b/zeppelin-interpreter/pom.xml
@@ -54,7 +54,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
-            <artifactId>iotdb-session</artifactId>
+            <artifactId>iotdb-jdbc</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
index 7574d5c..dc1c86c 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/iotdb/IoTDBInterpreter.java
@@ -17,20 +17,26 @@
 package org.apache.zeppelin.iotdb;
 
 
+import static org.apache.iotdb.rpc.IoTDBRpcDataSet.TIMESTAMP_STR;
 import static org.apache.iotdb.rpc.RpcUtils.setTimeFormat;
 
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.time.ZoneId;
 import java.util.Arrays;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Properties;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.jdbc.IoTDBConnection;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.session.Session;
-import org.apache.iotdb.session.SessionDataSet;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.AbstractInterpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -52,14 +58,13 @@ public class IoTDBInterpreter extends AbstractInterpreter {
   static final String IOTDB_ZONE_ID = "iotdb.zoneId";
   static final String IOTDB_ENABLE_RPC_COMPRESSION = 
"iotdb.enable.rpc.compression";
   static final String IOTDB_TIME_DISPLAY_TYPE = "iotdb.time.display.type";
-  static final String SET_TIMESTAMP_DISPLAY = "set time_display_type";
-
   private static final String NONE_VALUE = "none";
   static final String DEFAULT_HOST = "127.0.0.1";
   static final String DEFAULT_PORT = "6667";
   static final String DEFAULT_FETCH_SIZE = "10000";
   static final String DEFAULT_ENABLE_RPC_COMPRESSION = "false";
   static final String DEFAULT_TIME_DISPLAY_TYPE = "long";
+  static final String DEFAULT_ZONE_ID = "UTC";
 
   private static final char TAB = '\t';
   private static final char NEWLINE = '\n';
@@ -67,48 +72,84 @@ public class IoTDBInterpreter extends AbstractInterpreter {
   private static final String SEMICOLON = ";";
   private static final String EQUAL_SIGN = "=";
 
-  private IoTDBConnectionException connectionException;
-  private Session session;
+
+  /**
+   * should be consistent with IoTDB client
+   */
+  private static final String QUIT_COMMAND = "quit";
+  private static final String EXIT_COMMAND = "exit";
+  private static final String HELP = "help";
+  private static final String IMPORT_CMD = "import";
+  static final String SET_TIMESTAMP_DISPLAY = "set time_display_type";
+  private static final String SET_MAX_DISPLAY_NUM = "set max_display_num";
+  private static final String SHOW_TIMESTAMP_DISPLAY = "show 
time_display_type";
+  private static final String SET_TIME_ZONE = "set time_zone";
+  private static final String SHOW_TIMEZONE = "show time_zone";
+  private static final String SET_FETCH_SIZE = "set fetch_size";
+  private static final String SHOW_FETCH_SIZE = "show fetch_size";
+
+  private static final Set<String> nonSupportCommandSet = new HashSet<>(
+      Arrays.asList(QUIT_COMMAND, EXIT_COMMAND, HELP, IMPORT_CMD, 
SET_TIME_ZONE, SET_FETCH_SIZE,
+          SET_MAX_DISPLAY_NUM, SHOW_TIMEZONE, SHOW_TIMESTAMP_DISPLAY, 
SHOW_FETCH_SIZE, IMPORT_CMD));
+
   private String timeFormat;
+
+  private IoTDBConnectionException connectionException;
+  private IoTDBConnection connection = null;
+  private int fetchSize;
   private ZoneId zoneId;
 
+
   public IoTDBInterpreter(Properties property) {
     super(property);
   }
 
   @Override
   public void open() {
+    String host;
+    int port;
+    String passWord;
     try {
-      String host = getProperty(IOTDB_HOST, DEFAULT_HOST).trim();
-      int port = Integer.parseInt(getProperty(IOTDB_PORT, 
DEFAULT_PORT).trim());
-      String userName = properties.getProperty(IOTDB_USERNAME, 
NONE_VALUE).trim();
-      String passWord = properties.getProperty(IOTDB_PASSWORD, 
NONE_VALUE).trim();
-      int fetchSize = Integer
+      host = getProperty(IOTDB_HOST, DEFAULT_HOST).trim();
+      port = Integer.parseInt(getProperty(IOTDB_PORT, DEFAULT_PORT).trim());
+      userName = properties.getProperty(IOTDB_USERNAME, NONE_VALUE).trim();
+      passWord = properties.getProperty(IOTDB_PASSWORD, NONE_VALUE).trim();
+      this.fetchSize = Integer
           .parseInt(properties.getProperty(IOTDB_FETCH_SIZE, 
DEFAULT_FETCH_SIZE).trim());
-      String zoneStr = properties.getProperty(IOTDB_ZONE_ID);
-      this.zoneId = !NONE_VALUE.equalsIgnoreCase(zoneStr) && 
StringUtils.isNotBlank(zoneStr)
-          ? ZoneId.of(zoneStr.trim()) : ZoneId.systemDefault();
+
       String timeDisplayType = properties.getProperty(IOTDB_TIME_DISPLAY_TYPE,
           DEFAULT_TIME_DISPLAY_TYPE).trim();
       this.timeFormat = setTimeFormat(timeDisplayType);
-      boolean enableRPCCompression = "true".equalsIgnoreCase(
+      Config.rpcThriftCompressionEnable = "true".equalsIgnoreCase(
           properties.getProperty(IOTDB_ENABLE_RPC_COMPRESSION,
               DEFAULT_ENABLE_RPC_COMPRESSION).trim());
-      session = new Session(host, port, userName, passWord, fetchSize, zoneId);
-      session.open(enableRPCCompression);
-    } catch (IoTDBConnectionException e) {
-      connectionException = e;
+
+      Class.forName(Config.JDBC_DRIVER_NAME);
+      this.connection = (IoTDBConnection) DriverManager
+          .getConnection(Config.IOTDB_URL_PREFIX + host + ":" + port + "/", 
userName, passWord);
+      String zoneStr = properties.getProperty(IOTDB_ZONE_ID);
+      if (!NONE_VALUE.equalsIgnoreCase(zoneStr) && 
StringUtils.isNotBlank(zoneStr)) {
+        this.zoneId = ZoneId.of(zoneStr.trim());
+        connection.setTimeZone(zoneStr);
+      } else {
+        this.zoneId = ZoneId.systemDefault();
+        connection.setTimeZone(this.zoneId.getId());
+      }
+      connection.setTimeZone(this.zoneId.getId());
+
+    } catch (SQLException | ClassNotFoundException | TException e) {
+      connectionException = new IoTDBConnectionException(e);
     }
   }
 
   @Override
   public void close() {
     try {
-      if (session != null) {
-        session.close();
+      if (this.connection != null) {
+        this.connection.close();
       }
-    } catch (IoTDBConnectionException e) {
-      connectionException = e;
+    } catch (SQLException e) {
+      connectionException = new IoTDBConnectionException(e);
     }
   }
 
@@ -131,62 +172,75 @@ public class IoTDBInterpreter extends AbstractInterpreter 
{
     try {
       String[] scriptLines = parseMultiLinesSQL(st);
       InterpreterResult interpreterResult = null;
+
       for (String scriptLine : scriptLines) {
-        String lowercaseSc = scriptLine.toLowerCase();
-        if (lowercaseSc.startsWith(SET_TIMESTAMP_DISPLAY)) {
-          String[] values = scriptLine.split(EQUAL_SIGN);
-          if (values.length != 2) {
-            throw new StatementExecutionException(
-                String.format("Time display format error, please input like 
%s=ISO8601",
-                    SET_TIMESTAMP_DISPLAY));
-          }
-          String timeDisplayType = values[1].trim();
-          this.timeFormat = setTimeFormat(values[1]);
-          interpreterResult = new InterpreterResult(Code.SUCCESS, "Time 
display type has set to " +
-              timeDisplayType);
-        } else if (lowercaseSc.startsWith("select")) {
-          //Execute query
-          String msg;
-          msg = getResultWithCols(session, scriptLine);
-          interpreterResult = new InterpreterResult(Code.SUCCESS);
-          interpreterResult.add(Type.TABLE, msg);
-        } else {
-          //Execute non query
-          session.executeNonQueryStatement(scriptLine);
-          interpreterResult = new InterpreterResult(Code.SUCCESS, "Sql 
executed.");
-        }
+        interpreterResult = handleInputCmd(scriptLine, connection);
       }
       return interpreterResult;
     } catch (StatementExecutionException e) {
       return new InterpreterResult(Code.ERROR, "StatementExecutionException: " 
+ e.getMessage());
-    } catch (IoTDBConnectionException e) {
-      return new InterpreterResult(Code.ERROR, "IoTDBConnectionException: " + 
e.getMessage());
     }
   }
 
-  private String getResultWithCols(Session session, String sql)
-      throws StatementExecutionException, IoTDBConnectionException {
-    SessionDataSet sessionDataSet = session.executeQueryStatement(sql);
-    List<String> columnNames = sessionDataSet.getColumnNames();
-    StringBuilder stringBuilder = new StringBuilder();
-    for (String key : columnNames) {
-      stringBuilder.append(key);
-      stringBuilder.append(TAB);
+  private InterpreterResult handleInputCmd(String cmd, IoTDBConnection 
connection)
+      throws StatementExecutionException {
+    String specialCmd = cmd.toLowerCase().trim();
+    if (nonSupportCommandSet.contains(specialCmd)) {
+      return new InterpreterResult(Code.ERROR, "Not supported in Zeppelin: " + 
specialCmd);
+    }
+    if (specialCmd.startsWith(SET_TIMESTAMP_DISPLAY)) {
+      String[] values = cmd.split(EQUAL_SIGN);
+      if (values.length != 2) {
+        throw new StatementExecutionException(
+            String.format("Time display format error, please input like 
%s=ISO8601",
+                SET_TIMESTAMP_DISPLAY));
+      }
+      String timeDisplayType = values[1].trim();
+      this.timeFormat = setTimeFormat(values[1]);
+      return new InterpreterResult(Code.SUCCESS, "Time display type has set to 
" +
+          timeDisplayType);
     }
-    stringBuilder.deleteCharAt(stringBuilder.length() - 1);
-    stringBuilder.append(NEWLINE);
-    while (sessionDataSet.hasNext()) {
-      RowRecord record = sessionDataSet.next();
-      stringBuilder.append(RpcUtils.formatDatetime(timeFormat, 
RpcUtils.DEFAULT_TIMESTAMP_PRECISION,
-          record.getTimestamp(), zoneId));
-      for (Field f : record.getFields()) {
-        stringBuilder.append(TAB);
-        stringBuilder.append(f);
+    return executeQuery(connection, cmd);
+  }
+
+  private InterpreterResult executeQuery(IoTDBConnection connection, String 
cmd) {
+    StringBuilder stringBuilder = new StringBuilder();
+    try (Statement statement = connection.createStatement()) {
+      statement.setFetchSize(fetchSize);
+      boolean hasResultSet = statement.execute(cmd.trim());
+      InterpreterResult interpreterResult;
+      if (hasResultSet) {
+        try (ResultSet resultSet = statement.getResultSet()) {
+          final ResultSetMetaData metaData = resultSet.getMetaData();
+          final int columnCount = metaData.getColumnCount();
+
+          for (int i = 1; i <= columnCount; i++) {
+            stringBuilder.append(metaData.getColumnLabel(i));
+            stringBuilder.append(TAB);
+          }
+          stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+          stringBuilder.append(NEWLINE);
+          while (resultSet.next()) {
+            stringBuilder
+                .append(RpcUtils.formatDatetime(timeFormat, 
RpcUtils.DEFAULT_TIMESTAMP_PRECISION,
+                    resultSet.getLong(TIMESTAMP_STR), zoneId));
+            for (int i = 2; i <= columnCount; i++) {
+              stringBuilder.append(TAB);
+              stringBuilder.append(resultSet.getString(i));
+            }
+            stringBuilder.append(NEWLINE);
+          }
+          stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+          interpreterResult = new InterpreterResult(Code.SUCCESS);
+          interpreterResult.add(Type.TABLE, stringBuilder.toString());
+          return interpreterResult;
+        }
+      } else {
+        return new InterpreterResult(Code.SUCCESS, "Sql executed.");
       }
-      stringBuilder.append(NEWLINE);
+    } catch (SQLException e) {
+      return new InterpreterResult(Code.ERROR, "SQLException: " + 
e.getMessage());
     }
-    stringBuilder.deleteCharAt(stringBuilder.length() - 1);
-    return stringBuilder.toString();
   }
 
   @Override
@@ -197,8 +251,8 @@ public class IoTDBInterpreter extends AbstractInterpreter {
   @Override
   public void cancel(InterpreterContext context) {
     try {
-      session.close();
-    } catch (IoTDBConnectionException e) {
+      connection.close();
+    } catch (SQLException e) {
       LOGGER.error("Exception close failed", e);
     }
   }
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
index 2013c47..94b836f 100644
--- 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/iotdb/IoTDBInterpreterTest.java
@@ -24,6 +24,7 @@ import static 
org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_FETCH_SIZE;
 import static org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_HOST;
 import static org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_PORT;
 import static 
org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_TIME_DISPLAY_TYPE;
+import static org.apache.zeppelin.iotdb.IoTDBInterpreter.DEFAULT_ZONE_ID;
 import static 
org.apache.zeppelin.iotdb.IoTDBInterpreter.IOTDB_ENABLE_RPC_COMPRESSION;
 import static org.apache.zeppelin.iotdb.IoTDBInterpreter.IOTDB_FETCH_SIZE;
 import static org.apache.zeppelin.iotdb.IoTDBInterpreter.IOTDB_HOST;
@@ -59,7 +60,7 @@ public class IoTDBInterpreterTest {
     properties.put(IOTDB_USERNAME, "root");
     properties.put(IOTDB_PASSWORD, "root");
     properties.put(IOTDB_FETCH_SIZE, DEFAULT_FETCH_SIZE);
-    properties.put(IOTDB_ZONE_ID, "UTC");
+    properties.put(IOTDB_ZONE_ID, DEFAULT_ZONE_ID);
     properties.put(IOTDB_ENABLE_RPC_COMPRESSION, 
DEFAULT_ENABLE_RPC_COMPRESSION);
     properties.put(IOTDB_TIME_DISPLAY_TYPE, DEFAULT_TIME_DISPLAY_TYPE);
     interpreter = new IoTDBInterpreter(properties);
@@ -193,7 +194,7 @@ public class IoTDBInterpreterTest {
     Assert.assertNotNull(actual);
     Assert.assertEquals(Code.ERROR, actual.code());
     Assert.assertEquals(
-        "StatementExecutionException: 401: meet error while parsing SQL to 
physical plan: {}line 1:13 missing ROOT at '<EOF>'",
+        "SQLException: 401: line 1:13 missing ROOT at '<EOF>'",
         actual.message().get(0).getData());
 
     wrongSql = "select * from a";
@@ -201,14 +202,14 @@ public class IoTDBInterpreterTest {
     Assert.assertNotNull(actual);
     Assert.assertEquals(Code.ERROR, actual.code());
     Assert.assertEquals(
-        "StatementExecutionException: 401: meet error while parsing SQL to 
physical plan: {}line 1:14 mismatched input 'a' expecting {FROM, ',', '.'}",
+        "SQLException: 401: line 1:14 mismatched input 'a' expecting {FROM, 
',', '.'}",
         actual.message().get(0).getData());
 
     wrongSql = "select * from root a";
     Assert.assertNotNull(actual);
     Assert.assertEquals(Code.ERROR, actual.code());
     Assert.assertEquals(
-        "StatementExecutionException: 401: meet error while parsing SQL to 
physical plan: {}line 1:14 mismatched input 'a' expecting {FROM, ',', '.'}",
+        "SQLException: 401: line 1:14 mismatched input 'a' expecting {FROM, 
',', '.'}",
         actual.message().get(0).getData());
   }
 

Reply via email to