val lines = spark.readStream
  .format("socket")
  //      .schema(StructType(schemas))
  .option("host", "10.3.87.23")
  .option("port", 6666)
  .load()
  .selectExpr("CAST(value AS STRING)").as[(String)]DF = lines.map(x => {
  val obj = JSON.parseObject(x)
  val ls = new util.ArrayList()
  (obj.getString("timestamp"), obj.getString("msg"))
}).toDF("timestamp", "msg")
val q = DF.writeStream
  .foreachBatch((row, l) => {
    if (row.count() != 0) {
      row.write.format("jdbc")
        .mode(mode)
        .options(cfg)
        .save()
    }

  })
  .start()

I get a error

Logical Plan:
Project [_1#10 AS timestamp#13, _2#11 AS msg#14]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS 
_1#10, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, 
fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, 
true, false) AS _2#11]
   +- MapElements <function1>, class java.lang.String, 
[StructField(value,StringType,true)], obj#9: scala.Tuple2
      +- DeserializeToObject cast(value#2 as string).toString, obj#8: 
java.lang.String
         +- Project [cast(value#0 as string) AS value#2]
            +- StreamingExecutionRelation TextSocketV2[host: 10.3.87.23, port: 
6666], [value#0]

at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.NullPointerException
at java.util.regex.Matcher.getTextLength(Matcher.java:1283)
at java.util.regex.Matcher.reset(Matcher.java:309)
at java.util.regex.Matcher.<init>(Matcher.java:229)
at java.util.regex.Pattern.matcher(Pattern.java:1093)
at scala.util.matching.Regex.findFirstIn(Regex.scala:388)
at 
org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2695)
at 
org.apache.spark.util.Utils$$anonfun$redact$1$$anonfun$apply$15.apply(Utils.scala:2695)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2695)
at org.apache.spark.util.Utils$$anonfun$redact$1.apply(Utils.scala:2693)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.util.Utils$.redact(Utils.scala:2693)
at org.apache.spark.util.Utils$.redact(Utils.scala:2660)
at 
org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:2071)
at 
org.apache.spark.sql.internal.SQLConf$$anonfun$redactOptions$1.apply(SQLConf.scala:2071)
at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.internal.SQLConf.redactOptions(SQLConf.scala:2071)
at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.simpleString(SaveIntoDataSourceCommand.scala:52)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.verboseString(QueryPlan.scala:177)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:548)
at org.apache.spark.sql.catalyst.trees.TreeNode.treeString(TreeNode.scala:472)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:197)
at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$4.apply(QueryExecution.scala:197)
at 
org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:99)
at 
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:197)
at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:75)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
at com.join.jdbc.writer.JdbcWriter$$anonfun$1.apply(JdbcWriter.scala:46)
at com.join.jdbc.writer.JdbcWriter$$anonfun$1.apply(JdbcWriter.scala:41)
at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:534)
at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:532)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:531)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
21/09/03 15:18:56 INFO SparkContext: Invoking stop() from shutdown hook


igyu

Reply via email to