Hi, all

I am trying to join two datastream whose element types are respectively
```
case class MyEvent(
_id: Long = 0L,
_cId: Long = 0L,
_url: Option[String] = None,
)
```
and
```
case class MyCategory(
_id: Long = 0L,
_name: Option[String] = None,
)
```

When I tried to join those two tables with
```
SELECT * FROM rawCategory INNER JOIN rawEvent ON rawEvent._cId = rawCategory._id
```

The following exception is thrown,

```
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
'scala.Option' cannot be used in a join operation because it does not implement 
a proper hashCode() method.
at 
org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:176)
at 
org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153)
at 
org.apache.flink.table.typeutils.TypeCheckUtils$.$anonfun$validateEqualsHashCode$1(TypeCheckUtils.scala:149)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
at 
org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147)
at 
org.apache.flink.table.runtime.join.NonWindowJoin.<init>(NonWindowJoin.scala:57)
at 
org.apache.flink.table.runtime.join.NonWindowInnerJoin.<init>(NonWindowInnerJoin.scala:50)
at 
org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:118)
at 
org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.getJoinOperator(DataStreamJoinToCoProcessTranslator.scala:102)
at 
org.apache.flink.table.plan.nodes.datastream.DataStreamJoin.translateToPlan(DataStreamJoin.scala:119)
at 
org.apache.flink.table.planner.StreamPlanner.translateToCRow(StreamPlanner.scala:251)
at 
org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:412)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:273)
at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
at io.redacted.sub.package$.testJoin(package.scala:143)
at io.redacted.sub.package$.process(package.scala:128)
at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
at io.redacted.DataAggregator.main(DataAggregator.scala)

Process finished with exit code 1
```

I tried using vanilla String with null. I encountered several NPE.
My intention is to use Option to indicate some value is missing, just like null 
in database and hopefully without NPE.

How should I define my data types? And which configuration should I take 
special care?

Bests,
Yi

Reply via email to