Re: Should Flume integration be behind a profile?

2017-10-02 Thread Sean Owen
CCing user@
Yeah good point about perhaps moving the examples into the module itself.
Actually removing it would be a long way off, no matter what.

On Mon, Oct 2, 2017 at 8:35 AM Nick Pentreath <nick.pentre...@gmail.com>
wrote:

> I'd agree with #1 or #2. Deprecation now seems fine.
>
> Perhaps this should be raised on the user list also?
>
> And perhaps it makes sense to look at moving the Flume support into Apache
> Bahir if there is interest (I've cc'ed Bahir dev list here)? That way the
> current state of the connector could keep going for those users who may
> need it.
>
> As for examples, for the Kinesis connector the examples now live in the
> subproject (see e.g. KinesisWordCountASL under external/kinesis-asl). So we
> don't have to completely remove the examples, just move them (this may not
> solve the doc issue but at least the examples are still there for anyone
> who needs them).
>
> On Mon, 2 Oct 2017 at 06:36 Mridul Muralidharan <mri...@gmail.com> wrote:
>
>> I agree, proposal 1 sounds better among the options.
>>
>> Regards,
>> Mridul
>>
>>
>> On Sun, Oct 1, 2017 at 3:50 PM, Reynold Xin <r...@databricks.com> wrote:
>> > Probably should do 1, and then it is an easier transition in 3.0.
>> >
>> > On Sun, Oct 1, 2017 at 1:28 AM Sean Owen <so...@cloudera.com> wrote:
>> >>
>> >> I tried and failed to do this in
>> >> https://issues.apache.org/jira/browse/SPARK-22142 because it became
>> clear
>> >> that the Flume examples would have to be removed to make this work,
>> too.
>> >> (Well, you can imagine other solutions with extra source dirs or
>> modules for
>> >> flume examples enabled by a profile, but that doesn't help the docs
>> and is
>> >> nontrivial complexity for little gain.)
>> >>
>> >> It kind of suggests Flume support should be deprecated if it's put
>> behind
>> >> a profile. Like with Kafka 0.8. (This is why I'm raising it again to
>> the
>> >> whole list.)
>> >>
>> >> Any preferences among:
>> >> 1. Put Flume behind a profile, remove examples, deprecate
>> >> 2. Put Flume behind a profile, remove examples, but don't deprecate
>> >> 3. Punt until Spark 3.0, when this integration would probably be
>> removed
>> >> entirely (?)
>> >>
>> >> On Tue, Sep 26, 2017 at 10:36 AM Sean Owen <so...@cloudera.com> wrote:
>> >>>
>> >>> Not a big deal, but I'm wondering whether Flume integration should at
>> >>> least be opt-in and behind a profile? it still sees some use (at
>> least on
>> >>> our end) but not applicable to the majority of users. Most other
>> third-party
>> >>> framework integrations are behind a profile, like YARN, Mesos,
>> Kinesis,
>> >>> Kafka 0.8, Docker. Just soliciting comments, not arguing for it.
>> >>>
>> >>> (Well, actually it annoys me that the Flume integration always fails
>> to
>> >>> compile in IntelliJ unless you generate the sources manually)
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: Flume integration

2016-11-21 Thread Ian Brooks
Hi Mich,

Thanks. I would prefer not to add another system into the mix as we currently 
don't use kafka at all. We are still in the prototype phase at the moment and 
it seems to be working well though it doesn't like you restrating the flume 
sink part without restarting the SPARK application. That is something we should 
be able to manage though.



*-Ian *


Hi Ian,


Flume is great for ingesting data into HDFS and Hbase. However, that is part of 
batch layer.


For real time processing, I would go through Kafka into spark streaming. Except 
your case, I have not established if anyone else does Flume directly into Spark?


If so how mature is it.


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[1]/
 
  
http://talebzadehmich.wordpress.com[2]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
  


On 21 November 2016 at 10:27, Ian Brooks  wrote:




*-Ian*


Hi
While I am following this discussion with interest, I am trying to comprehend 
any architectural benefit of a spark sink.
Is there any feature in flume makes it more suitable to ingest stream data than 
sppark streaming, so that we should chain them? For example does it help 
durability or reliability of the source?
Or, it is a more tactical choice based on connector availability or such?
To me, flume is important component to ingest streams to hdfs or hive directly 
ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:


Hi Ian,


Has this been resolved?


How about data to Flume and then Kafka and Kafka streaming into Spark?


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[1]/
 
  
http://talebzadehmich.wordpress.com[2]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
  


On 13 July 2016 at 11:13, Ian Brooks  wrote:


Hi,
 
I'm currently trying to implement a prototype Spark application that gets data 
from Flume and processes it. I'm using the pull based method mentioned in 
https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html[5] 
 
The is initially working fine for getting data from Flume, however the Spark 
client doesn't appear to be letting Flume know that the data has been received, 
so Flume doesn't remove it from the batch. 
 
After 100 requests Flume stops allowing any new data and logs
 
08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] 
(org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error 
while processing transaction. 

 
My code to pull the data from Flume is
 
SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(1);
final String checkpointDir = "/tmp/";
 
final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
ssc.checkpoint(checkpointDir);
JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, host, port);
 
// Transform each flume avro event to a process-able format
JavaDStream transformedEvents = flumeStream.map(new 
Function() {
 
@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception {
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();
Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class); 
HashMap body = avroData.getBody();
String data = body.get("bytes");
return data;
}
});
 
...
 
ssc.start();
ssc.awaitTermination();
ssc.close();
}
 
Is there something specific I should be doing to let the Flume server know the 
batch has been received and processed?


*Ian Brooks*
 




*Ian Brooks*
Lead Cloud Systems Engineer
 
Mobile: +44 7900987187[6]
UK Office: +44 131 629 5155[7]
US Office: +1 650 943 2403[8]
Skype: ijbrooks
 
E-mail: _i.brooks@sensewhere.com_ 
Web: www.sensewhere.com[9] 
 
*sensewhere Ltd*. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA. 
Company Number: SC357036
*sensewhere USA* 800 West El Camino Real, Suite 180, Mountain View, California, 
94040
*sensewhere China* Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue, Nanshan 
District, Shenzhen 51806
 
  




*Ian Brooks*
Lead Cloud Systems Engineer

Mobile: +44 7900987187
UK Office: +44 131 629 5155
US 

Re: Flume integration

2016-11-21 Thread Mich Talebzadeh
Hi Ian,

Flume is great for ingesting data into HDFS and Hbase. However, that is
part of batch layer.

For real time processing, I would go through Kafka into spark streaming.
Except your case, I have not established if anyone else does Flume directly
into Spark?

If so how mature is it.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 21 November 2016 at 10:27, Ian Brooks  wrote:

>
> We use Flume already as our way of getting data from our application in to
> HDFS and HBase, we have some new work we are looking at that requires
> realtime processing on data that we don't need to store, so It fits into
> our existing platform easier just to pass the data through Flume like
> everything else and just route this data to SPARK.
>
> -Ian
>
>
>
>
> On Monday 21 November 2016 07:59:42 ayan guha wrote:
>
> Hi
>
> While I am following this discussion with interest, I am trying to
> comprehend any architectural benefit of a spark sink.
>
> Is there any feature in flume makes it more suitable to ingest stream data
> than sppark streaming, so that we should chain them? For example does it
> help durability or reliability of the source?
>
> Or, it is a more tactical choice based on connector availability or such?
>
> To me, flume is important component to ingest streams to hdfs or hive
> directly ie it plays on the batch side of lambda architecture pattern.
>
> On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:
>
> Hi Ian,
>
>
> Has this been resolved?
>
>
> How about data to Flume and then Kafka and Kafka streaming into Spark?
>
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn  https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On 13 July 2016 at 11:13, Ian Brooks  wrote:
>
> Hi,
>
>
>
> I'm currently trying to implement a prototype Spark application that gets
> data from Flume and processes it. I'm using the pull based method mentioned
> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>
>
>
> The is initially working fine for getting data from Flume, however the
> Spark client doesn't appear to be letting Flume know that the data has been
> received, so Flume doesn't remove it from the batch.
>
>
>
> After 100 requests Flume stops allowing any new data and logs
>
>
>
> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
> Error while processing transaction.
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
> MemoryChannel.java:96)
>
>
>
> My code to pull the data from Flume is
>
>
>
> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> Duration batchInterval = new Duration(1);
>
> final String checkpointDir = "/tmp/";
>
>
>
> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
> ssc.checkpoint(checkpointDir);
>
> JavaReceiverInputDStream flumeStream = 
> FlumeUtils.createPollingStream(ssc,
> host, port);
>
>
>
> // Transform each flume avro event to a process-able format
>
> JavaDStream transformedEvents = flumeStream.map(new
> Function() {
>
>
>
> @Override
>
> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> String flumeEventStr = flumeEvent.event().toString();
>
> avroData avroData = new avroData();
>
> Gson gson = new GsonBuilder().create();
>
> avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> HashMap body = avroData.getBody();
>
> String data = body.get("bytes");
>
> return data;
>
> }
>
> });
>
>
>
> ...
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> ssc.close();
>
> }
>
>
>
> Is there something specific I should be doing to let the Flume server know
> the batch has been received and processed?
>
>
> --
>
> Ian Brooks
>
>
>
>
>
>
>
> --
>
> 

Re: Flume integration

2016-11-21 Thread Ian Brooks

*-Ian*


Hi
While I am following this discussion with interest, I am trying to comprehend 
any architectural benefit of a spark sink.
Is there any feature in flume makes it more suitable to ingest stream data than 
sppark streaming, so that we should chain them? For example does it help 
durability or reliability of the source?
Or, it is a more tactical choice based on connector availability or such?
To me, flume is important component to ingest streams to hdfs or hive directly 
ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:


Hi Ian,


Has this been resolved?


How about data to Flume and then Kafka and Kafka streaming into Spark?


Thanks


Dr Mich Talebzadeh 
  
LinkedIn / 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw[2]/
 
  
http://talebzadehmich.wordpress.com[3]


*Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction. 
  


On 13 July 2016 at 11:13, Ian Brooks  wrote:


Hi,
 
I'm currently trying to implement a prototype Spark application that gets data 
from Flume and processes it. I'm using the pull based method mentioned in 
https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html[5] 
 
The is initially working fine for getting data from Flume, however the Spark 
client doesn't appear to be letting Flume know that the data has been received, 
so Flume doesn't remove it from the batch. 
 
After 100 requests Flume stops allowing any new data and logs
 
08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] 
(org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error 
while processing transaction. 

 
My code to pull the data from Flume is
 
SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(1);
final String checkpointDir = "/tmp/";
 
final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
ssc.checkpoint(checkpointDir);
JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, host, port);
 
// Transform each flume avro event to a process-able format
JavaDStream transformedEvents = flumeStream.map(new 
Function() {
 
@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception {
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();
Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class); 
HashMap body = avroData.getBody();
String data = body.get("bytes");
return data;
}
});
 
...
 
ssc.start();
ssc.awaitTermination();
ssc.close();
}
 
Is there something specific I should be doing to let the Flume server know the 
batch has been received and processed?


*Ian Brooks*
 




*Ian Brooks*
Lead Cloud Systems Engineer

Mobile: +44 7900987187
UK Office: +44 131 629 5155
US Office: +1 650 943 2403
Skype: ijbrooks

E-mail: i.bro...@sensewhere.com[6] 
Web: www.sensewhere.com[7] 

*sensewhere Ltd*. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA.
Company Number: SC357036
*sensewhere USA* 800 West El Camino Real, Suite 180, Mountain View, California, 
94040
*sensewhere China* Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue, Nanshan 
District, Shenzhen 51806

  


[1] mailto:mich.talebza...@gmail.com
[2] 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
[3] http://talebzadehmich.wordpress.com
[4] mailto:i.bro...@sensewhere.com
[5] https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
[6] mailt:i.bro...@sensewhere.com
[7] http://www.sensewhere.com/


Re: Flume integration

2016-11-20 Thread ayan guha
Hi

While I am following this discussion with interest, I am trying to
comprehend any architectural benefit of a spark sink.

Is there any feature in flume makes it more suitable to ingest stream data
than sppark streaming, so that we should chain them? For example does it
help durability or reliability of the source?

Or, it is a more tactical choice based on connector availability or such?

To me, flume is important component to ingest streams to hdfs or hive
directly ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:

> Hi Ian,
>
> Has this been resolved?
>
> How about data to Flume and then Kafka and Kafka streaming into Spark?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 July 2016 at 11:13, Ian Brooks  wrote:
>
>> Hi,
>>
>>
>>
>> I'm currently trying to implement a prototype Spark application that gets
>> data from Flume and processes it. I'm using the pull based method mentioned
>> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>>
>>
>>
>> The is initially working fine for getting data from Flume, however the
>> Spark client doesn't appear to be letting Flume know that the data has been
>> received, so Flume doesn't remove it from the batch.
>>
>>
>>
>> After 100 requests Flume stops allowing any new data and logs
>>
>>
>>
>> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
>> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
>> Error while processing transaction.
>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>> capacity 100 full, consider committing more frequently, increasing
>> capacity, or increasing thread count
>>at org.apache.flume.channel.MemoryChannel$MemoryTransaction.
>> doTake(MemoryChannel.java:96)
>>
>>
>>
>> My code to pull the data from Flume is
>>
>>
>>
>> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>>
>> Duration batchInterval = new Duration(1);
>>
>> final String checkpointDir = "/tmp/";
>>
>>
>>
>> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>> batchInterval);
>>
>> ssc.checkpoint(checkpointDir);
>>
>> JavaReceiverInputDStream flumeStream =
>> FlumeUtils.createPollingStream(ssc, host, port);
>>
>>
>>
>> // Transform each flume avro event to a process-able format
>>
>> JavaDStream transformedEvents = flumeStream.map(new
>> Function() {
>>
>>
>>
>> @Override
>>
>> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>>
>> String flumeEventStr = flumeEvent.event().toString();
>>
>> avroData avroData = new avroData();
>>
>> Gson gson = new GsonBuilder().create();
>>
>> avroData = gson.fromJson(flumeEventStr, avroData.class);
>>
>> HashMap body = avroData.getBody();
>>
>> String data = body.get("bytes");
>>
>> return data;
>>
>> }
>>
>> });
>>
>>
>>
>> ...
>>
>>
>>
>> ssc.start();
>>
>> ssc.awaitTermination();
>>
>> ssc.close();
>>
>> }
>>
>>
>>
>> Is there something specific I should be doing to let the Flume server
>> know the batch has been received and processed?
>>
>>
>> --
>>
>> Ian Brooks
>>
>>
>>
>
>


Re: Flume integration

2016-11-20 Thread Mich Talebzadeh
Thanks Ian.

Was your source of Flume IBM/MQ by any chance?



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 November 2016 at 16:40, Ian Brooks  wrote:

> Hi Mich,
>
>
>
> Yes, i managed to resolve this one. The issue was because the way
> described in the docs doesn't work properly as in order for the Flume part
> to be notified you need to set the storageLevel on the PollingStream like
>
>
>
> JavaReceiverInputDStream flumeStream = 
> FlumeUtils.createPollingStream(ssc,
> addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 10);
>
>
>
>
>
> After setting this, the data is correclty maked as processed by the SPARK
> reveiver and the Flume sink is notified.
>
>
>
> -Ian
>
>
>
>
>
> > Hi Ian,
>
> >
>
> > Has this been resolved?
>
> >
>
> > How about data to Flume and then Kafka and Kafka streaming into Spark?
>
> >
>
> > Thanks
>
> >
>
> > Dr Mich Talebzadeh
>
> >
>
> >
>
> >
>
> > LinkedIn *
>
> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
> OABU
>
> > rV8Pw
>
> >  OAB
>
> > UrV8Pw>*
>
> >
>
> >
>
> >
>
> > http://talebzadehmich.wordpress.com
>
> >
>
> >
>
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
>
> > loss, damage or destruction of data or any other property which may arise
>
> > from relying on this email's technical content is explicitly disclaimed.
>
> > The author will in no case be liable for any monetary damages arising
> from
>
> > such loss, damage or destruction.
>
> >
>
> > On 13 July 2016 at 11:13, Ian Brooks  wrote:
>
> > > Hi,
>
> > >
>
> > >
>
> > >
>
> > > I'm currently trying to implement a prototype Spark application that
> gets
>
> > > data from Flume and processes it. I'm using the pull based method
>
> > > mentioned
>
> > > in https://spark.apache.org/docs/1.6.1/streaming-flume-
> integration.html
>
> > >
>
> > >
>
> > >
>
> > > The is initially working fine for getting data from Flume, however the
>
> > > Spark client doesn't appear to be letting Flume know that the data has
>
> > > been
>
> > > received, so Flume doesn't remove it from the batch.
>
> > >
>
> > >
>
> > >
>
> > > After 100 requests Flume stops allowing any new data and logs
>
> > >
>
> > >
>
> > >
>
> > > 08 Jul 2016 14:59:00,265 WARN [Spark Sink Processor Thread - 5]
>
> > > (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80) -
>
> > > Error while processing transaction.
>
> > > org.apache.flume.ChannelException: Take list for MemoryTransaction,
>
> > > capacity 100 full, consider committing more frequently, increasing
>
> > > capacity, or increasing thread count
>
> > >
>
> > > at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
>
> > >
>
> > > MemoryChannel.java:96)
>
> > >
>
> > >
>
> > >
>
> > > My code to pull the data from Flume is
>
> > >
>
> > >
>
> > >
>
> > > SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> > >
>
> > > Duration batchInterval = new Duration(1);
>
> > >
>
> > > final String checkpointDir = "/tmp/";
>
> > >
>
> > >
>
> > >
>
> > > final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>
> > > batchInterval);
>
> > >
>
> > > ssc.checkpoint(checkpointDir);
>
> > >
>
> > > JavaReceiverInputDStream flumeStream =
>
> > > FlumeUtils.createPollingStream(ssc, host, port);
>
> > >
>
> > >
>
> > >
>
> > > // Transform each flume avro event to a process-able format
>
> > >
>
> > > JavaDStream transformedEvents = flumeStream.map(new
>
> > > Function() {
>
> > >
>
> > >
>
> > >
>
> > > @Override
>
> > >
>
> > > public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> > >
>
> > > String flumeEventStr = flumeEvent.event().toString();
>
> > >
>
> > > avroData avroData = new avroData();
>
> > >
>
> > > Gson gson = new GsonBuilder().create();
>
> > >
>
> > > avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> > >
>
> > > HashMap body = avroData.getBody();
>
> > >
>
> > > String data = body.get("bytes");
>
> > >
>
> > > return data;
>
> > >
>
> > > }
>
> > >
>
> > > });
>
> > >
>
> > >
>
> > >
>
> > > ...
>
> > >
>
> > >
>
> > >
>
> > > ssc.start();
>
> > >
>
> > > ssc.awaitTermination();
>
> > >
>
> > > ssc.close();
>
> > >
>
> > > }
>
> > >
>
> > >
>
> > >
>
> > > Is there something specific I should be doing to let the Flume server
> know
>
> > > the batch has been 

Re: Flume integration

2016-11-20 Thread Ian Brooks
Hi Mich,

Yes, i managed to resolve this one. The issue was because the way described in 
the docs 
doesn't work properly as in order for the Flume part to be notified you need to 
set the 
storageLevel on the PollingStream like

JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, addresses, 
StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 10);


After setting this, the data is correclty maked as processed by the SPARK 
reveiver and the 
Flume sink is notified.

-Ian


> Hi Ian,
> 
> Has this been resolved?
> 
> How about data to Flume and then Kafka and Kafka streaming into Spark?
> 
> Thanks
> 
> Dr Mich Talebzadeh
> 
> 
> 
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABU
> rV8Pw
>  UrV8Pw>*
> 
> 
> 
> http://talebzadehmich.wordpress.com
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> 

Re: Flume integration

2016-11-20 Thread Mich Talebzadeh
Hi Ian,

Has this been resolved?

How about data to Flume and then Kafka and Kafka streaming into Spark?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 July 2016 at 11:13, Ian Brooks  wrote:

> Hi,
>
>
>
> I'm currently trying to implement a prototype Spark application that gets
> data from Flume and processes it. I'm using the pull based method mentioned
> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>
>
>
> The is initially working fine for getting data from Flume, however the
> Spark client doesn't appear to be letting Flume know that the data has been
> received, so Flume doesn't remove it from the batch.
>
>
>
> After 100 requests Flume stops allowing any new data and logs
>
>
>
> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
> Error while processing transaction.
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
> MemoryChannel.java:96)
>
>
>
> My code to pull the data from Flume is
>
>
>
> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> Duration batchInterval = new Duration(1);
>
> final String checkpointDir = "/tmp/";
>
>
>
> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
> ssc.checkpoint(checkpointDir);
>
> JavaReceiverInputDStream flumeStream = 
> FlumeUtils.createPollingStream(ssc,
> host, port);
>
>
>
> // Transform each flume avro event to a process-able format
>
> JavaDStream transformedEvents = flumeStream.map(new
> Function() {
>
>
>
> @Override
>
> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> String flumeEventStr = flumeEvent.event().toString();
>
> avroData avroData = new avroData();
>
> Gson gson = new GsonBuilder().create();
>
> avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> HashMap body = avroData.getBody();
>
> String data = body.get("bytes");
>
> return data;
>
> }
>
> });
>
>
>
> ...
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> ssc.close();
>
> }
>
>
>
> Is there something specific I should be doing to let the Flume server know
> the batch has been received and processed?
>
>
> --
>
> Ian Brooks
>
>
>


Flume integration

2016-07-13 Thread Ian Brooks
Hi,

I'm currently trying to implement a prototype Spark application that gets data 
from Flume and processes it. I'm using the pull based method mentioned in 
https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html 

The is initially working fine for getting data from Flume, however the Spark 
client doesn't appear to be letting Flume know that the data has been received, 
so Flume doesn't remove it from the batch. 

After 100 requests Flume stops allowing any new data and logs

08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] 
(org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error 
while processing transaction. 


My code to pull the data from Flume is

SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
Duration batchInterval = new Duration(1);

final String checkpointDir = "/tmp/";

final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
ssc.checkpoint(checkpointDir);

JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, host, port);


// Transform each flume avro event to a process-able format
JavaDStream transformedEvents = flumeStream.map(new 
Function() {

@Override
public String call(SparkFlumeEvent flumeEvent) throws Exception 
{
String flumeEventStr = flumeEvent.event().toString();
avroData avroData = new avroData();

Gson gson = new GsonBuilder().create();
avroData = gson.fromJson(flumeEventStr, avroData.class);
 
HashMap body = avroData.getBody();
String data = body.get("bytes");
 
return data;
}
});


...

ssc.start();
ssc.awaitTermination();
ssc.close();
}


Is there something specific I should be doing to let the Flume server know the 
batch has been received and processed?


*Ian Brooks*



Re: query on Spark + Flume integration using push model

2015-07-10 Thread Akhil Das
Here's an example https://github.com/przemek1990/spark-streaming

Thanks
Best Regards

On Thu, Jul 9, 2015 at 4:35 PM, diplomatic Guru diplomaticg...@gmail.com
wrote:

 Hello all,

 I'm trying to configure the flume to push data into a sink so that my
 stream job could pick up the data. My events are in JSON format, but the
 Spark + Flume integration [1] document only refer to Avro sink.

 [1] https://spark.apache.org/docs/latest/streaming-flume-integration.html

 I looked at some of the examples online, and they all refer to avro type:

 agent.sinks.avroSink.type = avro

 If I set the type to avro and send the data in JSON, will it work? I'm
 unable to try this because the Stream job throwing Avro
 'org.apache.flume.source.avro.AvroFlumeEvent' exception.


 Please advice how to handle this situation.


 many thanks



Re: query on Spark + Flume integration using push model

2015-07-10 Thread diplomatic Guru
Hi Akhil, thank you for your reply. Does that mean that original Spark
Streaming only support Avro? If that the case then why only Avro? Is there
a particular reason?

The project linked is for Scala but I'm using Java. Is there another
project?


On 10 July 2015 at 08:46, Akhil Das ak...@sigmoidanalytics.com wrote:

 Here's an example https://github.com/przemek1990/spark-streaming

 Thanks
 Best Regards

 On Thu, Jul 9, 2015 at 4:35 PM, diplomatic Guru diplomaticg...@gmail.com
 wrote:

 Hello all,

 I'm trying to configure the flume to push data into a sink so that my
 stream job could pick up the data. My events are in JSON format, but the
 Spark + Flume integration [1] document only refer to Avro sink.

 [1] https://spark.apache.org/docs/latest/streaming-flume-integration.html

 I looked at some of the examples online, and they all refer to avro type:

 agent.sinks.avroSink.type = avro

 If I set the type to avro and send the data in JSON, will it work? I'm
 unable to try this because the Stream job throwing Avro
 'org.apache.flume.source.avro.AvroFlumeEvent' exception.


 Please advice how to handle this situation.


 many thanks





query on Spark + Flume integration using push model

2015-07-09 Thread diplomatic Guru
Hello all,

I'm trying to configure the flume to push data into a sink so that my
stream job could pick up the data. My events are in JSON format, but the
Spark + Flume integration [1] document only refer to Avro sink.

[1] https://spark.apache.org/docs/latest/streaming-flume-integration.html

I looked at some of the examples online, and they all refer to avro type:

agent.sinks.avroSink.type = avro

If I set the type to avro and send the data in JSON, will it work? I'm
unable to try this because the Stream job throwing Avro
'org.apache.flume.source.avro.AvroFlumeEvent' exception.


Please advice how to handle this situation.


many thanks


Spark and Flume integration - do I understand this correctly?

2014-07-29 Thread dapooley
Hi,

I am trying to integrate Spark onto a Flume log sink and avro source. The
sink is on one machine (the application), and the source is on another. Log
events are being sent from the application server to the avro source server
(a log directory sink on the arvo source prints to verify)

The aim is to get Spark to also receive the same events that the avro source
is getting. The steps, I believe, are:

1. install/start Spark master (on avro source machine).
2. write spark application, deploy (on avro source machine).
3. add spark application as a worker to the master.
4. have spark application configured to same port as avro source

Test setup is using 2 ubuntu VMs on a Windows host.

Flume configuration:

# application ##
## Tail application log file
# /var/lib/apache-flume-1.5.0-bin/bin/flume-ng agent -n cps -c conf -f
conf/flume-conf.properties
# http://flume.apache.org/FlumeUserGuide.html#exec-source
source_agent.sources = tomcat
source_agent.sources.tomcat.type = exec
source_agent.sources.tomcat.command = tail -F
/var/lib/tomcat/logs/application.log
source_agent.sources.tomcat.batchSize = 1
source_agent.sources.tomcat.channels = memoryChannel

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 100

## Send to Flume Collector on Analytics Node
# http://flume.apache.org/FlumeUserGuide.html#avro-sink
source_agent.sinks = avro_sink
source_agent.sinks.avro_sink.type = avro
source_agent.sinks.avro_sink.channel = memoryChannel
source_agent.sinks.avro_sink.hostname = 10.0.2.2
source_agent.sinks.avro_sink.port = 41414


 avro source ##
## Receive Flume events for Spark streaming

# http://flume.apache.org/FlumeUserGuide.html#memory-channel
agent1.channels = memoryChannel
agent1.channels.memoryChannel.type = memory
agent1.channels.memoryChannel.capacity = 100

## Flume Collector on Analytics Node
# http://flume.apache.org/FlumeUserGuide.html#avro-source
agent1.sources = avroSource
agent1.sources.avroSource.type = avro
agent1.sources.avroSource.channels = memoryChannel
agent1.sources.avroSource.bind = 0.0.0.0
agent1.sources.avroSource.port = 41414

#Sinks
agent1.sinks = localout

#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
agent1.sinks.localout.type = file_roll
agent1.sinks.localout.sink.directory = /home/vagrant/flume/logs
agent1.sinks.localout.sink.rollInterval = 0
agent1.sinks.localout.channel = memoryChannel

thank you in advance for any assistance,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Flume-integration-do-I-understand-this-correctly-tp10879.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.