Problem: how do we recover from user errors (connectivity issues / storage service down / etc.)? Environment: Spark streaming using Kafka Direct Streams Code Snippet:
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList("kafkaTopic1"));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "smallest");
JavaPairInputDStream<String, String> messages = KafkaUtils
.createDirectStream(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet);
JavaDStream<String> inputStream = messages
.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}});
inputStream.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
if(!rdd.isEmpty())
{
rdd.foreach(new VoidFunction<String>(){
@Override
public void call(String arg0) throws Exception {
System.out.println("------------------------rdd----------"+arg0);
Thread.sleep(1000);
throw new Exception(" :::::::::::::::user and/or service
exception::::::::::::::"+arg0);
}});
}
return null;
}
});
Detailed Description: Using spark streaming I read the text messages from kafka
using direct API. For sake of simplicity, all I do in processing is printing
each message on console and sleep of 1 sec. as a placeholder for actual
processing. Assuming we get a user error may be due to bad record, format error
or the service connectivity issues or let's say the persistent store downtime.
I've represented that with throwing an Exception from foreach block. I
understand spark retries this configurable number of times and proceeds ahead.
The question is what happens to those failed messages, does ( if yes when )
spark re-tries those ? If not, does it have any callback method so as user can
log / dump it in error queue and provision it for further analysis and / or
retrials manually. Also, fyi, checkpoints are enabled and above code is in
create context method to recover from spark driver / worker failures.
________________________________
NOTE: This message may contain information that is confidential, proprietary,
privileged or otherwise protected by law. The message is intended solely for
the named addressee. If received in error, please destroy and notify the
sender. Any use of this email is prohibited when received in error. Impetus
does not represent, warrant and/or guarantee, that the integrity of this
communication has been maintained nor that the communication is free of errors,
virus, interception or interference.
default.xml
Description: default.xml
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
