Hello,

  I'm trying to organize processing of messages from Kafka. And there is a
typical case when a number of messages in kafka's queue is more then
Spark app's possibilities to process. But I need a strong time limit to
prepare result for at least for a part of data.

Code example:

        SparkConf sparkConf = new SparkConf()
                .setAppName("Spark")
                .setMaster("local");

        JavaStreamingContext jssc = new
JavaStreamingContext(sparkConf, Milliseconds.apply(5000));

        jssc.checkpoint("/tmp/spark_checkpoint");

        Set<String> topicMap = new
HashSet<>(Arrays.asList(topicList.split(",")));
        Map<String, String> kafkaParams = new HashMap<String, String>() {
            {
                put("metadata.broker.list", bootstrapServers);
                put("auto.offset.reset", "smallest");
            }
        };

        JavaPairInputDStream<String, String> messages =
                KafkaUtils.createDirectStream(jssc,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParams,
                        topicMap);

        messages.countByWindow(Seconds.apply(10), Milliseconds.apply(5000))
*                .map(x -> {System.out.println(x); return x;})*
                .dstream().saveAsTextFiles("/tmp/spark", "spark-streaming");


  I need to see a result of window operation each 10 seconds (this is only
simplest example). But really with my test data ~10M messages I have first
result a minute after and further I see only zeros. Is a way to limit
processing time to guarantee a response in specified time like Apache
Flink's triggers?

Thanks.

Reply via email to