Hi,

I'm currently trying to implement a prototype Spark application that gets data 
from Flume and processes it. I'm using the pull based method mentioned in 
https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html 

The is initially working fine for getting data from Flume, however the Spark 
client doesn't appear to be letting Flume know that the data has been received, 
so Flume doesn't remove it from the batch. 

After 100 requests Flume stops allowing any new data and logs

08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5] 
(org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error 
while processing transaction. 


My code to pull the data from Flume is

        SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
        Duration batchInterval = new Duration(10000);
        
        final String checkpointDir = "/tmp/";

        final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
batchInterval);
        ssc.checkpoint(checkpointDir);
        
        JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = 
FlumeUtils.createPollingStream(ssc, host, port);

        
        // Transform each flume avro event to a process-able format
        JavaDStream<String> transformedEvents = flumeStream.map(new 
Function<SparkFlumeEvent, String>() {

                @Override
                public String call(SparkFlumeEvent flumeEvent) throws Exception 
{
                    String flumeEventStr = flumeEvent.event().toString();
                    avroData avroData = new avroData();
                    
                    Gson gson = new GsonBuilder().create();
                    avroData = gson.fromJson(flumeEventStr, avroData.class);    
 
                    HashMap<String,String> body = avroData.getBody();
                    String data = body.get("bytes");
                     
                    return data;
                }
        });
        

        ...

        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
    

Is there something specific I should be doing to let the Flume server know the 
batch has been received and processed?


*Ian Brooks*

Reply via email to