Hi,

I am trying to run a spark application ingesting data from Kafka using the
Spark structured streaming and the spark library
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
issue where during execution of all my micro-batches the Kafka consumer is
not able to fetch the offsets and its having its offsets reset as show
below in this log

19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-5 to offset 1168959116.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-1 to offset 1218619371.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-8 to offset 1157205346.
19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id,
groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
Resetting offset for partition my-topic-21 to offset 1255403059.


It is reasonable if this resetting happens once in application due to the
fact that the offsets stored in my checkpoint are no longer valid and will
have to reset our offsets to a new value. But I am seeing this reset
happening for every micro batch execution in my streaming job. In at the
end the streaming query progress emits the following

19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
  "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
  "name" : null,
  "timestamp" : "2019-09-10T15:55:00.000Z",
  "batchId" : 189,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 127,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 24,
    "setOffsetRange" : 36,
    "triggerExecution" : 1859,
    "walCommit" : 1032
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
    "startOffset" : {
      "my_kafka_topic" : {
        "23" : 1206926686,
        "8" : 1158514946,
        "17" : 1258387219,
        "11" : 1263091642,
        "2" : 1226741128,
        "20" : 1229560889,
        "5" : 1170304913,
        "14" : 1207333901,
        "4" : 1274242728,
        "13" : 1336386658,
        "22" : 1260210993,
        "7" : 1288639296,
        "16" : 1247462229,
        "10" : 1093157103,
        "1" : 1219904858,
        "19" : 1116269615,
        "9" : 1238935018,
        "18" : 1069224544,
        "12" : 1256018541,
        "3" : 1251150202,
        "21" : 1256774117,
        "15" : 1170591375,
        "6" : 1185108169,
        "24" : 1202342095,
        "0" : 1165356330
      }
    },
    "endOffset" : {
      "my_kafka_topic" : {
        "23" : 1206928043,
        "8" : 1158516721,
        "17" : 1258389219,
        "11" : 1263093490,
        "2" : 1226743225,
        "20" : 1229562962,
        "5" : 1170307882,
        "14" : 1207335736,
        "4" : 1274245585,
        "13" : 1336388570,
        "22" : 1260213582,
        "7" : 1288641384,
        "16" : 1247464311,
        "10" : 1093159186,
        "1" : 1219906407,
        "19" : 1116271435,
        "9" : 1238936994,
        "18" : 1069226913,
        "12" : 1256020926,
        "3" : 1251152579,
        "21" : 1256776910,
        "15" : 1170593216,
        "6" : 1185110032,
        "24" : 1202344538,
        "0" : 1165358262
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
  }
}


In the above StreamingQueryProgress event the numInputRows fields  is zero
and this is the case for all micro batch executions and no data is being
produced whatsoever. So basically for each batch my offsets are being reset
and each batch is producing zero rows. Since there is no work being done
and since dynamic allocation is enabled all my executors killed... I have
tried deleting my checkpoint and started my application from scratch and I
am still facing the same issue. What could possibly be wrong this?... what
lines of investigation should I take?  If you are interested in getting
Stackoverflow point you can answer my question in SO here
<https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>.


Thanks,
Charles

Reply via email to