val lines = spark.readStream .format("socket") // .schema(StructType(schemas)) .option("host", "10.3.87.23") .option("port", 6666) .load() .selectExpr("CAST(value AS STRING)").as[(String)]DF = lines.map(x => { val obj = JSON.parseObject(x) val ls = new util.ArrayList() (obj.getString("timestamp"), obj.getString("msg")) }).toDF("timestamp", "msg") val q = DF.writeStream .foreachBatch((row, l) => { if (row.count() != 0) { row.write.format("jdbc") .mode(mode) .options(cfg) .save() }
}) .start() I get a error Logical Plan: Project [_1#10 AS timestamp#13, _2#11 AS msg#14] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#11] +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#9: scala.Tuple2 +- DeserializeToObject cast(value#2 as string).toString, obj#8: java.lang.String +- Project [cast(value#0 as string) AS value#2] +- StreamingExecutionRelation TextSocketV2[host: 10.3.87.23, port: 6666], [value#0] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Caused by: java.lang.NullPointerException at java.util.regex.Matcher.getTextLength(Matcher.java:1283) at java.util.regex.Matcher.reset(Matcher.java:309) at java.util.regex.Matcher.<init>(Matcher.java:229) at java.util.regex.Pattern.matcher(Pattern.java:1093) at scala.util.matching.Regex.findFirstIn(Regex.scala:388) at org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2695) at org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2695) at scala.Option.orElse(Option.scala:289) at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2695) at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2693) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.util.Utils$.redact(Utils.scala:2693) at org.apache.spark.util.Utils$.redact(Utils.scala:2660) at org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:2071) at org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:2071) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.internal.SQLConf.redactOptions(SQLConf.scala:2071) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.simpleString(SaveIntoDataSourceCommand.scala:52) at org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:548) at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:472) at org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:197) at org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:197) at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:99) at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:197) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:75) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) at com.join.jdbc.writer.JdbcWriter$$anonfun$1.apply(JdbcWriter.scala:46) at com.join.jdbc.writer.JdbcWriter$$anonfun$1.apply(JdbcWriter.scala:41) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) ... 1 more 21/09/03 15:18:56 INFO SparkContext: Invoking stop() from shutdown hook igyu