[ 
https://issues.apache.org/jira/browse/DRILL-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17430676#comment-17430676
 ] 

ASF GitHub Bot commented on DRILL-8005:
---------------------------------------

vvysotskyi commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r730462584



##########
File path: 
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s 
VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = 
"UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic 
strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All 
versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)

Review comment:
       Do we support repeated types or lists of lists and so on?

##########
File path: contrib/storage-jdbc/pom.xml
##########
@@ -46,7 +47,11 @@
       <groupId>com.zaxxer</groupId>
       <artifactId>HikariCP</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>com.github.vvysotskyi.drill-calcite</groupId>
+      <artifactId>calcite-server</artifactId>

Review comment:
       Could you please explain why this dependency is required here?

##########
File path: 
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s 
VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = 
"UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic 
strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All 
versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()

Review comment:
       Please combine it with a declaration instead of a static block.

##########
File path: 
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s 
VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = 
"UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic 
strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All 
versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String 
name, JdbcWriter config) {
+    this.tableName = JdbcDDLQueryUtils.addBackTicksToTable(name);
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = JdbcDDLQueryUtils.addBackTicksToField(field.getName());
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + 
columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), 
nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try (Statement statement = connection.createStatement()) {
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the 
schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) 
rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = 
StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = 
(NullableVarCharHolder) rowList.get(i);
+            value = 
StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) 
rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);

Review comment:
       Please wrap the statement into the try-with-resources block and add a 
closing connection to the `finally` block, since, for the case of exception 
during statement execution, it wouldn't be closed...

##########
File path: contrib/storage-jdbc/pom.xml
##########
@@ -46,7 +47,11 @@
       <groupId>com.zaxxer</groupId>
       <artifactId>HikariCP</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>com.github.vvysotskyi.drill-calcite</groupId>

Review comment:
       ```suggestion
         <groupId>${calcite.groupId}</groupId>
   ```

##########
File path: 
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlDialect.DatabaseProduct;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s 
VALUES\n%s";
+  private static final String INSERT_QUERY_TEMPLATE_FOR_APACHE_PHOENIX = 
"UPSERT INTO %s VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic 
strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All 
versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String 
name, JdbcWriter config) {
+    this.tableName = JdbcDDLQueryUtils.addBackTicksToTable(name);
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = JdbcDDLQueryUtils.addBackTicksToField(field.getName());
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + 
columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), 
nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try (Statement statement = connection.createStatement()) {
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the 
schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString

Review comment:
       Can't we use Calcite's `JdbcTableModify` to create insert statement 
string instead of using custom logic?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@drill.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Writer to JDBC Storage Plugin
> ---------------------------------
>
>                 Key: DRILL-8005
>                 URL: https://issues.apache.org/jira/browse/DRILL-8005
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Storage - JDBC
>    Affects Versions: 1.19.0
>            Reporter: Charles Givre
>            Assignee: Charles Givre
>            Priority: Major
>             Fix For: 1.20.0
>
>
> Current implementation of Drill only allows writing to file systems.  This 
> issue proposes extending the JDBC plugin to allow writing to JDBC data 
> sources.  This will do so by implementing: 
> CREATE TABLE AS
> DROP TABLE
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to