Could you try to use blink planner? I guess this works in blink planner.

Besides, it is suggested to use String with null values instead of
Option[String].
Flink SQL/Table doesn't know Option and will recognize it as a RAW/Generic
type which is rather slower.
There should be no NPE, otherwise, it might be a bug in Flink SQL.

Best,
Jark

On Fri, 19 Jun 2020 at 11:08, YI <uuu...@protonmail.com> wrote:

> Hi, all
>
> I am trying to join two datastream whose element types are respectively
> ```
> case class MyEvent(
>   _id: Long = 0L,
>   _cId: Long = 0L,
>   _url: Option[String] = None,
> )
> ```
> and
> ```
> case class MyCategory(
>   _id: Long = 0L,
>   _name: Option[String] = None,
> )
> ```
>
> When I tried to join those two tables with
> ```
> SELECT * FROM rawCategory INNER JOIN rawEvent ON rawEvent._cId =
> rawCategory._id
> ```
>
> The following exception is thrown,
>
> ```
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type 'scala.Option' cannot be used in a join operation because it does not
> implement a proper hashCode() method.
> at
> org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:176)
> at
> org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153)
> at
> org.apache.flink.table.typeutils.TypeCheckUtils$.$anonfun$validateEqualsHashCode$1(TypeCheckUtils.scala:149)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
> at
> org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147)
> at
> org.apache.flink.table.runtime.join.NonWindowJoin.<init>(NonWindowJoin.scala:57)
> at
> org.apache.flink.table.runtime.join.NonWindowInnerJoin.<init>(NonWindowInnerJoin.scala:50)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:118)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.getJoinOperator(DataStreamJoinToCoProcessTranslator.scala:102)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:119)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:251)
> at
> org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:412)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
> at
> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableLike.map(TraversableLike.scala:273)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
> at
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
> at
> org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
> at io.redacted.sub.package$.testJoin(package.scala:143)
> at io.redacted.sub.package$.process(package.scala:128)
> at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
> at io.redacted.DataAggregator.main(DataAggregator.scala)
>
> Process finished with exit code 1
> ```
>
> I tried using vanilla String with null. I encountered several NPE.
> My intention is to use Option to indicate some value is missing, just like
> null in database and hopefully without NPE.
>
> How should I define my data types? And which configuration should I take
> special care?
>
> Bests,
> Yi
>
>

Reply via email to