LI HAO created BEAM-7695: ---------------------------- Summary: Read TFRecord Files from hdfs will meet exception if file size is large Key: BEAM-7695 URL: https://issues.apache.org/jira/browse/BEAM-7695 Project: Beam Issue Type: Bug Components: io-java-tfrecord Affects Versions: 2.13.0, 2.12.0, 2.11.0, 2.10.0, 2.9.0, 2.8.0, 2.7.0, 2.6.0, 2.5.0, 2.4.0, 2.3.0, 2.2.0 Reporter: LI HAO
I read the TFRecord files which is in HDFS will meet error. * The single TFRecord file is larger than 3GB. * The total size larger than 1TB. * Using Beam 2.13.0 + Flinkrunner 2.13.0, I also test 2.11.0/2.12.0 The dependency jar: ``` build.gradle dependencies { // This dependency is found on compile classpath of this component and consumers. //implementation 'com.google.guava:guava:27.0.1-jre' compile 'org.apache.beam:beam-sdks-java-core:2.13.0' compile 'org.apache.beam:beam-vendor-guava-20_0:0.1' compile 'org.tensorflow:tensorflow-hadoop:1.13.1' compile 'org.apache.beam:beam-runners-direct-java:2.13.0' //implementation "org.apache.beam:beam-sdks-java-core:2.13.0" compile "org.apache.beam:beam-runners-flink_2.11:2.13.0" compile "org.apache.beam:beam-sdks-java-io-hadoop-file-system:2.13.0" compile "org.apache.hadoop:hadoop-common:2.7.3" compile "org.apache.hadoop:hadoop-client:2.7.3" compile "org.apache.hadoop:hadoop-mapreduce-client-core:2.7.3" compile "org.tensorflow:proto:1.13.1" compile "org.apache.beam:beam-sdks-java-extensions-sketching:2.13.0" // Use JUnit test framework testImplementation 'junit:junit:4.12' } ``` The error msg: ``` ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126) Caused by: java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) at avazu.data.transform.App.testTfrecordQIYU(App.java:572) at avazu.data.transform.App.main(App.java:744) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) ... 12 more Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: fe4ce5375efbbb55e56967e4c7a975b2) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:475) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139) at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113) ... 21 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) ... 26 more Caused by: java.io.IOException: Mismatch of length mask when reading a record. Expected 808268081 but received 1769712859. at org.apache.beam.sdk.io.TFRecordIO$TFRecordCodec.read(TFRecordIO.java:651) at org.apache.beam.sdk.io.TFRecordIO$TFRecordSource$TFRecordReader.readNextRecord(TFRecordIO.java:530) at org.apache.beam.sdk.io.CompressedSource$CompressedReader.readNextRecord(CompressedSource.java:431) at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:484) at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:479) at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:249) at org.apache.beam.runners.flink.metrics.ReaderInvocationUtil.invokeStart(ReaderInvocationUtil.java:51) at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:75) at org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.open(SourceInputFormat.java:42) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:170) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:745) ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005)