There is nothing special about the UDF, whether it does a join, or some other type of query, as the UDF is evaluated on a per record basis.
A double click on the feed connect statement (with UDF) is helpful in understanding the mechanism. Consider the following insert pipeline: *FLWOR I* insert into dataset targetDataset ( for $a in dataset sourceDataset let $b := f($x) return $b ) In above FLWOR, you can do anything in f() as long as the following holds: (i) data type associated with $b is compliant with data type associated with targetDataset. The above FLWOR is exactly identical to a feed ingestion pipeline (written below for reference; note that feed_ingest replaces dataset function and f() is the UDF associated with the feed). *FLWOR II* insert into dataset targetDataset ( for $a in *feed_ingest*(args) let $b := f($x) return $b ) The connect feed statement written by Ildar is translated to above template prior to compilation. In the above two FLWORs, the use of 'dataset' and 'feed_ingest' functions is identical which is to return a collection of records. Whether the collection is derived from a another dataset or a feed adaptor, is immaterial. As reported by Ildar, if the connect feed statement is failing, it is a bug. If for a given AQL function, FLWOR I does not work at runtime, then FLWOR II is not expected to work either (indicative of a bug elsewhere) However if FLWOR II is what is failing, but FLWOR I works, then it is an issue with the feed ingestion pipeline. Regards, Raman On Tue, Dec 8, 2015 at 11:50 PM, Mike Carey <[email protected]> wrote: > +1 > > Again: The semantics are supposed to be ONE RECORD AT A TIME. It's not a > join between the incoming stream and the dataset(s) in the query, it's just > a function being applied to one single individual (insert more "one" > emphatic words here) record. I.e., it is NOT a continuous join. It's just > a function call whose semantics should be identical to calling the function > on a constant constructed record formed from the next record in the input > stream. > > Cheers, > Mike > > > > On 12/8/15 11:08 PM, Jianfeng Jia wrote: > >> I would think this kind of stream record join with “static” data use case >> is very common. Take geo tagging as an example, for each of the incoming >> tweet I’d like to know the text format place of that geo coordinates. In >> theory I’d expect the following left-outerjoin function will work, but >> currently it’s not working. >> >> create function addAddress($t) { >> "tid": $t.tweetid, >> "user": $t.user, >> "sender-location":$t.sender-location, >> "send-time": $t.send-time, >> "referred-topics":$t.referred-topics, >> "message-text":$t.message-text, >> “place": for $x in dataset USCounty >> where spatial-intersect($t.sender-location, $x.geometry) return >> $x.State-County >> } >> >> Basically, what user expects is an lookup. We don’t have to stop the feed >> and operate the join. The result will be the same as static join if we >> lookup the result per record. >> However, in performance consideration, it has to be done in a micro batch >> way. Maybe we can pass some hint to “apply” the function per 50 feeds. >> >> create feed TweetFeed >> apply function addAddress per batch(50); >> >> Or we can allocate some buffer to the feed, and apply functions when the >> buffer is full. >> >> The streaming world even handling with join two streams under some window >> boundaries. I think we should not restrict applying functions to our feed. >> >> One thought is that we should refine the "create function” API to more >> finer granularity. >> Like, >> >> ——— currently support ----- >> create filter function A(record) >> create transform function B(record) >> create project function C(record) >> —— TODO >> create lookup function D( record, dataset) >> create reduce function E([records]) >> ... >> >> >> >> On Dec 8, 2015, at 9:52 PM, abdullah alamoudi <[email protected]> wrote: >>> >>> I think that we probably should restrict feed applied functions somehow >>> (needs further thoughts and discussions) and I know for sure that we >>> don't. >>> As for the case you present, I would imagine that it could be allowed >>> theoretically but I think everyone sees why it should be disallowed. >>> >>> One thing to keep in mind is that we introduce a materialize if the >>> dataset >>> was part of an insert pipeline. Now think about how this would work with >>> a >>> continuous feed. One choice would be that the feed will materialize all >>> records to be inserted and once the feed stops, it would start inserting >>> them but I still think we should not allow it. >>> >>> My 2c, >>> Any opposing argument? >>> >>> >>> Amoudi, Abdullah. >>> >>> On Tue, Dec 8, 2015 at 6:28 PM, Ildar Absalyamov < >>> [email protected] >>> >>>> wrote: >>>> Hi All, >>>> >>>> As a part of feed ingestion we do allow preprocessing incoming data with >>>> AQL UDFs. >>>> I was wondering if we somehow restrict the kind of UDFs that could be >>>> used? Do we allow joins in these UDFs? Especially joins with the same >>>> dataset, which is used for intake. Ex: >>>> >>>> create type TweetType as open { >>>> id: string, >>>> username : string, >>>> location : string, >>>> text : string, >>>> timestamp : string >>>> } >>>> create dataset Tweets(TweetType) >>>> primary key id; >>>> create function feed_processor($x) { >>>> for $y in dataset Tweets >>>> // self-join with Tweets dataset on some predicate($x, $y) >>>> return $y >>>> } >>>> create feed TweetFeed >>>> apply function feed_processor; >>>> >>>> The query above fails in runtime, but I was wondering if that >>>> theoretically could work at all. >>>> >>>> Best regards, >>>> Ildar >>>> >>>> >>>> >> >> Best, >> >> Jianfeng Jia >> PhD Candidate of Computer Science >> University of California, Irvine >> >> > -- Raman
