hi,when I write a sql like this:



        String sqlCreate = "CREATE TABLE fs_table (\n" +
                "  `examplestr` bytes\n" +
                ")  WITH (\n" +
                "  'connector'='filesystem',\n" +
                "  'format'='raw',\n" +
                "  'path'='/tmp/zhangying480'\n" +
                ")";
 I get an error like this,which means my tfrecord format is wrong,but it is 
correct:
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        ... 4 more
Caused by: java.io.IOException: Length header crc32 checking failed: 
-1837507072 != -252953760, length = 323850
at 
com.jd.realtime.formal.operator.source.inner.TFRecordReader.read(TFRecordReader.java:44)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:128)
at 
com.jd.realtime.formal.operator.source.inner.TFRecordInputFormat.nextRecord(TFRecordInputFormat.java:17)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


I read the source code then have a result:
public class SerializationSchemaAdapter implements Encoder<RowData> {

private static final long serialVersionUID = 1L;

    static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];


@Override
public void encode(RowData element, OutputStream stream) throws IOException {
    checkOpened();
stream.write(serializationSchema.serialize(element));
stream.write(LINE_DELIMITER);


write line_delimiter will cause the incorrect result,so I realize a 
TFRecordSerializationSchemaAdapter
and I override the FileSystemTableSink,then I fix the problem


                          
So far,I only suport filesystemsink with tfrecord format,do you think It is 
necessary to support other system?















Reply via email to