Could you use 4 scalar functions instead of UDTF and map function? For
example;

select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits),
hasWatermelon(fruits)
from T;

I think this can preserve the primary key.

Best,
Jark

On Thu, 3 Dec 2020 at 15:28, Rex Fenley <r...@remind101.com> wrote:

> It appears that even when I pass id through the map function and join back
> with the original table, it does not seem to think that the id passed
> through map is a unique key. Is there any way to solve this while still
> preserving the primary key?
>
> On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <r...@remind101.com> wrote:
>
>> Even odder, if I pull the constructor of the function into its own
>> variable it "works" (though it appears that map only passes through the
>> fields mapped over which means I'll need an additional join, though now I
>> think I'm on the right path).
>>
>> I.e.
>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>   val func = new SplatFruitsFunc()
>>   return table
>>     .map(func($"fruits"))
>>     .as(
>>       s"${columnPrefix}_has_orange",
>>       s"${columnPrefix}_has_banana",
>>       s"${columnPrefix}_has_apple",
>>       s"${columnPrefix}_has_watermelon"
>>    )
>>    .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>> }
>>
>> ends up giving me the following error instead
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Cannot resolve field [fruits], input field
>> list:[prefix_has_orange, prefix_has_banana, prefix_has_apple,
>> prefix_has_watermelon].
>>
>> which implies I'll need to join back to the original table like I was
>> doing with the leftOuterJoinLateral originally I suppose.
>>
>>
>> On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <r...@remind101.com> wrote:
>>
>>> Looks like `as` needed to move outside of where it was before to fix
>>> that error. Though now I'm receiving
>>> >org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Aliasing more fields than we actually have.
>>>
>>> Example code now:
>>>
>>> // table will always have pk id
>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>> return table
>>>  .map(
>>>    new SplatFruitsFunc()(
>>>      $"fruits"
>>>    )
>>>  )
>>>  .as(
>>>   s"${columnPrefix}_has_orange",
>>>   s"${columnPrefix}_has_banana",
>>>   s"${columnPrefix}_has_apple",
>>>   s"${columnPrefix}_has_watermelon"
>>>  )
>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>> }
>>>
>>> class SplatFruitsFunc extends ScalarFunction {
>>>   def eval(fruits: Array[String]): Row = {
>>>     val hasOrange: java.lang.Boolean = fruits.contains("Orange")
>>>     val hasBanana: java.lang.Boolean = fruits.contains("Banana")
>>>     val hasApple: java.lang.Boolean = fruits.contains("Apple")
>>>     val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
>>>     Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>   }
>>>
>>>   override def getResultType(signature: Array[Class[_]]):
>>> TypeInformation[_] =
>>>     Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
>>> }
>>>
>>> which afaict correctly follows the documentation.
>>>
>>> Anything here stand out?
>>>
>>> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <r...@remind101.com> wrote:
>>>
>>>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
>>>> leftOuterJoinLateral to a map and I'm receiving:
>>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error: Only a scalar function can be used in the map
>>>> operator.
>>>> which seems odd because documentation says
>>>>
>>>> > Performs a map operation with a user-defined scalar function or
>>>> built-in scalar function. The output will be flattened if the output type
>>>> is a composite type.
>>>>
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>>>>
>>>> Shouldn't this work as an alternative?
>>>>
>>>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <r...@remind101.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have a TableFunction and wherever it is applied with a
>>>>> leftOuterJoinLateral, my table loses any inference of there being a 
>>>>> primary
>>>>> key. I see this because all subsequent joins end up with "NoUniqueKey" 
>>>>> when
>>>>> I know a primary key of id should exist.
>>>>>
>>>>> I'm wondering if this is expected behavior and if it's possible to
>>>>> tell a table directly what the primary key should be?
>>>>>
>>>>>
>>>>> To demonstrate my example:
>>>>> My table function checks if an element of a certain type is in a
>>>>> string array, and depending on whether or not it is there, it appends a
>>>>> column with value true or false. For example, if array "fruits" which 
>>>>> could
>>>>> possibly contain orange, banana, apple, and watermelon on a row contains
>>>>> only `["orange", "apple"]` then it will append `has_orange: true,
>>>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to
>>>>> the row. This example is essentially the same as my code, outside of 
>>>>> having
>>>>> a much larger set of keys and not dealing with fruits.
>>>>>
>>>>> Example code:
>>>>>
>>>>> // table will always have pk id
>>>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>>>> return table
>>>>>  .leftOuterJoinLateral(
>>>>>    new SplatFruitsFunc()(
>>>>>      $"fruits"
>>>>>    ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
>>>>> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>>>>>  )
>>>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>>>> }
>>>>>
>>>>> @FunctionHint(
>>>>>   output = new DataTypeHint(
>>>>>     "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
>>>>> has_watermelon BOOLEAN)"
>>>>>   )
>>>>> )
>>>>> class SplatFruitsFunc
>>>>>     extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>>>>>
>>>>>   def eval(fruits: Array[String]): Unit = {
>>>>>     val hasOrange: java.lang.Boolean = fruits.contains("orange")
>>>>>     val hasBanana: java.lang.Boolean = fruits.contains("banana")
>>>>>     val hasApple: java.lang.Boolean = fruits.contains("apple")
>>>>>     val hasWatermelon: java.lang.Boolean =
>>>>> fruits.contains("watermelon")
>>>>>     collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>>>   }
>>>>> }
>>>>>
>>>>> Thanks!
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>

Reply via email to