Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala
<kali.tumm...@gmail.com>:
Flink,
I am able to access Kinesis from Intellij but not S3 I have edited my
stack overflow question with kinesis code , Flink is still having
issues reading S3.
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868>
Thanks
Sri
On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala
<kali.tumm...@gmail.com <mailto:kali.tumm...@gmail.com>> wrote:
my stack overflow question.
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>
On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala
<kali.tumm...@gmail.com <mailto:kali.tumm...@gmail.com>> wrote:
Here is my Intellij question.
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>
On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala
<kali.tumm...@gmail.com <mailto:kali.tumm...@gmail.com>> wrote:
Hi Flink Experts,
I am trying to read an S3 file from my Intellij using
Flink I am.comimg across Aws Auth error can someone
help below are all the details.
I have Aws credentials in homefolder/.aws/credentials
My Intellij Environment Variables:-
ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
flink-conf.yaml file content:-
fs.hdfs.hadoopconf:
/Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
core-site.xml file content:-
<?xml version="1.0"?> <?xml-stylesheet
type="text/xsl" href="configuration.xsl"?>
<configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <property>
<name>fs.s3.buffer.dir</name> <value>/tmp</value> </property> <property> <name>fs.s3a.server-side-encryption-algorithm</name>
<value>AES256</value> </property> <!--<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value>
</property>--> <property> <name>fs.s3a.aws.credentials.provider</name> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> </property> <property> <name>fs.s3a.access.key</name> <value></value> </property> <property> <name>fs.s3a.secret.key</name>
<value></value> </property> <property> <name>fs.s3a.session.token</name> <value></value> </property> <property> <name>fs.s3a.proxy.host</name> <value></value> </property> <property> <name>fs.s3a.proxy.port</name> <value>8099</value> </property>
<property> <name>fs.s3a.proxy.username</name> <value></value> </property> <property> <name>fs.s3a.proxy.password</name> <value></value> </property> </configuration>
POM.xml file:-
<?xml version="1.0" encoding="UTF-8"?> <project
xmlns="http://maven.apache.org/POM/4.0.0
<http://maven.apache.org/POM/4.0.0>"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance
<http://www.w3.org/2001/XMLSchema-instance>"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
<http://maven.apache.org/POM/4.0.0>
http://maven.apache.org/xsd/maven-4.0.0.xsd
<http://maven.apache.org/xsd/maven-4.0.0.xsd>">
<modelVersion>4.0.0</modelVersion> <groupId>FlinkStreamAndSql</groupId>
<artifactId>FlinkStreamAndSql</artifactId> <version>1.0-SNAPSHOT</version> <build>
<sourceDirectory>src/main/scala</sourceDirectory> <plugins> <plugin> <!-- see
http://davidb.github.com/scala-maven-plugin
<http://davidb.github.com/scala-maven-plugin> -->
<groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.3</version>
<executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration>
</configuration>
</execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId> <version>2.13</version> <configuration> <useFile>false</useFile>
<disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue
like NoDefClassError,... --> <!--
useManifestOnlyJar>false</useManifestOnlyJar -->
<includes> <include>**/*Test.*</include> <include>**/*Suite.*</include>
</includes> </configuration> </plugin>
<!-- "package" command plugin --> <plugin>
<artifactId>maven-assembly-plugin</artifactId> <version>2.4.1</version> <configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs> </configuration>
<executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals>
<goal>single</goal> </goals> </execution> </executions> </plugin>
</plugins> </build> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> <version>10.13.1.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala_2.11</artifactId>
<version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId>
<version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis_2.11</artifactId> <version>1.8.0</version> <scope>system</scope> <systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>1.8.8</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.11.579</version> </dependency> <dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.1</version> </dependency> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.4</version> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.commons/commons-csv
<https://mvnrepository.com/artifact/org.apache.commons/commons-csv>
--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-csv</artifactId> <version>1.7</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId>
<version>1.4.1</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> <version>1.11.579</version> </dependency> <!-- For Parquet -->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-twitter_2.10</artifactId>
<version>1.1.4-hadoop1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-jackson_2.11</artifactId> <version>3.6.7</version> </dependency> <dependency>
<groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-cloudsearch</artifactId> <version>1.11.500</version> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2
<https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2>
--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <version>2.8.3-1.8.3</version> </dependency> <dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency> </dependencies> </project>
Scala Code:-
package com.aws.examples.s3
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableEnvironment}
import org.apache.flink.table.api.java.BatchTableEnvironment
import org.apache.flink.table.sources.CsvTableSource
object Batch {
def main(args: Array[String]):Unit = {
val env: ExecutionEnvironment =
ExecutionEnvironment.getExecutionEnvironment val
tableEnv: BatchTableEnvironment =
TableEnvironment.getTableEnvironment(env)
/* create table from csv */ val tableSrc = CsvTableSource
.builder()
.path("s3a://bucket/csvfolder/avg.txt")
.fieldDelimiter(",")
.field("date", Types.STRING)
.field("month", Types.STRING)
.field("category", Types.STRING)
.field("product", Types.STRING)
.field("profit", Types.INT)
.build()
tableEnv.registerTableSource("CatalogTable", tableSrc)
val catalog: Table = tableEnv.scan("CatalogTable")
/* querying with Table API */ val order20: Table = catalog
.filter(" category === 'Category5'")
.groupBy("month")
.select("month, profit.sum as sum")
.orderBy("sum")
val order20Set: DataSet[Row1] =
tableEnv.toDataSet(order20, classOf[Row1])
order20Set.writeAsText("src/main/resources/table1/table1")
//tableEnv.toAppendStream(order20,
classOf[Row]).writeAsText("/home/jivesh/table")
env.execute("State")
}
class Row1 {
var month:String = _
var sum: java.lang.Integer = _
override def toString():String =month +"," +sum }
}
Error:-
*Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Unable to load credentials from service endpoint*
*Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException:
No AWS Credentials provided by
BasicAWSCredentialsProvider
EnvironmentVariableCredentialsProvider
InstanceProfileCredentialsProvider :
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Unable to load credentials from service endpoint*
Thanks
------------------------------------------------------------------------
The information contained in this e-mail is
confidential and/or proprietary to Capital One and/or
its affiliates and may only be used solely in
performance of work or services for Capital One. The
information transmitted herewith is intended only for
use by the individual or entity to which it is
addressed. If the reader of this message is not the
intended recipient, you are hereby notified that any
review, retransmission, dissemination, distribution,
copying or other use of, or taking of any action in
reliance upon this information is strictly
prohibited. If you have received this communication
in error, please contact the sender and delete the
material from your computer.
--
Thanks & Regards
Sri Tummala
--
Thanks & Regards
Sri Tummala
--
Thanks & Regards
Sri Tummala
--
Thanks & Regards
Sri Tummala