[ 
https://issues.apache.org/jira/browse/FLINK-9935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16558519#comment-16558519
 ] 

ASF GitHub Bot commented on FLINK-9935:
---------------------------------------

fhueske commented on issue #6423: [FLINK-9935] [table] Fix incorrect group 
field access in batch window combiner.
URL: https://github.com/apache/flink/pull/6423#issuecomment-408155116
 
 
   Merged

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Batch Table API: grouping by window and attribute causes 
> java.lang.ClassCastException:
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-9935
>                 URL: https://issues.apache.org/jira/browse/FLINK-9935
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.2, 1.5.1, 1.6.0, 1.7.0
>            Reporter: Roman Wozniak
>            Assignee: Fabian Hueske
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.4.3, 1.5.3, 1.6.0, 1.7.0
>
>
>  Grouping by window AND some other attribute(s) seems broken. Test case 
> attached:
> {code}
> class BatchStatisticsIntegrationTest extends FlatSpec with Matchers {
>   trait BatchContext {
>     implicit lazy val env: ExecutionEnvironment = 
> ExecutionEnvironment.getExecutionEnvironment
>     implicit val tableEnv: BatchTableEnvironment = 
> TableEnvironment.getTableEnvironment(env)
>     val data = Seq(
>       (1532424567000L, "id1", "location1"),
>       (1532424567000L, "id2", "location1"),
>       (1532424567000L, "id3", "location1"),
>       (1532424568000L, "id1", "location2"),
>       (1532424568000L, "id2", "location3")
>     )
>     val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data)
>     val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id, 
> 'location)
>   }
>   it should "be possible to run Table API queries with grouping by tumble 
> window and column(s) on batch data" in new BatchContext {
>     val results = table
>       .window(Tumble over 1.second on 'rowtime as 'w)
>       .groupBy('w, 'location)
>       .select(
>         'w.start.cast(Types.LONG),
>         'w.end.cast(Types.LONG),
>         'location,
>         'id.count
>       )
>       .toDataSet[(Long, Long, String, Long)]
>       .collect()
>     results should contain theSameElementsAs Seq(
>       (1532424567000L, 1532424568000L, "location1", 3L),
>       (1532424568000L, 1532424569000L, "location2", 1L),
>       (1532424568000L, 1532424569000L, "location3", 1L)
>     )
>   }
> }
> {code}
> It seems like during execution time, the 'rowtime attribute replaces 
> 'location and that causes ClassCastException.
> {code:java}
> [info]   Cause: java.lang.ClassCastException: java.lang.Long cannot be cast 
> to java.lang.String
> [info]   at 
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> [info]   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
> [info]   at 
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
> [info]   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> [info]   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> [info]   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> [info]   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> [info]   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> [info]   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> [info]   at 
> org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)
> {code}
> Here is some debug information that I was able to get. So, field serializers 
> don't match the type of Row fields:
> {code}
> this.instance = {Row@68451} "1532424567000,(3),1532424567000"
>  fields = {Object[3]@68461} 
>   0 = {Long@68462} 1532424567000
>   1 = {CountAccumulator@68463} "(3)"
>   2 = {Long@68462} 1532424567000
> this.serializer = {RowSerializer@68452} 
>  fieldSerializers = {TypeSerializer[3]@68455} 
>   0 = {StringSerializer@68458} 
>   1 = {TupleSerializer@68459} 
>   2 = {LongSerializer@68460} 
>  arity = 3
>  nullMask = {boolean[3]@68457} 
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to