Jeff Zhang created FLINK-15566:
----------------------------------

             Summary: Flink implicitly order the fields in PojoTypeInfo
                 Key: FLINK-15566
                 URL: https://issues.apache.org/jira/browse/FLINK-15566
             Project: Flink
          Issue Type: Improvement
          Components: API / Core
    Affects Versions: 1.10.0
            Reporter: Jeff Zhang
         Attachments: image-2020-01-13-16-02-57-949.png

I don't know why flink would do that, but this cause my user defined function 
behavior incorrectly if I and pojo in my udf and override getResultType

[https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]

 

Here's the udf I define.

 
{code:java}

%flink
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils._
import org.apache.flink.api.scala._

class Person(val age:Int, val job: String, val marital: String, val education: 
String, val default: String, val balance: String, val housing: String, val 
loan: String, val contact: String, val day: String, val month: String, val 
duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val 
poutcome: String, val y: String)

class ParseFunction extends TableFunction[Person] {
  def eval(line: String) {
    val tokens = line.split(";")
    // parse the line
    if (!line.startsWith("\"age\"")) {
      collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), 
normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), 
normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), 
normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new 
Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
           new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  
normalize(tokens(15)), normalize(tokens(16))))
    }
  }
  
  override def getResultType() = {
    val cls = classOf[Person]
    new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
       new PojoField(cls.getDeclaredField("age"), Types.INT),
       new PojoField(cls.getDeclaredField("job"), Types.STRING),
       new PojoField(cls.getDeclaredField("marital"), Types.STRING),
       new PojoField(cls.getDeclaredField("education"), Types.STRING),
       new PojoField(cls.getDeclaredField("default"), Types.STRING),
       new PojoField(cls.getDeclaredField("balance"), Types.STRING), 
       new PojoField(cls.getDeclaredField("housing"), Types.STRING), 
       new PojoField(cls.getDeclaredField("loan"), Types.STRING), 
       new PojoField(cls.getDeclaredField("contact"), Types.STRING), 
       new PojoField(cls.getDeclaredField("day"), Types.STRING), 
       new PojoField(cls.getDeclaredField("month"), Types.STRING), 
       new PojoField(cls.getDeclaredField("duration"), Types.INT),
       new PojoField(cls.getDeclaredField("campaign"), Types.INT),
       new PojoField(cls.getDeclaredField("pdays"), Types.INT),
       new PojoField(cls.getDeclaredField("previous"), Types.INT),
       new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
       new PojoField(cls.getDeclaredField("y"), Types.STRING)
     ))
  }  

  // remove the quote
  private def normalize(token: String) = {
      if (token.startsWith("\"")) {
          token.substring(1, token.length - 1)
      } else {
          token
      }
  }
}{code}

And then I use this udf in sql but get the wrong result because the flink 
reorder the fields implicitly.

 !image-2020-01-13-16-02-57-949.png! 





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to