Thanks Ian.

Was your source of Flume IBM/MQ by any chance?



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 November 2016 at 16:40, Ian Brooks <i.bro...@sensewhere.com> wrote:

> Hi Mich,
>
>
>
> Yes, i managed to resolve this one. The issue was because the way
> described in the docs doesn't work properly as in order for the Flume part
> to be notified you need to set the storageLevel on the PollingStream like
>
>
>
> JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = 
> FlumeUtils.createPollingStream(ssc,
> addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 10);
>
>
>
>
>
> After setting this, the data is correclty maked as processed by the SPARK
> reveiver and the Flume sink is notified.
>
>
>
> -Ian
>
>
>
>
>
> > Hi Ian,
>
> >
>
> > Has this been resolved?
>
> >
>
> > How about data to Flume and then Kafka and Kafka streaming into Spark?
>
> >
>
> > Thanks
>
> >
>
> > Dr Mich Talebzadeh
>
> >
>
> >
>
> >
>
> > LinkedIn *
>
> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCd
> OABU
>
> > rV8Pw
>
> > <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCd
> OAB
>
> > UrV8Pw>*
>
> >
>
> >
>
> >
>
> > http://talebzadehmich.wordpress.com
>
> >
>
> >
>
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
>
> > loss, damage or destruction of data or any other property which may arise
>
> > from relying on this email's technical content is explicitly disclaimed.
>
> > The author will in no case be liable for any monetary damages arising
> from
>
> > such loss, damage or destruction.
>
> >
>
> > On 13 July 2016 at 11:13, Ian Brooks <i.bro...@sensewhere.com> wrote:
>
> > > Hi,
>
> > >
>
> > >
>
> > >
>
> > > I'm currently trying to implement a prototype Spark application that
> gets
>
> > > data from Flume and processes it. I'm using the pull based method
>
> > > mentioned
>
> > > in https://spark.apache.org/docs/1.6.1/streaming-flume-
> integration.html
>
> > >
>
> > >
>
> > >
>
> > > The is initially working fine for getting data from Flume, however the
>
> > > Spark client doesn't appear to be letting Flume know that the data has
>
> > > been
>
> > > received, so Flume doesn't remove it from the batch.
>
> > >
>
> > >
>
> > >
>
> > > After 100 requests Flume stops allowing any new data and logs
>
> > >
>
> > >
>
> > >
>
> > > 08 Jul 2016 14:59:00,265 WARN [Spark Sink Processor Thread - 5]
>
> > > (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80) -
>
> > > Error while processing transaction.
>
> > > org.apache.flume.ChannelException: Take list for MemoryTransaction,
>
> > > capacity 100 full, consider committing more frequently, increasing
>
> > > capacity, or increasing thread count
>
> > >
>
> > > at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
>
> > >
>
> > > MemoryChannel.java:96)
>
> > >
>
> > >
>
> > >
>
> > > My code to pull the data from Flume is
>
> > >
>
> > >
>
> > >
>
> > > SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> > >
>
> > > Duration batchInterval = new Duration(10000);
>
> > >
>
> > > final String checkpointDir = "/tmp/";
>
> > >
>
> > >
>
> > >
>
> > > final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>
> > > batchInterval);
>
> > >
>
> > > ssc.checkpoint(checkpointDir);
>
> > >
>
> > > JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
>
> > > FlumeUtils.createPollingStream(ssc, host, port);
>
> > >
>
> > >
>
> > >
>
> > > // Transform each flume avro event to a process-able format
>
> > >
>
> > > JavaDStream<String> transformedEvents = flumeStream.map(new
>
> > > Function<SparkFlumeEvent, String>() {
>
> > >
>
> > >
>
> > >
>
> > > @Override
>
> > >
>
> > > public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> > >
>
> > > String flumeEventStr = flumeEvent.event().toString();
>
> > >
>
> > > avroData avroData = new avroData();
>
> > >
>
> > > Gson gson = new GsonBuilder().create();
>
> > >
>
> > > avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> > >
>
> > > HashMap<String,String> body = avroData.getBody();
>
> > >
>
> > > String data = body.get("bytes");
>
> > >
>
> > > return data;
>
> > >
>
> > > }
>
> > >
>
> > > });
>
> > >
>
> > >
>
> > >
>
> > > ...
>
> > >
>
> > >
>
> > >
>
> > > ssc.start();
>
> > >
>
> > > ssc.awaitTermination();
>
> > >
>
> > > ssc.close();
>
> > >
>
> > > }
>
> > >
>
> > >
>
> > >
>
> > > Is there something specific I should be doing to let the Flume server
> know
>
> > > the batch has been received and processed?
>
> > >
>
> > >
>
> > > --
>
> > >
>
> > > Ian Brooks
>
>
>
>
> --
>
> Ian Brooks
>
> Lead Cloud Systems Engineer
>
>
>
> Mobile: +44 7900987187
>
> UK Office: +44 131 629 5155
>
> US Office: +1 650 943 2403
>
> Skype: ijbrooks
>
>
>
> E-mail: i.bro...@sensewhere.com
>
> Web: www.sensewhere.com
>
>
>
> sensewhere Ltd. 4th Floor, 108 Princes Street, Edinburgh EH2 3AA
>
> Company Number: SC357036
>
> sensewhere USA 800 West El Camino Real, Suite 180, Mountain View,
> California, 94040
>
> sensewhere China Room748, 7/F, Tower A, SCC, No.88 Haide 1st Avenue,
> Nanshan District, Shenzhen 51806
>
>
>
>

Reply via email to