Vijay created BAHIR-301:
---------------------------
Summary: Unable to persist Oracle's Number(16,9) using
sql-streaming-jdbc
Key: BAHIR-301
URL: https://issues.apache.org/jira/browse/BAHIR-301
Project: Bahir
Issue Type: Bug
Components: Spark Streaming Connectors
Affects Versions: Spark-2.4.0
Reporter: Vijay
Fix For: Spark-2.4.0
While trying to persist Oracle's Number(16,9) using sql-streaming-jdbc, getting
below error.
+*Java Code Snippet:*+
{{Dataset<Row> rawData = spark.readStream()}}
{{ .format("csv").schema(getSchema())}}
{{ .csv("/rates-streaming/*.csv");}}
{{ }}
{{ rawData.createOrReplaceTempView("testRates");}}
{{ }}
{{ rawData.printSchema();}}
{{ results.printSchema();}}
{{ }}
{{ Dataset<Row> results = spark.sql("select rate from testRates ");}}
{{ }}
{{ StreamingQuery query =
results.writeStream().outputMode("append").format("streaming-jdbc")}}
{{
.outputMode(OutputMode.Append()).option(JDBCOptions.JDBC_URL(), datasource)}}
{{ .option(JDBCOptions.JDBC_TABLE_NAME(), "TABLENAME")}}
{{ .option(JDBCOptions.JDBC_DRIVER_CLASS(),
"oracle.jdbc.driver.OracleDriver")}}
{{ .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE(),
"100").option("user", username)}}
{{ .option("password", password).option("truncate",
false).trigger(Trigger.ProcessingTime("15 seconds")).start();}}
{{ query.awaitTermination();}}
{{ }}
{{ }}
{{ }}
{{ public StructType getSchema() {}}
{{ List<StructField> fields = new ArrayList<>();}}
{{ fields.add(DataTypes.createStructField("rate",
DataTypes.createDecimalType(16, 9), true));}}
{{ StructType schema = DataTypes.createStructType(fields);}}
{{ return schema;}}
{{ }}}
+*Logs:*+
{{java.lang.ClassCastException: org.apache.spark.sql.types.Decimal cannot be
cast to java.math.BigDecimal}}
{{ at org.apache.spark.sql.Row.getDecimal(Row.scala:262)
~[spark-catalyst_2.12-2.4.0.jar:2.4.0]}}
{{ at org.apache.spark.sql.Row.getDecimal$(Row.scala:262)
~[spark-catalyst_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.spark.sql.catalyst.expressions.GenericRow.getDecimal(rows.scala:166)
~[spark-catalyst_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.bahir.sql.streaming.jdbc.JdbcUtil$.$anonfun$makeSetter$12(JdbcUtil.scala:102)
~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.bahir.sql.streaming.jdbc.JdbcUtil$.$anonfun$makeSetter$12$adapted(JdbcUtil.scala:101)
~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.doWriteAndResetBuffer(JdbcStreamWriter.scala:174)
~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.write(JdbcStreamWriter.scala:156)
~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.bahir.sql.streaming.jdbc.JdbcStreamDataWriter.write(JdbcStreamWriter.scala:79)
~[spark-sql-streaming-jdbc_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:118)
~[spark-sql_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
~[spark-sql_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
~[spark-sql_2.12-2.4.0.jar:2.4.0]}}
{{ at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{ at org.apache.spark.scheduler.Task.run(Task.scala:121)
~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{ at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:405)
~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{ at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{ at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
~[spark-core_2.12-2.4.0.jar:2.4.0]}}
{{ at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[na:1.8.0_291]}}
{{ at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[na:1.8.0_291]}}
{{ at java.lang.Thread.run(Thread.java:748) [na:1.8.0_291]}}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)