Dear All
       I have a question about  Flink&Hadoop.
        I want to read the files on HDFS by flink,but I encountered an error as 
follows,can you please advise the solution about this problem. It will be much 
appreciated.:
 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Exception in thread "main" 
com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ 
jar:file:/data/home/fbi/hanningning/flink-hdfs/target/flink-hdfs.jar!/reference.conf:
 804: Could not resolve substitution to a value: ${akka.stream.materializer}
        at 
com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
        at 
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
        at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
        at 
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
        at 
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
        at 
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
        at 
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
        at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
        at 
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
        at 
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
        at 
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
        at 
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
        at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
        at 
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
        at 
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
        at 
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
        at 
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
        at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
        at 
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
        at 
com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
        at 
com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
        at 
com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
        at 
com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
        at 
com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
.....




====================my code===========================
public class App {

    public static void main(String[] args) throws Exception {

        final String inputPath = args[0]//hdfs file path;
        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

        HadoopInputFormat<LongWritable,Text> hadoopInputFormat =
                new HadoopInputFormat<LongWritable, Text>(new 
TextInputFormat(),LongWritable.class,
                        Text.class,new JobConf());
        TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new 
Path(inputPath));

        DataSet<Tuple2<LongWritable, Text>> text = 
env.createInput(hadoopInputFormat);

        text.print();

        env.execute("read hdfs by flink test");
    }
    
}
==========================maven 
dependencies=======================================
<properties>
    <flink.version>1.4.0</flink.version>
    <log4j.version>1.2.17</log4j.version>
    <akka.version>2.4.20</akka.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-hadoop-compatibility_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>

    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
</dependencies>
 
Best wishes



Thanks

Reply via email to