[ 
https://issues.apache.org/jira/browse/FLINK-6896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050502#comment-16050502
 ] 

ASF GitHub Bot commented on FLINK-6896:
---------------------------------------

Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4111#discussion_r122205799
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
    @@ -871,12 +871,24 @@ class CodeGenerator(
           returnType: TypeInformation[_ <: Any],
           resultFieldNames: Seq[String])
         : GeneratedExpression = {
    -    val input1AccessExprs = for (i <- 0 until input1.getArity if 
input1Mapping.contains(i))
    -      yield generateInputAccess(input1, input1Term, i, input1Mapping)
    +
    +    val input1AccessExprs = if (input1.isInstanceOf[PojoTypeInfo[_]]) {
    +      for (i <- 0 until input1Mapping.length)
    +        yield generateInputAccess(input1, input1Term, i, input1Mapping)
    +    } else {
    +      for (i <- 0 until input1.getArity if input1Mapping.contains(i))
    +        yield generateInputAccess(input1, input1Term, i, input1Mapping)
    +    }
     
         val input2AccessExprs = input2 match {
    -      case Some(ti) => for (i <- 0 until ti.getArity if 
input2Mapping.contains(i))
    -        yield generateInputAccess(ti, input2Term, i, input2Mapping)
    +      case Some(ti) => if (input2.isInstanceOf[PojoTypeInfo[_]]) {
    --- End diff --
    
    Thanks @wuchong I check it in my side. nice. 
    +1
    
    Best,
    SunJincheng


> Creating a table from a POJO and use table sink to output fail
> --------------------------------------------------------------
>
>                 Key: FLINK-6896
>                 URL: https://issues.apache.org/jira/browse/FLINK-6896
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: Mark You
>            Assignee: sunjincheng
>         Attachments: debug.png
>
>
> Following example fails at sink, using debug mode to see the reason of 
> ArrayIndexOutOfBoundException is cause by the input type is Pojo type not Row?
> Sample:
> {code:title=TumblingWindow.java|borderStyle=solid}
> public class TumblingWindow {
>     public static void main(String[] args) throws Exception {
>         List<Content> data = new ArrayList<Content>();
>         data.add(new Content(1L, "Hi"));
>         data.add(new Content(2L, "Hallo"));
>         data.add(new Content(3L, "Hello"));
>         data.add(new Content(4L, "Hello"));
>         data.add(new Content(7L, "Hello"));
>         data.add(new Content(8L, "Hello world"));
>         data.add(new Content(16L, "Hello world"));
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         DataStream<Content> stream = env.fromCollection(data);
>         DataStream<Content> stream2 = stream.assignTimestampsAndWatermarks(
>                 new 
> BoundedOutOfOrdernessTimestampExtractor<Content>(Time.milliseconds(1)) {
>                     /**
>                      * 
>                      */
>                     private static final long serialVersionUID = 
> 410512296011057717L;
>                     @Override
>                     public long extractTimestamp(Content element) {
>                         return element.getRecordTime();
>                     }
>                 });
>         final StreamTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env);
>         Table table = tableEnv.fromDataStream(stream2, 
> "urlKey,uplink,downlink,httpGetMessageCount,httpPostMessageCount,statusCode,rowtime.rowtime");
>         Table windowTable = 
> table.window(Tumble.over("1.hours").on("rowtime").as("w")).groupBy("w, 
> urlKey")
>                 
> .select("w.start,urlKey,uplink.sum,downlink.sum,httpGetMessageCount.sum,httpPostMessageCount.sum
>  ");
>         //table.printSchema();
>         TableSink<Row> windowSink = new 
> CsvTableSink("/Users/mark/Documents/specific-website-code.csv", ",", 1,
>                 WriteMode.OVERWRITE);
>         windowTable.writeToSink(windowSink);
>         // tableEnv.toDataStream(windowTable, Row.class).print();
>         env.execute();
>     }
>     public static class Content implements Serializable {
>         /**
>          * 
>          */
>         private static final long serialVersionUID = 1429246948772430441L;
>         private String urlKey;
>         private long recordTime;
>         // private String recordTimeStr;
>         private long httpGetMessageCount;
>         private long httpPostMessageCount;
>         private long uplink;
>         private long downlink;
>         private long statusCode;
>         private long statusCodeCount;
>         public Content() {
>             super();
>         }
>         public Content(long recordTime, String urlKey) {
>             super();
>             this.recordTime = recordTime;
>             this.urlKey = urlKey;
>         }
>         public String getUrlKey() {
>             return urlKey;
>         }
>         public void setUrlKey(String urlKey) {
>             this.urlKey = urlKey;
>         }
>         public long getRecordTime() {
>             return recordTime;
>         }
>         public void setRecordTime(long recordTime) {
>             this.recordTime = recordTime;
>         }
>         public long getHttpGetMessageCount() {
>             return httpGetMessageCount;
>         }
>         public void setHttpGetMessageCount(long httpGetMessageCount) {
>             this.httpGetMessageCount = httpGetMessageCount;
>         }
>         public long getHttpPostMessageCount() {
>             return httpPostMessageCount;
>         }
>         public void setHttpPostMessageCount(long httpPostMessageCount) {
>             this.httpPostMessageCount = httpPostMessageCount;
>         }
>         public long getUplink() {
>             return uplink;
>         }
>         public void setUplink(long uplink) {
>             this.uplink = uplink;
>         }
>         public long getDownlink() {
>             return downlink;
>         }
>         public void setDownlink(long downlink) {
>             this.downlink = downlink;
>         }
>         public long getStatusCode() {
>             return statusCode;
>         }
>         public void setStatusCode(long statusCode) {
>             this.statusCode = statusCode;
>         }
>         public long getStatusCodeCount() {
>             return statusCodeCount;
>         }
>         public void setStatusCodeCount(long statusCodeCount) {
>             this.statusCodeCount = statusCodeCount;
>         }
>     }
>     private class TimestampWithEqualWatermark implements 
> AssignerWithPunctuatedWatermarks<Object[]> {
>         /**
>          * 
>          */
>         private static final long serialVersionUID = 1L;
>         @Override
>         public long extractTimestamp(Object[] element, long 
> previousElementTimestamp) {
>             // TODO Auto-generated method stub
>             return (long) element[0];
>         }
>         @Override
>         public Watermark checkAndGetNextWatermark(Object[] lastElement, long 
> extractedTimestamp) {
>             return new Watermark(extractedTimestamp);
>         }
>     }
> }
> {code}
> Exception trace
> {code}
> Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 6
>       at 
> org.apache.flink.table.codegen.CodeGenerator.org$apache$flink$table$codegen$CodeGenerator$$generateFieldAccess(CodeGenerator.scala:1661)
>       at 
> org.apache.flink.table.codegen.CodeGenerator.org$apache$flink$table$codegen$CodeGenerator$$generateInputAccess(CodeGenerator.scala:1599)
>       at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$26.apply(CodeGenerator.scala:875)
>       at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$26.apply(CodeGenerator.scala:874)
>       at 
> scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:728)
>       at scala.collection.immutable.Range.foreach(Range.scala:166)
>       at 
> scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:727)
>       at 
> org.apache.flink.table.codegen.CodeGenerator.generateConverterResultExpression(CodeGenerator.scala:874)
>       at 
> org.apache.flink.table.plan.nodes.CommonScan$class.generatedConversionFunction(CommonScan.scala:57)
>       at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.generatedConversionFunction(DataStreamScan.scala:36)
>       at 
> org.apache.flink.table.plan.nodes.datastream.StreamScan$class.convertToInternalRow(StreamScan.scala:48)
>       at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.convertToInternalRow(DataStreamScan.scala:36)
>       at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamScan.translateToPlan(DataStreamScan.scala:63)
>       at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:94)
>       at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate.translateToPlan(DataStreamGroupWindowAggregate.scala:119)
>       at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamCalc.translateToPlan(DataStreamCalc.scala:94)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:678)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:637)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:214)
>       at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>       at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>       at com.taiwanmobile.cep.noc.TumblingWindow.main(TumblingWindow.java:66)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to