[
https://issues.apache.org/jira/browse/BAHIR-175?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
lynn updated BAHIR-175:
-----------------------
Description:
Spark Version:2.2.0 spark-streaming-sql-mqtt version: 2.2.0
# Reading checkpoints offsets error
{code:java}
org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to
org.apache.spark.sql.execution.streaming.LongOffset{code}
solution:
The MqttStreamSource.scala source file:
Line 149, getBatch Method:
{code:java}
val startIndex = start.getOrElse(LongOffset(0)) match
{ case offset: SerializedOffset => offset.json.toInt case offset: LongOffset =>
offset.offset.toInt }
val endIndex = end match
{ case offset: SerializedOffset => offset.json.toInt case offset: LongOffset =>
offset.offset.toInt }
{code}
2. The MqttStreamSource.scala source file
getBatch Method:
{code:java}
val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
// Move consumed messages to persistent store.
(startIndex + 1 to endIndex).foreach
{ id => val element = messages.getOrElse(id, store.retrieve(id)) data +=
element store.store(id, element) messages.remove(id, element) }
The following line:
val element = messages.getOrElse(id, store.retrieve(id)) throws error:
java.lang.ClassCastException: scala.Tuple2 cannot be cast to
scala.runtime.Nothing$
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
{code}
solution:
{code:java}
val element: (String, Timestamp) = messages.getOrElse(id,
store.retrieve[(String, Timestamp)](id))
{code}
was:
Spark Version:2.2.0 spark-streaming-sql-mqtt version: 2.2.0
# Reading checkpoints offsets error
org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to
org.apache.spark.sql.execution.streaming.LongOffset
solution:
The MqttStreamSource.scala source file:
Line 149, getBatch Method:
val startIndex = start.getOrElse(LongOffset(0)) match
{ case offset: SerializedOffset => offset.json.toInt case offset: LongOffset =>
offset.offset.toInt }
val endIndex = end match
{ case offset: SerializedOffset => offset.json.toInt case offset: LongOffset =>
offset.offset.toInt }
2. The MqttStreamSource.scala source file
getBatch Method:
val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
// Move consumed messages to persistent store.
(startIndex + 1 to endIndex).foreach
{ id => val element = messages.getOrElse(id, store.retrieve(id)) data +=
element store.store(id, element) messages.remove(id, element) }
The following line:
val element = messages.getOrElse(id, store.retrieve(id)) throws error:
java.lang.ClassCastException: scala.Tuple2 cannot be cast to
scala.runtime.Nothing$
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
solution:
val element: (String, Timestamp) = messages.getOrElse(id,
store.retrieve[(String, Timestamp)](id))
> Recovering from Failures with Checkpointing Exception(Mqtt)
> -----------------------------------------------------------
>
> Key: BAHIR-175
> URL: https://issues.apache.org/jira/browse/BAHIR-175
> Project: Bahir
> Issue Type: Bug
> Components: Spark Structured Streaming Connectors
> Reporter: lynn
> Assignee: Lukasz Antoniak
> Priority: Major
> Fix For: Spark-2.4.0
>
>
> Spark Version:2.2.0 spark-streaming-sql-mqtt version: 2.2.0
>
> # Reading checkpoints offsets error
> {code:java}
> org.apache.spark.sql.execution.streaming.SerializedOffset cannot be cast to
> org.apache.spark.sql.execution.streaming.LongOffset{code}
>
> solution:
> The MqttStreamSource.scala source file:
> Line 149, getBatch Method:
>
> {code:java}
> val startIndex = start.getOrElse(LongOffset(0)) match
> { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset
> => offset.offset.toInt }
> val endIndex = end match
> { case offset: SerializedOffset => offset.json.toInt case offset: LongOffset
> => offset.offset.toInt }
> {code}
>
> 2. The MqttStreamSource.scala source file
> getBatch Method:
>
> {code:java}
> val data: ArrayBuffer[(String, Timestamp)] = ArrayBuffer.empty
> // Move consumed messages to persistent store.
> (startIndex + 1 to endIndex).foreach
> { id => val element = messages.getOrElse(id, store.retrieve(id)) data +=
> element store.store(id, element) messages.remove(id, element) }
> The following line:
> val element = messages.getOrElse(id, store.retrieve(id)) throws error:
> java.lang.ClassCastException: scala.Tuple2 cannot be cast to
> scala.runtime.Nothing$
> at
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
> at
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1$$anonfun$3.apply(MQTTStreamSource.scala:160)
> at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> at scala.collection.concurrent.TrieMap.getOrElse(TrieMap.scala:633)
> at
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply$mcZI$sp(MQTTStreamSource.scala:160)
> at
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
> at
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource$$anonfun$getBatch$1.apply(MQTTStreamSource.scala:159)
> at scala.collection.immutable.Range.foreach(Range.scala:160)
> at
> org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.getBatch(MQTTStreamSource.scala:159)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:470)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets$3.apply(StreamExecution.scala:466)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at
> org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:466)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
> {code}
>
>
> solution:
>
> {code:java}
> val element: (String, Timestamp) = messages.getOrElse(id,
> store.retrieve[(String, Timestamp)](id))
>
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.3.2#803003)