Hi Cody,

thanks for your answer. I have finally managed to create simple sample code. Here it is:

import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.*;

public class SparkTest {

private static JavaPairInputDStream<String, String> setupKafkaStream(final JavaStreamingContext jssc) {
        final Map<String, String> params = new HashMap<>();
        params.put("group.id", "TestApp");
        params.put("metadata.broker.list", "localhost:9092");

        final Set<String> topics = new HashSet<>();
        topics.add("test");

        return KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                params,
                topics
        );
    }

    public static void main(String args[]) {
final SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkTest"); final JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        // setup
// final JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999); final JavaPairInputDStream<String, String> s = setupKafkaStream(jssc);
        final JavaDStream<String> lines = s.map(tuple -> tuple._2());

        final JavaDStream<String> words =
lines.window(Durations.seconds(10)).flatMap(line -> Arrays.asList(line.split(" ")));


        final JavaPairDStream<String, Integer> wordCounts =
                words.mapToPair(word -> new Tuple2<>(word, 1))
                        .reduceByKey((a, b) -> a + b);

        wordCounts.print();

final ConsoleShutdownTrigger consoleShutdownTrigger = new ConsoleShutdownTrigger(() -> {
            jssc.stop(false, true);
        });
        consoleShutdownTrigger.start();

        jssc.start();
        jssc.awaitTermination();
    }
}

When I run this app the pipeline is stopped (or blocked). But if I switch from direct-kafka-stream to (for instance) socket-text-stream the app works as expected.

Since this is possible use-case (API allows it) I would like to know whether I hit some limitation (or bug) in the Spark-Kafka.

I am using Spark 1.5.0.

Thanks
Martin



On 03/22/2016 06:24 PM, Cody Koeninger wrote:
I definitely have direct stream jobs that use window() without
problems... Can you post a minimal code example that reproduces the
problem?

Using print() will confuse the issue, since print() will try to only
use the first partition.

Use foreachRDD { rdd => rdd.foreach(println)

or something comparable

On Tue, Mar 22, 2016 at 10:14 AM, Martin Soch <martin.s...@oracle.com> wrote:
Hi all,

I am using direct-Kafka-input-stream in my Spark app. When I use window(...)
function in the chain it will cause the processing pipeline to stop - when I
open the Spark-UI I can see that the streaming batches are being queued and
the pipeline reports to process one of the first batches.

To be more correct: the issue happens only when the windows overlap (if
sliding_interval < window_length). Otherwise the system behaves as expected.

Derivations of window(..) function - like reduceByKeyAndWindow(..), etc.
works also as expected - pipeline doesn't stop. The same applies when using
different type of stream.

Is it some known limitation of window(..) function when used with
direct-Kafka-input-stream ?

Java pseudo code:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream s;
s.window(Durations.seconds(10)).print();  // the pipeline will stop

Thanks
Martin

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to