Hi,
I am new to Flink and I've written two small test projects: 1) to read data
from s3 and 2) to push data to s3. However, I am getting two different
errors for the projects relating to, i think, how the core-site.xml file is
being read. I am running the project locally in IntelliJ. I have the
environment variable in run configurations set to
HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the
core-site.xml in the src/main/resources folder but get the same errors. I
want to know if my core-site.xml file is configured correctly for using s3a
and how to have IntelliJ read the core-site.xml file? Also, are the
core-site.xml configurations different for reading versus writing to s3?
This is my code for reading data from s3:
public class DesktopWriter {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.createLocalEnvironment();
DataSet<String> data =
env.readTextFile("s3://flink-test/flink-test.txt");
data.print();
}
}
I get the error: Caused by: java.io.IOException: Cannot determine access
key to Amazon S3. Please make sure to configure it by setting the
configuration key 'fs.s3.accessKey'.
This is my code for writing to S3:
public class S3Sink {
public static void main(String[] args) throws Exception {
Map<String, String> configs = ConfigUtils.*loadConfigs*(“path/
to/config.yaml");
final ParameterTool parameterTool = ParameterTool.*fromMap*(configs)
;
StreamExecutionEnvironment env = StreamExecutionEnvironment.
*getExecutionEnvironment*();
env.getConfig().disableSysoutLogging();
env.getConfig().setGlobalJobParameters(parameterTool);
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<String>(
parameterTool.getRequired("kafka.topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
messageStream.writeAsText("s3a://flink-test/flinktest.txt"
).setParallelism(1);
env.execute();
}
I get the error: Caused by: java.io.IOException: The given file URI
(s3://flink-test/flinktest.txt) points to the HDFS NameNode at flink-test,
but the File System could not be initialized with that address: Unable to
load AWS credentials from any provider in the chain
This is my core-site.xml:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in
org.apache.hadoop.fs.s3a.Constants
-->
<property>
<name>fs.s3a.awsAccessKeyId</name>
<value>*****</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.awsSecretAccessKey</name>
<value>*****</value>
</property>
</configuration>
This is my pom.xml:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2.5</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.2.5</version>
</dependency>
</dependencies>
Thanks!
Sam