Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan
Tummala <kali.tumm...@gmail.com>
<mailto:kali.tumm...@gmail.com>:
Flink,
I am able to access Kinesis from Intellij but not S3 I
have edited my stack overflow question with kinesis code
, Flink is still having issues reading S3.
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868>
Thanks
Sri
On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan
Tummala <kali.tumm...@gmail.com
<mailto:kali.tumm...@gmail.com>> wrote:
my stack overflow question.
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>
On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan
Tummala <kali.tumm...@gmail.com
<mailto:kali.tumm...@gmail.com>> wrote:
Here is my Intellij question.
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>
On Mon, Mar 8, 2021 at 11:22 AM sri hari kali
charan Tummala <kali.tumm...@gmail.com
<mailto:kali.tumm...@gmail.com>> wrote:
Hi Flink Experts,
I am trying to read an S3 file from my
Intellij using Flink I am.comimg across
Aws Auth error can someone help below
are all the details.
I have Aws credentials in
homefolder/.aws/credentials
My Intellij Environment Variables:-
ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
flink-conf.yaml file content:-
fs.hdfs.hadoopconf:
/Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
core-site.xml file content:-
<?xml version="1.0"?> <?xml-stylesheet
type="text/xsl"
href="configuration.xsl"?>
<configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property>
<property> <name>fs.s3.buffer.dir</name> <value>/tmp</value> </property> <property> <name>fs.s3a.server-side-encryption-algorithm</name>
<value>AES256</value> </property> <!--<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value>
</property>--> <property> <name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> </property> <property> <name>fs.s3a.access.key</name>
<value></value> </property>
<property> <name>fs.s3a.secret.key</name> <value></value>
</property>
<property> <name>fs.s3a.session.token</name>
<value></value> </property>
<property> <name>fs.s3a.proxy.host</name> <value></value>
</property>
<property> <name>fs.s3a.proxy.port</name> <value>8099</value> </property>
<property> <name>fs.s3a.proxy.username</name> <value></value> </property>
<property> <name>fs.s3a.proxy.password</name>
<value></value> </property>
</configuration>
POM.xml file:-
<?xml version="1.0" encoding="UTF-8"?>
<project
xmlns="http://maven.apache.org/POM/4.0.0
<http://maven.apache.org/POM/4.0.0>"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance
<http://www.w3.org/2001/XMLSchema-instance>"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
<http://maven.apache.org/POM/4.0.0>
http://maven.apache.org/xsd/maven-4.0.0.xsd
<http://maven.apache.org/xsd/maven-4.0.0.xsd>">
<modelVersion>4.0.0</modelVersion> <groupId>FlinkStreamAndSql</groupId>
<artifactId>FlinkStreamAndSql</artifactId> <version>1.0-SNAPSHOT</version> <build>
<sourceDirectory>src/main/scala</sourceDirectory> <plugins> <plugin>
<!-- see
http://davidb.github.com/scala-maven-plugin
<http://davidb.github.com/scala-maven-plugin>
--> <groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId> <version>3.1.3</version> <executions> <execution>
<goals> <goal>compile</goal> <goal>testCompile</goal>
</goals> <configuration>
</configuration> </execution>
</executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId> <version>2.13</version> <configuration> <useFile>false</useFile>
<disableXmlReport>true</disableXmlReport> <!-- If you have
classpath issue like NoDefClassError,...
--> <!--
useManifestOnlyJar>false</useManifestOnlyJar
--> <includes> <include>**/*Test.*</include>
<include>**/*Suite.*</include> </includes> </configuration>
</plugin> <!-- "package" command plugin
--> <plugin> <artifactId>maven-assembly-plugin</artifactId>
<version>2.4.1</version> <configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs>
</configuration> <executions>
<execution> <id>make-assembly</id> <phase>package</phase> <goals>
<goal>single</goal> </goals> </execution>
</executions> </plugin> </plugins>
</build> <dependencies> <dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId>
<version>1.8.1</version> </dependency> <dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId>
<version>1.8.1</version> </dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>org.apache.derby</groupId> <artifactId>derby</artifactId>
<version>10.13.1.1</version> </dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId>
<version>1.8.1</version> </dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId>
<version>1.8.1</version> </dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis_2.11</artifactId>
<version>1.8.0</version> <scope>system</scope>
<systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> </dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId> <version>1.8.8</version> </dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId> <version>1.11.579</version> </dependency>
<dependency>
<groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId>
<version>1.2.2</version> </dependency> <dependency>
<groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId>
<version>2.1</version> </dependency> <dependency>
<groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId>
<version>1.4</version> </dependency> <!--
https://mvnrepository.com/artifact/org.apache.commons/commons-csv
<https://mvnrepository.com/artifact/org.apache.commons/commons-csv>
--> <dependency> <groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId> <version>1.7</version> </dependency> <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId> <version>1.4.1</version> </dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency>
<dependency>
<groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId>
<version>1.11.579</version> </dependency> <!-- For
Parquet --> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId>
<version>1.8.1</version> </dependency> <dependency>
<groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId>
<version>1.10.0</version> </dependency> <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-twitter_2.10</artifactId> <version>1.1.4-hadoop1</version>
</dependency> <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>org.json4s</groupId> <artifactId>json4s-jackson_2.11</artifactId>
<version>3.6.7</version> </dependency> <dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudsearch</artifactId> <version>1.11.500</version> </dependency>
<!--
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2
<https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2>
--> <dependency> <groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId> <version>2.8.3-1.8.3</version> </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId> <version>1.8.1</version> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId>
<version>2.8.5</version> </dependency> </dependencies>
</project>
Scala Code:-
package com.aws.examples.s3
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.java.{DataSet,
ExecutionEnvironment}
import org.apache.flink.table.api.{Table,
TableEnvironment}
import
org.apache.flink.table.api.java.BatchTableEnvironment
import org.apache.flink.table.sources.CsvTableSource
object Batch {
def main(args: Array[String]):Unit = {
val env: ExecutionEnvironment =
ExecutionEnvironment.getExecutionEnvironment
val tableEnv: BatchTableEnvironment =
TableEnvironment.getTableEnvironment(env)
/* create table from csv */ val tableSrc =
CsvTableSource
.builder()
.path("s3a://bucket/csvfolder/avg.txt")
.fieldDelimiter(",")
.field("date", Types.STRING)
.field("month", Types.STRING)
.field("category", Types.STRING)
.field("product", Types.STRING)
.field("profit", Types.INT)
.build()
tableEnv.registerTableSource("CatalogTable",
tableSrc)
val catalog: Table =
tableEnv.scan("CatalogTable")
/* querying with Table API */ val order20:
Table = catalog
.filter(" category === 'Category5'")
.groupBy("month")
.select("month, profit.sum as sum")
.orderBy("sum")
val order20Set: DataSet[Row1] =
tableEnv.toDataSet(order20, classOf[Row1])
order20Set.writeAsText("src/main/resources/table1/table1")
//tableEnv.toAppendStream(order20,
classOf[Row]).writeAsText("/home/jivesh/table")
env.execute("State")
}
class Row1 {
var month:String = _
var sum: java.lang.Integer = _
override def toString():String =month +","
+sum }
}
Error:-
*Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Unable to load credentials from service
endpoint*
*Caused by:
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException:
No AWS Credentials provided by
BasicAWSCredentialsProvider
EnvironmentVariableCredentialsProvider
InstanceProfileCredentialsProvider :
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
Unable to load credentials from service
endpoint*
Thanks
------------------------------------------------------------------------
The information contained in this e-mail
is confidential and/or proprietary to
Capital One and/or its affiliates and
may only be used solely in performance
of work or services for Capital One. The
information transmitted herewith is
intended only for use by the individual
or entity to which it is addressed. If
the reader of this message is not the
intended recipient, you are hereby
notified that any review,
retransmission, dissemination,
distribution, copying or other use of,
or taking of any action in reliance upon
this information is strictly prohibited.
If you have received this communication
in error, please contact the sender and
delete the material from your computer.
--
Thanks & Regards
Sri Tummala
--
Thanks & Regards
Sri Tummala
--
Thanks & Regards
Sri Tummala
--
Thanks & Regards
Sri Tummala