I would like to add that if your predicate does some heavy-weight
computation that you want to avoid duplicating for the filters, then
you can insert a map before the filters, where you evaluate the
predicate and put the result into a field.

Best,
Gabor



2016-05-13 11:51 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
> Hi,
>
> it is true that Gabor's approach of using two filters has a certain
> overhead.
> However, the overhead should be reasonable. The data stays on the same node
> and the filter can be very lightweight.
>
> I agree that this is not a very nice solution.
> However, modifying the DataSet API such that an operator can have more than
> one output would be a very large change. It would require rewriting large
> portions of the optimizer and job generation. The assumption of a single
> output is made in many places which are not always easy to spot. To be
> honest, I don't think this is possible with reasonable effort. Even if it
> was possible, the change would be so large that somebody would need to
> spend a lot of time reviewing the changes.
>
> I am sorry, this limitation cannot be easily resolved.
>
> Fabian
>
>
> 2016-05-12 19:39 GMT+02:00 CPC <acha...@gmail.com>:
>
>> Hi,
>>
>> if it just require implementing a custom operator(i mean does not require
>> changes to network stack or other engine level changes)  i can try to
>> implement it since i am working on optimizer and plan generation for a
>> month. Also  we are going to implement our etl framework on flink and this
>> kind of scenario is a good fit and a common requirement in etl like flows.
>> If you can help me which parts of the project I should look for , i can try
>> it.
>>
>> Thanks
>> On May 12, 2016 6:54 PM, "Aljoscha Krettek" <aljos...@apache.org> wrote:
>>
>> > Hi,
>> > I agree that this would be very nice. Unfortunately Flink does only allow
>> > one output from an operation right now. Maybe we can extends this somehow
>> > in the future.
>> >
>> > Cheers,
>> > Aljoscha
>> >
>> > On Thu, 12 May 2016 at 17:27 CPC <acha...@gmail.com> wrote:
>> >
>> > > Hi Gabor,
>> > >
>> > > Yes functionally this helps. But in this case i am processing an
>> element
>> > > twice and sending  whole data to two different operator . What i am
>> > trying
>> > > to achieve is like datastream split  like functionality or a little bit
>> > > more:
>> > > In filter like scenario i want to do below pseudo operation:
>> > >
>> > > def function(iter: Iterator[URLOutputData], trueEvents:
>> > > >> Collector[URLOutputData], falseEvents: Collector[URLOutputData],
>> > > errEvents:
>> > > >> Collector[URLOutputData]) {
>> > > >
>> > > >     iter.foreach {
>> > > >
>> > > >       i =>
>> > > >
>> > > >         try {
>> > > >
>> > > >           if (predicate(i))
>> > > >
>> > > >             trueEvents.collect(i)
>> > > >
>> > > >           else
>> > > >
>> > > >             falseEvents.collect(i)
>> > > >
>> > > >         } catch {
>> > > >
>> > > >           case _ => errEvents.collect(i)
>> > > >
>> > > >         }
>> > > >
>> > > >     }
>> > > >
>> > > >   }
>> > > >
>> > > >
>> > > Another case could be,suppose i have an input set of web events comes
>> > from
>> > > different web apps and i want to split dataset based on application
>> > > category
>> > >
>> > > Thanks,
>> > >
>> > >
>> > > On 12 May 2016 at 17:28, Gábor Gévay <gga...@gmail.com> wrote:
>> > >
>> > > > Hello,
>> > > >
>> > > > You can split a DataSet into two DataSets with two filters:
>> > > >
>> > > > val xs: DataSet[A] = ...
>> > > > val split1: DataSet[A] = xs.filter(f1)
>> > > > val split2: DataSet[A] = xs.filter(f2)
>> > > >
>> > > > where f1 and f2 are true for those elements that should go into the
>> > > > first and second DataSets respectively. So far, the splits will just
>> > > > contain elements from the input DataSet, but you can of course apply
>> > > > some map after one of the filters.
>> > > >
>> > > > Does this help?
>> > > >
>> > > > Best,
>> > > > Gábor
>> > > >
>> > > >
>> > > >
>> > > > 2016-05-12 16:03 GMT+02:00 CPC <acha...@gmail.com>:
>> > > > > Hi folks,
>> > > > >
>> > > > > Is there any way in dataset api to split Dataset[A] to Dataset[A]
>> and
>> > > > > Dataset[B] ? Use case belongs to a custom filter component that we
>> > want
>> > > > to
>> > > > > implement. We will want to direct input elements whose result is
>> > false
>> > > > > after we apply the predicate. Actually we want to direct input
>> > elements
>> > > > > that throw exception to another output as well(demultiplexer like
>> > > > > component).
>> > > > >
>> > > > > Thank you in advance...
>> > > >
>> > >
>> >
>>

Reply via email to