DataBricks Spark/Scala Opportunity
> > Hi All, > > Note:- (This is a full time position) > > I copied Geoff in this email chain who is actively looking for Spark/Scala > Developer for a full time position with DataBricks 200k+ > (Salary/Bonus/Stocks/H1b/GC Process). > > If you know anyone or your friends who is good in Spark/Scala either > American Citizen/GC/H1B ask him to contact Geoff ( > geoffzun...@databricks.com). > > 415-328-4879 > > > Thanks > Sri > > -- Thanks & Regards Sri Tummala
Flink Read S3 Intellij IDEA Error
> Hi Flink Experts, > I am trying to read an S3 file from my Intellij using Flink I am.comimg > across Aws Auth error can someone help below are all the details. > > I have Aws credentials in homefolder/.aws/credentials > My Intellij Environment Variables:- > ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 > > FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config > > flink-conf.yaml file content:- > > fs.hdfs.hadoopconf: > /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config > > core-site.xml file content:- > > > > > > > fs.s3.impl > org.apache.hadoop.fs.s3a.S3AFileSystem > > > > fs.s3.buffer.dir > /tmp > > > > fs.s3a.server-side-encryption-algorithm > AES256 > > > > > > fs.s3a.aws.credentials.provider > org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider > > > fs.s3a.access.key > > > > fs.s3a.secret.key > > > > fs.s3a.session.token > > > > > fs.s3a.proxy.host > > > > fs.s3a.proxy.port > 8099 > > > fs.s3a.proxy.username > > > > fs.s3a.proxy.password > > > > > > POM.xml file:- > > > http://maven.apache.org/POM/4.0.0"; > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; > xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 > http://maven.apache.org/xsd/maven-4.0.0.xsd";> > 4.0.0 > > FlinkStreamAndSql > FlinkStreamAndSql > 1.0-SNAPSHOT > > src/main/scala > > > > net.alchim31.maven > scala-maven-plugin > 3.1.3 > > > > compile > testCompile > > > > > > > > org.apache.maven.plugins > maven-surefire-plugin > 2.13 > > false > true > > > > **/*Test.* > **/*Suite.* > > > > > > > maven-assembly-plugin > 2.4.1 > > > jar-with-dependencies > > > > > make-assembly > package > > single > > > > > > > > > > org.apache.flink > flink-core > 1.8.1 > > > > org.apache.flink > flink-core > 1.8.1 > > > > org.apache.flink > flink-clients_2.11 > 1.8.1 > > > > org.apache.derby > derby > 10.13.1.1 > > > > org.apache.flink > flink-jdbc_2.11 > 1.8.1 > > > > org.apache.flink > flink-table-api-scala_2.11 > 1.8.1 > > > > org.apache.flink > flink-table-api-java > 1.8.1 > > > > > org.apache.flink > flink-table > 1.8.1 > > > > org.apache.flink > flink-table-planner_2.11 > 1.8.1 > > > > > org.apache.flink > flink-json > 1.8.1 > > > > org.apache.flink > flink-scala_2.11 > 1.8.1 > > > >org.apache.flink >flink-scala_2.11 >1.8.1 > > > >org.apache.flink >flink-streaming-scala_2.11 >1.8.1 > > > >org.apache.flink >flink-connector-kinesis_2.11 >1.8.0 >system > > ${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar > > > >org.apache.flink >flink-connector-kafka-0.11_2.11 >1.8.1 > > > >com.amazonaws >amazon-kinesis-client >1.8.8 > > >
Re: Flink Read S3 Intellij IDEA Error
Here is my Intellij question. https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > > Hi Flink Experts, >> > > I am trying to read an S3 file from my Intellij using Flink I am.comimg >> across Aws Auth error can someone help below are all the details. >> > > >> I have Aws credentials in homefolder/.aws/credentials >> > > My Intellij Environment Variables:- >> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 >> >> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config >> >> flink-conf.yaml file content:- >> >> fs.hdfs.hadoopconf: >> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config >> >> core-site.xml file content:- >> >> >> >> >> >> >> fs.s3.impl >> org.apache.hadoop.fs.s3a.S3AFileSystem >> >> >> >> fs.s3.buffer.dir >> /tmp >> >> >> >> fs.s3a.server-side-encryption-algorithm >> AES256 >> >> >> >> >> >> fs.s3a.aws.credentials.provider >> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider >> >> >> fs.s3a.access.key >> >> >> >> fs.s3a.secret.key >> >> >> >> fs.s3a.session.token >> >> >> >> >> fs.s3a.proxy.host >> >> >> >> fs.s3a.proxy.port >> 8099 >> >> >> fs.s3a.proxy.username >> >> >> >> fs.s3a.proxy.password >> >> >> >> >> >> POM.xml file:- >> >> >> http://maven.apache.org/POM/4.0.0"; >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; >> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >> http://maven.apache.org/xsd/maven-4.0.0.xsd";> >> 4.0.0 >> >> FlinkStreamAndSql >> FlinkStreamAndSql >> 1.0-SNAPSHOT >> >> src/main/scala >> >> >> >> net.alchim31.maven >> scala-maven-plugin >> 3.1.3 >> >> >> >> compile >> testCompile >> >> >> >> >> >> >> >> org.apache.maven.plugins >> maven-surefire-plugin >> 2.13 >> >> false >> true >> >> >> >> **/*Test.* >> **/*Suite.* >> >> >> >> >> >> >> maven-assembly-plugin >> 2.4.1 >> >> >> jar-with-dependencies >> >> >> >> >> make-assembly >> package >> >> single >> >> >> >> >> >> >> >> >> >> org.apache.flink >> flink-core >> 1.8.1 >> >> >> >> org.apache.flink >> flink-core >> 1.8.1 >> >> >> >> org.apache.flink >> flink-clients_2.11 >> 1.8.1 >> >> >> >> org.apache.derby >> derby >> 10.13.1.1 >> >> >> >> org.apache.flink >
Re: Flink Read S3 Intellij IDEA Error
my stack overflow question. https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Here is my Intellij question. > > > https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 > > On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> >> Hi Flink Experts, >>> >> >> I am trying to read an S3 file from my Intellij using Flink I am.comimg >>> across Aws Auth error can someone help below are all the details. >>> >> >> >>> I have Aws credentials in homefolder/.aws/credentials >>> >> >> My Intellij Environment Variables:- >>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 >>> >>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config >>> >>> flink-conf.yaml file content:- >>> >>> fs.hdfs.hadoopconf: >>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config >>> >>> core-site.xml file content:- >>> >>> >>> >>> >>> >>> >>> fs.s3.impl >>> org.apache.hadoop.fs.s3a.S3AFileSystem >>> >>> >>> >>> fs.s3.buffer.dir >>> /tmp >>> >>> >>> >>> fs.s3a.server-side-encryption-algorithm >>> AES256 >>> >>> >>> >>> >>> >>> fs.s3a.aws.credentials.provider >>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider >>> >>> >>> fs.s3a.access.key >>> >>> >>> >>> fs.s3a.secret.key >>> >>> >>> >>> fs.s3a.session.token >>> >>> >>> >>> >>> fs.s3a.proxy.host >>> >>> >>> >>> fs.s3a.proxy.port >>> 8099 >>> >>> >>> fs.s3a.proxy.username >>> >>> >>> >>> fs.s3a.proxy.password >>> >>> >>> >>> >>> >>> POM.xml file:- >>> >>> >>> http://maven.apache.org/POM/4.0.0"; >>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; >>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>> http://maven.apache.org/xsd/maven-4.0.0.xsd";> >>> 4.0.0 >>> >>> FlinkStreamAndSql >>> FlinkStreamAndSql >>> 1.0-SNAPSHOT >>> >>> src/main/scala >>> >>> >>> >>> net.alchim31.maven >>> scala-maven-plugin >>> 3.1.3 >>> >>> >>> >>> compile >>> testCompile >>> >>> >>> >>> >>> >>> >>> >>> org.apache.maven.plugins >>> maven-surefire-plugin >>> 2.13 >>> >>> false >>> true >>> >>> >>> >>> **/*Test.* >>> **/*Suite.* >>> >>> >>> >>> >>> >>> >>> maven-assembly-plugin >>> 2.4.1 >>> >>> >>> jar-with-dependencies >>> >>> >>> >>> >>> make-assembly >>> package >>>
Re: Flink Read S3 Intellij IDEA Error
Flink, I am able to access Kinesis from Intellij but not S3 I have edited my stack overflow question with kinesis code , Flink is still having issues reading S3. https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 Thanks Sri On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > my stack overflow question. > > > https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 > > On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Here is my Intellij question. >> >> >> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >> >> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> >>> Hi Flink Experts, >>>> >>> >>> I am trying to read an S3 file from my Intellij using Flink I am.comimg >>>> across Aws Auth error can someone help below are all the details. >>>> >>> >>> >>>> I have Aws credentials in homefolder/.aws/credentials >>>> >>> >>> My Intellij Environment Variables:- >>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 >>>> >>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config >>>> >>>> flink-conf.yaml file content:- >>>> >>>> fs.hdfs.hadoopconf: >>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config >>>> >>>> core-site.xml file content:- >>>> >>>> >>>> >>>> >>>> >>>> >>>> fs.s3.impl >>>> org.apache.hadoop.fs.s3a.S3AFileSystem >>>> >>>> >>>> >>>> fs.s3.buffer.dir >>>> /tmp >>>> >>>> >>>> >>>> fs.s3a.server-side-encryption-algorithm >>>> AES256 >>>> >>>> >>>> >>>> >>>> >>>> fs.s3a.aws.credentials.provider >>>> >>>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider >>>> >>>> >>>> fs.s3a.access.key >>>> >>>> >>>> >>>> fs.s3a.secret.key >>>> >>>> >>>> >>>> fs.s3a.session.token >>>> >>>> >>>> >>>> >>>> fs.s3a.proxy.host >>>> >>>> >>>> >>>> fs.s3a.proxy.port >>>> 8099 >>>> >>>> >>>> fs.s3a.proxy.username >>>> >>>> >>>> >>>> fs.s3a.proxy.password >>>> >>>> >>>> >>>> >>>> >>>> POM.xml file:- >>>> >>>> >>>> http://maven.apache.org/POM/4.0.0"; >>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; >>>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 >>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";> >>>> 4.0.0 >>>> >>>> FlinkStreamAndSql >>>> FlinkStreamAndSql >>>> 1.0-SNAPSHOT >>>> >>>> src/main/scala >>>> >>>> >>>> >>>> net.alchim31.maven >>>> scala-maven-plugin >>>> 3.1.3 >>>> >>>> >>>> >>>> compile >>>> testCompile >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> org.apache.maven.plugins >>>> maven-surefire-p
Re: Flink Read S3 Intellij IDEA Error
I am not getting what you both are talking about lets be clear. Plugin ? what is it ? Is it a Jar which I have to download from the Internet and place it in a folder ? Is this the Jar which I have to download ? (flink-s3-fs-hadoop) ? Will this belo solution work ? https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being Thanks Sri On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler wrote: > Well, you could do this before running the job: > > // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable, > pointing to a directory containing the plugins > > PluginManager pluginManager = > PluginUtils.createPluginManagerFromRootFolder(new Configuration()); > Filesystem.initialize(new Configuration(), pluginManager); > > On 3/10/2021 8:16 PM, Lasse Nedergaard wrote: > > Hi. > > I had the same problem. Flink use a plugins to access s3. When you run > local it starts a mini cluster and the mini cluster don’t load plugins. So > it’s not possible without modifying Flink. In my case I wanted to > investigate save points through Flink processor API and the workaround was > to build my own version of the processor API and include the missing part. > > Med venlig hilsen / Best regards > Lasse Nedergaard > > > Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala > : > > > Flink, > > I am able to access Kinesis from Intellij but not S3 I have edited my > stack overflow question with kinesis code , Flink is still having issues > reading S3. > > > https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 > > > Thanks > Sri > > On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> my stack overflow question. >> >> >> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >> >> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> Here is my Intellij question. >>> >>> >>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>> >>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> >>>> Hi Flink Experts, >>>>> >>>> >>>> I am trying to read an S3 file from my Intellij using Flink I am.comimg >>>>> across Aws Auth error can someone help below are all the details. >>>>> >>>> >>>> >>>>> I have Aws credentials in homefolder/.aws/credentials >>>>> >>>> >>>> My Intellij Environment Variables:- >>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 >>>>> >>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config >>>>> >>>>> flink-conf.yaml file content:- >>>>> >>>>> fs.hdfs.hadoopconf: >>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config >>>>> >>>>> core-site.xml file content:- >>>>> >>>>> >>>> href="configuration.xsl"?> >>>>> fs.s3.impl >>>>> org.apache.hadoop.fs.s3a.S3AFileSystem >>>>> fs.s3.buffer.dir >>>>> /tmp >>>>> fs.s3a.server-side-encryption-algorithm >>>>> AES256 >>>>> fs.s3a.aws.credentials.provider >>>>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider >>>>> fs.s3a.access.key >>>>> >>>>> fs.s3a.secret.key >>>>> fs.s3a.session.token >>>>> >>>>> fs.s3a.proxy.host >>>>> fs.s3a.proxy.port >>>>> 8099 >>>>> fs.s3a.proxy.username >>>>> fs.s3a.proxy.password >>>>> >>>>> >>>>> POM.xml file:- >>>>> >>>>> >>>> xmlns="http://maven.apache.org/POM/4.0.0"; >>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
Re: Flink Read S3 Intellij IDEA Error
Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see ConfigConstants.ENV_FLINK_LIB_DIR will this work ? Thanks Sri On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > I am not getting what you both are talking about lets be clear. > > Plugin ? what is it ? Is it a Jar which I have to download from the > Internet and place it in a folder ? Is this the Jar which I have to > download ? (flink-s3-fs-hadoop) ? > > Will this belo solution work ? > > https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being > > Thanks > Sri > > > > On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler > wrote: > >> Well, you could do this before running the job: >> >> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable, >> pointing to a directory containing the plugins >> >> PluginManager pluginManager = >> PluginUtils.createPluginManagerFromRootFolder(new Configuration()); >> Filesystem.initialize(new Configuration(), pluginManager); >> >> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote: >> >> Hi. >> >> I had the same problem. Flink use a plugins to access s3. When you run >> local it starts a mini cluster and the mini cluster don’t load plugins. So >> it’s not possible without modifying Flink. In my case I wanted to >> investigate save points through Flink processor API and the workaround was >> to build my own version of the processor API and include the missing part. >> >> Med venlig hilsen / Best regards >> Lasse Nedergaard >> >> >> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala >> : >> >> >> Flink, >> >> I am able to access Kinesis from Intellij but not S3 I have edited my >> stack overflow question with kinesis code , Flink is still having issues >> reading S3. >> >> >> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 >> >> >> Thanks >> Sri >> >> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> my stack overflow question. >>> >>> >>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>> >>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> Here is my Intellij question. >>>> >>>> >>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>> >>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < >>>> kali.tumm...@gmail.com> wrote: >>>> >>>>> >>>>> Hi Flink Experts, >>>>>> >>>>> >>>>> I am trying to read an S3 file from my Intellij using Flink I >>>>>> am.comimg across Aws Auth error can someone help below are all the >>>>>> details. >>>>>> >>>>> >>>>> >>>>>> I have Aws credentials in homefolder/.aws/credentials >>>>>> >>>>> >>>>> My Intellij Environment Variables:- >>>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 >>>>>> >>>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config >>>>>> >>>>>> flink-conf.yaml file content:- >>>>>> >>>>>> fs.hdfs.hadoopconf: >>>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config >>>>>> >>>>>> core-site.xml file content:- >>>>>> >>>>>> >>>>> href="configuration.xsl"?> >>>>>> fs.s3.impl >>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem >>>>>> fs.s3.buffer.dir >>>>>> /tmp >>>>>> fs.s3a.server-side-encryption-algorithm >>>>>> AES256 >>>>>> fs.s3a.aws.credentials.provider >>>>>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvi
Re: Flink Read S3 Intellij IDEA Error
Let's close this issue guys please answer my questions. I am using Flink 1.8.1. Thanks Sri On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see > ConfigConstants.ENV_FLINK_LIB_DIR will this work ? > > Thanks > Sri > > On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> I am not getting what you both are talking about lets be clear. >> >> Plugin ? what is it ? Is it a Jar which I have to download from the >> Internet and place it in a folder ? Is this the Jar which I have to >> download ? (flink-s3-fs-hadoop) ? >> >> Will this belo solution work ? >> >> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being >> >> Thanks >> Sri >> >> >> >> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler >> wrote: >> >>> Well, you could do this before running the job: >>> >>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable, >>> pointing to a directory containing the plugins >>> >>> PluginManager pluginManager = >>> PluginUtils.createPluginManagerFromRootFolder(new Configuration()); >>> Filesystem.initialize(new Configuration(), pluginManager); >>> >>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote: >>> >>> Hi. >>> >>> I had the same problem. Flink use a plugins to access s3. When you run >>> local it starts a mini cluster and the mini cluster don’t load plugins. So >>> it’s not possible without modifying Flink. In my case I wanted to >>> investigate save points through Flink processor API and the workaround was >>> to build my own version of the processor API and include the missing part. >>> >>> Med venlig hilsen / Best regards >>> Lasse Nedergaard >>> >>> >>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala >>> : >>> >>> >>> Flink, >>> >>> I am able to access Kinesis from Intellij but not S3 I have edited my >>> stack overflow question with kinesis code , Flink is still having issues >>> reading S3. >>> >>> >>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 >>> >>> >>> Thanks >>> Sri >>> >>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> my stack overflow question. >>>> >>>> >>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>> >>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < >>>> kali.tumm...@gmail.com> wrote: >>>> >>>>> Here is my Intellij question. >>>>> >>>>> >>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>>> >>>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < >>>>> kali.tumm...@gmail.com> wrote: >>>>> >>>>>> >>>>>> Hi Flink Experts, >>>>>>> >>>>>> >>>>>> I am trying to read an S3 file from my Intellij using Flink I >>>>>>> am.comimg across Aws Auth error can someone help below are all the >>>>>>> details. >>>>>>> >>>>>> >>>>>> >>>>>>> I have Aws credentials in homefolder/.aws/credentials >>>>>>> >>>>>> >>>>>> My Intellij Environment Variables:- >>>>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1 >>>>>>> >>>>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config >>>>>>> >>>>>>> flink-conf.yaml file content:- >>>>>>> >>>>>>> fs.hdfs.hadoopconf: >>>>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config >>>>>>> >>>>>>> core-site.xml file content:- >>>>>>> >>>>>>> >>>>>
Re: Flink Read S3 Intellij IDEA Error
Which I already did in my pin still its not working. Thanks Sri On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler wrote: > The concept of plugins does not exist in 1.8.1. As a result it should be > sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to > your project. > > On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote: > > Let's close this issue guys please answer my questions. I am using Flink > 1.8.1. > > Thanks > Sri > > On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see >> ConfigConstants.ENV_FLINK_LIB_DIR will this work ? >> >> Thanks >> Sri >> >> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> I am not getting what you both are talking about lets be clear. >>> >>> Plugin ? what is it ? Is it a Jar which I have to download from the >>> Internet and place it in a folder ? Is this the Jar which I have to >>> download ? (flink-s3-fs-hadoop) ? >>> >>> Will this belo solution work ? >>> >>> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being >>> >>> Thanks >>> Sri >>> >>> >>> >>> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler >>> wrote: >>> >>>> Well, you could do this before running the job: >>>> >>>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable, >>>> pointing to a directory containing the plugins >>>> >>>> PluginManager pluginManager = >>>> PluginUtils.createPluginManagerFromRootFolder(new Configuration()); >>>> Filesystem.initialize(new Configuration(), pluginManager); >>>> >>>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote: >>>> >>>> Hi. >>>> >>>> I had the same problem. Flink use a plugins to access s3. When you run >>>> local it starts a mini cluster and the mini cluster don’t load plugins. So >>>> it’s not possible without modifying Flink. In my case I wanted to >>>> investigate save points through Flink processor API and the workaround was >>>> to build my own version of the processor API and include the missing part. >>>> >>>> Med venlig hilsen / Best regards >>>> Lasse Nedergaard >>>> >>>> >>>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala >>>> : >>>> >>>> >>>> Flink, >>>> >>>> I am able to access Kinesis from Intellij but not S3 I have edited my >>>> stack overflow question with kinesis code , Flink is still having issues >>>> reading S3. >>>> >>>> >>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 >>>> >>>> >>>> Thanks >>>> Sri >>>> >>>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala < >>>> kali.tumm...@gmail.com> wrote: >>>> >>>>> my stack overflow question. >>>>> >>>>> >>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>>> >>>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < >>>>> kali.tumm...@gmail.com> wrote: >>>>> >>>>>> Here is my Intellij question. >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>>>> >>>>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala < >>>>>> kali.tumm...@gmail.com> wrote: >>>>>> >>>>>>> >>>>>>> Hi Flink Experts, >>>>>>>> >>>>>>> >>>>>>> I am trying to read an S3 file from my Intellij using Flink I >>>>>>>> am.comimg across Aws Auth error can someone help below are all the >>>>>>>> details. >>>>>>>> >>>>>>> >>>>>>> >>>>>>>&
Re: Flink Read S3 Intellij IDEA Error
If anyone working have flink version 1.8.1 code reading S3 in Intellij in public GitHub please pass it on that will be huge help. Thanks Sri On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Which I already did in my pin still its not working. > > Thanks > Sri > > On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler wrote: > >> The concept of plugins does not exist in 1.8.1. As a result it should be >> sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to >> your project. >> >> On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote: >> >> Let's close this issue guys please answer my questions. I am using Flink >> 1.8.1. >> >> Thanks >> Sri >> >> On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see >>> ConfigConstants.ENV_FLINK_LIB_DIR will this work ? >>> >>> Thanks >>> Sri >>> >>> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> I am not getting what you both are talking about lets be clear. >>>> >>>> Plugin ? what is it ? Is it a Jar which I have to download from the >>>> Internet and place it in a folder ? Is this the Jar which I have to >>>> download ? (flink-s3-fs-hadoop) ? >>>> >>>> Will this belo solution work ? >>>> >>>> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being >>>> >>>> Thanks >>>> Sri >>>> >>>> >>>> >>>> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler >>>> wrote: >>>> >>>>> Well, you could do this before running the job: >>>>> >>>>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable, >>>>> pointing to a directory containing the plugins >>>>> >>>>> PluginManager pluginManager = >>>>> PluginUtils.createPluginManagerFromRootFolder(new Configuration()); >>>>> Filesystem.initialize(new Configuration(), pluginManager); >>>>> >>>>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote: >>>>> >>>>> Hi. >>>>> >>>>> I had the same problem. Flink use a plugins to access s3. When you run >>>>> local it starts a mini cluster and the mini cluster don’t load plugins. So >>>>> it’s not possible without modifying Flink. In my case I wanted to >>>>> investigate save points through Flink processor API and the workaround was >>>>> to build my own version of the processor API and include the missing part. >>>>> >>>>> Med venlig hilsen / Best regards >>>>> Lasse Nedergaard >>>>> >>>>> >>>>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala >>>>> : >>>>> >>>>> >>>>> Flink, >>>>> >>>>> I am able to access Kinesis from Intellij but not S3 I have edited my >>>>> stack overflow question with kinesis code , Flink is still having issues >>>>> reading S3. >>>>> >>>>> >>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 >>>>> >>>>> >>>>> Thanks >>>>> Sri >>>>> >>>>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala < >>>>> kali.tumm...@gmail.com> wrote: >>>>> >>>>>> my stack overflow question. >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>>>> >>>>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala < >>>>>> kali.tumm...@gmail.com> wrote: >>>>>> >>>>>>> Here is my Intellij question. >>>>>>> >>>>>>> >>>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868 >>>>>>> >>>>>>> On Mo
Re: Flink Read S3 Intellij IDEA Error
Same error. On Fri, 12 Mar 2021 at 09:01, ChesnaSchepler wrote: > From the exception I would conclude that your core-site.xml file is not > being picked up. > > AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so > try setting HADOOP_CONF_DIR to the directory that the file resides in. > > On 3/12/2021 5:10 PM, sri hari kali charan Tummala wrote: > > If anyone working have flink version 1.8.1 code reading S3 in Intellij in > public GitHub please pass it on that will be huge help. > > > Thanks > Sri > > On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Which I already did in my pin still its not working. >> >> Thanks >> Sri >> >> On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler >> wrote: >> >>> The concept of plugins does not exist in 1.8.1. As a result it should be >>> sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to >>> your project. >>> >>> On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote: >>> >>> Let's close this issue guys please answer my questions. I am using Flink >>> 1.8.1. >>> >>> Thanks >>> Sri >>> >>> On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see >>>> ConfigConstants.ENV_FLINK_LIB_DIR will this work ? >>>> >>>> Thanks >>>> Sri >>>> >>>> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala < >>>> kali.tumm...@gmail.com> wrote: >>>> >>>>> I am not getting what you both are talking about lets be clear. >>>>> >>>>> Plugin ? what is it ? Is it a Jar which I have to download from the >>>>> Internet and place it in a folder ? Is this the Jar which I have to >>>>> download ? (flink-s3-fs-hadoop) ? >>>>> >>>>> Will this belo solution work ? >>>>> >>>>> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being >>>>> >>>>> Thanks >>>>> Sri >>>>> >>>>> >>>>> >>>>> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler >>>>> wrote: >>>>> >>>>>> Well, you could do this before running the job: >>>>>> >>>>>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment >>>>>> variable, pointing to a directory containing the plugins >>>>>> >>>>>> PluginManager pluginManager = >>>>>> PluginUtils.createPluginManagerFromRootFolder(new Configuration()); >>>>>> Filesystem.initialize(new Configuration(), pluginManager); >>>>>> >>>>>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote: >>>>>> >>>>>> Hi. >>>>>> >>>>>> I had the same problem. Flink use a plugins to access s3. When you >>>>>> run local it starts a mini cluster and the mini cluster don’t load >>>>>> plugins. >>>>>> So it’s not possible without modifying Flink. In my case I wanted to >>>>>> investigate save points through Flink processor API and the workaround >>>>>> was >>>>>> to build my own version of the processor API and include the missing >>>>>> part. >>>>>> >>>>>> Med venlig hilsen / Best regards >>>>>> Lasse Nedergaard >>>>>> >>>>>> >>>>>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala >>>>>> : >>>>>> >>>>>> >>>>>> Flink, >>>>>> >>>>>> I am able to access Kinesis from Intellij but not S3 I have edited my >>>>>> stack overflow question with kinesis code , Flink is still having issues >>>>>> reading S3. >>>>>> >>>>>> >>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868 >>>>>> >>>>>> >>>>>> Thanks >>>>>> Sri >>>>>> >>>>>> On Tue, Mar 9, 2021 at 11:30
Re: Running flink on AWS ECS
Aws already has auto scale flink cluster it’s called Kinesis Data Analytics just add your flink Jar to Kinesis Sql analytics that’s all , aws will auto provision a flink cluster and do the admin part for you. On Saturday, September 28, 2019, David Anderson wrote: > I believe there can be advantages and disadvantages in both > directions. For example, fewer containers with multiple slots reduces > the effort the Flink Master has to do whenever global coordination is > required, i.e., during checkpointing. And the network stack in the > task managers is optimized to take advantage of locality, whenever > possible. > > On the other hand, if you have a lot of pressure on the heap (e.g., > because you are using a heap-based state backend), then having more, > smaller task managers can reduce latency by reducing the impact of > garbage collection pauses. > > I'm sure I've overlooked some factors, but the bottom line appears to > be that there's no one-size-fits-all answer. > > David > > On Wed, Sep 25, 2019 at 5:43 PM Navneeth Krishnan > wrote: > > > > Thanks Terry, the reason why I asked this is because somewhere I saw > running one slot per container is beneficial. I couldn’t find the where I > saw that. > > Also I think running it with multiple slots will reduce IPC since some > of the data will be processed writhing the same JVM. > > > > Thanks > > > > On Wed, Sep 25, 2019 at 1:16 AM Terry Wang wrote: > >> > >> Hi, Navneeth, > >> > >> I think both is ok. > >> IMO, run one container with number of slots same as virtual cores may > be better for slots can share the Flink Framework and thus reduce memory > cost. > >> > >> Best, > >> Terry Wang > >> > >> > >> > >> > 在 2019年9月25日,下午3:26,Navneeth Krishnan 写道: > >> > > >> > Hi All, > >> > > >> > I’m currently running flink on amazon ecs and I have assigned task > slots based on vcpus per instance. Is it beneficial to run a separate > container with one slot each or one container with number of slots same as > virtual cores? > >> > > >> > Thanks > >> > -- Thanks & Regards Sri Tummala
Re: AWS Client Builder with default credentials
check this. https://github.com/kali786516/FlinkStreamAndSql/blob/b8bcbadaa3cb6bfdae891f10ad1205e256adbc1e/src/main/scala/com/aws/examples/dynamodb/dynStreams/FlinkDynamoDBStreams.scala#L42 https://github.com/kali786516/FlinkStreamAndSql/blob/b8bcbadaa3cb6bfdae891f10ad1205e256adbc1e/src/main/scala/com/aws/examples/kinesis/consumer/transactionScalaWorkingExample/TransactionScalaConsumer.scala#L41 On Mon, Feb 24, 2020 at 9:08 AM David Magalhães wrote: > Hi Robert, thanks for your reply. > > GlobalConfiguration.loadConfiguration was useful to check if a > flink-conf.yml file was on resources, for the integration tests that I'm > doing. On the cluster I will use the default configurations. > > On Fri, Feb 21, 2020 at 10:58 AM Robert Metzger > wrote: > >> There are multiple ways of passing configuration parameters to your user >> defined code in Flink >> >> a) use getRuntimeContext().getUserCodeClassLoader().getResource() to >> load a config file from your user code jar or the classpath. >> b) use >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters() to >> access a configuration object serialized from the main method. >> you can pass a custom object to the job parameters, or use Flink's >> "Configuration" object in your main method: >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> Configuration config = new Configuration(); >> config.setString("foo", "bar"); >> env.getConfig().setGlobalJobParameters(config); >> >> c) Load the flink-conf.yaml: >> >> Configuration conf = GlobalConfiguration.loadConfiguration(); >> >> I'm not 100% sure if this approach works, as it is not intended to be >> used in user code (I believe). >> >> >> Let me know if this helps! >> >> Best, >> Robert >> >> On Thu, Feb 20, 2020 at 1:50 PM Chesnay Schepler >> wrote: >> >>> First things first, we do not intend for users to use anything in the S3 >>> filesystem modules except the filesystems itself, >>> meaning that you're somewhat treading on unsupported ground here. >>> >>> Nevertheless, the S3 modules contain a large variety of AWS-provided >>> CerentialsProvider implementations, >>> that can derive credentials from environment variables, system >>> properties, files on the classpath and many more. >>> >>> Ultimately though, you're kind of asking us how to use AWS APIs, for >>> which I would direct you to the AWS documentation. >>> >>> On 20/02/2020 13:16, David Magalhães wrote: >>> >>> I'm using >>> org.apache.flink.fs.s3base.shaded.com.amazonaws.client.builder.AwsClientBuilder >>> to create a S3 client to copy objects and delete object inside >>> a TwoPhaseCommitSinkFunction. >>> >>> Shouldn't be another way to set up configurations without put them >>> hardcoded ? Something like core-site.xml or flink-conf.yaml ? >>> >>> Right now I need to have them hardcoded like this. >>> >>> AmazonS3ClientBuilder.standard >>> .withPathStyleAccessEnabled(true) >>> .withEndpointConfiguration( >>> new EndpointConfiguration("http://minio:9000";, "us-east-1") >>> ) >>> .withCredentials( >>> new AWSStaticCredentialsProvider(new >>> BasicAWSCredentials("minio", "minio123")) >>> ) >>> .build >>> >>> Thanks >>> >>> >>> -- Thanks & Regards Sri Tummala
Batch Flink Job S3 write performance vs Spark
Hi All, have a question did anyone compared the performance of Flink batch job writing to s3 vs spark writing to s3? -- Thanks & Regards Sri Tummala
Batch Flink Job S3 write performance vs Spark
Hi All, have a question did anyone compared the performance of Flink batch job writing to s3 vs spark writing to s3? -- Thanks & Regards Sri Tummala
Re: Batch Flink Job S3 write performance vs Spark
Thank you (the two systems running on Java and using the same set of libraries), so from my understanding, Flink uses AWS SDK behind the scenes same as spark. On Wed, Feb 26, 2020 at 8:49 AM Arvid Heise wrote: > Fair benchmarks are notoriously difficult to setup. > > Usually, it's easy to find a workload where one system shines and as its > vendor you report that. Then, the competitor benchmarks a different use > case where his system outperforms ours. In the end, customers are more > confused than before. > > You should do your own benchmarks for your own workloads. That is the only > reliable way. > > In the end, both systems use similar setups and improvements in one system > are often also incorporated into the other system with some delay, such > that there should be no ground-breaking differences between the two systems > running on Java and using the same set of libraries. > Of course, if one system has a very specific optimization for your use > case, that could be much faster. > > > On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Hi All, >> >> have a question did anyone compared the performance of Flink batch job >> writing to s3 vs spark writing to s3? >> >> -- >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala
Re: Batch Flink Job S3 write performance vs Spark
Ok, thanks for the clarification. On Wed, Feb 26, 2020 at 9:22 AM Arvid Heise wrote: > Exactly. We use the hadoop-fs as an indirection on top of that, but Spark > probably does the same. > > On Wed, Feb 26, 2020 at 3:52 PM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Thank you (the two systems running on Java and using the same set of >> libraries), so from my understanding, Flink uses AWS SDK behind the scenes >> same as spark. >> >> On Wed, Feb 26, 2020 at 8:49 AM Arvid Heise wrote: >> >>> Fair benchmarks are notoriously difficult to setup. >>> >>> Usually, it's easy to find a workload where one system shines and as its >>> vendor you report that. Then, the competitor benchmarks a different use >>> case where his system outperforms ours. In the end, customers are more >>> confused than before. >>> >>> You should do your own benchmarks for your own workloads. That is the >>> only reliable way. >>> >>> In the end, both systems use similar setups and improvements in one >>> system are often also incorporated into the other system with some delay, >>> such that there should be no ground-breaking differences between the two >>> systems running on Java and using the same set of libraries. >>> Of course, if one system has a very specific optimization for your use >>> case, that could be much faster. >>> >>> >>> On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> have a question did anyone compared the performance of Flink batch job >>>> writing to s3 vs spark writing to s3? >>>> >>>> -- >>>> Thanks & Regards >>>> Sri Tummala >>>> >>>> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala
Re: Batch Flink Job S3 write performance vs Spark
sorry for being lazy I would have gone through flink source code. On Wed, Feb 26, 2020 at 9:35 AM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Ok, thanks for the clarification. > > On Wed, Feb 26, 2020 at 9:22 AM Arvid Heise wrote: > >> Exactly. We use the hadoop-fs as an indirection on top of that, but Spark >> probably does the same. >> >> On Wed, Feb 26, 2020 at 3:52 PM sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> Thank you (the two systems running on Java and using the same set of >>> libraries), so from my understanding, Flink uses AWS SDK behind the scenes >>> same as spark. >>> >>> On Wed, Feb 26, 2020 at 8:49 AM Arvid Heise wrote: >>> >>>> Fair benchmarks are notoriously difficult to setup. >>>> >>>> Usually, it's easy to find a workload where one system shines and as >>>> its vendor you report that. Then, the competitor benchmarks a different use >>>> case where his system outperforms ours. In the end, customers are more >>>> confused than before. >>>> >>>> You should do your own benchmarks for your own workloads. That is the >>>> only reliable way. >>>> >>>> In the end, both systems use similar setups and improvements in one >>>> system are often also incorporated into the other system with some delay, >>>> such that there should be no ground-breaking differences between the two >>>> systems running on Java and using the same set of libraries. >>>> Of course, if one system has a very specific optimization for your use >>>> case, that could be much faster. >>>> >>>> >>>> On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala < >>>> kali.tumm...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> have a question did anyone compared the performance of Flink batch job >>>>> writing to s3 vs spark writing to s3? >>>>> >>>>> -- >>>>> Thanks & Regards >>>>> Sri Tummala >>>>> >>>>> >>> >>> -- >>> Thanks & Regards >>> Sri Tummala >>> >>> > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala
Fwd: Stream to CSV Sink with SQL Distinct Values
Hi All, I am trying to read data from kinesis stream and applying SQL transformation (distinct) and then tryting to write to CSV sink which is failinf due to this issue (org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes.) , full code is here ( https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112 ). can anyone help me moveforward on this issue? Full Code:- // set up the streaming execution environment val env = StreamExecutionEnvironment.createLocalEnvironment //env.enableCheckpointing(10) val tEnv = TableEnvironment.getTableEnvironment(env) // Get AWS credentials val credentialsProvider = new DefaultAWSCredentialsProviderChain val credentials = credentialsProvider.getCredentials // Configure Flink Kinesis consumer val consumerConfig = new Properties consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId) consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey) consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON") // Create Kinesis stream val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig)) val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] = new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() { override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = { val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) val csvData = data.getCc_num+","+ data.getFirst+","+ data.getLast+","+ data.getTrans_num+","+ data.getTrans_time+","+ data.getCategory+","+ data.getMerchant+","+ data.getAmt+","+ data.getMerch_lat+","+ data.getMerch_long //println(csvData) val p:Array[String] = csvData.split(",") var cc_num:String = p(0) var first:String = p(1) var last:String = p(2) var trans_num:String = p(3) var trans_time:String = p(4) var category:String = p(5) var merchant:String = p(6) var amt:String = p(7) var merch_lat:String = p(8) var merch_long:String = p(9) val creationDate: Time = new Time(System.currentTimeMillis()) return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long) } } val data = kinesis.map(mapFunction) //data.print() tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')" val table = tEnv.sqlQuery(query) val table1 = table.distinct() tEnv.registerTable("fromAnotherTable",table1) table.printSchema() val csvSink:TableSink[Row] = new CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~"); val fieldNames:Array[String] = Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long") val fieldTypes:Array[TypeInformation[_]] = Array( org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING, org.apache.flink.api.common.typeinfo.Types.STRING ) tEnv.registerTableSink("s3csvTargetTransaction", fieldNames, fieldTypes, csvSink) tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long from fromAnotherTable") -- Thanks & Regards Sri Tummala
Fwd: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment
Hi , I am trying to write flink table to streaming Sink it fails at casting Java to Scala or Scala to Java, it fails at below step can anyone help me out ? about this error. val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"), new SimpleStringEncoder[Row]("UTF-8")).build() table.addSink(sink2) package com.aws.examples.kinesis.consumer.TransactionExample import java.util.Properties import com.amazonaws.auth.DefaultAWSCredentialsProviderChain import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.serialization.{SimpleStringEncoder, SimpleStringSchema} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, ConsumerConfigConstants} import org.apache.flink.table.api.{Table, TableEnvironment} import com.google.gson.{Gson, JsonObject} import org.apache.flink.api.java.tuple.{Tuple10, Tuple3} import java.sql.{DriverManager, Time} import com.aws.SchemaJavaClasses.Row1 import org.apache.flink.types.Row import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} import org.apache.flink.table.api.scala._ import org.apache.flink.table.sinks.CsvTableSink import org.apache.flink.api.java.io.jdbc import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat import org.apache.flink.table.api.java._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.table.sinks.TableSink import com.aws.customSinks.CsvCustomSink import org.apache.flink.core.fs.Path import scala.collection.JavaConversions._ import org.apache.flink.table.sources.CsvTableSource import org.apache.flink.table.api.Table import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.java.StreamTableEnvironment import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import com.aws.customSinks.CsvCustomSink import org.apache.flink.streaming.api.functions.sink.SinkFunction object KinesisConsumer { def main(args: Array[String]): Unit = { // set up the streaming execution environment val env = StreamExecutionEnvironment.createLocalEnvironment //env.enableCheckpointing(10) val tEnv = TableEnvironment.getTableEnvironment(env) // Get AWS credentials val credentialsProvider = new DefaultAWSCredentialsProviderChain val credentials = credentialsProvider.getCredentials // Configure Flink Kinesis consumer val consumerConfig = new Properties consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, credentials.getAWSAccessKeyId) consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, credentials.getAWSSecretKey) consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON") // Create Kinesis stream val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), consumerConfig)) val mapFunction: MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]] = new MapFunction[String, Tuple10[String, String, String,String,String,String,String,String,String,String]]() { override def map(s: String): Tuple10[String, String, String,String,String,String,String,String,String,String] = { val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) val csvData = data.getCc_num+","+ data.getFirst+","+ data.getLast+","+ data.getTrans_num+","+ data.getTrans_time+","+ data.getCategory+","+ data.getMerchant+","+ data.getAmt+","+ data.getMerch_lat+","+ data.getMerch_long //println(csvData) val p:Array[String] = csvData.split(",") var cc_num:String = p(0) var first:String = p(1) var last:String = p(2) var trans_num:String = p(3) var trans_time:String = p(4) var category:String = p(5) var merchant:String = p(6) var amt:String = p(7) var merch_lat:String = p(8) var merch_long:String = p(9) val creationDate: Time = new Time(System.currentTimeMillis()) return new Tuple10(cc_num, first, last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long) } } val data = kinesis.map(mapFunction) //data.print() tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,mer
Re: Stream to CSV Sink with SQL Distinct Values
Hi Weng, another issue now (Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.), here is the full code https://github.com/kali786516/FlinkStreamAndSql/blob/15e5e60d6c044bc830f5ef2d79c071389e7460d1/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L128 and pom https://github.com/kali786516/FlinkStreamAndSql/blob/master/pom.xml. Exception in thread "main" org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams. at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:100) at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:126) at com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala) tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')" val table = tEnv.sqlQuery(query) table.printSchema() import org.apache.flink.streaming.api.scala._ val test1 = tEnv.sqlQuery(query).distinct().toAppendStream[Row] test1.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3") On Mon, Jul 15, 2019 at 9:52 PM Caizhi Weng wrote: > Hi Kali, > > Currently Flink treats all aggregate functions as retractable. As > `distinct` is an aggregate function, it's considered by the planner that it > might update or retract records (although from my perspective it won't...). > Because csv table sink is an append only sink (it's hard to update what has > been written in the middle of a file), the exception you mentioned occurs. > > However, you can use `toAppendStream` method to change the retractable > stream to an append only stream. For example, > `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get > an append only stream. You can then add csv sink to this stream. > > sri hari kali charan Tummala 于2019年7月16日周二 > 上午3:32写道: > >> Hi All, >> >> I am trying to read data from kinesis stream and applying SQL >> transformation (distinct) and then tryting to write to CSV sink which is >> failinf due to this issue (org.apache.flink.table.api.TableException: >> AppendStreamTableSink requires that Table has only insert changes.) , full >> code is here ( >> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112 >> ). >> >> can anyone help me moveforward on this issue? >> >> Full Code:- >> >> // set up the streaming execution environment >> val env = StreamExecutionEnvironment.createLocalEnvironment >> //env.enableCheckpointing(10) >> >> val tEnv = TableEnvironment.getTableEnvironment(env) >> >> // Get AWS credentials >> val credentialsProvider = new DefaultAWSCredentialsProviderChain >> val credentials = credentialsProvider.getCredentials >> >> // Configure Flink Kinesis consumer >> val consumerConfig = new Properties >> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") >> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, >> credentials.getAWSAccessKeyId) >> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, >> credentials.getAWSSecretKey) >> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, >> "TRIM_HORIZON") >> >> // Create Kinesis stream >> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", >> new SimpleStringSchema(), consumerConfig)) >> >> val mapFunction: MapFunction[String, Tuple10[String, String, >> String,String,String,String,String,String,String,String]] = >> new MapFunction[String, Tuple10[String, String, >> String,String,String,String,String,String,String,String]]() { >> >> override def map(s: String): Tuple10[String, String, >> String,String,String,String,String,String,String,String] = { >> val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) >> >> val csvData = data.getCc_num+","+ >> data.getFirst+","+ >> data.getLast+","+ >> data.getTrans_num+","+ >>
Re: Stream to CSV Sink with SQL Distinct Values
Hi Lee, I did try Option 1:- it writes to CSV file only if I kill the running job. tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3", FileSystem.WriteMode.OVERWRITE,"~","|") OutPut:- 2> (true,180094108369013,John,Holland,c1ad7a1b73172ef67bd24820438f3f93,2019-07-15 22:48:40,travel,Satterfield-Lowe,81,39.015861,-119.883595) Option 2:- I tried several options thought this workaround is kind of working but I need to replace brakcets,true etc import java.io.PrintStream val fileOut = new PrintStream("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut2/out.txt") System.setOut(fileOut) tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).print() System.out.println(tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).print()) On Mon, Jul 15, 2019 at 10:03 PM JingsongLee wrote: > Hi caizhi and kali: > > I think this table should use toRetractStream instead of toAppendStream, > and you should handle the retract messages. (If you just use distinct, the > message should always be accumulate message) > > Best, JingsongLee > > -- > From:Caizhi Weng > Send Time:2019年7月16日(星期二) 09:52 > To:sri hari kali charan Tummala > Cc:user > Subject:Re: Stream to CSV Sink with SQL Distinct Values > > Hi Kali, > > Currently Flink treats all aggregate functions as retractable. As > `distinct` is an aggregate function, it's considered by the planner that it > might update or retract records (although from my perspective it won't...). > Because csv table sink is an append only sink (it's hard to update what has > been written in the middle of a file), the exception you mentioned occurs. > > However, you can use `toAppendStream` method to change the retractable > stream to an append only stream. For example, > `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get > an append only stream. You can then add csv sink to this stream. > > sri hari kali charan Tummala 于2019年7月16日周二 > 上午3:32写道: > Hi All, > > I am trying to read data from kinesis stream and applying SQL > transformation (distinct) and then tryting to write to CSV sink which is > failinf due to this issue (org.apache.flink.table.api.TableException: > AppendStreamTableSink requires that Table has only insert changes.) , full > code is here ( > https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112 > ). > > can anyone help me moveforward on this issue? > > Full Code:- > > // set up the streaming execution environment > val env = StreamExecutionEnvironment.createLocalEnvironment > //env.enableCheckpointing(10) > > val tEnv = TableEnvironment.getTableEnvironment(env) > > // Get AWS credentials > val credentialsProvider = new DefaultAWSCredentialsProviderChain > val credentials = credentialsProvider.getCredentials > > // Configure Flink Kinesis consumer > val consumerConfig = new Properties > consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") > consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, > credentials.getAWSAccessKeyId) > consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > credentials.getAWSSecretKey) > consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON") > > // Create Kinesis stream > val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", > new SimpleStringSchema(), consumerConfig)) > > val mapFunction: MapFunction[String, Tuple10[String, String, > String,String,String,String,String,String,String,String]] = > new MapFunction[String, Tuple10[String, String, > String,String,String,String,String,String,String,String]]() { > > override def map(s: String): Tuple10[String, String, > String,String,String,String,String,String,String,String] = { > val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) > > val csvData = data.getCc_num+","+ > data.getFirst+","+ > data.getLast+","+ > data.getTrans_num+","+ > data.getTrans_time+","+ > data.getCategory+","+ > data.getMerchant+","+ > data.getAmt+","+ > data.getMerch_lat+","+ > data.getMerch_long > > //println(csvData) > > val p:Array[String] = csvData.split(",") > var cc_num:String = p(0) >
Re: Stream to CSV Sink with SQL Distinct Values
Hi Lee, it writes only after the job is killed and also I dont see all the records ? is there a workaround? tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4", FileSystem.WriteMode.NO_OVERWRITE,"~","|") On Mon, Jul 15, 2019 at 10:03 PM JingsongLee wrote: > Hi caizhi and kali: > > I think this table should use toRetractStream instead of toAppendStream, > and you should handle the retract messages. (If you just use distinct, the > message should always be accumulate message) > > Best, JingsongLee > > -- > From:Caizhi Weng > Send Time:2019年7月16日(星期二) 09:52 > To:sri hari kali charan Tummala > Cc:user > Subject:Re: Stream to CSV Sink with SQL Distinct Values > > Hi Kali, > > Currently Flink treats all aggregate functions as retractable. As > `distinct` is an aggregate function, it's considered by the planner that it > might update or retract records (although from my perspective it won't...). > Because csv table sink is an append only sink (it's hard to update what has > been written in the middle of a file), the exception you mentioned occurs. > > However, you can use `toAppendStream` method to change the retractable > stream to an append only stream. For example, > `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get > an append only stream. You can then add csv sink to this stream. > > sri hari kali charan Tummala 于2019年7月16日周二 > 上午3:32写道: > Hi All, > > I am trying to read data from kinesis stream and applying SQL > transformation (distinct) and then tryting to write to CSV sink which is > failinf due to this issue (org.apache.flink.table.api.TableException: > AppendStreamTableSink requires that Table has only insert changes.) , full > code is here ( > https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112 > ). > > can anyone help me moveforward on this issue? > > Full Code:- > > // set up the streaming execution environment > val env = StreamExecutionEnvironment.createLocalEnvironment > //env.enableCheckpointing(10) > > val tEnv = TableEnvironment.getTableEnvironment(env) > > // Get AWS credentials > val credentialsProvider = new DefaultAWSCredentialsProviderChain > val credentials = credentialsProvider.getCredentials > > // Configure Flink Kinesis consumer > val consumerConfig = new Properties > consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") > consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, > credentials.getAWSAccessKeyId) > consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > credentials.getAWSSecretKey) > consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON") > > // Create Kinesis stream > val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", > new SimpleStringSchema(), consumerConfig)) > > val mapFunction: MapFunction[String, Tuple10[String, String, > String,String,String,String,String,String,String,String]] = > new MapFunction[String, Tuple10[String, String, > String,String,String,String,String,String,String,String]]() { > > override def map(s: String): Tuple10[String, String, > String,String,String,String,String,String,String,String] = { > val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) > > val csvData = data.getCc_num+","+ > data.getFirst+","+ > data.getLast+","+ > data.getTrans_num+","+ > data.getTrans_time+","+ > data.getCategory+","+ > data.getMerchant+","+ > data.getAmt+","+ > data.getMerch_lat+","+ > data.getMerch_long > > //println(csvData) > > val p:Array[String] = csvData.split(",") > var cc_num:String = p(0) > var first:String = p(1) > var last:String = p(2) > var trans_num:String = p(3) > var trans_time:String = p(4) > var category:String = p(5) > var merchant:String = p(6) > var amt:String = p(7) > var merch_lat:String = p(8) > var merch_long:String = p(9) > > val creationDate: Time = new Time(System.currentTimeMillis()) > return new Tuple10(cc_num, first, > last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long) > } > } > > val data = kinesis.map(mapFunction)
Re: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment
this is the error. org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment On Mon, Jul 15, 2019 at 9:54 PM Caizhi Weng wrote: > Hi Kali, > > What's the exception thrown or error message hinted when executing the > erroneous step? Please print them here so that we can investigate the > problem. > > sri hari kali charan Tummala 于2019年7月16日周二 > 上午4:49写道: > >> Hi , >> >> I am trying to write flink table to streaming Sink it fails at casting >> Java to Scala or Scala to Java, it fails at below step can anyone help me >> out ? about this error. >> >> >> val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new >> Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"), >> new SimpleStringEncoder[Row]("UTF-8")).build() >> >> table.addSink(sink2) >> >> >> package com.aws.examples.kinesis.consumer.TransactionExample >> >> import java.util.Properties >> >> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain >> import org.apache.flink.api.common.functions.MapFunction >> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, >> SimpleStringSchema} >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer >> import >> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, >> ConsumerConfigConstants} >> import org.apache.flink.table.api.{Table, TableEnvironment} >> import com.google.gson.{Gson, JsonObject} >> import org.apache.flink.api.java.tuple.{Tuple10, Tuple3} >> import java.sql.{DriverManager, Time} >> >> import com.aws.SchemaJavaClasses.Row1 >> import org.apache.flink.types.Row >> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat} >> import org.apache.flink.table.api.scala._ >> import org.apache.flink.table.sinks.CsvTableSink >> import org.apache.flink.api.java.io.jdbc >> import >> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder >> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat >> import org.apache.flink.table.api.java._ >> import org.apache.flink.api.common.typeinfo.TypeInformation >> import org.apache.flink.api.java.DataSet >> import org.apache.flink.table.sinks.TableSink >> import com.aws.customSinks.CsvCustomSink >> import org.apache.flink.core.fs.Path >> >> import scala.collection.JavaConversions._ >> import org.apache.flink.table.sources.CsvTableSource >> import org.apache.flink.table.api.Table >> import org.apache.flink.table.api.TableEnvironment >> import org.apache.flink.table.api.java.StreamTableEnvironment >> import org.apache.flink.streaming.api.datastream.DataStream >> import >> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink >> import com.aws.customSinks.CsvCustomSink >> import org.apache.flink.streaming.api.functions.sink.SinkFunction >> >> object KinesisConsumer { >> >> def main(args: Array[String]): Unit = { >> >> // set up the streaming execution environment >> val env = StreamExecutionEnvironment.createLocalEnvironment >> //env.enableCheckpointing(10) >> >> val tEnv = TableEnvironment.getTableEnvironment(env) >> >> // Get AWS credentials >> val credentialsProvider = new DefaultAWSCredentialsProviderChain >> val credentials = credentialsProvider.getCredentials >> >> // Configure Flink Kinesis consumer >> val consumerConfig = new Properties >> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") >> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, >> credentials.getAWSAccessKeyId) >> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, >> credentials.getAWSSecretKey) >> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, >> "TRIM_HORIZON") >> >> // Create Kinesis stream >> val kinesis = env.addSource(new >> FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), >> consumerConfig)) >> >> val mapFunction: MapFunction[String, Tuple10[String, String, >> String,String,String,String,String,String,String,String]] = >> new MapFunction[String, Tuple10[String, String, >> String,String,String,String,String,String,String,String]]() { >> >> override def map(s: String): T
Fwd: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
> > > Hi All, > > I am trying to convert sql query results value to distinct and writing to > CSV which is failing with below error. > > *Exception in thread "main" org.apache.flink.table.api.TableException: > Only tables that originate from Scala DataStreams can be converted to Scala > DataStreams.* > > > * at > org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145) > at > com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153) > at > com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)* > > Code Example:- > > val data = kinesis.map(mapFunction) > tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") > val query = "SELECT distinct > cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long > FROM transactions where cc_num not in ('cc_num')" > val table = tEnv.sqlQuery(query) > import org.apache.flink.streaming.api.scala._ > tEnv.sqlQuery(query).distinct().toRetractStream[Row] > > .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4", > FileSystem.WriteMode.NO_OVERWRITE,"~","|") > > > Thanks & Regards Sri Tummala
Re: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.
, merch_long) } } val data = kinesis.map(mapFunction) tEnv.registerDataStream("transactions", data, "cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") val query = "SELECT distinct cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num')" val table = tEnv.sqlQuery(query) import org.apache.flink.streaming.api.scala._ tEnv.sqlQuery(query).distinct().toRetractStream[Row] .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4", FileSystem.WriteMode.NO_OVERWRITE, "~", "|") env.execute() } } *POM:-* http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> 4.0.0 FlinkStreamAndSql FlinkStreamAndSql 1.0-SNAPSHOT src/main/scala src/test/scala net.alchim31.maven scala-maven-plugin 3.1.3 compile testCompile org.apache.maven.plugins maven-surefire-plugin 2.13 false true **/*Test.* **/*Suite.* maven-assembly-plugin 2.4.1 jar-with-dependencies make-assembly package single org.apache.flink flink-core 1.8.1 org.apache.flink flink-core 1.8.1 org.apache.flink flink-clients_2.11 1.8.1 org.apache.derby derby 10.13.1.1 org.apache.flink flink-jdbc_2.11 1.8.1 org.apache.flink flink-table-api-scala_2.11 1.8.1 org.apache.flink flink-table-api-java 1.8.1 org.apache.flink flink-table 1.8.1 org.apache.flink flink-table-planner_2.11 1.8.1 org.apache.flink flink-json 1.8.1 org.apache.flink flink-scala_2.11 1.8.1 org.apache.flink flink-scala_2.11 1.8.1 org.apache.flink flink-streaming-scala_2.11 1.8.1 org.apache.flink flink-connector-kinesis_2.11 1.8.0 org.apache.flink flink-connector-kafka-0.11_2.11 1.8.1 com.amazonaws amazon-kinesis-client 1.8.8 com.amazonaws aws-java-sdk-kinesis 1.11.579 commons-dbcp commons-dbcp 1.2.2 com.google.code.gson gson 2.1 commons-cli commons-cli 1.4 org.apache.commons commons-csv 1.7 org.apache.commons commons-compress 1.4.1 com.amazonaws dynamodb-streams-kinesis-adapter 1.4.0 com.amazonaws dynamodb-streams-kinesis-adapter 1.4.0
Re: table toRetractStream missing last record and adding extra column (True)
windows for question 1 or question 2 or both ? Thanks Sri On Tue, Jul 16, 2019 at 12:25 PM taher koitawala wrote: > Looks like you need a window > > On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Hi All, >> >> I am trying to write toRetractSream to CSV which is kind of working ok >> but I get extra values like True and then my output data values. >> >> Question1 :- >> I dont want true in my output data how to achieve this? >> >> Scree >> >> Question 2:- >> in the output file (CSV) I am missing data in the last line is the >> toRetractStram closing before writing to file? >> >> Screen Shot attached >> >> Code:- >> >> val data = kinesis.map(mapFunction) >> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") >> val query = "SELECT distinct >> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long >> FROM transactions where cc_num not in ('cc_num')" >> val table = tEnv.sqlQuery(query) >> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) >> >> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", >> FileSystem.WriteMode.OVERWRITE,"\n","|") >> >> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala
Trying to Convert Tuple[Boolean,Row] to [Row]
Hi All, I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am getting this error asking me for InferedR , what is InferedR in FLink? val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] = new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() { override def map(t: tuple.Tuple2[Boolean, Row]): Row = { t.f1 } /*override def map(t: tuple.Tuple2[Boolean, Row], collector: Collector[Object]): Unit = { collector.collect(t.f1) } */ } tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction) .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.OVERWRITE,"\n","|") and when I try to I get a different type of error. *Error:(143, 74) type mismatch; found : org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal] required: org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?] tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)* -- Thanks & Regards Sri Tummala
Re: table toRetractStream missing last record and adding extra column (True)
Question 1:- I did tired map function end up having issue ( https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i ) I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am getting this error asking me for InferedR , what is InferedR in FLink? val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] = new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() { override def map(t: tuple.Tuple2[Boolean, Row]): Row = { t.f1 } /*override def map(t: tuple.Tuple2[Boolean, Row], collector: Collector[Object]): Unit = { collector.collect(t.f1) } */ } tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction) .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.OVERWRITE,"\n","|") and when I try to I get a different type of error. *Error:(143, 74) type mismatch; found : org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal] required: org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?] tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).map(mymapFunction)* *Question 2:- * *I dont have any source data issue, to regenerate this issue for testing its simple.* *create a kinesis stream * *run the producer * https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala then run the consumer:- https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala Thanks Sri On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng wrote: > Hi Sri, > > Question1: > You can use a map to filter the "true", i.e, ds.map(_._2). > Note, it's ok to remove the "true" flag for distinct as it does not > generate updates. For other query contains updates, such as a non-window > group by, we should not filter the flag or the result is not correct. > > Question 2: > I can't reproduce this problem in my local environment. Maybe there is > something wrong with the source data? > > Best, Hequn > > On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> windows for question 1 or question 2 or both ? >> >> Thanks >> Sri >> >> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala >> wrote: >> >>> Looks like you need a window >>> >>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> Hi All, >>>> >>>> I am trying to write toRetractSream to CSV which is kind of working ok >>>> but I get extra values like True and then my output data values. >>>> >>>> Question1 :- >>>> I dont want true in my output data how to achieve this? >>>> >>>> Scree >>>> >>>> Question 2:- >>>> in the output file (CSV) I am missing data in the last line is the >>>> toRetractStram closing before writing to file? >>>> >>>> Screen Shot attached >>>> >>>> Code:- >>>> >>>> val data = kinesis.map(mapFunction) >>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long") >>>> val query = "SELECT distinct >>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long >>>> FROM transactions where cc_num not in ('cc_num')" >>>> val table = tEnv.sqlQuery(query) >>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]) >>>> >>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", >>>> FileSystem.WriteMode.OVERWRITE,"\n","|") >>>> >>>> >>>> >>>> -- >>>> Thanks & Regards >>>> Sri Tummala >>>> >>>> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala
Re: table toRetractStream missing last record and adding extra column (True)
column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long FROM transactions where cc_num not in ('cc_num') " val table = tEnv.sqlQuery(query) table .toRetractStream(TypeInformation.of(classOf[Row])) .map(_._2) .writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",FileSystem.WriteMode.OVERWRITE) table.printSchema() table.toRetractStream(TypeInformation.of(classOf[Row])).print() env.execute() /* table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122", FileSystem.WriteMode.OVERWRITE, "\n","|") val test = table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2) test.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut123",FileSystem.WriteMode.OVERWRITE) test.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122", FileSystem.WriteMode.OVERWRITE, "\n","|") import org.apache.flink.streaming.api.scala._ import org.apache.flink.api.common.typeinfo.TypeInformation implicit val typeInfo = TypeInformation.of(classOf[Row]) val ds = table.toRetractStream(TypeInformation.of(classOf[Row])) ds.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",FileSystem.WriteMode.OVERWRITE, "\n","|") tEnv.toRetractStream(table, TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15", FileSystem.WriteMode.NO_OVERWRITE, "\n", "|") table.distinct().writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", "\n","|") import org.apache.flink.api.common.time.Time import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.api.common.typeinfo.TypeInformation implicit val typeInfo = TypeInformation.of(classOf[Row]) table.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.OVERWRITE, "\n", "|") table.toRetractStream(TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.NO_OVERWRITE, "\n", "|") ds. writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.NO_OVERWRITE, "\n", "|") tEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6)) tEnv.toRetractStream(table) .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", FileSystem.WriteMode.OVERWRITE, "\n", "|") tEnv.toRetractStream(table,classOf[T]) */ } } On Wed, Jul 17, 2019 at 10:11 AM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Question 1:- > > I did tired map function end up having issue ( > https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i > ) > > I am trying to convert a Tuple[Boolean,Row] to Row using map function, I > am getting this error asking me for InferedR , what is InferedR in FLink? > > val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] = > new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() { > override def map(t: tuple.Tuple2[Boolean, Row]): Row = { > t.f1 > } > /*override def map(t: tuple.Tuple2[Boolean, Row], collector: > Collector[Object]): Unit = { > collector.collect(t.f1) > } > */ > } > > tEnv.toRetractStream(table, > classOf[org.apache.flink.types.Row]).map(mymapFunction) > > .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8", > FileSystem.WriteMode.OVERWRITE,"\n","|") > > and when I try to I get a different type of error. > > > > > *Error:(143, 74) type mismatch; found : > org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal] > required: > org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?] > tEnv.toRetractStream(table, > classOf[org.apache.flink.types.Row]).map(mymapFunction)* > > *Question 2:- * > *I dont have any source data issue, to regenerate this issue for testing >
Re: table toRetractStream missing last record and adding extra column (True)
yes even the delimiter can be replaced, have to test what happens if the data itself has a comma in it I need to test. table.toRetractStream(TypeInformation.of(classOf[Row])) .map(_._2.toString.replaceAll(",","~")) .writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125", FileSystem.WriteMode.OVERWRITE) On Wed, Jul 17, 2019 at 6:47 PM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > Amazing all issues resolved in one go thanks Cheng , one issue though I > can't write map.(_._2) to CSV looks like it doesn't support right now have > to be TextFile. > > below is a full code if someone wants in Scala. > > Git Code is here:- > https://github.com/kali786516/FlinkStreamAndSql > > package com.aws.examples.kinesis.consumer.transactionExampleScala > > import java.util.Properties > import com.amazonaws.auth.DefaultAWSCredentialsProviderChain > import > com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass > import com.google.gson.Gson > import org.apache.flink.api.common.functions.MapFunction > import org.apache.flink.api.common.serialization.SimpleStringSchema > import org.apache.flink.types.Row > import org.apache.flink.streaming.api.scala.{DataStream, > StreamExecutionEnvironment} > import org.apache.flink.table.api.scala.StreamTableEnvironment > import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer > import > org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, > ConsumerConfigConstants} > import org.apache.flink.api.scala._ > import org.apache.flink.table.api.scala._ > import java.sql.{DriverManager, Time} > import org.apache.flink.api.common.typeinfo.TypeInformation > import org.apache.flink.core.fs.{FileSystem, Path} > > object TransactionScalaTest { > > /* > extends RetractStreamTableSink[Row] > override def configure(strings: Array[String], typeInformations: > Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ??? > > override def getFieldNames: Array[String] = ??? > > override def getFieldTypes: Array[TypeInformation[_]] = ??? > > override def emitDataStream(dataStream: > DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ??? > > override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] > = super.getOutputType > > override def getRecordType: TypeInformation[Row] = ??? > >*/ > > def main(args: Array[String]): Unit = { > > > > // set up the streaming execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment > //env.enableCheckpointing(1) > > val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) > > // Get AWS credentials > val credentialsProvider = new DefaultAWSCredentialsProviderChain > val credentials = credentialsProvider.getCredentials > > // Configure Flink Kinesis consumer > val consumerConfig = new Properties > consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") > consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, > credentials.getAWSAccessKeyId) > consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, > credentials.getAWSSecretKey) > consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON") > > // Create Kinesis stream > val kinesis = env.addSource(new > FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(), > consumerConfig)) > > val mapFunction: MapFunction[String, (String, String, String, String, > String, String, String, String, String, String)] = > new MapFunction[String, (String, String, String, String, String, > String, String, String, String, String)]() { > > override def map(s: String): (String, String, String, String, String, > String, String, String, String, String) = { > > val data = new Gson().fromJson(s, classOf[TransactionJsonClass]) > > val csvData = data.getCc_num + "," + > data.getFirst + "," + > data.getLast + "," + > data.getTrans_num + "," + > data.getTrans_time + "," + > data.getCategory + "," + > data.getMerchant + "," + > data.getAmt + "," + > data.getMerch_lat + "," + > data.getMerch_long > > //println(csvData) > > val p: Array[String] = csvData.split(",") > var cc_num: String = p(0) > var first: String = p(1) >
Re: Is it possible to decide the order of where conditions in Flink SQL
try cte common table expressions if it supports or sql subquery. On Fri, Jul 26, 2019 at 1:00 PM Fanbin Bu wrote: > how about move query db filter to the outer select. > > On Fri, Jul 26, 2019 at 9:31 AM Tony Wei wrote: > >> Hi, >> >> If I have multiple where conditions in my SQL, is it possible to specify >> its order, so that the query >> can be executed more efficiently? >> >> For example, if I have the following SQL, it used a heavy UDF that needs >> to access database. >> However, if I can specify the order of conditions is executing >> `!user.is_robot` first then executing >> UDF, it will reduce the number of database access. Those records with >> `true` in `user.is_robot` will >> be dropped earlier and don't need to access database. >> >> select * >> >> from users >> >> where !user.is_robot and UDF_NEED_TO_QUERY_DB(user) >> >> >> Thanks, >> Tony Wei >> > -- Thanks & Regards Sri Tummala
FlinkDynamoDBStreamsConsumer Atleast Once
Hi Flink Experts, how to achieve at least once semantics with FlinkDynamoDBStreamsConsumer + DynamoDB Streams ? Flink checkpointing or save points do the job? My Scenario:- Flink application uses FlinkDynamoDBStreamsConsumer which reads latest changes from DynamoDB streams but if my software fails and the application was down for 30 minutes due to some reason can Flink application read missing changes from past 30 minutes from Dyn stream? Is it better to write dyn stream changes to kafka or kinesis topic and ask flink application to read from kafka or kinesis topic which will let's flink application to read from last successful record using Kafka offsets for example? What would be the best architecture to move forward? -- Thanks & Regards Sri Tummala
Re: What is the recommended way to run flink with high availability on AWS?
Ok, no problem. On Wed, Aug 21, 2019 at 12:22 AM Pei HE wrote: > Thanks Kali for the information. However, it doesn't work for me, because > I need features in Flink 1.7.x or later and use manged Amazon MSK. > -- > Pei > > > > On Tue, Aug 20, 2019 at 7:17 PM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> check this out AWS managed Flink cluster in aws kinesis, in AWS kinesis >> try to create Kinesis data analytics application using flink instead of sql. >> >> On Tue, Aug 20, 2019 at 9:28 PM Pei HE wrote: >> >>> Hi there, >>> Looking at Flink document [1], EMR is the recommended way to run Flink >>> on AWS. However, Flink is currently not supported in an EMR cluster with >>> multiple master nodes [2]. >>> >>> For example, I can create a EMR HA cluster with Zookeeper, Hive, etc. >>> But, if I add Flink to the application list when I create the cluster, I >>> got the following error from aws cli: >>> "An error occurred (ValidationException) when calling the RunJobFlow >>> operation: HA clusters do not support the specified applications: Flink. >>> Revise the applications and resubmit.". >>> >>> My question is what is the recommended way to run flink with high >>> availability on AWS. >>> >>> Thanks >>> -- >>> Pei >>> >>> [1]: >>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html >>> [2]: >>> https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-ha-applications.html >>> >>> >> >> -- >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala
Re: Is Scala the best language for Flink?
Yes Scala is the best. On Fri, Jan 28, 2022, 9:57 AM Nicolás Ferrario wrote: > Hi Seb. > > In my team we are migrating things to Kotlin because we find it much > easier to deal with. It's like the best of both worlds, but you do give up > on Flink Scala serializers, since the only way to get Kotlin Data Classes > working is by making them a POJO (or implementing your own TypeInfo). You > do at least gain Evolution support. > > We've found that Kotlin + Gradle are a great combination, and we end up > with quick compile times, a much simpler build config (compared to Maven > and SBT). > Another thing is that we've found that the macro createTypeInformation can > be dangerous, because sometimes it may fallback to Kryo on classes you > don't expect it to. Those implicit behaviors we see in Scala have caused us > some headaches and we prefer to explicitly type things in the Data Stream. > Just personal preference, but pointing it out in case it's useful to > someone. > > Hope this helps! > > On Mon, Jan 24, 2022 at 7:15 AM seb wrote: > >> Hi there, >> >> I am getting started with Apache Flink. I am curious whether there is a >> clear winner between developing in either Scala or Java. >> >> It sounds like Flink is typically slower to support new versions of Scala >> and that Java development might have fewer quirks. >> >> What do you think? I have experience coding in Scala, but I am more than >> happy to learn Java. >> >> Thanks in advance for sharing your thoughts! >> >> Best, >> Sebastian >> >
Flink/Scala contract positions ?
Hi Folks, Is anyone hiring for Flink or Scala Akka contract corp to corp positions ? I am open in market looking for work in Scala Spark or Flink Scala or Scala Akka world. Thanks Sri
Re: Flink/Scala contract positions ?
Hi Jing, thanks for reaching back are you offering a contract Job to fix Flink Scala to fix Scala API for Flink? yes, I would be interested we can talk if you want to schedule some time? Thanks Sri On Fri, Jun 3, 2022 at 9:00 AM Jing Ge wrote: > Hi, > > Currently, the Flink Scala API is not in a good shape. Would you like to > start from there? > > Best regards, > Jing > > On Fri, Jun 3, 2022 at 4:29 PM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Hi Folks, >> >> Is anyone hiring for Flink or Scala Akka contract corp to corp positions >> ? I am open in market looking for work in Scala Spark or Flink Scala or >> Scala Akka world. >> >> Thanks >> Sri >> > -- Thanks & Regards Sri Tummala
Re: Need help to join Apache Flink community on.Slack
Hi Jing, Please add me kali.tumm...@gmail.com. Thanks Sri On Sat, Jun 4, 2022 at 4:47 PM Jing Ge wrote: > Hi Santhosh, > > just invited you. Please check your email. Looking forward to knowing your > story! Thanks! > > To anyone else who wants to join, please send an email to > user@flink.apache.org, you might have a better chance to get the invite. > Thanks. > > Regards, > Jing > > On Sat, Jun 4, 2022 at 10:37 PM santhosh venkat < > santhoshvenkat1...@gmail.com> wrote: > >> Hi, >> >> Can you please invite me to join the apache flink slack community >> channel. We have adopted apache flink and would like to participate in the >> community forum. >> >> Thank you. >> >> Regards >> > -- Thanks & Regards Sri Tummala
Flink source Code Explanation
Hi Flink Community, I want to go through flink source code in my free time is there a document that I can go through that explains to me where to start? other than Java doc is there anything else to start my reserve engineering. Thanks & Regards Sri Tummala
Re: Flink source Code Explanation
I am getting a connection timed out error in firefox and google chrome can you double-check whether the weblink is working or not? Thanks Sri On Sun, Jun 5, 2022 at 7:01 PM Jing Ge wrote: > Hi Sri, > > Flink is very well documented. You can find it under e.g. > https://nightlies.apache.org/flink/flink-docs-master/ > > Best regards, > Jing > > On Mon, Jun 6, 2022 at 3:39 AM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Hi Flink Community, >> >> I want to go through flink source code in my free time is there a >> document that I can go through that explains to me where to start? other >> than Java doc is there anything else to start my reserve engineering. >> >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala
Flink config driven tool ?
Hi Flink Community, can someone point me to a good config-driven flink data movement tool Github repos? Imagine I build my ETL dag connecting source --> transformations --> target just using a config file. below are a few spark examples:- https://github.com/mvrpl/big-shipper https://github.com/BitwiseInc/Hydrograph Thanks & Regards Sri Tummala
Re: Flink config driven tool ?
thanks but looks like a spark tool is there something similar in flink? Thanks Sri On Tue, Jun 7, 2022 at 12:07 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hey there, > > No idea if it's any good, but just saw Apache SeaTunnel[1] today which > seems to fit your requirements. > > Best, > Austin > > [1]: https://seatunnel.apache.org/ > > On Tue, Jun 7, 2022 at 2:19 PM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Hi Flink Community, >> >> can someone point me to a good config-driven flink data movement tool >> Github repos? Imagine I build my ETL dag connecting source --> >> transformations --> target just using a config file. >> >> below are a few spark examples:- >> https://github.com/mvrpl/big-shipper >> https://github.com/BitwiseInc/Hydrograph >> >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala
Re: Flink config driven tool ?
Thanks, I'll check it out. On Tue, Jun 7, 2022, 1:31 PM Austin Cawley-Edwards wrote: > They support Flink as well. Looks like they even support the new Flink k8s > operator.[1] > > Austin > > [1]: > https://seatunnel.apache.org/docs/2.1.1/start/kubernetes#deploying-the-operator > > On Tue, Jun 7, 2022 at 3:11 PM sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> thanks but looks like a spark tool is there something similar in flink? >> >> Thanks >> Sri >> >> On Tue, Jun 7, 2022 at 12:07 PM Austin Cawley-Edwards < >> austin.caw...@gmail.com> wrote: >> >>> Hey there, >>> >>> No idea if it's any good, but just saw Apache SeaTunnel[1] today which >>> seems to fit your requirements. >>> >>> Best, >>> Austin >>> >>> [1]: https://seatunnel.apache.org/ >>> >>> On Tue, Jun 7, 2022 at 2:19 PM sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> Hi Flink Community, >>>> >>>> can someone point me to a good config-driven flink data movement tool >>>> Github repos? Imagine I build my ETL dag connecting source --> >>>> transformations --> target just using a config file. >>>> >>>> below are a few spark examples:- >>>> https://github.com/mvrpl/big-shipper >>>> https://github.com/BitwiseInc/Hydrograph >>>> >>>> Thanks & Regards >>>> Sri Tummala >>>> >>>> >> >> -- >> Thanks & Regards >> Sri Tummala >> >>
Re: Flink config driven tool ?
I would have helped you if it was written in Scala instead of Java. On Wed, Jun 15, 2022 at 2:22 AM Rakshit Ramesh < rakshit.ram...@datakaveri.org> wrote: > I'm working on such a thing. > It's in early stages and needs a lot more work. > I'm open to collaborating. > https://github.com/datakaveri/iudx-adaptor-framework > > On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala < > kali.tumm...@gmail.com> wrote: > >> Hi Flink Community, >> >> can someone point me to a good config-driven flink data movement tool >> Github repos? Imagine I build my ETL dag connecting source --> >> transformations --> target just using a config file. >> >> below are a few spark examples:- >> https://github.com/mvrpl/big-shipper >> https://github.com/BitwiseInc/Hydrograph >> >> Thanks & Regards >> Sri Tummala >> >> -- Thanks & Regards Sri Tummala
Re: Flink config driven tool ?
found one more flink tool https://www.splunk.com/en_us/products/stream-processing.html On Wed, Jun 15, 2022 at 1:18 PM Jing Ge wrote: > Hi, > > Just like Shengkai mentioned. I would strongly suggest trying SQL for ETL > dag. If you find anything that SQL does not work for you, please share your > requirements with us. We might check if it makes sense to build new > features in Flink to support them. > > Best regards, > Jing > > > On Wed, Jun 15, 2022 at 11:22 AM Rakshit Ramesh < > rakshit.ram...@datakaveri.org> wrote: > >> I'm working on such a thing. >> It's in early stages and needs a lot more work. >> I'm open to collaborating. >> https://github.com/datakaveri/iudx-adaptor-framework >> >> On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala < >> kali.tumm...@gmail.com> wrote: >> >>> Hi Flink Community, >>> >>> can someone point me to a good config-driven flink data movement tool >>> Github repos? Imagine I build my ETL dag connecting source --> >>> transformations --> target just using a config file. >>> >>> below are a few spark examples:- >>> https://github.com/mvrpl/big-shipper >>> https://github.com/BitwiseInc/Hydrograph >>> >>> Thanks & Regards >>> Sri Tummala >>> >>> -- Thanks & Regards Sri Tummala
Re: Flink config driven tool ?
this looks interesting Nasa Akka project https://github.com/NASARace/race On Mon, Jun 20, 2022 at 7:06 PM sri hari kali charan Tummala < kali.tumm...@gmail.com> wrote: > found one more flink tool > https://www.splunk.com/en_us/products/stream-processing.html > > On Wed, Jun 15, 2022 at 1:18 PM Jing Ge wrote: > >> Hi, >> >> Just like Shengkai mentioned. I would strongly suggest trying SQL for ETL >> dag. If you find anything that SQL does not work for you, please share your >> requirements with us. We might check if it makes sense to build new >> features in Flink to support them. >> >> Best regards, >> Jing >> >> >> On Wed, Jun 15, 2022 at 11:22 AM Rakshit Ramesh < >> rakshit.ram...@datakaveri.org> wrote: >> >>> I'm working on such a thing. >>> It's in early stages and needs a lot more work. >>> I'm open to collaborating. >>> https://github.com/datakaveri/iudx-adaptor-framework >>> >>> On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala < >>> kali.tumm...@gmail.com> wrote: >>> >>>> Hi Flink Community, >>>> >>>> can someone point me to a good config-driven flink data movement tool >>>> Github repos? Imagine I build my ETL dag connecting source --> >>>> transformations --> target just using a config file. >>>> >>>> below are a few spark examples:- >>>> https://github.com/mvrpl/big-shipper >>>> https://github.com/BitwiseInc/Hydrograph >>>> >>>> Thanks & Regards >>>> Sri Tummala >>>> >>>> > > -- > Thanks & Regards > Sri Tummala > > -- Thanks & Regards Sri Tummala
Big Data Contract Roles ?
Hi Flink Users/ Spark Users, Is anyone hiring contract corp to corp big Data spark scala or Flink scala roles ? Thanks Sri
Spark Scala Contract Opportunity @USA
Hi All, Is anyone looking for a spark scala contract role inside the USA? A company called Maxonic has an open spark scala contract position (100% remote) inside the USA if anyone is interested, please send your CV to kali.tumm...@gmail.com. Thanks & Regards Sri Tummala
Flink India Jobs Refferal
Hi Community, I got laid off at Apple in Feb 2023 which forced me move out of USA due to immigration problem (h1b) I was a Big Data,Spark,Scala,Python and Flink consultant with over 12+ years of experience. I am still haven't landed in a job in India since then I need referrals in India in product firms (non-consulting please) who uses Flink or Spark or Scala or Big Data or ML & AI. Can someone help me with this? Thanks Sri On Thu, Dec 21, 2023, 7:23 AM Xuyang wrote: > Hi, Praveen Chandna. > I don't know much about this plan, you can ask on the dev mailing list. [1] > > [1]https://flink.apache.org/what-is-flink/community/ > > -- > Best! > Xuyang > > > 在 2023-12-20 14:53:52,"Praveen Chandna via user" > 写道: > > Hello Xuyang > > > > One more query, is there plan to release formal Java 17 support in Flink > release 1.19 or 1.20 ? Or do we need to wait till Flink 2.0 ? > > > > Thanks !! > > > > *From:* Xuyang > *Sent:* 19 December 2023 16:37 > *To:* user@flink.apache.org > *Subject:* Re:RE: Flink - Java 17 support > > > > Hi, Praveen Chandna. > > > > Please correct me if I'm wrong. From the release note of Flink 1.18 [1], > you can see that although 1.18 already supports Java 17, it is still in > beta mode. > > > > [1] > https://nightlies.apache.org/flink/flink-docs-master/zh/release-notes/flink-1.18/ > > -- > > Best! > > Xuyang > > > > 在 2023-12-19 18:32:48,"Praveen Chandna via user" > 写道: > > Hello Team > > > > Please helps to reply on Java 17 support. Does the Java 17 experimental > support is production ready ? > > Is it recommended to use Flink 1.18 with Java 17 or is there any risk if > we move to Java 17 in production ? > > > > Thanks !! > > > > *From:* Praveen Chandna > *Sent:* 17 December 2023 21:28 > *To:* Praveen Chandna via user > *Subject:* Flink - Java 17 support > > > > Hello > > > > In the Flink version 1.18, there is experimental support for Java 17. What > is the plan for Java 17 supported release of Flink ? > > > > As the below Jira is already closed for Java 17 support. > > https://issues.apache.org/jira/browse/FLINK-15736 > > > > Thanks !! > > > > // Regards > > Praveen Chandna > > > >
Flink Scala Positions in India or USA !
Hi Folks, I am currently seeking full-time positions in Flink Scala in India or the USA (non consulting) , specifically at the Principal or Staff level positions in India or USA. I require an h1b transfer and assistance with relocation from India , my i40 is approved. Thanks & Regards Sri Tummala
Job Opportunities in India or UK with Tier 2 Sponsorship
Hi Flink Community, I'm a Hands on Apache Flink Software Engineer looking for job opportunities in India or the UK (with Tier 2 sponsorship). If anyone knows of openings or can point me in the right direction, please let me know. Thanks, Sri Tummala