Could anyone please help me with this? Thanks, Akshay On Mon, 10 Dec 2018, 6:05 pm Akshay Mendole <akshaymend...@gmail.com wrote:
> Hi, > I have been facing issues while trying to read from a hdfs sequence > file. > > This is my code snippet > > DataSource<Tuple2<Text, Text>> input = env > .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, > ravenDataDir), > TypeInformation.of(new TypeHint<Tuple2<Text, Text>>() { > })); > > > Upon executing this in yarn cluster mode, I am getting following error > The type returned by the input format could not be automatically > determined. Please specify the TypeInformation of the produced type > explicitly by using the 'createInput(InputFormat, TypeInformation)' method > instead. > > org.apache.flink.api.java.ExecutionEnvironment.createInput(ExecutionEnvironment.java:551) > flipkart.EnrichementFlink.main(EnrichementFlink.java:31) > > > When I add the TypeInformation myself as follows, I run into the same > issue. > > DataSource<Tuple2<Text, Text>> input = env > .createInput(HadoopInputs.readSequenceFile(Text.class, Text.class, > ravenDataDir)); > > > > > When I add these libraries in the lib folder, > flink-hadoop-compatibility_2.11-1.7.0.jar > > > the error changes to this > > java.lang.NoClassDefFoundError: > org/apache/flink/api/common/typeutils/TypeSerializerSnapshot > at > org.apache.flink.api.java.typeutils.WritableTypeInfo.createSerializer(WritableTypeInfo.java:111) > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:107) > at > org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:52) > at > org.apache.flink.optimizer.postpass.JavaApiPostPass.createSerializer(JavaApiPostPass.java:283) > at > org.apache.flink.optimizer.postpass.JavaApiPostPass.traverseChannel(JavaApiPostPass.java:252) > at > org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:97) > at > org.apache.flink.optimizer.postpass.JavaApiPostPass.postPass(JavaApiPostPass.java:81) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:527) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:399) > at > org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:379) > at > org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:906) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:473) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > > > Can someone help me resolve this issue? > > Thanks, > Akshay > > > >