Thanks, but how would I do this in the Table API? Map? I tried a map function and it still wasn't able to infer the primary key unless I artificially instead did an aggregate function over the pk.
On Wed, Dec 9, 2020 at 7:55 PM Jark Wu <[email protected]> wrote: > Could you use 4 scalar functions instead of UDTF and map function? For > example; > > select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits), > hasWatermelon(fruits) > from T; > > I think this can preserve the primary key. > > Best, > Jark > > On Thu, 3 Dec 2020 at 15:28, Rex Fenley <[email protected]> wrote: > >> It appears that even when I pass id through the map function and join >> back with the original table, it does not seem to think that the id passed >> through map is a unique key. Is there any way to solve this while still >> preserving the primary key? >> >> On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <[email protected]> wrote: >> >>> Even odder, if I pull the constructor of the function into its own >>> variable it "works" (though it appears that map only passes through the >>> fields mapped over which means I'll need an additional join, though now I >>> think I'm on the right path). >>> >>> I.e. >>> def splatFruits(table: Table, columnPrefix: String): Table = { >>> val func = new SplatFruitsFunc() >>> return table >>> .map(func($"fruits")) >>> .as( >>> s"${columnPrefix}_has_orange", >>> s"${columnPrefix}_has_banana", >>> s"${columnPrefix}_has_apple", >>> s"${columnPrefix}_has_watermelon" >>> ) >>> .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) >>> } >>> >>> ends up giving me the following error instead >>> > org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error: Cannot resolve field [fruits], input field >>> list:[prefix_has_orange, prefix_has_banana, prefix_has_apple, >>> prefix_has_watermelon]. >>> >>> which implies I'll need to join back to the original table like I was >>> doing with the leftOuterJoinLateral originally I suppose. >>> >>> >>> On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <[email protected]> wrote: >>> >>>> Looks like `as` needed to move outside of where it was before to fix >>>> that error. Though now I'm receiving >>>> >org.apache.flink.client.program.ProgramInvocationException: The main >>>> method caused an error: Aliasing more fields than we actually have. >>>> >>>> Example code now: >>>> >>>> // table will always have pk id >>>> def splatFruits(table: Table, columnPrefix: String): Table = { >>>> return table >>>> .map( >>>> new SplatFruitsFunc()( >>>> $"fruits" >>>> ) >>>> ) >>>> .as( >>>> s"${columnPrefix}_has_orange", >>>> s"${columnPrefix}_has_banana", >>>> s"${columnPrefix}_has_apple", >>>> s"${columnPrefix}_has_watermelon" >>>> ) >>>> .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) >>>> } >>>> >>>> class SplatFruitsFunc extends ScalarFunction { >>>> def eval(fruits: Array[String]): Row = { >>>> val hasOrange: java.lang.Boolean = fruits.contains("Orange") >>>> val hasBanana: java.lang.Boolean = fruits.contains("Banana") >>>> val hasApple: java.lang.Boolean = fruits.contains("Apple") >>>> val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon") >>>> Row.of(hasOrange, hasBanana, hasApple, hasWatermelon) >>>> } >>>> >>>> override def getResultType(signature: Array[Class[_]]): >>>> TypeInformation[_] = >>>> Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, >>>> Types.BOOLEAN) >>>> } >>>> >>>> which afaict correctly follows the documentation. >>>> >>>> Anything here stand out? >>>> >>>> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <[email protected]> wrote: >>>> >>>>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction >>>>> and leftOuterJoinLateral to a map and I'm receiving: >>>>> > org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error: Only a scalar function can be used in the map >>>>> operator. >>>>> which seems odd because documentation says >>>>> >>>>> > Performs a map operation with a user-defined scalar function or >>>>> built-in scalar function. The output will be flattened if the output type >>>>> is a composite type. >>>>> >>>>> >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations >>>>> >>>>> Shouldn't this work as an alternative? >>>>> >>>>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[email protected]> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I have a TableFunction and wherever it is applied with a >>>>>> leftOuterJoinLateral, my table loses any inference of there being a >>>>>> primary >>>>>> key. I see this because all subsequent joins end up with "NoUniqueKey" >>>>>> when >>>>>> I know a primary key of id should exist. >>>>>> >>>>>> I'm wondering if this is expected behavior and if it's possible to >>>>>> tell a table directly what the primary key should be? >>>>>> >>>>>> >>>>>> To demonstrate my example: >>>>>> My table function checks if an element of a certain type is in a >>>>>> string array, and depending on whether or not it is there, it appends a >>>>>> column with value true or false. For example, if array "fruits" which >>>>>> could >>>>>> possibly contain orange, banana, apple, and watermelon on a row contains >>>>>> only `["orange", "apple"]` then it will append `has_orange: true, >>>>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to >>>>>> the row. This example is essentially the same as my code, outside of >>>>>> having >>>>>> a much larger set of keys and not dealing with fruits. >>>>>> >>>>>> Example code: >>>>>> >>>>>> // table will always have pk id >>>>>> def splatFruits(table: Table, columnPrefix: String): Table = { >>>>>> return table >>>>>> .leftOuterJoinLateral( >>>>>> new SplatFruitsFunc()( >>>>>> $"fruits" >>>>>> ) as (s"${columnPrefix}_has_orange", >>>>>> s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple", >>>>>> s"${columnPrefix}_has_watermelon") >>>>>> ) >>>>>> .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) >>>>>> } >>>>>> >>>>>> @FunctionHint( >>>>>> output = new DataTypeHint( >>>>>> "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, >>>>>> has_watermelon BOOLEAN)" >>>>>> ) >>>>>> ) >>>>>> class SplatFruitsFunc >>>>>> extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] { >>>>>> >>>>>> def eval(fruits: Array[String]): Unit = { >>>>>> val hasOrange: java.lang.Boolean = fruits.contains("orange") >>>>>> val hasBanana: java.lang.Boolean = fruits.contains("banana") >>>>>> val hasApple: java.lang.Boolean = fruits.contains("apple") >>>>>> val hasWatermelon: java.lang.Boolean = >>>>>> fruits.contains("watermelon") >>>>>> collect(hasOrange, hasBanana, hasApple, hasWatermelon) >>>>>> } >>>>>> } >>>>>> >>>>>> Thanks! >>>>>> >>>>>> -- >>>>>> >>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>> >>>>>> >>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>> <https://www.facebook.com/remindhq> >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>
