[ https://issues.apache.org/jira/browse/SPARK-14737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15278375#comment-15278375 ]
Faisal commented on SPARK-14737: -------------------------------- {code} import java.io.Serializable; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import org.apache.commons.lang3.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; import org.apache.spark.streaming.kafka.KafkaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import kafka.serializer.StringDecoder; import scala.Tuple2; public class DataProcessor2 implements Serializable { private static final long serialVersionUID = 3071125481526170241L; private static Logger log = LoggerFactory.getLogger("DataProcessor"); public static void main(String[] args) { final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR); DataProcessorContextFactory3 factory = new DataProcessorContextFactory3(); JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory); // Start the process jssc.start(); jssc.awaitTermination(); } } class DataProcessorContextFactory3 implements JavaStreamingContextFactory, Serializable { private static final long serialVersionUID = 6070911284191531450L; private static Logger logger = LoggerFactory.getLogger(DataProcessorContextFactory.class); DataProcessorContextFactory3() { } @Override public JavaStreamingContext create() { logger.debug("creating new context..!"); final String brokers = ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME); final String topic = ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME); final String app = "app1"; final String offset = ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest"); logger.debug("Data processing configuration. brokers={}, topic={}, app={}, offset={}", brokers, topic, app, offset); if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) || StringUtils.isBlank(app)) { System.err.println("Usage: DataProcessor <brokers> <topic>\n" + Consts.KAFKA_BROKERS_NAME + " is a list of one or more Kafka brokers separated by comma\n" + Consts.KAFKA_TOPIC_NAME + " is a kafka topic to consume from \n\n\n"); System.exit(1); } final String majorVersion = "1.0"; final String minorVersion = "3"; final String version = majorVersion + "." + minorVersion; final String applicationName = "DataProcessor-" + topic + "-" + version; // for dev environment SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(applicationName); // for cluster environment //SparkConf sparkConf = new SparkConf().setAppName(applicationName); final long sparkBatchDuration = Long .valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10")); final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchDuration)); logger.debug("setting checkpoint directory={}", sparkCheckPointDir); jssc.checkpoint(sparkCheckPointDir); HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); kafkaParams.put("auto.offset.reset", offset); kafkaParams.put("group.id", "app1"); // @formatter:off JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); // @formatter:on processRDD(messages, app); return jssc; } private void processRDD(JavaPairInputDStream<String, String> messages, final String app) { JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction()); rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() { private static final long serialVersionUID = 250647626267731218L; @Override public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception { if (!currentRdd.isEmpty()) { logger.debug("Receive RDD. Create JobDispatcherFunction at HOST={}", FunctionUtil.getHostName()); currentRdd.foreachPartition(new VoidFunction<Iterator<MsgStruct>>() { @Override public void call(Iterator<MsgStruct> arg0) throws Exception { while(arg0.hasNext()){ System.out.println(arg0.next().toString()); } } }); } else { logger.debug("Current RDD is empty."); } return null; } }); } public static class MessageProcessFunction implements Function<Tuple2<String, String>, MsgStruct> { @Override public MsgStruct call(Tuple2<String, String> data) throws Exception { String message = data._2(); System.out.println("message:"+message); return MsgStruct.parse(message); } } public static class MsgStruct implements Serializable{ private String message; public static MsgStruct parse(String msg){ MsgStruct m = new MsgStruct(); m.message = msg; return m; } public String toString(){ return "content inside="+message; } } } {code} > Kafka Brokers are down - spark stream should retry > -------------------------------------------------- > > Key: SPARK-14737 > URL: https://issues.apache.org/jira/browse/SPARK-14737 > Project: Spark > Issue Type: Improvement > Components: Streaming > Affects Versions: 1.3.0 > Environment: Suse Linux, Cloudera Enterprise 5.4.8 (#7 built by > jenkins on 20151023-1205 git: d7dbdf29ac1d57ae9fb19958502d50dcf4e4fffd), > kafka_2.10-0.8.2.2 > Reporter: Faisal > > I have spark streaming application that uses direct streaming - listening to > KAFKA topic. > {code} > HashMap<String, String> kafkaParams = new HashMap<String, String>(); > kafkaParams.put("metadata.broker.list", "broker1,broker2,broker3"); > kafkaParams.put("auto.offset.reset", "largest"); > HashSet<String> topicsSet = new HashSet<String>(); > topicsSet.add("Topic1"); > JavaPairInputDStream<String, String> messages = > KafkaUtils.createDirectStream( > jssc, > String.class, > String.class, > StringDecoder.class, > StringDecoder.class, > kafkaParams, > topicsSet > ); > {code} > I notice when i stop/shutdown kafka brokers, my spark application also > shutdown. > Here is the spark execution script > {code} > spark-submit \ > --master yarn-cluster \ > --files /home/siddiquf/spark/log4j-spark.xml > --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-spark.xml" \ > --conf > "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-spark.xml" \ > --class com.example.MyDataStreamProcessor \ > myapp.jar > {code} > Spark job submitted successfully and i can track the application driver and > worker/executor nodes. > Everything works fine but only concern if kafka borkers are offline or > restarted my application controlled by yarn should not shutdown? but it does. > If this is expected behavior then how to handle such situation with least > maintenance? Keeping in mind Kafka cluster is not in hadoop cluster and > managed by different team that is why requires our application to be > resilient enough. > Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org