Re: Do I need to do .collect inside forEachRDD

2017-12-07 Thread Qiao, Richard
Kant, right, we cannot use Driver’s producer in executor. That’s I mentioned 
“kafka sink” to solve it.
This article should be helpful about it 
https://allegro.tech/2015/08/spark-kafka-integration.html

Best Regards
Richard


From: kant kodali <kanth...@gmail.com>
Date: Thursday, December 7, 2017 at 12:39 PM
To: "Qiao, Richard" <richard.q...@capitalone.com>
Cc: Gerard Maas <gerard.m...@gmail.com>, "user @spark" <user@spark.apache.org>
Subject: Re: Do I need to do .collect inside forEachRDD

Hi Richard,

I had tried your sample code now and several times in the past as well. The 
problem seems to be kafkaProducer is not serializable. so I get "Task not 
serializable exception" and my kafkaProducer object is created using the 
following jar.

group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1'

On Thu, Dec 7, 2017 at 2:46 AM, Qiao, Richard 
<richard.q...@capitalone.com<mailto:richard.q...@capitalone.com>> wrote:
Thanks for sharing the code.
The 1st problem in the first code is the map is allocated in Driver, but it’s 
trying to put data in Executors, then retrieve it in driver to send to Kafka.
You are using this map as accumulator’s feature, but it doesn’t work in this 
way.

The 2nd problem is both codes are trying to collect rdd level data to generate 
a single Json string then send to Kafka, which could cause very long json 
string if your TPS is very high.
If possible you can send smaller json strings at task level, such as:
.foreachRDD(stringLongJavaPairRDD -> {
  stringLongJavaPairRDD.foreachPartition{partition ->{
  Map<String, Long> map = new HashMap<>(); //Defined in a task
  partition.foreach(stringLongTuple2 -> {
map.put(stringLongTuple2._1(), stringLongTuple2._2())
  });
  producer.send(new ProducerRecord<>("topicA", gson.toJson(map))); // 
send smaller json in a task
}
  }
});
When you do it, make sure kafka producer (seek kafka sink for it) and gson’s 
environment setup correctly in executors.

If after this, there is still OOM, let’s discuss further.

Best Regards
Richard


From: kant kodali <kanth...@gmail.com<mailto:kanth...@gmail.com>>
Date: Thursday, December 7, 2017 at 2:30 AM
To: Gerard Maas <gerard.m...@gmail.com<mailto:gerard.m...@gmail.com>>
Cc: "Qiao, Richard" 
<richard.q...@capitalone.com<mailto:richard.q...@capitalone.com>>, "user 
@spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Do I need to do .collect inside forEachRDD

@Richard I had pasted the two versions of the code below and I still couldn't 
figure out why it wouldn't work without .collect ?  Any help would be great


The code below doesn't work and sometime I also run into OutOfMemory error.

jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {
Map<String, Long> map = new HashMap<>();
stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

System.out.println(stringLongTuple2._1()); // Works I can see 
values getting printed out

System.out.println(stringLongTuple2._2()); // Works I can see 
values getting printed out

map.put(stringLongTuple2._1(), stringLongTuple2._2())

});

System.out.println(gson.toJson(map)); // Prints empty json doc string 
"{}" always. But why? especially

// when the map is getting filled values as confirmed by the print 
statements above

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
});

jssc.start();

jssc.awaitTermination();



  VS

The below code works but it is slow because .collectAsMap



jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {

LinkedHashMap<String, Long&

Re: Do I need to do .collect inside forEachRDD

2017-12-07 Thread kant kodali
Hi Richard,

I had tried your sample code now and several times in the past as well. The
problem seems to be kafkaProducer is not serializable. so I get "Task not
serializable exception" and my kafkaProducer object is created using the
following jar.

group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1'

On Thu, Dec 7, 2017 at 2:46 AM, Qiao, Richard <richard.q...@capitalone.com>
wrote:

> Thanks for sharing the code.
>
> The 1st problem in the first code is the map is allocated in Driver, but
> it’s trying to put data in Executors, then retrieve it in driver to send to
> Kafka.
>
> You are using this map as accumulator’s feature, but it doesn’t work in
> this way.
>
>
>
> The 2nd problem is both codes are trying to collect rdd level data to
> generate a single Json string then send to Kafka, which could cause very
> long json string if your TPS is very high.
>
> If possible you can send smaller json strings at task level, such as:
>
> .foreachRDD(stringLongJavaPairRDD -> {
>
>   stringLongJavaPairRDD.foreachPartition{partition ->{
>
>   Map<String, Long> map = new HashMap<>(); //Defined in a task
>
>   partition.foreach(stringLongTuple2 -> {
>
> map.put(stringLongTuple2._1(), stringLongTuple2._2())
>
>   });
>
>   producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
> // send smaller json in a task
>
> }
>
>   }
>
> });
>
> When you do it, make sure kafka producer (seek kafka sink for it) and
> gson’s environment setup correctly in executors.
>
>
>
> If after this, there is still OOM, let’s discuss further.
>
>
>
> Best Regards
>
> Richard
>
>
>
>
>
> *From: *kant kodali <kanth...@gmail.com>
> *Date: *Thursday, December 7, 2017 at 2:30 AM
> *To: *Gerard Maas <gerard.m...@gmail.com>
> *Cc: *"Qiao, Richard" <richard.q...@capitalone.com>, "user @spark" <
> user@spark.apache.org>
> *Subject: *Re: Do I need to do .collect inside forEachRDD
>
>
>
> @Richard I had pasted the two versions of the code below and I still
> couldn't figure out why it wouldn't work without .collect ?  Any help would
> be great
>
>
>
>
>
> *The code below doesn't work and sometime I also run into OutOfMemory
> error.*
>
> jsonMessagesDStream
> .window(*new *Duration(6), *new *Duration(1000))
> .mapToPair(val -> {
>   JsonParser parser = *new *JsonParser();
>   JsonObject jsonObj = parser.parse(val).getAsJsonObject();
>   *if *(jsonObj.has(*"key4"*)) {
> *return new *Tuple2<>(*""*, 0L);
>   }
>   String symbol = jsonObj.get(*"key1"*).getAsString();
>   *long *uuantity = jsonObj.get(*"key2"*).getAsLong();
>   *return new *Tuple2<>(symbol, quantity);
> })
> .reduceByKey((v1, v2) -> v1 + v2)
> .foreachRDD(stringLongJavaPairRDD -> {
> Map<String, Long> map = *new *HashMap<>();
> stringLongJavaPairRDD.foreach(stringLongTuple2 -> {
>
> *System.out.println(stringLongTuple2._1()); // Works I can see 
> values getting printed out*
>
> *System.out.println(stringLongTuple2._2()); // Works I can see 
> values getting printed out*
>
> map.put(stringLongTuple2._1(), stringLongTuple2._2())
>
> });
>
> *System.out.println(gson.toJson(map));* // Prints empty json doc 
> string "{}" always. But why? especially
>
> // when the map is getting filled values as confirmed by the print 
> statements above
>
> producer.send(*new *ProducerRecord<>(*"topicA"*, gson.toJson(map)));
> });
>
> jssc.start();
>
> jssc.awaitTermination();
>
>
>
>   VS
>
> *The below code works but it is slow because .collectAsMap*
>
>
>
> jsonMessagesDStream
> .window(*new *Duration(6), *new *Duration(1000))
> .mapToPair(val -> {
>   JsonParser parser = *new *JsonParser();
>   JsonObject jsonObj = parser.parse(val).getAsJsonObject();
>   *if *(jsonObj.has(*"key4"*)) {
> *return new *Tuple2<>(*""*, 0L);
>   }
>   String symbol = jsonObj.get(*"key1"*).getAsString();
>   *long *uuantity = jsonObj.get(*"key2"*).getAsLong();
>   *return new *Tuple2<>(symbol, quantity);
> })
> .reduceByKey((v1, v2) -> v1 + v2)
> .foreachRDD(stringLongJavaPairRDD -> {
>
> LinkedHashMap<String, 

Re: Do I need to do .collect inside forEachRDD

2017-12-07 Thread Qiao, Richard
Thanks for sharing the code.
The 1st problem in the first code is the map is allocated in Driver, but it’s 
trying to put data in Executors, then retrieve it in driver to send to Kafka.
You are using this map as accumulator’s feature, but it doesn’t work in this 
way.

The 2nd problem is both codes are trying to collect rdd level data to generate 
a single Json string then send to Kafka, which could cause very long json 
string if your TPS is very high.
If possible you can send smaller json strings at task level, such as:
.foreachRDD(stringLongJavaPairRDD -> {
  stringLongJavaPairRDD.foreachPartition{partition ->{
  Map<String, Long> map = new HashMap<>(); //Defined in a task
  partition.foreach(stringLongTuple2 -> {
map.put(stringLongTuple2._1(), stringLongTuple2._2())
  });
  producer.send(new ProducerRecord<>("topicA", gson.toJson(map))); // 
send smaller json in a task
}
  }
});
When you do it, make sure kafka producer (seek kafka sink for it) and gson’s 
environment setup correctly in executors.

If after this, there is still OOM, let’s discuss further.

Best Regards
Richard


From: kant kodali <kanth...@gmail.com>
Date: Thursday, December 7, 2017 at 2:30 AM
To: Gerard Maas <gerard.m...@gmail.com>
Cc: "Qiao, Richard" <richard.q...@capitalone.com>, "user @spark" 
<user@spark.apache.org>
Subject: Re: Do I need to do .collect inside forEachRDD

@Richard I had pasted the two versions of the code below and I still couldn't 
figure out why it wouldn't work without .collect ?  Any help would be great


The code below doesn't work and sometime I also run into OutOfMemory error.

jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {
Map<String, Long> map = new HashMap<>();
stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

System.out.println(stringLongTuple2._1()); // Works I can see 
values getting printed out

System.out.println(stringLongTuple2._2()); // Works I can see 
values getting printed out

map.put(stringLongTuple2._1(), stringLongTuple2._2())

});

System.out.println(gson.toJson(map)); // Prints empty json doc string 
"{}" always. But why? especially

// when the map is getting filled values as confirmed by the print 
statements above

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
});

jssc.start();

jssc.awaitTermination();



  VS

The below code works but it is slow because .collectAsMap



jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {

LinkedHashMap<String, Long> map = new 
LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap());

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));

});

jssc.start();

jssc.awaitTermination();





On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas 
<gerard.m...@gmail.com<mailto:gerard.m...@gmail.com>> wrote:
Hi Kant,

>  but would your answer on .collect() change depending on running the spark 
> app in client vs cluster mode?

No, it should make no difference.

-kr, Gerard.

On Tue, Dec 5, 2017 at 11:34 PM, kant kodali 
<kanth...@gmail.com<mailto:kanth...@gmail.com>> wrote:
@Richard I don't see any error in the executor log but let me run again to make 
sure.

@Gerard Thanks much!  but would your answer on .collect() change depending on 
running the spark app in client vs cluster mode?

Thanks!

On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas 
<gerard.m...@gmail.com<mailto:gerard.m...@gmail.com>> wrote:
The general answer to your initial question is that "it depends". If the 
operation in the rdd.foreach() closure can be parallelized, then you don't need 
to collect

Re: Do I need to do .collect inside forEachRDD

2017-12-06 Thread kant kodali
@Richard I had pasted the two versions of the code below and I still
couldn't figure out why it wouldn't work without .collect ?  Any help would
be great


*The code below doesn't work and sometime I also run into OutOfMemory
error.*

jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {
Map map = new HashMap<>();
stringLongJavaPairRDD.foreach(stringLongTuple2 -> {

*System.out.println(stringLongTuple2._1()); // Works I can
see values getting printed out*

*System.out.println(stringLongTuple2._2()); // Works I can
see values getting printed out*

map.put(stringLongTuple2._1(), stringLongTuple2._2())

});

*System.out.println(gson.toJson(map));* // Prints empty json
doc string "{}" always. But why? especially

// when the map is getting filled values as confirmed by the
print statements above

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));
});

jssc.start();

jssc.awaitTermination();


  VS

*The below code works but it is slow because .collectAsMap*


jsonMessagesDStream
.window(new Duration(6), new Duration(1000))
.mapToPair(val -> {
  JsonParser parser = new JsonParser();
  JsonObject jsonObj = parser.parse(val).getAsJsonObject();
  if (jsonObj.has("key4")) {
return new Tuple2<>("", 0L);
  }
  String symbol = jsonObj.get("key1").getAsString();
  long uuantity = jsonObj.get("key2").getAsLong();
  return new Tuple2<>(symbol, quantity);
})
.reduceByKey((v1, v2) -> v1 + v2)
.foreachRDD(stringLongJavaPairRDD -> {

LinkedHashMap map = new
LinkedHashMap<>(stringLongJavaPairRDD.collectAsMap());

producer.send(new ProducerRecord<>("topicA", gson.toJson(map)));

});

jssc.start();

jssc.awaitTermination();




On Wed, Dec 6, 2017 at 1:43 AM, Gerard Maas  wrote:

> Hi Kant,
>
> >  but would your answer on .collect() change depending on running the
> spark app in client vs cluster mode?
>
> No, it should make no difference.
>
> -kr, Gerard.
>
> On Tue, Dec 5, 2017 at 11:34 PM, kant kodali  wrote:
>
>> @Richard I don't see any error in the executor log but let me run again
>> to make sure.
>>
>> @Gerard Thanks much!  but would your answer on .collect() change
>> depending on running the spark app in client vs cluster mode?
>>
>> Thanks!
>>
>> On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas 
>> wrote:
>>
>>> The general answer to your initial question is that "it depends". If the
>>> operation in the rdd.foreach() closure can be parallelized, then you don't
>>> need to collect first. If it needs some local context (e.g. a socket
>>> connection), then you need to do rdd.collect first to bring the data
>>> locally, which has a perf penalty and also is restricted to the memory size
>>> to the driver process.
>>>
>>> Given the further clarification:
>>> >Reads from Kafka and outputs to Kafka. so I check the output from
>>> Kafka.
>>>
>>> If it's writing to Kafka, that operation can be done in a distributed
>>> form.
>>>
>>> You could use this lib: https://github.com/BenFradet/spark-kafka-writer
>>>
>>> Or, if you can upgrade to Spark 2.2 version, you can pave your way to
>>> migrate to structured streaming by already adopting the 'structured' APIs
>>> within Spark Streaming:
>>>
>>> case class KV(key: String, value: String)
>>>
>>> dstream.map().reduce().forEachRdd{rdd ->
>>> import spark.implicits._
>>> val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs
>>> to be in a (key,value) shape
>>> val dataFrame = rdd.toDF()
>>> dataFrame.write
>>>  .format("kafka")
>>>  .option("kafka.bootstrap.servers",
>>> "host1:port1,host2:port2")
>>>  .option("topic", "topic1")
>>>  .save()
>>> }
>>>
>>> -kr, Gerard.
>>>
>>>
>>>
>>> On Tue, Dec 5, 2017 at 10:38 PM, kant kodali  wrote:
>>>
 Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

 On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <
 richard.q...@capitalone.com> wrote:

> Where do you check the output result for both case?
>
> Sent from my iPhone
>
>
> > On Dec 5, 2017, at 15:36, kant kodali  wrote:
> >
> > Hi All,
> >
> > I have a simple stateless transformation using Dstreams 

Re: Do I need to do .collect inside forEachRDD

2017-12-06 Thread Gerard Maas
Hi Kant,

>  but would your answer on .collect() change depending on running the
spark app in client vs cluster mode?

No, it should make no difference.

-kr, Gerard.

On Tue, Dec 5, 2017 at 11:34 PM, kant kodali  wrote:

> @Richard I don't see any error in the executor log but let me run again to
> make sure.
>
> @Gerard Thanks much!  but would your answer on .collect() change depending
> on running the spark app in client vs cluster mode?
>
> Thanks!
>
> On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas  wrote:
>
>> The general answer to your initial question is that "it depends". If the
>> operation in the rdd.foreach() closure can be parallelized, then you don't
>> need to collect first. If it needs some local context (e.g. a socket
>> connection), then you need to do rdd.collect first to bring the data
>> locally, which has a perf penalty and also is restricted to the memory size
>> to the driver process.
>>
>> Given the further clarification:
>> >Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>
>> If it's writing to Kafka, that operation can be done in a distributed
>> form.
>>
>> You could use this lib: https://github.com/BenFradet/spark-kafka-writer
>>
>> Or, if you can upgrade to Spark 2.2 version, you can pave your way to
>> migrate to structured streaming by already adopting the 'structured' APIs
>> within Spark Streaming:
>>
>> case class KV(key: String, value: String)
>>
>> dstream.map().reduce().forEachRdd{rdd ->
>> import spark.implicits._
>> val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to
>> be in a (key,value) shape
>> val dataFrame = rdd.toDF()
>> dataFrame.write
>>  .format("kafka")
>>  .option("kafka.bootstrap.servers",
>> "host1:port1,host2:port2")
>>  .option("topic", "topic1")
>>  .save()
>> }
>>
>> -kr, Gerard.
>>
>>
>>
>> On Tue, Dec 5, 2017 at 10:38 PM, kant kodali  wrote:
>>
>>> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>>
>>> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <
>>> richard.q...@capitalone.com> wrote:
>>>
 Where do you check the output result for both case?

 Sent from my iPhone


 > On Dec 5, 2017, at 15:36, kant kodali  wrote:
 >
 > Hi All,
 >
 > I have a simple stateless transformation using Dstreams (stuck with
 the old API for one of the Application). The pseudo code is rough like this
 >
 > dstream.map().reduce().forEachRdd(rdd -> {
 >  rdd.collect(),forEach(); // Is this necessary ? Does execute
 fine but a bit slow
 > })
 >
 > I understand collect collects the results back to the driver but is
 that necessary? can I just do something like below? I believe I tried both
 and somehow the below code didn't output any results (It can be issues with
 my env. I am not entirely sure) but I just would like some clarification on
 .collect() since it seems to slow things down for me.
 >
 > dstream.map().reduce().forEachRdd(rdd -> {
 >  rdd.forEach(() -> {} ); //
 > })
 >
 > Thanks!
 >
 >
 

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates and may only be used
 solely in performance of work or services for Capital One. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed. If the reader of this message is not the intended
 recipient, you are hereby notified that any review, retransmission,
 dissemination, distribution, copying or other use of, or taking of any
 action in reliance upon this information is strictly prohibited. If you
 have received this communication in error, please contact the sender and
 delete the material from your computer.


>>>
>>
>


Re: Do I need to do .collect inside forEachRDD

2017-12-05 Thread kant kodali
@Richard I don't see any error in the executor log but let me run again to
make sure.

@Gerard Thanks much!  but would your answer on .collect() change depending
on running the spark app in client vs cluster mode?

Thanks!

On Tue, Dec 5, 2017 at 1:54 PM, Gerard Maas  wrote:

> The general answer to your initial question is that "it depends". If the
> operation in the rdd.foreach() closure can be parallelized, then you don't
> need to collect first. If it needs some local context (e.g. a socket
> connection), then you need to do rdd.collect first to bring the data
> locally, which has a perf penalty and also is restricted to the memory size
> to the driver process.
>
> Given the further clarification:
> >Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>
> If it's writing to Kafka, that operation can be done in a distributed
> form.
>
> You could use this lib: https://github.com/BenFradet/spark-kafka-writer
>
> Or, if you can upgrade to Spark 2.2 version, you can pave your way to
> migrate to structured streaming by already adopting the 'structured' APIs
> within Spark Streaming:
>
> case class KV(key: String, value: String)
>
> dstream.map().reduce().forEachRdd{rdd ->
> import spark.implicits._
> val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to
> be in a (key,value) shape
> val dataFrame = rdd.toDF()
> dataFrame.write
>  .format("kafka")
>  .option("kafka.bootstrap.servers",
> "host1:port1,host2:port2")
>  .option("topic", "topic1")
>  .save()
> }
>
> -kr, Gerard.
>
>
>
> On Tue, Dec 5, 2017 at 10:38 PM, kant kodali  wrote:
>
>> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>>
>> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard <
>> richard.q...@capitalone.com> wrote:
>>
>>> Where do you check the output result for both case?
>>>
>>> Sent from my iPhone
>>>
>>>
>>> > On Dec 5, 2017, at 15:36, kant kodali  wrote:
>>> >
>>> > Hi All,
>>> >
>>> > I have a simple stateless transformation using Dstreams (stuck with
>>> the old API for one of the Application). The pseudo code is rough like this
>>> >
>>> > dstream.map().reduce().forEachRdd(rdd -> {
>>> >  rdd.collect(),forEach(); // Is this necessary ? Does execute fine
>>> but a bit slow
>>> > })
>>> >
>>> > I understand collect collects the results back to the driver but is
>>> that necessary? can I just do something like below? I believe I tried both
>>> and somehow the below code didn't output any results (It can be issues with
>>> my env. I am not entirely sure) but I just would like some clarification on
>>> .collect() since it seems to slow things down for me.
>>> >
>>> > dstream.map().reduce().forEachRdd(rdd -> {
>>> >  rdd.forEach(() -> {} ); //
>>> > })
>>> >
>>> > Thanks!
>>> >
>>> >
>>> 
>>>
>>> The information contained in this e-mail is confidential and/or
>>> proprietary to Capital One and/or its affiliates and may only be used
>>> solely in performance of work or services for Capital One. The information
>>> transmitted herewith is intended only for use by the individual or entity
>>> to which it is addressed. If the reader of this message is not the intended
>>> recipient, you are hereby notified that any review, retransmission,
>>> dissemination, distribution, copying or other use of, or taking of any
>>> action in reliance upon this information is strictly prohibited. If you
>>> have received this communication in error, please contact the sender and
>>> delete the material from your computer.
>>>
>>>
>>
>


Re: Do I need to do .collect inside forEachRDD

2017-12-05 Thread Gerard Maas
The general answer to your initial question is that "it depends". If the
operation in the rdd.foreach() closure can be parallelized, then you don't
need to collect first. If it needs some local context (e.g. a socket
connection), then you need to do rdd.collect first to bring the data
locally, which has a perf penalty and also is restricted to the memory size
to the driver process.

Given the further clarification:
>Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

If it's writing to Kafka, that operation can be done in a distributed form.

You could use this lib: https://github.com/BenFradet/spark-kafka-writer

Or, if you can upgrade to Spark 2.2 version, you can pave your way to
migrate to structured streaming by already adopting the 'structured' APIs
within Spark Streaming:

case class KV(key: String, value: String)

dstream.map().reduce().forEachRdd{rdd ->
import spark.implicits._
val kv = rdd.map{e => KV(extractKey(e), extractValue(e))} // needs to
be in a (key,value) shape
val dataFrame = rdd.toDF()
dataFrame.write
 .format("kafka")
 .option("kafka.bootstrap.servers",
"host1:port1,host2:port2")
 .option("topic", "topic1")
 .save()
}

-kr, Gerard.



On Tue, Dec 5, 2017 at 10:38 PM, kant kodali  wrote:

> Reads from Kafka and outputs to Kafka. so I check the output from Kafka.
>
> On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard  > wrote:
>
>> Where do you check the output result for both case?
>>
>> Sent from my iPhone
>>
>>
>> > On Dec 5, 2017, at 15:36, kant kodali  wrote:
>> >
>> > Hi All,
>> >
>> > I have a simple stateless transformation using Dstreams (stuck with the
>> old API for one of the Application). The pseudo code is rough like this
>> >
>> > dstream.map().reduce().forEachRdd(rdd -> {
>> >  rdd.collect(),forEach(); // Is this necessary ? Does execute fine
>> but a bit slow
>> > })
>> >
>> > I understand collect collects the results back to the driver but is
>> that necessary? can I just do something like below? I believe I tried both
>> and somehow the below code didn't output any results (It can be issues with
>> my env. I am not entirely sure) but I just would like some clarification on
>> .collect() since it seems to slow things down for me.
>> >
>> > dstream.map().reduce().forEachRdd(rdd -> {
>> >  rdd.forEach(() -> {} ); //
>> > })
>> >
>> > Thanks!
>> >
>> >
>> 
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>
>


Re: Do I need to do .collect inside forEachRDD

2017-12-05 Thread Qiao, Richard
In the 2nd case, is there any producer’s error thrown in executor’s log?

Best Regards
Richard


From: kant kodali <kanth...@gmail.com>
Date: Tuesday, December 5, 2017 at 4:38 PM
To: "Qiao, Richard" <richard.q...@capitalone.com>
Cc: "user @spark" <user@spark.apache.org>
Subject: Re: Do I need to do .collect inside forEachRDD

Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard 
<richard.q...@capitalone.com<mailto:richard.q...@capitalone.com>> wrote:
Where do you check the output result for both case?

Sent from my iPhone

> On Dec 5, 2017, at 15:36, kant kodali 
> <kanth...@gmail.com<mailto:kanth...@gmail.com>> wrote:
>
> Hi All,
>
> I have a simple stateless transformation using Dstreams (stuck with the old 
> API for one of the Application). The pseudo code is rough like this
>
> dstream.map().reduce().forEachRdd(rdd -> {
>  rdd.collect(),forEach(); // Is this necessary ? Does execute fine but a 
> bit slow
> })
>
> I understand collect collects the results back to the driver but is that 
> necessary? can I just do something like below? I believe I tried both and 
> somehow the below code didn't output any results (It can be issues with my 
> env. I am not entirely sure) but I just would like some clarification on 
> .collect() since it seems to slow things down for me.
>
> dstream.map().reduce().forEachRdd(rdd -> {
>  rdd.forEach(() -> {} ); //
> })
>
> Thanks!
>
>


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Do I need to do .collect inside forEachRDD

2017-12-05 Thread kant kodali
Reads from Kafka and outputs to Kafka. so I check the output from Kafka.

On Tue, Dec 5, 2017 at 1:26 PM, Qiao, Richard 
wrote:

> Where do you check the output result for both case?
>
> Sent from my iPhone
>
> > On Dec 5, 2017, at 15:36, kant kodali  wrote:
> >
> > Hi All,
> >
> > I have a simple stateless transformation using Dstreams (stuck with the
> old API for one of the Application). The pseudo code is rough like this
> >
> > dstream.map().reduce().forEachRdd(rdd -> {
> >  rdd.collect(),forEach(); // Is this necessary ? Does execute fine
> but a bit slow
> > })
> >
> > I understand collect collects the results back to the driver but is that
> necessary? can I just do something like below? I believe I tried both and
> somehow the below code didn't output any results (It can be issues with my
> env. I am not entirely sure) but I just would like some clarification on
> .collect() since it seems to slow things down for me.
> >
> > dstream.map().reduce().forEachRdd(rdd -> {
> >  rdd.forEach(() -> {} ); //
> > })
> >
> > Thanks!
> >
> >
> 
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>


Re: Do I need to do .collect inside forEachRDD

2017-12-05 Thread Qiao, Richard
Where do you check the output result for both case?

Sent from my iPhone

> On Dec 5, 2017, at 15:36, kant kodali  wrote:
> 
> Hi All,
> 
> I have a simple stateless transformation using Dstreams (stuck with the old 
> API for one of the Application). The pseudo code is rough like this
> 
> dstream.map().reduce().forEachRdd(rdd -> {
>  rdd.collect(),forEach(); // Is this necessary ? Does execute fine but a 
> bit slow
> })
> 
> I understand collect collects the results back to the driver but is that 
> necessary? can I just do something like below? I believe I tried both and 
> somehow the below code didn't output any results (It can be issues with my 
> env. I am not entirely sure) but I just would like some clarification on 
> .collect() since it seems to slow things down for me.
> 
> dstream.map().reduce().forEachRdd(rdd -> {
>  rdd.forEach(() -> {} ); // 
> })
> 
> Thanks!
> 
> 


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Do I need to do .collect inside forEachRDD

2017-12-05 Thread kant kodali
Hi All,

I have a simple stateless transformation using Dstreams (stuck with the old
API for one of the Application). The pseudo code is rough like this

dstream.map().reduce().forEachRdd(rdd -> {
 rdd.collect(),forEach(); // Is this necessary ? Does execute fine but
a bit slow
})

I understand collect collects the results back to the driver but is that
necessary? can I just do something like below? I believe I tried both and
somehow the below code didn't output any results (It can be issues with my
env. I am not entirely sure) but I just would like some clarification on
.collect() since it seems to slow things down for me.

dstream.map().reduce().forEachRdd(rdd -> {
 rdd.forEach(() -> {} ); //
})

Thanks!