Are you writing multiple streaming query output to the same location? If
so, I can see this error occurring. Multiple streaming queries writing to
the same directory is not supported.

On Tue, Jul 24, 2018 at 3:38 PM, dddaaa <danv...@gmail.com> wrote:

> I'm trying to read json messages from kafka and store them in hdfs with
> spark
> structured streaming.
>
> I followed the example here:
> https://spark.apache.org/docs/2.1.0/structured-streaming-
> kafka-integration.html
>
> and when my code looks like this:
>
>     df = spark \
>       .read \
>       .format("kafka") \
>       .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
>       .option("subscribe", "topic1") \
>       .load()
>     df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
> df.writeStream.format("json").option("checkpointLocation",
> "some/hdfs/path").start(/data")
> Then I get rows with binary values in hdfs.
>
> {"value":"BINARY
> DATA","topic":"test_hdfs2","partition":0,"offset":3463075,
> "timestamp":"2018-07-24T20:51:33.655Z","timestampType":0}
>
> These rows are continually written as expected, but in the binary format.
>
> I found this post:
>
> https://databricks.com/blog/2017/04/26/processing-data-in-
> apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>
> and I'm trying to implement this example:
>
> schema = StructType().add("a", IntegerType()).add("b", StringType())
> df.select( \
>   col("key").cast("string"),
>   from_json(col("value").cast("string"), schema))
> But here I get an odd behvaiur. I have a small file written to hdfs with
> multiple empty json rows - {}
>
> and very quickly the jobs fails with the following excption:
>
> 18/07/24 22:25:47 ERROR datasources.FileFormatWriter: Aborting job null.
> java.lang.IllegalStateException:
> hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when compacting
> batch 409 (compactInterval: 10) at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
> at scala.Option.getOrElse(Option.scala:121) at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4.apply(CompactibleFileStreamLog.scala:173)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4.apply(CompactibleFileStreamLog.scala:172)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(
> CompactibleFileStreamLog.scala:172)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(
> CompactibleFileStreamLog.scala:156)
> at
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.
> commitJob(ManifestFileCommitProtocol.scala:64)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(
> FileFormatWriter.scala:213)
> at
> org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(
> FileStreamSink.scala:123)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$
> 3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:77)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$
> 3.apply(MicroBatchExecution.scala:475)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.
> reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(
> StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$
> apache$spark$sql$execution$streaming$MicroBatchExecution$
> $runBatch(MicroBatchExecution.scala:474)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(
> MicroBatchExecution.scala:133)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(
> MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(
> MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.
> reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(
> StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.
> execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.
> runActivatedStream(MicroBatchExecution.scala:117)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$
> spark$sql$execution$streaming$StreamExecution$$runStream(
> StreamExecution.scala:279)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(
> StreamExecution.scala:189)
> 18/07/24 22:25:47 ERROR streaming.MicroBatchExecution: Query [id =
> 4f6c4ebc-f330-4697-b2db-7989b93dfba3, runId =
> 57575397-9fda-4370-9dcb-4550ae1576ec] terminated with error
> org.apache.spark.SparkException: Job aborted. at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(
> FileFormatWriter.scala:224)
> at
> org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(
> FileStreamSink.scala:123)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$
> 3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:77)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$
> 3.apply(MicroBatchExecution.scala:475)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.
> reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(
> StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$
> apache$spark$sql$execution$streaming$MicroBatchExecution$
> $runBatch(MicroBatchExecution.scala:474)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(
> MicroBatchExecution.scala:133)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(
> MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(
> MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.
> reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(
> StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$
> runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.
> execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.
> runActivatedStream(MicroBatchExecution.scala:117)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$
> spark$sql$execution$streaming$StreamExecution$$runStream(
> StreamExecution.scala:279)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(
> StreamExecution.scala:189)
> Caused by: java.lang.IllegalStateException:
> hdfs://SOME_PATH/_spark_metadata/399.compact doesn't exist when compacting
> batch 409 (compactInterval: 10) at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
> at scala.Option.getOrElse(Option.scala:121) at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4.apply(CompactibleFileStreamLog.scala:173)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$
> anonfun$4.apply(CompactibleFileStreamLog.scala:172)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(
> CompactibleFileStreamLog.scala:172)
> at
> org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(
> CompactibleFileStreamLog.scala:156)
> at
> org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.
> commitJob(ManifestFileCommitProtocol.scala:64)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(
> FileFormatWriter.scala:213)
> ... 17 more
>
> Any idea how to implement this in the right way?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to