Direct Kafka input stream and window(…) function

2016-03-22 Thread Martin Soch

Hi all,

I am using direct-Kafka-input-stream in my Spark app. When I use 
window(...) function in the chain it will cause the processing pipeline 
to stop - when I open the Spark-UI I can see that the streaming batches 
are being queued and the pipeline reports to process one of the first 
batches.


To be more correct: the issue happens only when the windows overlap (if 
sliding_interval < window_length). Otherwise the system behaves as expected.


Derivations of window(..) function - like reduceByKeyAndWindow(..), etc. 
works also as expected - pipeline doesn't stop. The same applies when 
using different type of stream.


Is it some known limitation of window(..) function when used with 
direct-Kafka-input-stream ?


Java pseudo code:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream s;
s.window(Durations.seconds(10)).print();  // the pipeline will stop

Thanks
Martin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Direct Kafka input stream and window(…) function

2016-03-22 Thread Cody Koeninger
I definitely have direct stream jobs that use window() without
problems... Can you post a minimal code example that reproduces the
problem?

Using print() will confuse the issue, since print() will try to only
use the first partition.

Use foreachRDD { rdd => rdd.foreach(println)

or something comparable

On Tue, Mar 22, 2016 at 10:14 AM, Martin Soch  wrote:
> Hi all,
>
> I am using direct-Kafka-input-stream in my Spark app. When I use window(...)
> function in the chain it will cause the processing pipeline to stop - when I
> open the Spark-UI I can see that the streaming batches are being queued and
> the pipeline reports to process one of the first batches.
>
> To be more correct: the issue happens only when the windows overlap (if
> sliding_interval < window_length). Otherwise the system behaves as expected.
>
> Derivations of window(..) function - like reduceByKeyAndWindow(..), etc.
> works also as expected - pipeline doesn't stop. The same applies when using
> different type of stream.
>
> Is it some known limitation of window(..) function when used with
> direct-Kafka-input-stream ?
>
> Java pseudo code:
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream s;
> s.window(Durations.seconds(10)).print();  // the pipeline will stop
>
> Thanks
> Martin
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Direct Kafka input stream and window(…) function

2016-03-24 Thread Cody Koeninger
If this is related to
https://issues.apache.org/jira/browse/SPARK-14105 , are you windowing
before doing any transformations at all?  Try using map to extract the
data you care about before windowing.

On Tue, Mar 22, 2016 at 12:24 PM, Cody Koeninger  wrote:
> I definitely have direct stream jobs that use window() without
> problems... Can you post a minimal code example that reproduces the
> problem?
>
> Using print() will confuse the issue, since print() will try to only
> use the first partition.
>
> Use foreachRDD { rdd => rdd.foreach(println)
>
> or something comparable
>
> On Tue, Mar 22, 2016 at 10:14 AM, Martin Soch  wrote:
>> Hi all,
>>
>> I am using direct-Kafka-input-stream in my Spark app. When I use window(...)
>> function in the chain it will cause the processing pipeline to stop - when I
>> open the Spark-UI I can see that the streaming batches are being queued and
>> the pipeline reports to process one of the first batches.
>>
>> To be more correct: the issue happens only when the windows overlap (if
>> sliding_interval < window_length). Otherwise the system behaves as expected.
>>
>> Derivations of window(..) function - like reduceByKeyAndWindow(..), etc.
>> works also as expected - pipeline doesn't stop. The same applies when using
>> different type of stream.
>>
>> Is it some known limitation of window(..) function when used with
>> direct-Kafka-input-stream ?
>>
>> Java pseudo code:
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream s;
>> s.window(Durations.seconds(10)).print();  // the pipeline will stop
>>
>> Thanks
>> Martin
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Direct Kafka input stream and window(…) function

2016-03-29 Thread Martin Soch

Hi Cody,

thanks for your answer. I have finally managed to create simple sample 
code. Here it is:


import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;

import java.util.*;

public class SparkTest {

private static JavaPairInputDStream 
setupKafkaStream(final JavaStreamingContext jssc) {

final Map params = new HashMap<>();
params.put("group.id", "TestApp");
params.put("metadata.broker.list", "localhost:9092");

final Set topics = new HashSet<>();
topics.add("test");

return KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
params,
topics
);
}

public static void main(String args[]) {
final SparkConf conf = new 
SparkConf().setMaster("local[*]").setAppName("SparkTest");
final JavaStreamingContext jssc = new 
JavaStreamingContext(conf, Durations.seconds(1));


// setup
// final JavaReceiverInputDStream lines = 
jssc.socketTextStream("localhost", );
final JavaPairInputDStream s = 
setupKafkaStream(jssc);

final JavaDStream lines = s.map(tuple -> tuple._2());

final JavaDStream words =
lines.window(Durations.seconds(10)).flatMap(line -> 
Arrays.asList(line.split(" ")));



final JavaPairDStream wordCounts =
words.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);

wordCounts.print();

final ConsoleShutdownTrigger consoleShutdownTrigger = new 
ConsoleShutdownTrigger(() -> {

jssc.stop(false, true);
});
consoleShutdownTrigger.start();

jssc.start();
jssc.awaitTermination();
}
}

When I run this app the pipeline is stopped (or blocked). But if I 
switch from direct-kafka-stream to (for instance) socket-text-stream the 
app works as expected.


Since this is possible use-case (API allows it) I would like to know 
whether I hit some limitation (or bug) in the Spark-Kafka.


I am using Spark 1.5.0.

Thanks
Martin



On 03/22/2016 06:24 PM, Cody Koeninger wrote:

I definitely have direct stream jobs that use window() without
problems... Can you post a minimal code example that reproduces the
problem?

Using print() will confuse the issue, since print() will try to only
use the first partition.

Use foreachRDD { rdd => rdd.foreach(println)

or something comparable

On Tue, Mar 22, 2016 at 10:14 AM, Martin Soch  wrote:

Hi all,

I am using direct-Kafka-input-stream in my Spark app. When I use window(...)
function in the chain it will cause the processing pipeline to stop - when I
open the Spark-UI I can see that the streaming batches are being queued and
the pipeline reports to process one of the first batches.

To be more correct: the issue happens only when the windows overlap (if
sliding_interval < window_length). Otherwise the system behaves as expected.

Derivations of window(..) function - like reduceByKeyAndWindow(..), etc.
works also as expected - pipeline doesn't stop. The same applies when using
different type of stream.

Is it some known limitation of window(..) function when used with
direct-Kafka-input-stream ?

Java pseudo code:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream s;
s.window(Durations.seconds(10)).print();  // the pipeline will stop

Thanks
Martin

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org