[ https://issues.apache.org/jira/browse/BEAM-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17547988#comment-17547988 ]
Danny McCormick commented on BEAM-7695: --------------------------------------- This issue has been migrated to https://github.com/apache/beam/issues/19644 > Read TFRecord Files from hdfs will meet exception if file size is large > ----------------------------------------------------------------------- > > Key: BEAM-7695 > URL: https://issues.apache.org/jira/browse/BEAM-7695 > Project: Beam > Issue Type: Bug > Components: io-java-tfrecord, runner-flink > Affects Versions: 2.2.0, 2.3.0, 2.4.0, 2.5.0, 2.6.0, 2.7.0, 2.8.0, 2.9.0, > 2.10.0, 2.11.0, 2.12.0, 2.13.0 > Reporter: LI HAO > Priority: P3 > > I read the TFRecord files which is in HDFS will meet error. > * The single TFRecord file is larger than 3GB. > * The total size larger than 1TB. > * Using Beam 2.13.0 + Flinkrunner 2.13.0 + Java 1.8, I also test > 2.11.0/2.12.0 with same problem > The dependency jar (in build.gradle): > {code:java} > dependencies > { // This dependency is found on compile classpath of this component and > consumers. //implementation 'com.google.guava:guava:27.0.1-jre' compile > 'org.apache.beam:beam-sdks-java-core:2.13.0' compile > 'org.apache.beam:beam-vendor-guava-20_0:0.1' compile > 'org.tensorflow:tensorflow-hadoop:1.13.1' compile > 'org.apache.beam:beam-runners-direct-java:2.13.0' //implementation > "org.apache.beam:beam-sdks-java-core:2.13.0" compile > "org.apache.beam:beam-runners-flink_2.11:2.13.0" compile > "org.apache.beam:beam-sdks-java-io-hadoop-file-system:2.13.0" compile > "org.apache.hadoop:hadoop-common:2.7.3" compile > "org.apache.hadoop:hadoop-client:2.7.3" compile > "org.apache.hadoop:hadoop-mapreduce-client-core:2.7.3" compile > "org.tensorflow:proto:1.13.1" compile > "org.apache.beam:beam-sdks-java-extensions-sketching:2.13.0" // Use JUnit > test framework testImplementation 'junit:junit:4.12' }{code} > The error msg: > > {code:java} > ------------------------------------------------------------ > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) > at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) > at avazu.data.transform.App.testTfrecordQIYU(App.java:572) > at avazu.data.transform.App.main(App.java:744) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > ... 12 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: Job > failed. (JobID: fe4ce5375efbbb55e56967e4c7a975b2) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113) > ... 21 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 26 more > Caused by: java.io.IOException: Mismatch of length mask when reading a > record. Expected 808268081 but received 1769712859. > at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:651) > at > org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:530) > at > org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:431) > at > org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484) > at > org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479) > at > org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249) > at > org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51) > at > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:75) > at > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42) > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) > at java.lang.Thread.run(Thread.java:745) > {code} > How to fix? > * I have already fix the bug with the following code. > * I will refine the following code and commit the codes. > > {code:java} > diff --git > a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java > b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java > index 96a753a..484a7cb 100644 > --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java > +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java > @@ -631,8 +631,16 @@ public class TFRecordIO { > int headerBytes = inChannel.read(header); > if (headerBytes <= 0) { > return null; > + } else if (headerBytes != HEADER_LEN) { > + while (header.hasRemaining() && inChannel.read(header) >= 0) {} > + if (header.hasRemaining()) { > + throw new IOException(String.format( > + "EOF while reading record of length %d. Read only %d bytes. Input might be > truncated. Not a valid TFRecord. Fewer than 12 bytes.", > + HEADER_LEN, header.position())); > + } > + } else { > + > } > - checkState(headerBytes == HEADER_LEN, "Not a valid TFRecord. Fewer than 12 > bytes."); > header.rewind(); > long length = header.getLong(); > @@ -655,7 +663,12 @@ public class TFRecordIO { > } > footer.clear(); > - inChannel.read(footer); > + while (footer.hasRemaining() && inChannel.read(footer) >= 0) {} > + if (footer.hasRemaining()) { > + throw new IOException(String.format( > + "EOF while reading record of length %d. Read only %d bytes. Input might be > truncated. Footer error.", > + FOOTER_LEN, footer.position())); > + } > footer.rewind(); > int maskedCrc32OfData = footer.getInt(); > {code} > -- This message was sent by Atlassian Jira (v8.20.7#820007)