spark streaming to jdbc

2021-09-03 Thread igyu
val lines = spark.readStream
  .format("socket")
  //  .schema(StructType(schemas))
  .option("host", "10.3.87.23")
  .option("port", )
  .load()
  .selectExpr("CAST(value AS STRING)").as[(String)]DF = lines.map(x => {
  val obj = JSON.parseObject(x)
  val ls = new util.ArrayList()
  (obj.getString("timestamp"), obj.getString("msg"))
}).toDF("timestamp", "msg")
val q = DF.writeStream
  .foreachBatch((row, l) => {
if (row.count() != 0) {
  row.write.format("jdbc")
.mode(mode)
.options(cfg)
.save()
}

  })
  .start()

I get a error

Logical Plan:
Project [_1#10 AS timestamp#13, _2#11 AS msg#14]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS 
_1#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, 
true, false) AS _2#11]
   +- MapElements , class java.lang.String, 
[StructField(value,StringType,true)], obj#9: scala.Tuple2
  +- DeserializeToObject cast(value#2 as string).toString, obj#8: 
java.lang.String
 +- Project [cast(value#0 as string) AS value#2]
+- StreamingExecutionRelation TextSocketV2[host: 10.3.87.23, port: 
], [value#0]

at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.NullPointerException
at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
at java.util.regex.Matcher.reset(Matcher.java:309)
at java.util.regex.Matcher.(Matcher.java:229)
at java.util.regex.Pattern.matcher(Pattern.java:1093)
at scala.util.matching.Regex.findFirstIn(Regex.scala:388)
at 
org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2695)
at 
org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2695)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2695)
at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2693)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.util.Utils$.redact(Utils.scala:2693)
at org.apache.spark.util.Utils$.redact(Utils.scala:2660)
at 
org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:2071)
at 
org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:2071)
at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.internal.SQLConf.redactOptions(SQLConf.scala:2071)
at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.simpleString(SaveIntoDataSourceCommand.scala:52)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:177)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:548)
at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:472)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:197)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:197)
at 
org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:99)
at 
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:197)
at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:75)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at com.join.jdbc.writer.JdbcWriter$$anonfun$1.apply(JdbcWriter.scala:46)
at com.join.jdbc.writer.JdbcWriter$$anonfun$1.apply(JdbcWriter.scala:41)
at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
at 

Spark Streaming Reusing JDBC Connections

2014-12-05 Thread Asim Jalis
Is there a way I can have a JDBC connection open through a streaming job. I
have a foreach which is running once per batch. However, I don’t want to
open the connection for each batch but would rather have a persistent
connection that I can reuse. How can I do this?

Thanks.

Asim


RE: Spark Streaming Reusing JDBC Connections

2014-12-05 Thread Ashic Mahtab
I've done this:

1. foreachPartition
2. Open connection.
3. foreach inside the partition.
4. close the connection.

Slightly crufty, but works. Would love to see a better approach.

Regards,
Ashic.

Date: Fri, 5 Dec 2014 12:32:24 -0500
Subject: Spark Streaming Reusing JDBC Connections
From: asimja...@gmail.com
To: user@spark.apache.org

Is there a way I can have a JDBC connection open through a streaming job. I 
have a foreach which is running once per batch. However, I don’t want to open 
the connection for each batch but would rather have a persistent connection 
that I can reuse. How can I do this?

Thanks.
Asim