my stack overflow question. https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Here is my Intellij question. > > > https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 > > On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> >> Hi Flink Experts, >>> >> >> I am trying to read an S3 file from my Intellij using Flink I am.comimg >>> across Aws Auth error can someone help below are all the details. >>> >> >> >>> I have Aws credentials in homefolder/.aws/credentials >>> >> >> My Intellij Environment Variables:- >>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 >>> >>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config >>> >>> flink-conf.yaml file content:- >>> >>> fs.hdfs.hadoopconf: >>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config >>> >>> core-site.xml file content:- >>> >>> <?xml version="1.0"?> >>> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> >>> >>> <configuration> >>> <property> >>> <name>fs.s3.impl</name> >>> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> >>> </property> >>> >>> <property> >>> <name>fs.s3.buffer.dir</name> >>> <value>/tmp</value> >>> </property> >>> >>> <property> >>> <name>fs.s3a.server-side-encryption-algorithm</name> >>> <value>AES256</value> >>> </property> >>> >>> <!--<property> >>> <name>fs.s3a.aws.credentials.provider</name> >>> >>> <value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value> >>> </property>--> >>> >>> <property> >>> <name>fs.s3a.aws.credentials.provider</name> >>> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> >>> </property> >>> <property> >>> <name>fs.s3a.access.key</name> >>> <value></value> >>> </property> >>> <property> >>> <name>fs.s3a.secret.key</name> >>> <value></value> >>> </property> >>> <property> >>> <name>fs.s3a.session.token</name> >>> <value></value> >>> </property> >>> >>> <property> >>> <name>fs.s3a.proxy.host</name> >>> <value></value> >>> </property> >>> <property> >>> <name>fs.s3a.proxy.port</name> >>> <value>8099</value> >>> </property> >>> <property> >>> <name>fs.s3a.proxy.username</name> >>> <value></value> >>> </property> >>> <property> >>> <name>fs.s3a.proxy.password</name> >>> <value></value> >>> </property> >>> >>> </configuration> >>> >>> POM.xml file:- >>> >>> <?xml version="1.0" encoding="UTF-8"?> >>> <project xmlns="http://maven.apache.org/POM/4.0.0" >>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>> http://maven.apache.org/xsd/maven-4.0.0.xsd"> >>> <modelVersion>4.0.0</modelVersion> >>> >>> <groupId>FlinkStreamAndSql</groupId> >>> <artifactId>FlinkStreamAndSql</artifactId> >>> <version>1.0-SNAPSHOT</version> >>> <build> >>> <sourceDirectory>src/main/scala</sourceDirectory> >>> <plugins> >>> <plugin> >>> <!-- see http://davidb.github.com/scala-maven-plugin --> >>> <groupId>net.alchim31.maven</groupId> >>> <artifactId>scala-maven-plugin</artifactId> >>> <version>3.1.3</version> >>> <executions> >>> <execution> >>> <goals> >>> <goal>compile</goal> >>> <goal>testCompile</goal> >>> </goals> >>> <configuration> >>> </configuration> >>> </execution> >>> </executions> >>> </plugin> >>> <plugin> >>> <groupId>org.apache.maven.plugins</groupId> >>> <artifactId>maven-surefire-plugin</artifactId> >>> <version>2.13</version> >>> <configuration> >>> <useFile>false</useFile> >>> <disableXmlReport>true</disableXmlReport> >>> <!-- If you have classpath issue like >>> NoDefClassError,... --> >>> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> >>> <includes> >>> <include>**/*Test.*</include> >>> <include>**/*Suite.*</include> >>> </includes> >>> </configuration> >>> </plugin> >>> >>> <!-- "package" command plugin --> >>> <plugin> >>> <artifactId>maven-assembly-plugin</artifactId> >>> <version>2.4.1</version> >>> <configuration> >>> <descriptorRefs> >>> <descriptorRef>jar-with-dependencies</descriptorRef> >>> </descriptorRefs> >>> </configuration> >>> <executions> >>> <execution> >>> <id>make-assembly</id> >>> <phase>package</phase> >>> <goals> >>> <goal>single</goal> >>> </goals> >>> </execution> >>> </executions> >>> </plugin> >>> </plugins> >>> </build> >>> <dependencies> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-core</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-core</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-clients_2.11</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.derby</groupId> >>> <artifactId>derby</artifactId> >>> <version>10.13.1.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-jdbc_2.11</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table-api-scala_2.11</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table-api-java</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-table-planner_2.11</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-json</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-scala_2.11</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-scala_2.11</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-streaming-scala_2.11</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-connector-kinesis_2.11</artifactId> >>> <version>1.8.0</version> >>> <scope>system</scope> >>> >>> <systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>com.amazonaws</groupId> >>> <artifactId>amazon-kinesis-client</artifactId> >>> <version>1.8.8</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>com.amazonaws</groupId> >>> <artifactId>aws-java-sdk-kinesis</artifactId> >>> <version>1.11.579</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>commons-dbcp</groupId> >>> <artifactId>commons-dbcp</artifactId> >>> <version>1.2.2</version> >>> </dependency> >>> <dependency> >>> <groupId>com.google.code.gson</groupId> >>> <artifactId>gson</artifactId> >>> <version>2.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>commons-cli</groupId> >>> <artifactId>commons-cli</artifactId> >>> <version>1.4</version> >>> </dependency> >>> >>> <!-- >>> https://mvnrepository.com/artifact/org.apache.commons/commons-csv --> >>> <dependency> >>> <groupId>org.apache.commons</groupId> >>> <artifactId>commons-csv</artifactId> >>> <version>1.7</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.commons</groupId> >>> <artifactId>commons-compress</artifactId> >>> <version>1.4.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>com.amazonaws</groupId> >>> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> >>> <version>1.4.0</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>com.amazonaws</groupId> >>> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> >>> <version>1.4.0</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>com.amazonaws</groupId> >>> <artifactId>aws-java-sdk</artifactId> >>> <version>1.11.579</version> >>> </dependency> >>> >>> >>> <!-- For Parquet --> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-hadoop-compatibility_2.11</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-avro</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.parquet</groupId> >>> <artifactId>parquet-avro</artifactId> >>> <version>1.10.0</version> >>> </dependency> >>> <dependency> >>> <groupId>org.apache.hadoop</groupId> >>> <artifactId>hadoop-mapreduce-client-core</artifactId> >>> <version>3.1.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-connector-twitter_2.10</artifactId> >>> <version>1.1.4-hadoop1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-connector-filesystem_2.11</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.json4s</groupId> >>> <artifactId>json4s-jackson_2.11</artifactId> >>> <version>3.6.7</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>com.amazonaws</groupId> >>> <artifactId>aws-java-sdk-cloudsearch</artifactId> >>> <version>1.11.500</version> >>> </dependency> >>> >>> <!-- >>> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2 --> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-shaded-hadoop2</artifactId> >>> <version>2.8.3-1.8.3</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.flink</groupId> >>> <artifactId>flink-s3-fs-hadoop</artifactId> >>> <version>1.8.1</version> >>> </dependency> >>> >>> <dependency> >>> <groupId>org.apache.hadoop</groupId> >>> <artifactId>hadoop-common</artifactId> >>> <version>2.8.5</version> >>> </dependency> >>> >>> >>> </dependencies> >>> >>> </project> >>> >>> Scala Code:- >>> >>> package com.aws.examples.s3 >>> >>> >>> import org.apache.flink.api.common.typeinfo.Types >>> import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} >>> import org.apache.flink.table.api.{Table, TableEnvironment} >>> import org.apache.flink.table.api.java.BatchTableEnvironment >>> import org.apache.flink.table.sources.CsvTableSource >>> >>> object Batch { >>> >>> def main(args: Array[String]): Unit = { >>> >>> val env: ExecutionEnvironment = >>> ExecutionEnvironment.getExecutionEnvironment >>> val tableEnv: BatchTableEnvironment = >>> TableEnvironment.getTableEnvironment(env) >>> /* create table from csv */ >>> >>> val tableSrc = CsvTableSource >>> .builder() >>> .path("s3a://bucket/csvfolder/avg.txt") >>> .fieldDelimiter(",") >>> .field("date", Types.STRING) >>> .field("month", Types.STRING) >>> .field("category", Types.STRING) >>> .field("product", Types.STRING) >>> .field("profit", Types.INT) >>> .build() >>> >>> tableEnv.registerTableSource("CatalogTable", tableSrc) >>> >>> val catalog: Table = tableEnv.scan("CatalogTable") >>> /* querying with Table API */ >>> >>> val order20: Table = catalog >>> .filter(" category === 'Category5'") >>> .groupBy("month") >>> .select("month, profit.sum as sum") >>> .orderBy("sum") >>> >>> val order20Set: DataSet[Row1] = tableEnv.toDataSet(order20, >>> classOf[Row1]) >>> >>> order20Set.writeAsText("src/main/resources/table1/table1") >>> >>> //tableEnv.toAppendStream(order20, >>> classOf[Row]).writeAsText("/home/jivesh/table") >>> env.execute("State") >>> >>> } >>> >>> class Row1 { >>> >>> var month: String = _ >>> >>> var sum: java.lang.Integer = _ >>> >>> override def toString(): String = month + "," + sum >>> >>> } >>> >>> } >>> >>> Error:- >>> *Caused by: >>> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable >>> to load credentials from service endpoint* >>> >>> *Caused by: >>> org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException: No >>> AWS Credentials provided by BasicAWSCredentialsProvider >>> EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : >>> org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException: Unable >>> to load credentials from service endpoint* >>> >>> >>> Thanks >>> >>> ------------------------------ >>> >>> The information contained in this e-mail is confidential and/or >>> proprietary to Capital One and/or its affiliates and may only be used >>> solely in performance of work or services for Capital One. The information >>> transmitted herewith is intended only for use by the individual or entity >>> to which it is addressed. If the reader of this message is not the intended >>> recipient, you are hereby notified that any review, retransmission, >>> dissemination, distribution, copying or other use of, or taking of any >>> action in reliance upon this information is strictly prohibited. If you >>> have received this communication in error, please contact the sender and >>> delete the material from your computer. >>> >>> >>> >>> >>> -- >> Thanks & Regards >> Sri Tummala >> >> > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala