val lines = spark.readStream
.format("socket")
// .schema(StructType(schemas))
.option("host", "10.3.87.23")
.option("port", )
.load()
.selectExpr("CAST(value AS STRING)").as[(String)]DF = lines.map(x => {
val obj = JSON.parseObject(x)
val ls = new util.ArrayList()
(obj.getString("timestamp"), obj.getString("msg"))
}).toDF("timestamp", "msg")
val q = DF.writeStream
.foreachBatch((row, l) => {
if (row.count() != 0) {
row.write.format("jdbc")
.mode(mode)
.options(cfg)
.save()
}
})
.start()
I get a error
Logical Plan:
Project [_1#10 AS timestamp#13, _2#11 AS msg#14]
+- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS
_1#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2,
true, false) AS _2#11]
+- MapElements , class java.lang.String,
[StructField(value,StringType,true)], obj#9: scala.Tuple2
+- DeserializeToObject cast(value#2 as string).toString, obj#8:
java.lang.String
+- Project [cast(value#0 as string) AS value#2]
+- StreamingExecutionRelation TextSocketV2[host: 10.3.87.23, port:
], [value#0]
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.NullPointerException
at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
at java.util.regex.Matcher.reset(Matcher.java:309)
at java.util.regex.Matcher.(Matcher.java:229)
at java.util.regex.Pattern.matcher(Pattern.java:1093)
at scala.util.matching.Regex.findFirstIn(Regex.scala:388)
at
org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2695)
at
org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2695)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2695)
at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2693)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.util.Utils$.redact(Utils.scala:2693)
at org.apache.spark.util.Utils$.redact(Utils.scala:2660)
at
org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:2071)
at
org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:2071)
at
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.internal.SQLConf.redactOptions(SQLConf.scala:2071)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.simpleString(SaveIntoDataSourceCommand.scala:52)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:177)
at
org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:548)
at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:472)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:197)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:197)
at
org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:99)
at
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:197)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:75)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at com.join.jdbc.writer.JdbcWriter$$anonfun$1.apply(JdbcWriter.scala:46)
at com.join.jdbc.writer.JdbcWriter$$anonfun$1.apply(JdbcWriter.scala:41)
at
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
at