Roman Wozniak created FLINK-9935:
------------------------------------

             Summary: Batch Table API: grouping by window and attribute causes 
java.lang.ClassCastException:
                 Key: FLINK-9935
                 URL: https://issues.apache.org/jira/browse/FLINK-9935
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.5.1, 1.5.0
            Reporter: Roman Wozniak


 Grouping by window AND some other attribute(s) seems broken. Test case 
attached:
{code}
class BatchStatisticsIntegrationTest extends FlatSpec with Matchers {

  trait BatchContext {
    implicit lazy val env: ExecutionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment
    implicit val tableEnv: BatchTableEnvironment = 
TableEnvironment.getTableEnvironment(env)

    val data = Seq(
      (1532424567000L, "id1", "location1"),
      (1532424567000L, "id2", "location1"),
      (1532424567000L, "id3", "location1"),
      (1532424568000L, "id1", "location2"),
      (1532424568000L, "id2", "location3")
    )

    val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data)

    val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id, 
'location)
  }

  it should "be possible to run Table API queries with grouping by tumble 
window and column(s) on batch data" in new BatchContext {
    val results = table
      .window(Tumble over 1.second on 'rowtime as 'w)
      .groupBy('w, 'location)
      .select(
        'w.start.cast(Types.LONG),
        'w.end.cast(Types.LONG),
        'location,
        'id.count
      )
      .toDataSet[(Long, Long, String, Long)]
      .collect()

    results should contain theSameElementsAs Seq(
      (1532424567000L, 1532424568000L, "location1", 3L),
      (1532424568000L, 1532424569000L, "location2", 1L),
      (1532424568000L, 1532424569000L, "location3", 1L)
    )
  }
}
{code}
It seems like during execution time, the 'rowtime attribute replaces 'location 
and that causes ClassCastException.
{code:java}
[info]   Cause: java.lang.ClassCastException: java.lang.Long cannot be cast to 
java.lang.String
[info]   at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
[info]   at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
[info]   at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
[info]   at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
[info]   at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
[info]   at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
[info]   at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
[info]   at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
[info]   at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
[info]   at 
org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)
{code}

Here is some debug information that I was able to get. So, field serializers 
don't match the type of Row fields:
{code}
this.instance = {Row@68451} "1532424567000,(3),1532424567000"
 fields = {Object[3]@68461} 
  0 = {Long@68462} 1532424567000
  1 = {CountAccumulator@68463} "(3)"
  2 = {Long@68462} 1532424567000
this.serializer = {RowSerializer@68452} 
 fieldSerializers = {TypeSerializer[3]@68455} 
  0 = {StringSerializer@68458} 
  1 = {TupleSerializer@68459} 
  2 = {LongSerializer@68460} 
 arity = 3
 nullMask = {boolean[3]@68457} 
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to