This looks like the right design to me.
On Fri, Apr 17, 2015 at 12:22 PM, Tao Li <[email protected]> wrote: > Why I design like this, it's on the follwing thoughts: > I want to take "KafkaChanel1 => Kafka Cluster => KafkaChannel2" as one > channel. So it will simply be ScribeSouce put events to it, and > HDFSEventSink take events from it. The kafka cluster provide a stable > storage, and be transparent on events delivery between source and sink. (If > I use a "KafkaSource=>MemoryChannel=>HDFSEventSink" to export data from > kafka to hdfs, the memory isn unstable, and not transparent) > So the work flow is simply like this: > ScribeClient =>ScribeSource => KafkaChannel(distributed) => HDFSEventSink => > HDFS > > As Interceptor is following the source, so maybe I should add the filter > interceptor after ScribeSource, like this: > ScribeClient =>ScribeSource => FilterInterceptor => > KafkaChannel(distributed) => HDFSEventSink => HDFS > > 2015-04-18 2:51 GMT+08:00 Tao Li <[email protected]>: >> >> @Gwen @Hari >> >> My use case is as follows: >> ScribeClient => [Agent1: ScribeSource => KafkaChannel1] => Kafka Cluster >> => [Agent2: KafkaCluster2 => HDFSEventSink] => HDFS >> >> The bad case is as follows: >> My HDFSEventSink need a header "timestamp", but some dirty data(by >> mistake) in Kafka doesn't has the "timestamp" headers, which cause the >> following BucketPath.escapeString thows NullPointerException. >> String realPath = BucketPath.escapeString(filePath, event.getHeaders(), >> timeZone, needRounding, roundUnit, roundValue, useLocalTime); >> >> I think Gwen's second point is OK, we can add a interceptor to do the >> filter job. >> >> But my flume agent is kind of special: >> For Agent1, doesn't have sink, directly send message to kafak cluster by >> KafkaChannel1. >> For Agent2, doesn't have source, directly poll event from kafka cluster by >> KafkaChannel2. >> Agent1 and Agent2 is different JVM and deploy on different node. >> >> I don't know if it's reasonable for a agent with no sink or no source? But >> I have already build the whold work flow, and it's works well for me for >> regular cases. >> >> For Agent2, because of without source, so I can't use Gwen's Interceptor >> suggestion. >> >> 2015-04-18 2:30 GMT+08:00 Hari Shreedharan <[email protected]>: >>> >>> What I think he means is that a message in the channel that cannot be >>> serialized by the serializer because it is malformed causing the serializer >>> to fail and perhaps throw (think malformed Avro). Such a message basically >>> would be stuck in an infinite loop. So the workaround in (2) would work if >>> using a Kafka Source. >>> >>> Thanks, >>> Hari >>> >>> >>> On Fri, Apr 17, 2015 at 10:08 AM, Tao Li <[email protected]> wrote: >>>> >>>> OK, I got it, Thanks. >>>> >>>> 2015-04-18 0:59 GMT+08:00 Hari Shreedharan <[email protected]>: >>>>> >>>>> Are you using Kafka channel? The fix I mentioned was for file channel. >>>>> Unfortunately, we don't plan to introduce something that drops data in >>>>> real >>>>> time. This makes it too easy for a misconfig to cause data loss. You'd >>>>> have >>>>> to ensure the data in the Kafka channel is valid. >>>>> >>>>> Thanks, >>>>> Hari >>>>> >>>>> >>>>> On Fri, Apr 17, 2015 at 9:41 AM, Tao Li <[email protected]> >>>>> wrote: >>>>>> >>>>>> @Hari, you mean I need to ensure the data in kafka is OK by myself, >>>>>> right? >>>>>> >>>>>> How about we have a config to let user decide how to handle BACKOFF. >>>>>> For example, we can config the max retry num in process(), and also >>>>>> config wether commit or not when exceed the max retry num.(In my kafka >>>>>> case, >>>>>> when meet dirty data, commit the comsume offset will be nice for me than >>>>>> endless loop) >>>>>> >>>>>> 2015-04-18 0:23 GMT+08:00 Hari Shreedharan >>>>>> <[email protected]>: >>>>>>> >>>>>>> We recently added functionality to the file channel integrity tool >>>>>>> that can be used to remove bad events from the channel - though you >>>>>>> would >>>>>>> need to write some code to validate events. It will be in the soon to be >>>>>>> released 1.6.0 >>>>>>> >>>>>>> Thanks, >>>>>>> Hari >>>>>>> >>>>>>> >>>>>>> On Fri, Apr 17, 2015 at 9:05 AM, Tao Li <[email protected]> >>>>>>> wrote: >>>>>>>> >>>>>>>> Hi all: >>>>>>>> >>>>>>>> My use case is KafkaChannel + HDFSEventSink. >>>>>>>> >>>>>>>> I found that SinkRunner.PollingRunner will call >>>>>>>> HDFSEventSink.process() in a while loop. For example, a message in >>>>>>>> kafka >>>>>>>> contains dirty data, so HDFSEventSink.process() consume message from >>>>>>>> kafka, >>>>>>>> throws exception because of dirty data, and kafka offset doesn't >>>>>>>> commit. And >>>>>>>> the outer loop, will continue call HDFSEventSink.process(). Because the >>>>>>>> kafka offset doesn't change, so HDFSEventSink will consume the dirty >>>>>>>> data >>>>>>>> again. The bad loop is never stopped. >>>>>>>> >>>>>>>> I want to know that if we have a mechanism to cover this case? For >>>>>>>> example, we have a max retry num for a unique HDFSEventSink.process() >>>>>>>> call >>>>>>>> and give up when exceed max limit. >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
