[ 
https://issues.apache.org/jira/browse/SPARK-14737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278375#comment-15278375
 ] 

Faisal commented on SPARK-14737:
--------------------------------


{code}
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class DataProcessor2 implements Serializable {
    private static final long serialVersionUID = 3071125481526170241L;

    private static Logger log = LoggerFactory.getLogger("DataProcessor");

    public static void main(String[] args) {
        final String sparkCheckPointDir = 
ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);
        DataProcessorContextFactory3 factory = new 
DataProcessorContextFactory3();
        JavaStreamingContext jssc = 
JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory);

        // Start the process
        jssc.start();
        jssc.awaitTermination();
    }

}

class DataProcessorContextFactory3 implements JavaStreamingContextFactory, 
Serializable {
    private static final long serialVersionUID = 6070911284191531450L;

    private static Logger logger = 
LoggerFactory.getLogger(DataProcessorContextFactory.class);

    DataProcessorContextFactory3() {
    }

    @Override
    public JavaStreamingContext create() {
        logger.debug("creating new context..!");

        final String brokers = 
ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME);
        final String topic = 
ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME);
        final String app = "app1";
        final String offset = 
ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest");

        logger.debug("Data processing configuration. brokers={}, topic={}, 
app={}, offset={}", brokers, topic, app,
                offset);
        if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) || 
StringUtils.isBlank(app)) {
            System.err.println("Usage: DataProcessor <brokers> <topic>\n" + 
Consts.KAFKA_BROKERS_NAME
                    + " is a list of one or more Kafka brokers separated by 
comma\n" + Consts.KAFKA_TOPIC_NAME
                    + " is a kafka topic to consume from \n\n\n");
            System.exit(1);
        }
        final String majorVersion = "1.0";
        final String minorVersion = "3";
        final String version = majorVersion + "." + minorVersion;
        final String applicationName = "DataProcessor-" + topic + "-" + version;
        // for dev environment
         SparkConf sparkConf = new 
SparkConf().setMaster("local[*]").setAppName(applicationName);
        // for cluster environment
        //SparkConf sparkConf = new SparkConf().setAppName(applicationName);
        final long sparkBatchDuration = Long
                
.valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10"));

        final String sparkCheckPointDir = 
ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR);

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(sparkBatchDuration));
        logger.debug("setting checkpoint directory={}", sparkCheckPointDir);
        jssc.checkpoint(sparkCheckPointDir);

        HashSet<String> topicsSet = new 
HashSet<String>(Arrays.asList(topic.split(",")));

        HashMap<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put("metadata.broker.list", brokers);
        kafkaParams.put("auto.offset.reset", offset);
        kafkaParams.put("group.id", "app1");

//          @formatter:off
            JavaPairInputDStream<String, String> messages = 
KafkaUtils.createDirectStream(
                    jssc, 
                    String.class, 
                    String.class,
                    StringDecoder.class, 
                    StringDecoder.class, 
                    kafkaParams, 
                    topicsSet
            );
//          @formatter:on
        processRDD(messages, app);
        return jssc;
    }

    private void processRDD(JavaPairInputDStream<String, String> messages, 
final String app) {
        JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction());

        rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() {

            private static final long serialVersionUID = 250647626267731218L;

            @Override
            public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception {
                if (!currentRdd.isEmpty()) {
                    logger.debug("Receive RDD. Create JobDispatcherFunction at 
HOST={}", FunctionUtil.getHostName());
                    currentRdd.foreachPartition(new 
VoidFunction<Iterator<MsgStruct>>() {

                        @Override
                        public void call(Iterator<MsgStruct> arg0) throws 
Exception {
                            while(arg0.hasNext()){
                                System.out.println(arg0.next().toString());
                            }
                        }
                    });
                } else {
                    logger.debug("Current RDD is empty.");
                }
                return null;
            }
        });
    }
    public static class MessageProcessFunction implements 
Function<Tuple2<String, String>, MsgStruct> {
        @Override
        public MsgStruct call(Tuple2<String, String> data) throws Exception {
            String message = data._2();
            System.out.println("message:"+message);
            return MsgStruct.parse(message);
        }

    }
    public static class MsgStruct implements Serializable{
        private String message;
        public static MsgStruct parse(String msg){
            MsgStruct m = new MsgStruct();
            m.message = msg;
            return m;
        }
        public String toString(){
            return "content inside="+message;
        }
    }

}
{code}

> Kafka Brokers are down - spark stream should retry
> --------------------------------------------------
>
>                 Key: SPARK-14737
>                 URL: https://issues.apache.org/jira/browse/SPARK-14737
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.3.0
>         Environment: Suse Linux, Cloudera Enterprise 5.4.8 (#7 built by 
> jenkins on 20151023-1205 git: d7dbdf29ac1d57ae9fb19958502d50dcf4e4fffd), 
> kafka_2.10-0.8.2.2
>            Reporter: Faisal
>
> I have spark streaming application that uses direct streaming - listening to 
> KAFKA topic.
> {code}
> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>     kafkaParams.put("metadata.broker.list", "broker1,broker2,broker3");
>     kafkaParams.put("auto.offset.reset", "largest");
>     HashSet<String> topicsSet = new HashSet<String>();
>     topicsSet.add("Topic1");
>     JavaPairInputDStream<String, String> messages = 
> KafkaUtils.createDirectStream(
>             jssc, 
>             String.class, 
>             String.class,
>             StringDecoder.class, 
>             StringDecoder.class, 
>             kafkaParams, 
>             topicsSet
>     );
> {code}
> I notice when i stop/shutdown kafka brokers, my spark application also 
> shutdown.
> Here is the spark execution script
> {code}
> spark-submit \
> --master yarn-cluster \
> --files /home/siddiquf/spark/log4j-spark.xml
> --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-spark.xml" \
> --conf 
> "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-spark.xml" \
> --class com.example.MyDataStreamProcessor \
> myapp.jar 
> {code}
> Spark job submitted successfully and i can track the application driver and 
> worker/executor nodes.
> Everything works fine but only concern if kafka borkers are offline or 
> restarted my application controlled by yarn should not shutdown? but it does.
> If this is expected behavior then how to handle such situation with least 
> maintenance? Keeping in mind Kafka cluster is not in hadoop cluster and 
> managed by different team that is why requires our application to be 
> resilient enough.
> Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to