Could you use 4 scalar functions instead of UDTF and map function? For example;
select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits), hasWatermelon(fruits) from T; I think this can preserve the primary key. Best, Jark On Thu, 3 Dec 2020 at 15:28, Rex Fenley <r...@remind101.com> wrote: > It appears that even when I pass id through the map function and join back > with the original table, it does not seem to think that the id passed > through map is a unique key. Is there any way to solve this while still > preserving the primary key? > > On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <r...@remind101.com> wrote: > >> Even odder, if I pull the constructor of the function into its own >> variable it "works" (though it appears that map only passes through the >> fields mapped over which means I'll need an additional join, though now I >> think I'm on the right path). >> >> I.e. >> def splatFruits(table: Table, columnPrefix: String): Table = { >> val func = new SplatFruitsFunc() >> return table >> .map(func($"fruits")) >> .as( >> s"${columnPrefix}_has_orange", >> s"${columnPrefix}_has_banana", >> s"${columnPrefix}_has_apple", >> s"${columnPrefix}_has_watermelon" >> ) >> .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) >> } >> >> ends up giving me the following error instead >> > org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: Cannot resolve field [fruits], input field >> list:[prefix_has_orange, prefix_has_banana, prefix_has_apple, >> prefix_has_watermelon]. >> >> which implies I'll need to join back to the original table like I was >> doing with the leftOuterJoinLateral originally I suppose. >> >> >> On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <r...@remind101.com> wrote: >> >>> Looks like `as` needed to move outside of where it was before to fix >>> that error. Though now I'm receiving >>> >org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error: Aliasing more fields than we actually have. >>> >>> Example code now: >>> >>> // table will always have pk id >>> def splatFruits(table: Table, columnPrefix: String): Table = { >>> return table >>> .map( >>> new SplatFruitsFunc()( >>> $"fruits" >>> ) >>> ) >>> .as( >>> s"${columnPrefix}_has_orange", >>> s"${columnPrefix}_has_banana", >>> s"${columnPrefix}_has_apple", >>> s"${columnPrefix}_has_watermelon" >>> ) >>> .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) >>> } >>> >>> class SplatFruitsFunc extends ScalarFunction { >>> def eval(fruits: Array[String]): Row = { >>> val hasOrange: java.lang.Boolean = fruits.contains("Orange") >>> val hasBanana: java.lang.Boolean = fruits.contains("Banana") >>> val hasApple: java.lang.Boolean = fruits.contains("Apple") >>> val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon") >>> Row.of(hasOrange, hasBanana, hasApple, hasWatermelon) >>> } >>> >>> override def getResultType(signature: Array[Class[_]]): >>> TypeInformation[_] = >>> Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN) >>> } >>> >>> which afaict correctly follows the documentation. >>> >>> Anything here stand out? >>> >>> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and >>>> leftOuterJoinLateral to a map and I'm receiving: >>>> > org.apache.flink.client.program.ProgramInvocationException: The main >>>> method caused an error: Only a scalar function can be used in the map >>>> operator. >>>> which seems odd because documentation says >>>> >>>> > Performs a map operation with a user-defined scalar function or >>>> built-in scalar function. The output will be flattened if the output type >>>> is a composite type. >>>> >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations >>>> >>>> Shouldn't this work as an alternative? >>>> >>>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> Hello, >>>>> >>>>> I have a TableFunction and wherever it is applied with a >>>>> leftOuterJoinLateral, my table loses any inference of there being a >>>>> primary >>>>> key. I see this because all subsequent joins end up with "NoUniqueKey" >>>>> when >>>>> I know a primary key of id should exist. >>>>> >>>>> I'm wondering if this is expected behavior and if it's possible to >>>>> tell a table directly what the primary key should be? >>>>> >>>>> >>>>> To demonstrate my example: >>>>> My table function checks if an element of a certain type is in a >>>>> string array, and depending on whether or not it is there, it appends a >>>>> column with value true or false. For example, if array "fruits" which >>>>> could >>>>> possibly contain orange, banana, apple, and watermelon on a row contains >>>>> only `["orange", "apple"]` then it will append `has_orange: true, >>>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to >>>>> the row. This example is essentially the same as my code, outside of >>>>> having >>>>> a much larger set of keys and not dealing with fruits. >>>>> >>>>> Example code: >>>>> >>>>> // table will always have pk id >>>>> def splatFruits(table: Table, columnPrefix: String): Table = { >>>>> return table >>>>> .leftOuterJoinLateral( >>>>> new SplatFruitsFunc()( >>>>> $"fruits" >>>>> ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", >>>>> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon") >>>>> ) >>>>> .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) >>>>> } >>>>> >>>>> @FunctionHint( >>>>> output = new DataTypeHint( >>>>> "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, >>>>> has_watermelon BOOLEAN)" >>>>> ) >>>>> ) >>>>> class SplatFruitsFunc >>>>> extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] { >>>>> >>>>> def eval(fruits: Array[String]): Unit = { >>>>> val hasOrange: java.lang.Boolean = fruits.contains("orange") >>>>> val hasBanana: java.lang.Boolean = fruits.contains("banana") >>>>> val hasApple: java.lang.Boolean = fruits.contains("apple") >>>>> val hasWatermelon: java.lang.Boolean = >>>>> fruits.contains("watermelon") >>>>> collect(hasOrange, hasBanana, hasApple, hasWatermelon) >>>>> } >>>>> } >>>>> >>>>> Thanks! >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >