This is an automated email from the ASF dual-hosted git repository. chinmayskulkarni pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix-connectors.git
The following commit(s) were added to refs/heads/master by this push: new 4a4308a PHOENIX-5410 Phoenix spark to hbase connector takes long time persist data 4a4308a is described below commit 4a4308a86c2224a2cf0bd9efb0f35df2680b556b Author: Manohar Chamaraju <manohar.chamar...@microfocus.com> AuthorDate: Fri Aug 2 15:43:30 2019 +0530 PHOENIX-5410 Phoenix spark to hbase connector takes long time persist data Signed-off-by: Chinmay Kulkarni <chinmayskulka...@gmail.com> --- .../spark/datasource/v2/writer/PhoenixDataWriter.java | 18 ++++++++++++++++-- .../sql/execution/datasources/jdbc/SparkJdbcUtil.scala | 4 ++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java index 04670d5..f67695c 100644 --- a/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java +++ b/phoenix-spark/src/main/java/org/apache/phoenix/spark/datasource/v2/writer/PhoenixDataWriter.java @@ -22,6 +22,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.stream.Collectors; @@ -32,6 +33,8 @@ import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder$; import org.apache.spark.sql.execution.datasources.SparkJdbcUtil; import org.apache.spark.sql.execution.datasources.jdbc.PhoenixJdbcDialect$; import org.apache.spark.sql.sources.v2.writer.DataWriter; @@ -39,6 +42,9 @@ import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$; +import org.apache.spark.sql.catalyst.expressions.AttributeReference; +import org.apache.spark.sql.catalyst.expressions.Attribute; import com.google.common.collect.Lists; @@ -55,6 +61,7 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> { private final PreparedStatement statement; private final long batchSize; private long numRecords = 0; + private ExpressionEncoder<Row> encoder = null; PhoenixDataWriter(PhoenixDataSourceWriteOptions options) { String scn = options.getScn(); @@ -68,6 +75,13 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> { overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); } this.schema = options.getSchema(); + + List<Attribute> attrs = new ArrayList<>(); + + for (AttributeReference ref : scala.collection.JavaConverters.seqAsJavaListConverter(schema.toAttributes()).asJava()) { + attrs.add(ref.toAttribute()); + } + encoder = RowEncoder$.MODULE$.apply(schema).resolveAndBind( scala.collection.JavaConverters.asScalaIteratorConverter(attrs.iterator()).asScala().toSeq(), SimpleAnalyzer$.MODULE$); try { this.conn = DriverManager.getConnection(JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + zkUrl, overridingProps); @@ -92,14 +106,14 @@ public class PhoenixDataWriter implements DataWriter<InternalRow> { public void write(InternalRow internalRow) throws IOException { try { int i=0; + Row row = SparkJdbcUtil.toRow(encoder, internalRow); for (StructField field : schema.fields()) { DataType dataType = field.dataType(); if (internalRow.isNullAt(i)) { statement.setNull(i + 1, SparkJdbcUtil.getJdbcType(dataType, PhoenixJdbcDialect$.MODULE$).jdbcNullType()); } else { - Row row = SparkJdbcUtil.toRow(schema, internalRow); - SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i); + SparkJdbcUtil.makeSetter(conn, PhoenixJdbcDialect$.MODULE$, dataType).apply(statement, row, i); } ++i; } diff --git a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala index 50cdbf5..97b0525 100644 --- a/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala +++ b/phoenix-spark/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/SparkJdbcUtil.scala @@ -34,11 +34,11 @@ Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, Metadata, Sh StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.NextIterator +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder object SparkJdbcUtil { - def toRow(schema: StructType, internalRow: InternalRow) : Row = { - val encoder = RowEncoder(schema).resolveAndBind() + def toRow(encoder: ExpressionEncoder[Row], internalRow: InternalRow) : Row = { encoder.fromRow(internalRow) }