The concept of plugins does not exist in 1.8.1. As a result it should be sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to your project.

On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:
Let's close this issue guys please answer my questions. I am using Flink 1.8.1.

Thanks
Sri

On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala <kali.tumm...@gmail.com <mailto:kali.tumm...@gmail.com>> wrote:

    Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
    ConfigConstants.ENV_FLINK_LIB_DIR will this work ?

    Thanks
    Sri

    On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala
    <kali.tumm...@gmail.com <mailto:kali.tumm...@gmail.com>> wrote:

        I am not getting what you both are talking about lets be clear.

        Plugin ? what is it ? Is it a Jar which I have to download
        from the Internet and place it in a folder ? Is this the Jar
        which I have to download ? (flink-s3-fs-hadoop) ?

        Will this belo solution work ?
        
https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being
        
<https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being>


        Thanks
        Sri



        On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler
        <ches...@apache.org <mailto:ches...@apache.org>> wrote:

            Well, you could do this before running the job:

            // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR
            environment variable, pointing to a directory containing
            the plugins

            PluginManager pluginManager =
            PluginUtils.createPluginManagerFromRootFolder(new
            Configuration());
            Filesystem.initialize(new Configuration(), pluginManager);

            On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
            Hi.

            I had the same problem. Flink use a plugins to access s3.
            When you run local it starts a mini cluster and the mini
            cluster don’t load plugins. So it’s not possible without
            modifying Flink.  In my case I wanted to investigate save
            points through Flink processor API and the workaround was
            to build my own version of the processor API and include
            the missing part.

            Med venlig hilsen / Best regards
            Lasse Nedergaard


            Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan
            Tummala <kali.tumm...@gmail.com>
            <mailto:kali.tumm...@gmail.com>:

            
            Flink,

            I am able to access Kinesis from Intellij but not S3 I
            have edited my stack overflow question with kinesis code
            , Flink is still having issues reading S3.

            
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
            
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868>


            Thanks
            Sri

            On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan
            Tummala <kali.tumm...@gmail.com
            <mailto:kali.tumm...@gmail.com>> wrote:

                my stack overflow question.

                
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
                
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>

                On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan
                Tummala <kali.tumm...@gmail.com
                <mailto:kali.tumm...@gmail.com>> wrote:

                    Here is my Intellij question.

                    
https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
                    
<https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868>

                    On Mon, Mar 8, 2021 at 11:22 AM sri hari kali
                    charan Tummala <kali.tumm...@gmail.com
                    <mailto:kali.tumm...@gmail.com>> wrote:


                            Hi Flink Experts,


                            I am trying to read an S3 file from my
                            Intellij using Flink I am.comimg across
                            Aws Auth error can someone help below
                            are all the details.

                            I have Aws credentials in
                            homefolder/.aws/credentials


                            My Intellij Environment Variables:-
                            ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
                            
FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config

                            flink-conf.yaml file content:-

                            fs.hdfs.hadoopconf: 
/Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config

                            core-site.xml file content:-

                            <?xml version="1.0"?> <?xml-stylesheet
                            type="text/xsl"
                            href="configuration.xsl"?>
                            <configuration> <property> <name>fs.s3.impl</name> <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> </property> 
<property> <name>fs.s3.buffer.dir</name> <value>/tmp</value> </property> <property> <name>fs.s3a.server-side-encryption-algorithm</name> 
<value>AES256</value> </property> <!--<property>
                            <name>fs.s3a.aws.credentials.provider</name>
                            
<value>org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider</value>
                            </property>--> <property> <name>fs.s3a.aws.credentials.provider</name> 
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value> </property> <property> <name>fs.s3a.access.key</name> 
<value></value> </property>
                            <property> <name>fs.s3a.secret.key</name> <value></value> 
</property>
                            <property> <name>fs.s3a.session.token</name> 
<value></value> </property>
                            <property> <name>fs.s3a.proxy.host</name> <value></value> 
</property>
                            <property> <name>fs.s3a.proxy.port</name> <value>8099</value> </property> 
<property> <name>fs.s3a.proxy.username</name> <value></value> </property>
                            <property> <name>fs.s3a.proxy.password</name> 
<value></value> </property>
                            </configuration>

                            POM.xml file:-

                            <?xml version="1.0" encoding="UTF-8"?>
                            <project
                            xmlns="http://maven.apache.org/POM/4.0.0
                            <http://maven.apache.org/POM/4.0.0>"
                            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance
                            <http://www.w3.org/2001/XMLSchema-instance>"
                            
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                            <http://maven.apache.org/POM/4.0.0>
                            http://maven.apache.org/xsd/maven-4.0.0.xsd
                            <http://maven.apache.org/xsd/maven-4.0.0.xsd>">
                            <modelVersion>4.0.0</modelVersion> <groupId>FlinkStreamAndSql</groupId> 
<artifactId>FlinkStreamAndSql</artifactId> <version>1.0-SNAPSHOT</version> <build> 
<sourceDirectory>src/main/scala</sourceDirectory> <plugins> <plugin>
                            <!-- see
                            http://davidb.github.com/scala-maven-plugin
                            <http://davidb.github.com/scala-maven-plugin>
                            --> <groupId>net.alchim31.maven</groupId> 
<artifactId>scala-maven-plugin</artifactId> <version>3.1.3</version> <executions> <execution>
                            <goals> <goal>compile</goal> <goal>testCompile</goal> 
</goals> <configuration>
                            </configuration> </execution>
                            </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> 
<artifactId>maven-surefire-plugin</artifactId> <version>2.13</version> <configuration> <useFile>false</useFile> 
<disableXmlReport>true</disableXmlReport> <!-- If you have
                            classpath issue like NoDefClassError,...
                            --> <!--
                            useManifestOnlyJar>false</useManifestOnlyJar
                            --> <includes> <include>**/*Test.*</include> 
<include>**/*Suite.*</include> </includes> </configuration>
                            </plugin> <!-- "package" command plugin
                            --> <plugin> <artifactId>maven-assembly-plugin</artifactId> 
<version>2.4.1</version> <configuration>
                            <descriptorRefs> 
<descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs>
                            </configuration> <executions>
                            <execution> <id>make-assembly</id> <phase>package</phase> <goals> 
<goal>single</goal> </goals> </execution>
                            </executions> </plugin> </plugins>
                            </build> <dependencies> <dependency>
                            <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> 
<version>1.8.1</version> </dependency> <dependency>
                            <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> 
<version>1.8.1</version> </dependency> <dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-clients_2.11</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> 
<version>10.13.1.1</version> </dependency> <dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-jdbc_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-table-api-scala_2.11</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-table-api-java</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> 
<version>1.8.1</version> </dependency> <dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-table-planner_2.11</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> 
<version>1.8.1</version> </dependency> <dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-scala_2.11</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-streaming-scala_2.11</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis_2.11</artifactId> 
<version>1.8.0</version> <scope>system</scope> 
<systemPath>${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar</systemPath> </dependency> <dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>com.amazonaws</groupId> 
<artifactId>amazon-kinesis-client</artifactId> <version>1.8.8</version> </dependency> 
<dependency>
                            <groupId>com.amazonaws</groupId> 
<artifactId>aws-java-sdk-kinesis</artifactId> <version>1.11.579</version> </dependency> 
<dependency>
                            <groupId>commons-dbcp</groupId> <artifactId>commons-dbcp</artifactId> 
<version>1.2.2</version> </dependency> <dependency>
                            <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> 
<version>2.1</version> </dependency> <dependency>
                            <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> 
<version>1.4</version> </dependency> <!--
                            
https://mvnrepository.com/artifact/org.apache.commons/commons-csv
                            
<https://mvnrepository.com/artifact/org.apache.commons/commons-csv>
                            --> <dependency> <groupId>org.apache.commons</groupId> 
<artifactId>commons-csv</artifactId> <version>1.7</version> </dependency> <dependency>
                            <groupId>org.apache.commons</groupId> 
<artifactId>commons-compress</artifactId> <version>1.4.1</version> </dependency> 
<dependency>
                            <groupId>com.amazonaws</groupId> 
<artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency> 
<dependency>
                            <groupId>com.amazonaws</groupId> 
<artifactId>dynamodb-streams-kinesis-adapter</artifactId> <version>1.4.0</version> </dependency> 
<dependency>
                            <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk</artifactId> 
<version>1.11.579</version> </dependency> <!-- For
                            Parquet --> <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-hadoop-compatibility_2.11</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> 
<version>1.8.1</version> </dependency> <dependency>
                            <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> 
<version>1.10.0</version> </dependency> <dependency>
                            <groupId>org.apache.hadoop</groupId> 
<artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency> 
<dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-twitter_2.10</artifactId> <version>1.1.4-hadoop1</version> 
</dependency> <dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-connector-filesystem_2.11</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>org.json4s</groupId> <artifactId>json4s-jackson_2.11</artifactId> 
<version>3.6.7</version> </dependency> <dependency>
                            <groupId>com.amazonaws</groupId> 
<artifactId>aws-java-sdk-cloudsearch</artifactId> <version>1.11.500</version> </dependency> 
<!--
                            
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2
                            
<https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop2>
                            --> <dependency> <groupId>org.apache.flink</groupId> 
<artifactId>flink-shaded-hadoop2</artifactId> <version>2.8.3-1.8.3</version> </dependency> 
<dependency>
                            <groupId>org.apache.flink</groupId> 
<artifactId>flink-s3-fs-hadoop</artifactId> <version>1.8.1</version> </dependency> 
<dependency>
                            <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> 
<version>2.8.5</version> </dependency> </dependencies>
                            </project>

                            Scala Code:-

                            package com.aws.examples.s3


                            import org.apache.flink.api.common.typeinfo.Types
                            import org.apache.flink.api.java.{DataSet, 
ExecutionEnvironment}
                            import org.apache.flink.table.api.{Table, 
TableEnvironment}
                            import 
org.apache.flink.table.api.java.BatchTableEnvironment
                            import org.apache.flink.table.sources.CsvTableSource

                            object Batch {

                               def main(args: Array[String]):Unit = {
val env: ExecutionEnvironment =
                                   ExecutionEnvironment.getExecutionEnvironment 
val tableEnv: BatchTableEnvironment =
                                   TableEnvironment.getTableEnvironment(env)
                                 /* create table from csv */ val tableSrc = 
CsvTableSource
                                   .builder()
                                   .path("s3a://bucket/csvfolder/avg.txt")
                                   .fieldDelimiter(",")
                                   .field("date", Types.STRING)
                                   .field("month", Types.STRING)
                                   .field("category", Types.STRING)
                                   .field("product", Types.STRING)
                                   .field("profit", Types.INT)
                                   .build()

                                 tableEnv.registerTableSource("CatalogTable", 
tableSrc)

                                 val catalog: Table = 
tableEnv.scan("CatalogTable")
                                 /* querying with Table API */ val order20: 
Table = catalog
                                   .filter(" category === 'Category5'")
                                   .groupBy("month")
                                   .select("month, profit.sum as sum")
                                   .orderBy("sum")

                                 val order20Set: DataSet[Row1] = 
tableEnv.toDataSet(order20, classOf[Row1])

                                 
order20Set.writeAsText("src/main/resources/table1/table1")

                                 //tableEnv.toAppendStream(order20,
                            classOf[Row]).writeAsText("/home/jivesh/table")
                            env.execute("State")

                               }

                               class Row1 {

                                 var month:String = _

                                 var sum: java.lang.Integer = _

                                 override def toString():String =month +"," 
+sum }

                            }

                            Error:-
                            *Caused by:
                            
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
                            Unable to load credentials from service
                            endpoint*

                            *Caused by:
                            
org.apache.flink.fs.s3base.shaded.com.amazonaws.AmazonClientException:
                            No AWS Credentials provided by
                            BasicAWSCredentialsProvider
                            EnvironmentVariableCredentialsProvider
                            InstanceProfileCredentialsProvider :
                            
org.apache.flink.fs.s3base.shaded.com.amazonaws.SdkClientException:
                            Unable to load credentials from service
                            endpoint*


                            Thanks

                            
------------------------------------------------------------------------

                            The information contained in this e-mail
                            is confidential and/or proprietary to
                            Capital One and/or its affiliates and
                            may only be used solely in performance
                            of work or services for Capital One. The
                            information transmitted herewith is
                            intended only for use by the individual
                            or entity to which it is addressed. If
                            the reader of this message is not the
                            intended recipient, you are hereby
                            notified that any review,
                            retransmission, dissemination,
                            distribution, copying or other use of,
                            or taking of any action in reliance upon
                            this information is strictly prohibited.
                            If you have received this communication
                            in error, please contact the sender and
                            delete the material from your computer.




-- Thanks & Regards
                        Sri Tummala



-- Thanks & Regards
                    Sri Tummala



-- Thanks & Regards
                Sri Tummala



-- Thanks & Regards
            Sri Tummala




-- Thanks & Regards
        Sri Tummala



-- Thanks & Regards
    Sri Tummala

--
Thanks & Regards
Sri Tummala


Reply via email to