Hi,
I have a problem with Structured Streaming and Kafka.
I have 2 brokers and a topic with 8 partitions and replication factor 2.

This is my driver program:

public static void main(String[] args)
    {

        SparkSession spark = SparkSession
                  .builder()
                  .appName("StreamFromKafka")
                  .config("spark.sql.streaming.minBatchesToRetain", 5)
                  .getOrCreate();


        spark.sessionState().conf().setConf(
org.apache.spark.sql.internal.SQLConf.SHUFFLE_PARTITIONS() ,8);

        Dataset<Row> df = spark
                  .readStream()
                  .format("kafka")
                  .option("kafka.bootstrap.servers",
"host100:9092,host200:9092")
                  .option("subscribe", "acrmonitor")
                  .option("startingOffsets", "earliest")
                  .load();

        String windowSize = "10 minutes";
        String slide = "10 minutes";
        String startTime = "0 minutes";

        Dataset<Row> df3 = df.select(
                                     get_json_object(
col("value").cast("string"), "$.address").alias("address"),
                                     get_json_object(
col("value").cast("string"), "$.type").alias("type"),
                                     get_json_object(
col("value").cast("string"),
"$.insertDate").alias("insertDate").cast("timestamp")
                                    )
                .withWatermark("insertDate", "15 minutes")
                .groupBy(
                         col("address"),
                         col("type"),
                         window(col("insertDate").cast("timestamp"),
windowSize, slide , startTime)
                         )
                .count();

    String  chkptDir = "/tmp/checkPoint" ;

     StreamingQuery query = df3
                .writeStream()
                .outputMode(OutputMode.Update())
                .option("checkpointLocation", chkptDir)
                .foreach(new JDBCsink()
).trigger(Trigger.ProcessingTime(30000)).start();

        try {
              query.awaitTermination();
        } catch (Exception e)
        { System.err.println("ERROR: " + e.getMessage()); }

    spark.cloneSession();

    }

I use a checkpoint directory.

When I stop one kafka broker ( the other one remain alive ), driver program
stops reading messages from queue and process them. I waited for more than
5 minutes.
If I restart driver program ( with one broker down, it reads and processes
messages.

If I try to stop one kafka broker, ( so the driver stops to read ...) and
after a few minutes I restart the broker, then driver programs begins again
to read messages and process them ( without restarting driver program ).

Is there a way to permit driver program to continue reading kafka topic,
when only one of broker goes down, without restart driver ?

Thanks.


<http://apache-spark-user-list.1001560.n3.nabble.com>

Reply via email to