Well, you could do this before running the job:

// set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable, pointing to a directory containing the plugins

PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(new Configuration());
Filesystem.initialize(new Configuration(), pluginManager);

On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
Hi.

I had the same problem. Flink use a plugins to access s3. When you run local it starts a mini cluster and the mini cluster don’t load plugins. So it’s not possible without modifying Flink.  In my case I wanted to investigate save points through Flink processor API and the workaround was to build my own version of the processor API and include the missing part.

Med venlig hilsen / Best regards
Lasse Nedergaard


Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala <kali.tumm...@gmail.com>:


Flink,

I am able to access Kinesis from Intellij but not S3 I have edited my stack overflow question with kinesis code , Flink is still having issues reading S3.

https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 <https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868>


Thanks
Sri

On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <kali.tumm...@gmail.com <mailto:kali.tumm...@gmail.com>> wrote:

    my stack overflow question.

    
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
    
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>

    On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala
    <kali.tumm...@gmail.com <mailto:kali.tumm...@gmail.com>> wrote:

        Here is my Intellij question.

        
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
        
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>

        On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala
        <kali.tumm...@gmail.com <mailto:kali.tumm...@gmail.com>> wrote:


                Hi Flink Experts,


                I am trying to read an S3 file from my Intellij using
                Flink I am.comimg across Aws Auth error can someone
                help below are all the details.

                I have Aws credentials in homefolder/.aws/credentials


                My Intellij Environment Variables:-
                ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
                
FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config

                flink-conf.yaml file content:-

                fs.hdfs.hadoopconf: 
/Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config

                core-site.xml file content:-

                <?xml version="1.0"?> <?xml-stylesheet
                type="text/xsl" href="configuration.xsl"?>
                <configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> <property> 
<name>fs.s3.buffer.dir</name> <value>/tmp</value> </property> <property> <name>fs.s3a.server-side-encryption-algorithm</name> 
<value>AES256</value> </property> <!--<property>
                <name>fs.s3a.aws.credentials.provider</name>
                
<value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value>
                </property>--> <property> <name>fs.s3a.aws.credentials.provider</name> <value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> </property> <property> <name>fs.s3a.access.key</name> <value></value> </property> <property> <name>fs.s3a.secret.key</name> 
<value></value> </property> <property> <name>fs.s3a.session.token</name> <value></value> </property> <property> <name>fs.s3a.proxy.host</name> <value></value> </property> <property> <name>fs.s3a.proxy.port</name> <value>8099</value> </property> 
<property> <name>fs.s3a.proxy.username</name> <value></value> </property> <property> <name>fs.s3a.proxy.password</name> <value></value> </property> </configuration>

                POM.xml file:-

                <?xml version="1.0" encoding="UTF-8"?> <project
                xmlns="http://maven.apache.org/POM/4.0.0
                <http://maven.apache.org/POM/4.0.0>"
                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance
                <http://www.w3.org/2001/XMLSchema-instance>"
                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                <http://maven.apache.org/POM/4.0.0>
                http://maven.apache.org/xsd/maven-4.0.0.xsd
                <http://maven.apache.org/xsd/maven-4.0.0.xsd>">
                <modelVersion>4.0.0</modelVersion> <groupId>FlinkStreamAndSql</groupId> 
<artifactId>FlinkStreamAndSql</artifactId> <version>1.0-SNAPSHOT</version> <build> 
<sourceDirectory>src/main/scala</sourceDirectory> <plugins> <plugin> <!-- see
                http://davidb.github.com/scala-maven-plugin
                <http://davidb.github.com/scala-maven-plugin> -->
                <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.3</version> 
<executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> 
</configuration>
                </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> 
<artifactId>maven-surefire-plugin</artifactId> <version>2.13</version> <configuration> <useFile>false</useFile> 
<disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue
                like NoDefClassError,... --> <!--
                useManifestOnlyJar>false</useManifestOnlyJar -->
                <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> 
</includes> </configuration> </plugin>
                <!-- "package" command plugin --> <plugin> 
<artifactId>maven-assembly-plugin</artifactId> <version>2.4.1</version> <configuration> 
<descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef> 
</descriptorRefs> </configuration>
                <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> 
<goal>single</goal> </goals> </execution> </executions> </plugin>
                </plugins> </build> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> <version>10.13.1.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala_2.11</artifactId> 
<version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> 
<version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis_2.11</artifactId> <version>1.8.0</version> <scope>system</scope> <systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>1.8.8</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-kinesis</artifactId> 
<version>1.11.579</version> </dependency> <dependency> <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.1</version> </dependency> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.4</version> </dependency> <!--
                
https://mvnrepository.com/artifact/org.apache.commons/commons-csv
                
<https://mvnrepository.com/artifact/org.apache.commons/commons-csv>
                --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-csv</artifactId> <version>1.7</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-compress</artifactId> 
<version>1.4.1</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> 
<artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> <version>1.11.579</version> </dependency> <!-- For Parquet -->
                <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.parquet</groupId> 
<artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-twitter_2.10</artifactId> 
<version>1.1.4-hadoop1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.json4s</groupId> <artifactId>json4s-jackson_2.11</artifactId> <version>3.6.7</version> </dependency> <dependency> 
<groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-cloudsearch</artifactId> <version>1.11.500</version> </dependency> <!--
                
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2
                
<https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2>
                --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <version>2.8.3-1.8.3</version> </dependency> <dependency> 
<groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> 
<artifactId>hadoop-common</artifactId> <version>2.8.5</version> </dependency> </dependencies> </project>

                Scala Code:-

                package com.aws.examples.s3


                import org.apache.flink.api.common.typeinfo.Types
                import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
                import org.apache.flink.table.api.{Table, TableEnvironment}
                import org.apache.flink.table.api.java.BatchTableEnvironment
                import org.apache.flink.table.sources.CsvTableSource

                object Batch {

                   def main(args: Array[String]):Unit = {
val env: ExecutionEnvironment =
                       ExecutionEnvironment.getExecutionEnvironment val 
tableEnv: BatchTableEnvironment =
                       TableEnvironment.getTableEnvironment(env)
                     /* create table from csv */ val tableSrc = CsvTableSource
                       .builder()
                       .path("s3a://bucket/csvfolder/avg.txt")
                       .fieldDelimiter(",")
                       .field("date", Types.STRING)
                       .field("month", Types.STRING)
                       .field("category", Types.STRING)
                       .field("product", Types.STRING)
                       .field("profit", Types.INT)
                       .build()

                     tableEnv.registerTableSource("CatalogTable", tableSrc)

                     val catalog: Table = tableEnv.scan("CatalogTable")
                     /* querying with Table API */ val order20: Table = catalog
                       .filter(" category === 'Category5'")
                       .groupBy("month")
                       .select("month, profit.sum as sum")
                       .orderBy("sum")

                     val order20Set: DataSet[Row1] = 
tableEnv.toDataSet(order20, classOf[Row1])

                     order20Set.writeAsText("src/main/resources/table1/table1")

                     //tableEnv.toAppendStream(order20,
                classOf[Row]).writeAsText("/home/jivesh/table") 
env.execute("State")

                   }

                   class Row1 {

                     var month:String = _

                     var sum: java.lang.Integer = _

                     override def toString():String =month +"," +sum }

                }

                Error:-
                *Caused by:
                
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
                Unable to load credentials from service endpoint*

                *Caused by:
                
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException:
                No AWS Credentials provided by
                BasicAWSCredentialsProvider
                EnvironmentVariableCredentialsProvider
                InstanceProfileCredentialsProvider :
                
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
                Unable to load credentials from service endpoint*


                Thanks

                
------------------------------------------------------------------------

                The information contained in this e-mail is
                confidential and/or proprietary to Capital One and/or
                its affiliates and may only be used solely in
                performance of work or services for Capital One. The
                information transmitted herewith is intended only for
                use by the individual or entity to which it is
                addressed. If the reader of this message is not the
                intended recipient, you are hereby notified that any
                review, retransmission, dissemination, distribution,
                copying or other use of, or taking of any action in
                reliance upon this information is strictly
                prohibited. If you have received this communication
                in error, please contact the sender and delete the
                material from your computer.




-- Thanks & Regards
            Sri Tummala



-- Thanks & Regards
        Sri Tummala



-- Thanks & Regards
    Sri Tummala



--
Thanks & Regards
Sri Tummala


Reply via email to