Hi, I have a problem with Structured Streaming and Kafka. I have 2 brokers and a topic with 8 partitions and replication factor 2.
This is my driver program: public static void main(String[] args) { SparkSession spark = SparkSession .builder() .appName("StreamFromKafka") .config("spark.sql.streaming.minBatchesToRetain", 5) .getOrCreate(); spark.sessionState().conf().setConf( org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS() ,8); Dataset<Row> df = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", "host100:9092,host200:9092") .option("subscribe", "acrmonitor") .option("startingOffsets", "earliest") .load(); String windowSize = "10 minutes"; String slide = "10 minutes"; String startTime = "0 minutes"; Dataset<Row> df3 = df.select( get_json_object( col("value").cast("string"), "$.address").alias("address"), get_json_object( col("value").cast("string"), "$.type").alias("type"), get_json_object( col("value").cast("string"), "$.insertDate").alias("insertDate").cast("timestamp") ) .withWatermark("insertDate", "15 minutes") .groupBy( col("address"), col("type"), window(col("insertDate").cast("timestamp"), windowSize, slide , startTime) ) .count(); String chkptDir = "/tmp/checkPoint" ; StreamingQuery query = df3 .writeStream() .outputMode(OutputMode.Update()) .option("checkpointLocation", chkptDir) .foreach(new JDBCsink() ).trigger(Trigger.ProcessingTime(30000)).start(); try { query.awaitTermination(); } catch (Exception e) { System.err.println("ERROR: " + e.getMessage()); } spark.cloneSession(); } I use a checkpoint directory. When I stop one kafka broker ( the other one remain alive ), driver program stops reading messages from queue and process them. I waited for more than 5 minutes. If I restart driver program ( with one broker down, it reads and processes messages. If I try to stop one kafka broker, ( so the driver stops to read ...) and after a few minutes I restart the broker, then driver programs begins again to read messages and process them ( without restarting driver program ). Is there a way to permit driver program to continue reading kafka topic, when only one of broker goes down, without restart driver ? Thanks. <http://apache-spark-user-list.1001560.n3.nabble.com>