Does that code even compile?  I'm assuming eventLogJson.foreach is
supposed to be eventLogJson.foreachRDD ?
I'm also confused as to why you're repartitioning to 1 partition.

Is your streaming job lagging behind (especially given that you're
basically single-threading it by repartitioning to 1 partition)?

Have you looked for any error logs or failed tasks during the time you
noticed missing messages?

Have you verified that you aren't attempting to overwrite hdfs paths?


On Thu, May 5, 2016 at 2:09 PM, Jerry Wong <jerry.king2.w...@gmail.com> wrote:
> Hi Cody,
>
> Thank you for quick response my question. I paste the main part of the code,
>
>     val sparkConf = new SparkConf().setAppName("KafkaSparkConsumer")
>
>     sparkConf.set("spark.cassandra.connection.host", "XXXX.XXXX.XXXX.XXXX")
>     sparkConf.set("spark.broadcast.factory",
> "org.apache.spark.broadcast.HttpBroadcastFactory")
>     sparkConf.set("spark.cores.max", args(0))
>     sparkConf.set("spark.executor.memory", args(1))
> val kafka_broker = args(2)
>     val kafka_topic = args(3)
>     val hdfs_path = args(4)
> val ssc = new StreamingContext(sparkConf, 2)
> val topicsSet = Set[String](kafka_topic)
>     val kafkaParams = Map[String, String]("metadata.broker.list" →
> kafka_broker)
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topicsSet)
> val lines = messages.repartition(1).map({ case (w, c) ⇒ (c)})
> val eventLogJson  = lines.filter(line => line.contains("eventType"))
>
> val eventlog =eventLogJson.foreach(json => {
> if(!json.isEmpty()){
> json.saveAsTextFile(hdfs_path+"/eventlogs/"+getTimeFormatToFile())
> }
> })
> ssc.start()
> ssc.awaitTermination()
>    }
>    def getTimeFormatToFile(): String = {
> val dateFormat =new SimpleDateFormat("yyyy_MM_dd_HH_mm_ss")
>    val dt = new Date()
> val cg= new GregorianCalendar()
> cg.setTime(dt);
> return dateFormat.format(cg.getTime())
>   }
>
> Any information is needs?
>
> Thanks!
>
> On Thu, May 5, 2016 at 12:34 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> That's not much information to go on.  Any relevant code sample or log
>> messages?
>>
>> On Thu, May 5, 2016 at 11:18 AM, Jerry <jerry.king2.w...@gmail.com> wrote:
>> > Hi,
>> >
>> > Does anybody give me an idea why the data is lost at the Kafka Consumer
>> > side? I use Kafka 0.8.2 and Spark (streaming) version is 1.5.2.
>> > Sometimes, I
>> > found out I could not receive the same number of data with Kafka
>> > producer.
>> > Exp) I sent 1000 data to Kafka Broker via Kafka Producer and confirmed
>> > the
>> > same number in the Broker. But when I checked either HDFS or Cassandra,
>> > the
>> > number is just 363. The data is not always lost, just sometimes...
>> > That's
>> > wired and annoying to me.
>> > Can anybody give me some reasons?
>> >
>> > Thanks!
>> > Jerry
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Missing-data-in-Kafka-Consumer-tp26887.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to