DataBricks Spark/Scala Opportunity

2020-12-22 Thread sri hari kali charan Tummala
>
> Hi All,
>
> Note:- (This is a full time position)
>
> I copied Geoff in this email chain who is actively looking for Spark/Scala
> Developer for a full time position with DataBricks 200k+
> (Salary/Bonus/Stocks/H1b/GC Process).
>
> If you know anyone or your friends who is good in Spark/Scala either
> American Citizen/GC/H1B ask him to contact Geoff (
> geoffzun...@databricks.com).
>
> 415-328-4879
>
>
> Thanks
> Sri
>
> --
Thanks & Regards
Sri Tummala


Flink Read S3 Intellij IDEA Error

2021-03-08 Thread sri hari kali charan Tummala
> Hi Flink Experts,
>

I am trying to read an S3 file from my Intellij using Flink I am.comimg
> across Aws Auth error can someone help below are all the details.
>


> I have Aws credentials in homefolder/.aws/credentials
>

My Intellij Environment Variables:-
> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>
> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>
> flink-conf.yaml file content:-
>
> fs.hdfs.hadoopconf: 
> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>
> core-site.xml file content:-
>
> 
> 
>
> 
> 
> fs.s3.impl
> org.apache.hadoop.fs.s3a.S3AFileSystem
> 
>
> 
> fs.s3.buffer.dir
> /tmp
> 
>
> 
> fs.s3a.server-side-encryption-algorithm
> AES256
> 
>
> 
>
> 
> fs.s3a.aws.credentials.provider
> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
> 
> 
> fs.s3a.access.key
> 
> 
> 
> fs.s3a.secret.key
> 
> 
> 
> fs.s3a.session.token
> 
> 
>
> 
> fs.s3a.proxy.host
> 
> 
> 
> fs.s3a.proxy.port
> 8099
> 
> 
> fs.s3a.proxy.username
> 
> 
> 
> fs.s3a.proxy.password
> 
> 
>
> 
>
> POM.xml file:-
>
> 
> http://maven.apache.org/POM/4.0.0";
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
> 4.0.0
>
> FlinkStreamAndSql
> FlinkStreamAndSql
> 1.0-SNAPSHOT
> 
> src/main/scala
> 
> 
> 
> net.alchim31.maven
> scala-maven-plugin
> 3.1.3
> 
> 
> 
> compile
> testCompile
> 
> 
> 
> 
> 
> 
> 
> org.apache.maven.plugins
> maven-surefire-plugin
> 2.13
> 
> false
> true
> 
> 
> 
> **/*Test.*
> **/*Suite.*
> 
> 
> 
>
> 
> 
> maven-assembly-plugin
> 2.4.1
> 
> 
> jar-with-dependencies
> 
> 
> 
> 
> make-assembly
> package
> 
> single
> 
> 
> 
> 
> 
> 
> 
>
> 
> org.apache.flink
> flink-core
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-core
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-clients_2.11
> 1.8.1
> 
>
> 
> org.apache.derby
> derby
> 10.13.1.1
> 
>
> 
> org.apache.flink
> flink-jdbc_2.11
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-table-api-scala_2.11
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-table-api-java
> 1.8.1
> 
>
>
> 
> org.apache.flink
> flink-table
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-table-planner_2.11
> 1.8.1
> 
>
>
> 
> org.apache.flink
> flink-json
> 1.8.1
> 
>
> 
> org.apache.flink
> flink-scala_2.11
> 1.8.1
> 
>
>
>org.apache.flink
>flink-scala_2.11
>1.8.1
>
>
>
>org.apache.flink
>flink-streaming-scala_2.11
>1.8.1
>
>
>
>org.apache.flink
>flink-connector-kinesis_2.11
>1.8.0
>system
>
> ${project.basedir}/Jars/flink-connector-kinesis_2.11-1.8-SNAPSHOT.jar
>
>
>
>org.apache.flink
>flink-connector-kafka-0.11_2.11
>1.8.1
>
>
>
>com.amazonaws
>amazon-kinesis-client
>1.8.8
>
>
> 

Re: Flink Read S3 Intellij IDEA Error

2021-03-09 Thread sri hari kali charan Tummala
Here is my Intellij question.

https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868

On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

>
> Hi Flink Experts,
>>
>
> I am trying to read an S3 file from my Intellij using Flink I am.comimg
>> across Aws Auth error can someone help below are all the details.
>>
>
>
>> I have Aws credentials in homefolder/.aws/credentials
>>
>
> My Intellij Environment Variables:-
>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>
>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>
>> flink-conf.yaml file content:-
>>
>> fs.hdfs.hadoopconf: 
>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>>
>> core-site.xml file content:-
>>
>> 
>> 
>>
>> 
>> 
>> fs.s3.impl
>> org.apache.hadoop.fs.s3a.S3AFileSystem
>> 
>>
>> 
>> fs.s3.buffer.dir
>> /tmp
>> 
>>
>> 
>> fs.s3a.server-side-encryption-algorithm
>> AES256
>> 
>>
>> 
>>
>> 
>> fs.s3a.aws.credentials.provider
>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
>> 
>> 
>> fs.s3a.access.key
>> 
>> 
>> 
>> fs.s3a.secret.key
>> 
>> 
>> 
>> fs.s3a.session.token
>> 
>> 
>>
>> 
>> fs.s3a.proxy.host
>> 
>> 
>> 
>> fs.s3a.proxy.port
>> 8099
>> 
>> 
>> fs.s3a.proxy.username
>> 
>> 
>> 
>> fs.s3a.proxy.password
>> 
>> 
>>
>> 
>>
>> POM.xml file:-
>>
>> 
>> http://maven.apache.org/POM/4.0.0";
>>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>> 4.0.0
>>
>> FlinkStreamAndSql
>> FlinkStreamAndSql
>> 1.0-SNAPSHOT
>> 
>> src/main/scala
>> 
>> 
>> 
>> net.alchim31.maven
>> scala-maven-plugin
>> 3.1.3
>> 
>> 
>> 
>> compile
>> testCompile
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> org.apache.maven.plugins
>> maven-surefire-plugin
>> 2.13
>> 
>> false
>> true
>> 
>> 
>> 
>> **/*Test.*
>> **/*Suite.*
>> 
>> 
>> 
>>
>> 
>> 
>> maven-assembly-plugin
>> 2.4.1
>> 
>> 
>> jar-with-dependencies
>> 
>> 
>> 
>> 
>> make-assembly
>> package
>> 
>> single
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>
>> 
>> org.apache.flink
>> flink-core
>> 1.8.1
>> 
>>
>> 
>> org.apache.flink
>> flink-core
>> 1.8.1
>> 
>>
>> 
>> org.apache.flink
>> flink-clients_2.11
>> 1.8.1
>> 
>>
>> 
>> org.apache.derby
>> derby
>> 10.13.1.1
>> 
>>
>> 
>> org.apache.flink
>

Re: Flink Read S3 Intellij IDEA Error

2021-03-09 Thread sri hari kali charan Tummala
my stack overflow question.

https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868

On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Here is my Intellij question.
>
>
> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>
> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>>
>> Hi Flink Experts,
>>>
>>
>> I am trying to read an S3 file from my Intellij using Flink I am.comimg
>>> across Aws Auth error can someone help below are all the details.
>>>
>>
>>
>>> I have Aws credentials in homefolder/.aws/credentials
>>>
>>
>> My Intellij Environment Variables:-
>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>>
>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>>
>>> flink-conf.yaml file content:-
>>>
>>> fs.hdfs.hadoopconf: 
>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>>>
>>> core-site.xml file content:-
>>>
>>> 
>>> 
>>>
>>> 
>>> 
>>> fs.s3.impl
>>> org.apache.hadoop.fs.s3a.S3AFileSystem
>>> 
>>>
>>> 
>>> fs.s3.buffer.dir
>>> /tmp
>>> 
>>>
>>> 
>>> fs.s3a.server-side-encryption-algorithm
>>> AES256
>>> 
>>>
>>> 
>>>
>>> 
>>> fs.s3a.aws.credentials.provider
>>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
>>> 
>>> 
>>> fs.s3a.access.key
>>> 
>>> 
>>> 
>>> fs.s3a.secret.key
>>> 
>>> 
>>> 
>>> fs.s3a.session.token
>>> 
>>> 
>>>
>>> 
>>> fs.s3a.proxy.host
>>> 
>>> 
>>> 
>>> fs.s3a.proxy.port
>>> 8099
>>> 
>>> 
>>> fs.s3a.proxy.username
>>> 
>>> 
>>> 
>>> fs.s3a.proxy.password
>>> 
>>> 
>>>
>>> 
>>>
>>> POM.xml file:-
>>>
>>> 
>>> http://maven.apache.org/POM/4.0.0";
>>>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>> 4.0.0
>>>
>>> FlinkStreamAndSql
>>> FlinkStreamAndSql
>>> 1.0-SNAPSHOT
>>> 
>>> src/main/scala
>>> 
>>> 
>>> 
>>> net.alchim31.maven
>>> scala-maven-plugin
>>> 3.1.3
>>> 
>>> 
>>> 
>>> compile
>>> testCompile
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> org.apache.maven.plugins
>>> maven-surefire-plugin
>>> 2.13
>>> 
>>> false
>>> true
>>> 
>>> 
>>> 
>>> **/*Test.*
>>> **/*Suite.*
>>> 
>>> 
>>> 
>>>
>>> 
>>> 
>>> maven-assembly-plugin
>>> 2.4.1
>>> 
>>> 
>>> jar-with-dependencies
>>> 
>>> 
>>> 
>>> 
>>> make-assembly
>>> package
>>>  

Re: Flink Read S3 Intellij IDEA Error

2021-03-10 Thread sri hari kali charan Tummala
Flink,

I am able to access Kinesis from Intellij but not S3 I have edited my stack
overflow question with kinesis code , Flink is still having issues reading
S3.

https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868


Thanks
Sri

On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> my stack overflow question.
>
>
> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>
> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Here is my Intellij question.
>>
>>
>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>
>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>>
>>> Hi Flink Experts,
>>>>
>>>
>>> I am trying to read an S3 file from my Intellij using Flink I am.comimg
>>>> across Aws Auth error can someone help below are all the details.
>>>>
>>>
>>>
>>>> I have Aws credentials in homefolder/.aws/credentials
>>>>
>>>
>>> My Intellij Environment Variables:-
>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>>>
>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>>>
>>>> flink-conf.yaml file content:-
>>>>
>>>> fs.hdfs.hadoopconf: 
>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>>>>
>>>> core-site.xml file content:-
>>>>
>>>> 
>>>> 
>>>>
>>>> 
>>>> 
>>>> fs.s3.impl
>>>> org.apache.hadoop.fs.s3a.S3AFileSystem
>>>> 
>>>>
>>>> 
>>>> fs.s3.buffer.dir
>>>> /tmp
>>>> 
>>>>
>>>> 
>>>> fs.s3a.server-side-encryption-algorithm
>>>> AES256
>>>> 
>>>>
>>>> 
>>>>
>>>> 
>>>> fs.s3a.aws.credentials.provider
>>>> 
>>>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
>>>> 
>>>> 
>>>> fs.s3a.access.key
>>>> 
>>>> 
>>>> 
>>>> fs.s3a.secret.key
>>>> 
>>>> 
>>>> 
>>>> fs.s3a.session.token
>>>> 
>>>> 
>>>>
>>>> 
>>>> fs.s3a.proxy.host
>>>> 
>>>> 
>>>> 
>>>> fs.s3a.proxy.port
>>>> 8099
>>>> 
>>>> 
>>>> fs.s3a.proxy.username
>>>> 
>>>> 
>>>> 
>>>> fs.s3a.proxy.password
>>>> 
>>>> 
>>>>
>>>> 
>>>>
>>>> POM.xml file:-
>>>>
>>>> 
>>>> http://maven.apache.org/POM/4.0.0";
>>>>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>>>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>>> 4.0.0
>>>>
>>>> FlinkStreamAndSql
>>>> FlinkStreamAndSql
>>>> 1.0-SNAPSHOT
>>>> 
>>>> src/main/scala
>>>> 
>>>> 
>>>> 
>>>> net.alchim31.maven
>>>> scala-maven-plugin
>>>> 3.1.3
>>>> 
>>>> 
>>>> 
>>>> compile
>>>> testCompile
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> org.apache.maven.plugins
>>>> maven-surefire-p

Re: Flink Read S3 Intellij IDEA Error

2021-03-10 Thread sri hari kali charan Tummala
I am not getting what you both are talking about lets be clear.

Plugin ? what is it ? Is it a Jar which I have to download from the
Internet and place it in a folder ? Is this the Jar which I have to
download ? (flink-s3-fs-hadoop) ?

Will this belo solution work ?
https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being

Thanks
Sri



On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler 
wrote:

> Well, you could do this before running the job:
>
> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable,
> pointing to a directory containing the plugins
>
> PluginManager pluginManager =
> PluginUtils.createPluginManagerFromRootFolder(new Configuration());
> Filesystem.initialize(new Configuration(), pluginManager);
>
> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
>
> Hi.
>
> I had the same problem. Flink use a plugins to access s3. When you run
> local it starts a mini cluster and the mini cluster don’t load plugins. So
> it’s not possible without modifying Flink.  In my case I wanted to
> investigate save points through Flink processor API and the workaround was
> to build my own version of the processor API and include the missing part.
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala
>  :
>
> 
> Flink,
>
> I am able to access Kinesis from Intellij but not S3 I have edited my
> stack overflow question with kinesis code , Flink is still having issues
> reading S3.
>
>
> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
>
>
> Thanks
> Sri
>
> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> my stack overflow question.
>>
>>
>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>
>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Here is my Intellij question.
>>>
>>>
>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>
>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>>
>>>> Hi Flink Experts,
>>>>>
>>>>
>>>> I am trying to read an S3 file from my Intellij using Flink I am.comimg
>>>>> across Aws Auth error can someone help below are all the details.
>>>>>
>>>>
>>>>
>>>>> I have Aws credentials in homefolder/.aws/credentials
>>>>>
>>>>
>>>> My Intellij Environment Variables:-
>>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>>>>
>>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>>>>
>>>>> flink-conf.yaml file content:-
>>>>>
>>>>> fs.hdfs.hadoopconf: 
>>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>>>>>
>>>>> core-site.xml file content:-
>>>>>
>>>>> >>>> href="configuration.xsl"?>
>>>>> fs.s3.impl
>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem
>>>>> fs.s3.buffer.dir
>>>>> /tmp
>>>>> fs.s3a.server-side-encryption-algorithm
>>>>> AES256
>>>>> fs.s3a.aws.credentials.provider
>>>>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
>>>>> fs.s3a.access.key
>>>>> 
>>>>> fs.s3a.secret.key
>>>>> fs.s3a.session.token
>>>>> 
>>>>> fs.s3a.proxy.host
>>>>> fs.s3a.proxy.port
>>>>> 8099
>>>>> fs.s3a.proxy.username  
>>>>>   fs.s3a.proxy.password
>>>>> 
>>>>>
>>>>> POM.xml file:-
>>>>>
>>>>> >>>> xmlns="http://maven.apache.org/POM/4.0.0"; 
>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";

Re: Flink Read S3 Intellij IDEA Error

2021-03-10 Thread sri hari kali charan Tummala
Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
ConfigConstants.ENV_FLINK_LIB_DIR will this work ?

Thanks
Sri

On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> I am not getting what you both are talking about lets be clear.
>
> Plugin ? what is it ? Is it a Jar which I have to download from the
> Internet and place it in a folder ? Is this the Jar which I have to
> download ? (flink-s3-fs-hadoop) ?
>
> Will this belo solution work ?
>
> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being
>
> Thanks
> Sri
>
>
>
> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler 
> wrote:
>
>> Well, you could do this before running the job:
>>
>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable,
>> pointing to a directory containing the plugins
>>
>> PluginManager pluginManager =
>> PluginUtils.createPluginManagerFromRootFolder(new Configuration());
>> Filesystem.initialize(new Configuration(), pluginManager);
>>
>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
>>
>> Hi.
>>
>> I had the same problem. Flink use a plugins to access s3. When you run
>> local it starts a mini cluster and the mini cluster don’t load plugins. So
>> it’s not possible without modifying Flink.  In my case I wanted to
>> investigate save points through Flink processor API and the workaround was
>> to build my own version of the processor API and include the missing part.
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>
>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala
>>  :
>>
>> 
>> Flink,
>>
>> I am able to access Kinesis from Intellij but not S3 I have edited my
>> stack overflow question with kinesis code , Flink is still having issues
>> reading S3.
>>
>>
>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
>>
>>
>> Thanks
>> Sri
>>
>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> my stack overflow question.
>>>
>>>
>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>
>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> Here is my Intellij question.
>>>>
>>>>
>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>>
>>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
>>>> kali.tumm...@gmail.com> wrote:
>>>>
>>>>>
>>>>> Hi Flink Experts,
>>>>>>
>>>>>
>>>>> I am trying to read an S3 file from my Intellij using Flink I
>>>>>> am.comimg across Aws Auth error can someone help below are all the 
>>>>>> details.
>>>>>>
>>>>>
>>>>>
>>>>>> I have Aws credentials in homefolder/.aws/credentials
>>>>>>
>>>>>
>>>>> My Intellij Environment Variables:-
>>>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>>>>>
>>>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>>>>>
>>>>>> flink-conf.yaml file content:-
>>>>>>
>>>>>> fs.hdfs.hadoopconf: 
>>>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>>>>>>
>>>>>> core-site.xml file content:-
>>>>>>
>>>>>> >>>>> href="configuration.xsl"?>
>>>>>> fs.s3.impl
>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem
>>>>>> fs.s3.buffer.dir
>>>>>> /tmp
>>>>>> fs.s3a.server-side-encryption-algorithm
>>>>>> AES256
>>>>>> fs.s3a.aws.credentials.provider
>>>>>> org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvi

Re: Flink Read S3 Intellij IDEA Error

2021-03-11 Thread sri hari kali charan Tummala
Let's close this issue guys please answer my questions. I am using Flink
1.8.1.

Thanks
Sri

On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
> ConfigConstants.ENV_FLINK_LIB_DIR will this work ?
>
> Thanks
> Sri
>
> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> I am not getting what you both are talking about lets be clear.
>>
>> Plugin ? what is it ? Is it a Jar which I have to download from the
>> Internet and place it in a folder ? Is this the Jar which I have to
>> download ? (flink-s3-fs-hadoop) ?
>>
>> Will this belo solution work ?
>>
>> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being
>>
>> Thanks
>> Sri
>>
>>
>>
>> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler 
>> wrote:
>>
>>> Well, you could do this before running the job:
>>>
>>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable,
>>> pointing to a directory containing the plugins
>>>
>>> PluginManager pluginManager =
>>> PluginUtils.createPluginManagerFromRootFolder(new Configuration());
>>> Filesystem.initialize(new Configuration(), pluginManager);
>>>
>>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
>>>
>>> Hi.
>>>
>>> I had the same problem. Flink use a plugins to access s3. When you run
>>> local it starts a mini cluster and the mini cluster don’t load plugins. So
>>> it’s not possible without modifying Flink.  In my case I wanted to
>>> investigate save points through Flink processor API and the workaround was
>>> to build my own version of the processor API and include the missing part.
>>>
>>> Med venlig hilsen / Best regards
>>> Lasse Nedergaard
>>>
>>>
>>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala
>>>  :
>>>
>>> 
>>> Flink,
>>>
>>> I am able to access Kinesis from Intellij but not S3 I have edited my
>>> stack overflow question with kinesis code , Flink is still having issues
>>> reading S3.
>>>
>>>
>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
>>>
>>>
>>> Thanks
>>> Sri
>>>
>>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> my stack overflow question.
>>>>
>>>>
>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>>
>>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
>>>> kali.tumm...@gmail.com> wrote:
>>>>
>>>>> Here is my Intellij question.
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>>>
>>>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
>>>>> kali.tumm...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> Hi Flink Experts,
>>>>>>>
>>>>>>
>>>>>> I am trying to read an S3 file from my Intellij using Flink I
>>>>>>> am.comimg across Aws Auth error can someone help below are all the 
>>>>>>> details.
>>>>>>>
>>>>>>
>>>>>>
>>>>>>> I have Aws credentials in homefolder/.aws/credentials
>>>>>>>
>>>>>>
>>>>>> My Intellij Environment Variables:-
>>>>>>> ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.8.1
>>>>>>>
>>>>>>> FLINK_CONF_DIR=/Users/Documents/FlinkStreamAndSql-master/src/main/resources/flink-config
>>>>>>>
>>>>>>> flink-conf.yaml file content:-
>>>>>>>
>>>>>>> fs.hdfs.hadoopconf: 
>>>>>>> /Users/blah/Documents/FlinkStreamAndSql-master/src/main/resources/hadoop-config
>>>>>>>
>>>>>>> core-site.xml file content:-
>>>>>>>
>>>>>>> >>>>>

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
Which I already did in my pin still its not working.

Thanks
Sri

On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler  wrote:

> The concept of plugins does not exist in 1.8.1. As a result it should be
> sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to
> your project.
>
> On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:
>
> Let's close this issue guys please answer my questions. I am using Flink
> 1.8.1.
>
> Thanks
> Sri
>
> On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
>> ConfigConstants.ENV_FLINK_LIB_DIR will this work ?
>>
>> Thanks
>> Sri
>>
>> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> I am not getting what you both are talking about lets be clear.
>>>
>>> Plugin ? what is it ? Is it a Jar which I have to download from the
>>> Internet and place it in a folder ? Is this the Jar which I have to
>>> download ? (flink-s3-fs-hadoop) ?
>>>
>>> Will this belo solution work ?
>>>
>>> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being
>>>
>>> Thanks
>>> Sri
>>>
>>>
>>>
>>> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler 
>>> wrote:
>>>
>>>> Well, you could do this before running the job:
>>>>
>>>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable,
>>>> pointing to a directory containing the plugins
>>>>
>>>> PluginManager pluginManager =
>>>> PluginUtils.createPluginManagerFromRootFolder(new Configuration());
>>>> Filesystem.initialize(new Configuration(), pluginManager);
>>>>
>>>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
>>>>
>>>> Hi.
>>>>
>>>> I had the same problem. Flink use a plugins to access s3. When you run
>>>> local it starts a mini cluster and the mini cluster don’t load plugins. So
>>>> it’s not possible without modifying Flink.  In my case I wanted to
>>>> investigate save points through Flink processor API and the workaround was
>>>> to build my own version of the processor API and include the missing part.
>>>>
>>>> Med venlig hilsen / Best regards
>>>> Lasse Nedergaard
>>>>
>>>>
>>>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala
>>>>  :
>>>>
>>>> 
>>>> Flink,
>>>>
>>>> I am able to access Kinesis from Intellij but not S3 I have edited my
>>>> stack overflow question with kinesis code , Flink is still having issues
>>>> reading S3.
>>>>
>>>>
>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
>>>>
>>>>
>>>> Thanks
>>>> Sri
>>>>
>>>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <
>>>> kali.tumm...@gmail.com> wrote:
>>>>
>>>>> my stack overflow question.
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>>>
>>>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
>>>>> kali.tumm...@gmail.com> wrote:
>>>>>
>>>>>> Here is my Intellij question.
>>>>>>
>>>>>>
>>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>>>>
>>>>>> On Mon, Mar 8, 2021 at 11:22 AM sri hari kali charan Tummala <
>>>>>> kali.tumm...@gmail.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi Flink Experts,
>>>>>>>>
>>>>>>>
>>>>>>> I am trying to read an S3 file from my Intellij using Flink I
>>>>>>>> am.comimg across Aws Auth error can someone help below are all the 
>>>>>>>> details.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>&

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
If anyone working have flink version 1.8.1 code reading S3 in Intellij in
public GitHub please pass it on that will be huge help.


Thanks
Sri

On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Which I already did in my pin still its not working.
>
> Thanks
> Sri
>
> On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler  wrote:
>
>> The concept of plugins does not exist in 1.8.1. As a result it should be
>> sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to
>> your project.
>>
>> On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:
>>
>> Let's close this issue guys please answer my questions. I am using Flink
>> 1.8.1.
>>
>> Thanks
>> Sri
>>
>> On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
>>> ConfigConstants.ENV_FLINK_LIB_DIR will this work ?
>>>
>>> Thanks
>>> Sri
>>>
>>> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> I am not getting what you both are talking about lets be clear.
>>>>
>>>> Plugin ? what is it ? Is it a Jar which I have to download from the
>>>> Internet and place it in a folder ? Is this the Jar which I have to
>>>> download ? (flink-s3-fs-hadoop) ?
>>>>
>>>> Will this belo solution work ?
>>>>
>>>> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being
>>>>
>>>> Thanks
>>>> Sri
>>>>
>>>>
>>>>
>>>> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler 
>>>> wrote:
>>>>
>>>>> Well, you could do this before running the job:
>>>>>
>>>>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment variable,
>>>>> pointing to a directory containing the plugins
>>>>>
>>>>> PluginManager pluginManager =
>>>>> PluginUtils.createPluginManagerFromRootFolder(new Configuration());
>>>>> Filesystem.initialize(new Configuration(), pluginManager);
>>>>>
>>>>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
>>>>>
>>>>> Hi.
>>>>>
>>>>> I had the same problem. Flink use a plugins to access s3. When you run
>>>>> local it starts a mini cluster and the mini cluster don’t load plugins. So
>>>>> it’s not possible without modifying Flink.  In my case I wanted to
>>>>> investigate save points through Flink processor API and the workaround was
>>>>> to build my own version of the processor API and include the missing part.
>>>>>
>>>>> Med venlig hilsen / Best regards
>>>>> Lasse Nedergaard
>>>>>
>>>>>
>>>>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala
>>>>>  :
>>>>>
>>>>> 
>>>>> Flink,
>>>>>
>>>>> I am able to access Kinesis from Intellij but not S3 I have edited my
>>>>> stack overflow question with kinesis code , Flink is still having issues
>>>>> reading S3.
>>>>>
>>>>>
>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
>>>>>
>>>>>
>>>>> Thanks
>>>>> Sri
>>>>>
>>>>> On Tue, Mar 9, 2021 at 11:30 AM sri hari kali charan Tummala <
>>>>> kali.tumm...@gmail.com> wrote:
>>>>>
>>>>>> my stack overflow question.
>>>>>>
>>>>>>
>>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>>>>
>>>>>> On Tue, Mar 9, 2021 at 11:28 AM sri hari kali charan Tummala <
>>>>>> kali.tumm...@gmail.com> wrote:
>>>>>>
>>>>>>> Here is my Intellij question.
>>>>>>>
>>>>>>>
>>>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117626682_66536868
>>>>>>>
>>>>>>> On Mo

Re: Flink Read S3 Intellij IDEA Error

2021-03-12 Thread sri hari kali charan Tummala
Same error.



On Fri, 12 Mar 2021 at 09:01, ChesnaSchepler  wrote:

> From the exception I would conclude that your core-site.xml file is not
> being picked up.
>
> AFAIK fs.hdfs.hadoopconf only works for HDFS, not for S3 filesystems, so
> try setting HADOOP_CONF_DIR to the directory that the file resides in.
>
> On 3/12/2021 5:10 PM, sri hari kali charan Tummala wrote:
>
> If anyone working have flink version 1.8.1 code reading S3 in Intellij in
> public GitHub please pass it on that will be huge help.
>
>
> Thanks
> Sri
>
> On Fri, 12 Mar 2021 at 08:08, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Which I already did in my pin still its not working.
>>
>> Thanks
>> Sri
>>
>> On Fri, 12 Mar 2021 at 06:18, Chesnay Schepler 
>> wrote:
>>
>>> The concept of plugins does not exist in 1.8.1. As a result it should be
>>> sufficient for your use-case to add a dependency on flink-s3-fs-hadoop to
>>> your project.
>>>
>>> On 3/12/2021 4:33 AM, sri hari kali charan Tummala wrote:
>>>
>>> Let's close this issue guys please answer my questions. I am using Flink
>>> 1.8.1.
>>>
>>> Thanks
>>> Sri
>>>
>>> On Wed, 10 Mar 2021 at 13:25, sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> Also I don't see ConfigConstants.ENV_FLINK_PLUGINS_DIR I only see
>>>> ConfigConstants.ENV_FLINK_LIB_DIR will this work ?
>>>>
>>>> Thanks
>>>> Sri
>>>>
>>>> On Wed, Mar 10, 2021 at 1:23 PM sri hari kali charan Tummala <
>>>> kali.tumm...@gmail.com> wrote:
>>>>
>>>>> I am not getting what you both are talking about lets be clear.
>>>>>
>>>>> Plugin ? what is it ? Is it a Jar which I have to download from the
>>>>> Internet and place it in a folder ? Is this the Jar which I have to
>>>>> download ? (flink-s3-fs-hadoop) ?
>>>>>
>>>>> Will this belo solution work ?
>>>>>
>>>>> https://stackoverflow.com/questions/64115627/flink-1-11-2-cant-find-implementation-for-s3-despite-correct-plugins-being
>>>>>
>>>>> Thanks
>>>>> Sri
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Mar 10, 2021 at 11:34 AM Chesnay Schepler 
>>>>> wrote:
>>>>>
>>>>>> Well, you could do this before running the job:
>>>>>>
>>>>>> // set the ConfigConstants.ENV_FLINK_PLUGINS_DIR environment
>>>>>> variable, pointing to a directory containing the plugins
>>>>>>
>>>>>> PluginManager pluginManager =
>>>>>> PluginUtils.createPluginManagerFromRootFolder(new Configuration());
>>>>>> Filesystem.initialize(new Configuration(), pluginManager);
>>>>>>
>>>>>> On 3/10/2021 8:16 PM, Lasse Nedergaard wrote:
>>>>>>
>>>>>> Hi.
>>>>>>
>>>>>> I had the same problem. Flink use a plugins to access s3. When you
>>>>>> run local it starts a mini cluster and the mini cluster don’t load 
>>>>>> plugins.
>>>>>> So it’s not possible without modifying Flink.  In my case I wanted to
>>>>>> investigate save points through Flink processor API and the workaround 
>>>>>> was
>>>>>> to build my own version of the processor API and include the missing 
>>>>>> part.
>>>>>>
>>>>>> Med venlig hilsen / Best regards
>>>>>> Lasse Nedergaard
>>>>>>
>>>>>>
>>>>>> Den 10. mar. 2021 kl. 17.33 skrev sri hari kali charan Tummala
>>>>>>  :
>>>>>>
>>>>>> 
>>>>>> Flink,
>>>>>>
>>>>>> I am able to access Kinesis from Intellij but not S3 I have edited my
>>>>>> stack overflow question with kinesis code , Flink is still having issues
>>>>>> reading S3.
>>>>>>
>>>>>>
>>>>>> https://stackoverflow.com/questions/66536868/flink-aws-s3-access-issue-intellij-idea?noredirect=1#comment117656862_66536868
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Sri
>>>>>>
>>>>>> On Tue, Mar 9, 2021 at 11:30 

Re: Running flink on AWS ECS

2019-09-28 Thread sri hari kali charan Tummala
Aws already has auto scale flink cluster it’s called Kinesis Data Analytics
just add your flink Jar to Kinesis Sql analytics that’s all , aws will auto
provision a flink cluster and do the admin part for you.

On Saturday, September 28, 2019, David Anderson  wrote:

> I believe there can be advantages and disadvantages in both
> directions. For example, fewer containers with multiple slots reduces
> the effort the Flink Master has to do whenever global coordination is
> required, i.e., during checkpointing. And the network stack in the
> task managers is optimized to take advantage of locality, whenever
> possible.
>
> On the other hand, if you have a lot of pressure on the heap (e.g.,
> because you are using a heap-based state backend), then having more,
> smaller task managers can reduce latency by reducing the impact of
> garbage collection pauses.
>
> I'm sure I've overlooked some factors, but the bottom line appears to
> be that there's no one-size-fits-all answer.
>
> David
>
> On Wed, Sep 25, 2019 at 5:43 PM Navneeth Krishnan
>  wrote:
> >
> > Thanks Terry, the reason why I asked this is because somewhere I saw
> running one slot per container is beneficial. I couldn’t find the where I
> saw that.
> > Also I think running it with multiple slots will reduce IPC since some
> of the data will be processed writhing the same JVM.
> >
> > Thanks
> >
> > On Wed, Sep 25, 2019 at 1:16 AM Terry Wang  wrote:
> >>
> >> Hi, Navneeth,
> >>
> >> I think both is ok.
> >> IMO, run one container with number of slots same as virtual cores may
> be better for slots can share the Flink Framework and thus reduce memory
> cost.
> >>
> >> Best,
> >> Terry Wang
> >>
> >>
> >>
> >> > 在 2019年9月25日,下午3:26,Navneeth Krishnan  写道:
> >> >
> >> > Hi All,
> >> >
> >> > I’m currently running flink on amazon ecs and I have assigned task
> slots based on vcpus per instance. Is it beneficial to run a separate
> container with one slot each or one container with number of slots same as
> virtual cores?
> >> >
> >> > Thanks
> >>
>


-- 
Thanks & Regards
Sri Tummala


Re: AWS Client Builder with default credentials

2020-02-24 Thread sri hari kali charan Tummala
check this.

https://github.com/kali786516/FlinkStreamAndSql/blob/b8bcbadaa3cb6bfdae891f10ad1205e256adbc1e/src/main/scala/com/aws/examples/dynamodb/dynStreams/FlinkDynamoDBStreams.scala#L42

https://github.com/kali786516/FlinkStreamAndSql/blob/b8bcbadaa3cb6bfdae891f10ad1205e256adbc1e/src/main/scala/com/aws/examples/kinesis/consumer/transactionScalaWorkingExample/TransactionScalaConsumer.scala#L41



On Mon, Feb 24, 2020 at 9:08 AM David Magalhães 
wrote:

> Hi Robert, thanks for your reply.
>
> GlobalConfiguration.loadConfiguration was useful to check if a
> flink-conf.yml file was on resources, for the integration tests that I'm
> doing. On the cluster I will use the default configurations.
>
> On Fri, Feb 21, 2020 at 10:58 AM Robert Metzger 
> wrote:
>
>> There are multiple ways of passing configuration parameters to your user
>> defined code in Flink
>>
>> a)  use getRuntimeContext().getUserCodeClassLoader().getResource() to
>> load a config file from your user code jar or the classpath.
>> b)  use
>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters() to
>> access a configuration object serialized from the main method.
>> you can pass a custom object to the job parameters, or use Flink's
>> "Configuration" object in your main method:
>>
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> Configuration config = new Configuration();
>> config.setString("foo", "bar");
>> env.getConfig().setGlobalJobParameters(config);
>>
>> c) Load the flink-conf.yaml:
>>
>> Configuration conf = GlobalConfiguration.loadConfiguration();
>>
>> I'm not 100% sure if this approach works, as it is not intended to be
>> used in user code (I believe).
>>
>>
>> Let me know if this helps!
>>
>> Best,
>> Robert
>>
>> On Thu, Feb 20, 2020 at 1:50 PM Chesnay Schepler 
>> wrote:
>>
>>> First things first, we do not intend for users to use anything in the S3
>>> filesystem modules except the filesystems itself,
>>> meaning that you're somewhat treading on unsupported ground here.
>>>
>>> Nevertheless, the S3 modules contain a large variety of AWS-provided
>>> CerentialsProvider implementations,
>>> that can derive credentials from environment variables, system
>>> properties, files on the classpath and many more.
>>>
>>> Ultimately though, you're kind of asking us how to use AWS APIs, for
>>> which I would direct you to the AWS documentation.
>>>
>>> On 20/02/2020 13:16, David Magalhães wrote:
>>>
>>> I'm using
>>> org.apache.flink.fs.s3base.shaded.com.amazonaws.client.builder.AwsClientBuilder
>>> to create a S3 client to copy objects and delete object inside
>>> a TwoPhaseCommitSinkFunction.
>>>
>>> Shouldn't be another way to set up configurations without put them
>>> hardcoded ? Something like core-site.xml or flink-conf.yaml ?
>>>
>>> Right now I need to have them hardcoded like this.
>>>
>>> AmazonS3ClientBuilder.standard
>>>   .withPathStyleAccessEnabled(true)
>>>   .withEndpointConfiguration(
>>> new EndpointConfiguration("http://minio:9000";, "us-east-1")
>>>   )
>>>   .withCredentials(
>>> new AWSStaticCredentialsProvider(new
>>> BasicAWSCredentials("minio", "minio123"))
>>>   )
>>>   .build
>>>
>>> Thanks
>>>
>>>
>>>

-- 
Thanks & Regards
Sri Tummala


Batch Flink Job S3 write performance vs Spark

2020-02-24 Thread sri hari kali charan Tummala
Hi All,

have a question did anyone compared the performance of Flink batch job
writing to s3 vs spark writing to s3?

-- 
Thanks & Regards
Sri Tummala


Batch Flink Job S3 write performance vs Spark

2020-02-25 Thread sri hari kali charan Tummala
Hi All,

have a question did anyone compared the performance of Flink batch job
writing to s3 vs spark writing to s3?

-- 
Thanks & Regards
Sri Tummala


Re: Batch Flink Job S3 write performance vs Spark

2020-02-26 Thread sri hari kali charan Tummala
Thank you  (the two systems running on Java and using the same set of
libraries), so from my understanding, Flink uses AWS SDK behind the scenes
same as spark.

On Wed, Feb 26, 2020 at 8:49 AM Arvid Heise  wrote:

> Fair benchmarks are notoriously difficult to setup.
>
> Usually, it's easy to find a workload where one system shines and as its
> vendor you report that. Then, the competitor benchmarks a different use
> case where his system outperforms ours. In the end, customers are more
> confused than before.
>
> You should do your own benchmarks for your own workloads. That is the only
> reliable way.
>
> In the end, both systems use similar setups and improvements in one system
> are often also incorporated into the other system with some delay, such
> that there should be no ground-breaking differences between the two systems
> running on Java and using the same set of libraries.
> Of course, if one system has a very specific optimization for your use
> case, that could be much faster.
>
>
> On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi All,
>>
>> have a question did anyone compared the performance of Flink batch job
>> writing to s3 vs spark writing to s3?
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Re: Batch Flink Job S3 write performance vs Spark

2020-02-26 Thread sri hari kali charan Tummala
Ok, thanks for the clarification.

On Wed, Feb 26, 2020 at 9:22 AM Arvid Heise  wrote:

> Exactly. We use the hadoop-fs as an indirection on top of that, but Spark
> probably does the same.
>
> On Wed, Feb 26, 2020 at 3:52 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Thank you  (the two systems running on Java and using the same set of
>> libraries), so from my understanding, Flink uses AWS SDK behind the scenes
>> same as spark.
>>
>> On Wed, Feb 26, 2020 at 8:49 AM Arvid Heise  wrote:
>>
>>> Fair benchmarks are notoriously difficult to setup.
>>>
>>> Usually, it's easy to find a workload where one system shines and as its
>>> vendor you report that. Then, the competitor benchmarks a different use
>>> case where his system outperforms ours. In the end, customers are more
>>> confused than before.
>>>
>>> You should do your own benchmarks for your own workloads. That is the
>>> only reliable way.
>>>
>>> In the end, both systems use similar setups and improvements in one
>>> system are often also incorporated into the other system with some delay,
>>> such that there should be no ground-breaking differences between the two
>>> systems running on Java and using the same set of libraries.
>>> Of course, if one system has a very specific optimization for your use
>>> case, that could be much faster.
>>>
>>>
>>> On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> have a question did anyone compared the performance of Flink batch job
>>>> writing to s3 vs spark writing to s3?
>>>>
>>>> --
>>>> Thanks & Regards
>>>> Sri Tummala
>>>>
>>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Re: Batch Flink Job S3 write performance vs Spark

2020-02-26 Thread sri hari kali charan Tummala
sorry for being lazy I would have gone through flink source code.

On Wed, Feb 26, 2020 at 9:35 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Ok, thanks for the clarification.
>
> On Wed, Feb 26, 2020 at 9:22 AM Arvid Heise  wrote:
>
>> Exactly. We use the hadoop-fs as an indirection on top of that, but Spark
>> probably does the same.
>>
>> On Wed, Feb 26, 2020 at 3:52 PM sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Thank you  (the two systems running on Java and using the same set of
>>> libraries), so from my understanding, Flink uses AWS SDK behind the scenes
>>> same as spark.
>>>
>>> On Wed, Feb 26, 2020 at 8:49 AM Arvid Heise  wrote:
>>>
>>>> Fair benchmarks are notoriously difficult to setup.
>>>>
>>>> Usually, it's easy to find a workload where one system shines and as
>>>> its vendor you report that. Then, the competitor benchmarks a different use
>>>> case where his system outperforms ours. In the end, customers are more
>>>> confused than before.
>>>>
>>>> You should do your own benchmarks for your own workloads. That is the
>>>> only reliable way.
>>>>
>>>> In the end, both systems use similar setups and improvements in one
>>>> system are often also incorporated into the other system with some delay,
>>>> such that there should be no ground-breaking differences between the two
>>>> systems running on Java and using the same set of libraries.
>>>> Of course, if one system has a very specific optimization for your use
>>>> case, that could be much faster.
>>>>
>>>>
>>>> On Mon, Feb 24, 2020 at 11:26 PM sri hari kali charan Tummala <
>>>> kali.tumm...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> have a question did anyone compared the performance of Flink batch job
>>>>> writing to s3 vs spark writing to s3?
>>>>>
>>>>> --
>>>>> Thanks & Regards
>>>>> Sri Tummala
>>>>>
>>>>>
>>>
>>> --
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala


Fwd: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi All,

I am trying to read data from kinesis stream and applying SQL
transformation (distinct) and then tryting to write to CSV sink which is
failinf due to this issue (org.apache.flink.table.api.TableException:
AppendStreamTableSink requires that Table has only insert changes.) , full
code is here (
https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
).

can anyone help me moveforward on this issue?

Full Code:-

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(),
consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]] =
  new MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]]() {

override def map(s: String): Tuple10[String, String,
String,String,String,String,String,String,String,String] = {
  val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

  val csvData = data.getCc_num+","+
data.getFirst+","+
data.getLast+","+
data.getTrans_num+","+
data.getTrans_time+","+
data.getCategory+","+
data.getMerchant+","+
data.getAmt+","+
data.getMerch_lat+","+
data.getMerch_long

  //println(csvData)

  val p:Array[String] = csvData.split(",")
  var cc_num:String = p(0)
  var first:String = p(1)
  var last:String = p(2)
  var trans_num:String = p(3)
  var trans_time:String = p(4)
  var category:String = p(5)
  var merchant:String = p(6)
  var amt:String = p(7)
  var merch_lat:String = p(8)
  var merch_long:String = p(9)

  val creationDate: Time = new Time(System.currentTimeMillis())
  return new Tuple10(cc_num, first,
last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
}
  }

val data = kinesis.map(mapFunction)

//data.print()

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

val table1 = table.distinct()

tEnv.registerTable("fromAnotherTable",table1)

table.printSchema()

val csvSink:TableSink[Row]  = new
CsvTableSink("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOutput","~");
val fieldNames:Array[String]  =
Array("cc_num","first_column","last_column","trans_num","trans_time","category_column","merchant_column","amt_column","merch_lat","merch_long")
val fieldTypes:Array[TypeInformation[_]]  = Array(
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING,
  org.apache.flink.api.common.typeinfo.Types.STRING
)

tEnv.registerTableSink("s3csvTargetTransaction", fieldNames,
fieldTypes, csvSink)

tEnv.sqlUpdate("INSERT INTO s3csvTargetTransaction SELECT
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
from fromAnotherTable")


-- 
Thanks & Regards
Sri Tummala


Fwd: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

2019-07-15 Thread sri hari kali charan Tummala
Hi ,

I am trying to write flink table to streaming Sink it fails at casting Java
to Scala or Scala to Java, it fails at below step can anyone help me out ?
about this error.


val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new
Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
  new SimpleStringEncoder[Row]("UTF-8")).build()

table.addSink(sink2)


package com.aws.examples.kinesis.consumer.TransactionExample

import java.util.Properties

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.serialization.{SimpleStringEncoder,
SimpleStringSchema}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
import org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants,
ConsumerConfigConstants}
import org.apache.flink.table.api.{Table, TableEnvironment}
import com.google.gson.{Gson, JsonObject}
import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
import java.sql.{DriverManager, Time}

import com.aws.SchemaJavaClasses.Row1
import org.apache.flink.types.Row
import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.sinks.CsvTableSink
import org.apache.flink.api.java.io.jdbc
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
import org.apache.flink.table.api.java._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.table.sinks.TableSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.core.fs.Path

import scala.collection.JavaConversions._
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.java.StreamTableEnvironment
import org.apache.flink.streaming.api.datastream.DataStream
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import com.aws.customSinks.CsvCustomSink
import org.apache.flink.streaming.api.functions.sink.SinkFunction

object KinesisConsumer {

  def main(args: Array[String]): Unit = {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.createLocalEnvironment
//env.enableCheckpointing(10)

val tEnv = TableEnvironment.getTableEnvironment(env)

// Get AWS credentials
val credentialsProvider = new DefaultAWSCredentialsProviderChain
val credentials = credentialsProvider.getCredentials

// Configure Flink Kinesis consumer
val consumerConfig = new Properties
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID,
credentials.getAWSAccessKeyId)
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY,
credentials.getAWSSecretKey)
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON")

// Create Kinesis stream
val kinesis = env.addSource(new
FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(),
consumerConfig))

val mapFunction: MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]] =
  new MapFunction[String, Tuple10[String, String,
String,String,String,String,String,String,String,String]]() {

override def map(s: String): Tuple10[String, String,
String,String,String,String,String,String,String,String] = {
  val data = new Gson().fromJson(s, classOf[TransactionJsonClass])

  val csvData = data.getCc_num+","+
data.getFirst+","+
data.getLast+","+
data.getTrans_num+","+
data.getTrans_time+","+
data.getCategory+","+
data.getMerchant+","+
data.getAmt+","+
data.getMerch_lat+","+
data.getMerch_long

  //println(csvData)

  val p:Array[String] = csvData.split(",")
  var cc_num:String = p(0)
  var first:String = p(1)
  var last:String = p(2)
  var trans_num:String = p(3)
  var trans_time:String = p(4)
  var category:String = p(5)
  var merchant:String = p(6)
  var amt:String = p(7)
  var merch_lat:String = p(8)
  var merch_long:String = p(9)

  val creationDate: Time = new Time(System.currentTimeMillis())
  return new Tuple10(cc_num, first,
last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
}
  }

val data = kinesis.map(mapFunction)

//data.print()


tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,mer

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi Weng,

another issue now (Exception in thread "main"
org.apache.flink.table.api.TableException: Only tables that originate from
Scala DataStreams can be converted to Scala DataStreams.), here is the full
code
https://github.com/kali786516/FlinkStreamAndSql/blob/15e5e60d6c044bc830f5ef2d79c071389e7460d1/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L128
and pom https://github.com/kali786516/FlinkStreamAndSql/blob/master/pom.xml.

Exception in thread "main" org.apache.flink.table.api.TableException: Only
tables that originate from Scala DataStreams can be converted to Scala
DataStreams.
at
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:100)
at
com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:126)
at
com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)

tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")

val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)

table.printSchema()

import org.apache.flink.streaming.api.scala._

val test1 = tEnv.sqlQuery(query).distinct().toAppendStream[Row]

test1.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3")


On Mon, Jul 15, 2019 at 9:52 PM Caizhi Weng  wrote:

> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala  于2019年7月16日周二
> 上午3:32写道:
>
>> Hi All,
>>
>> I am trying to read data from kinesis stream and applying SQL
>> transformation (distinct) and then tryting to write to CSV sink which is
>> failinf due to this issue (org.apache.flink.table.api.TableException:
>> AppendStreamTableSink requires that Table has only insert changes.) , full
>> code is here (
>> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
>> ).
>>
>> can anyone help me moveforward on this issue?
>>
>> Full Code:-
>>
>> // set up the streaming execution environment
>> val env = StreamExecutionEnvironment.createLocalEnvironment
>> //env.enableCheckpointing(10)
>>
>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>
>> // Get AWS credentials
>> val credentialsProvider = new DefaultAWSCredentialsProviderChain
>> val credentials = credentialsProvider.getCredentials
>>
>> // Configure Flink Kinesis consumer
>> val consumerConfig = new Properties
>> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
>> credentials.getAWSAccessKeyId)
>> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
>> credentials.getAWSSecretKey)
>> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
>> "TRIM_HORIZON")
>>
>> // Create Kinesis stream
>> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", 
>> new SimpleStringSchema(), consumerConfig))
>>
>> val mapFunction: MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]] =
>>   new MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]]() {
>>
>> override def map(s: String): Tuple10[String, String, 
>> String,String,String,String,String,String,String,String] = {
>>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>>
>>   val csvData = data.getCc_num+","+
>> data.getFirst+","+
>> data.getLast+","+
>> data.getTrans_num+","+
>>

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi Lee,

I did try

Option 1:-
it writes to CSV file only if I kill the running job.

tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut3",
  FileSystem.WriteMode.OVERWRITE,"~","|")

OutPut:-
2>
(true,180094108369013,John,Holland,c1ad7a1b73172ef67bd24820438f3f93,2019-07-15
22:48:40,travel,Satterfield-Lowe,81,39.015861,-119.883595)

Option 2:-
I tried several options thought this workaround is kind of working but I
need to replace brakcets,true etc

import java.io.PrintStream
val fileOut = new
PrintStream("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut2/out.txt")

System.setOut(fileOut)

tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row]).print()

System.out.println(tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).print())


On Mon, Jul 15, 2019 at 10:03 PM JingsongLee 
wrote:

> Hi caizhi and kali:
>
> I think this table should use toRetractStream instead of toAppendStream,
> and you should handle the retract messages. (If you just use distinct, the
> message should always be accumulate message)
>
> Best, JingsongLee
>
> --
> From:Caizhi Weng 
> Send Time:2019年7月16日(星期二) 09:52
> To:sri hari kali charan Tummala 
> Cc:user 
> Subject:Re: Stream to CSV Sink with SQL Distinct Values
>
> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala  于2019年7月16日周二
> 上午3:32写道:
> Hi All,
>
> I am trying to read data from kinesis stream and applying SQL
> transformation (distinct) and then tryting to write to CSV sink which is
> failinf due to this issue (org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.) , full
> code is here (
> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
> ).
>
> can anyone help me moveforward on this issue?
>
> Full Code:-
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", 
> new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]]() {
>
> override def map(s: String): Tuple10[String, String, 
> String,String,String,String,String,String,String,String] = {
>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>   val csvData = data.getCc_num+","+
> data.getFirst+","+
> data.getLast+","+
> data.getTrans_num+","+
> data.getTrans_time+","+
> data.getCategory+","+
> data.getMerchant+","+
> data.getAmt+","+
> data.getMerch_lat+","+
> data.getMerch_long
>
>   //println(csvData)
>
>   val p:Array[String] = csvData.split(",")
>   var cc_num:String = p(0)
> 

Re: Stream to CSV Sink with SQL Distinct Values

2019-07-15 Thread sri hari kali charan Tummala
Hi Lee,

it writes only after the job is killed and also I dont see all the records
? is there a workaround?

tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
FileSystem.WriteMode.NO_OVERWRITE,"~","|")


On Mon, Jul 15, 2019 at 10:03 PM JingsongLee 
wrote:

> Hi caizhi and kali:
>
> I think this table should use toRetractStream instead of toAppendStream,
> and you should handle the retract messages. (If you just use distinct, the
> message should always be accumulate message)
>
> Best, JingsongLee
>
> --
> From:Caizhi Weng 
> Send Time:2019年7月16日(星期二) 09:52
> To:sri hari kali charan Tummala 
> Cc:user 
> Subject:Re: Stream to CSV Sink with SQL Distinct Values
>
> Hi Kali,
>
> Currently Flink treats all aggregate functions as retractable. As
> `distinct` is an aggregate function, it's considered by the planner that it
> might update or retract records (although from my perspective it won't...).
> Because csv table sink is an append only sink (it's hard to update what has
> been written in the middle of a file), the exception you mentioned occurs.
>
> However, you can use `toAppendStream` method to change the retractable
> stream to an append only stream. For example,
> `tEnv.sqlQuery(query).distinct().toAppendStream[Row]` and then you can get
> an append only stream. You can then add csv sink to this stream.
>
> sri hari kali charan Tummala  于2019年7月16日周二
> 上午3:32写道:
> Hi All,
>
> I am trying to read data from kinesis stream and applying SQL
> transformation (distinct) and then tryting to write to CSV sink which is
> failinf due to this issue (org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.) , full
> code is here (
> https://github.com/kali786516/FlinkStreamAndSql/blob/614abfc100f74bd8bb7fadb926d946f16f6ef845/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala#L112
> ).
>
> can anyone help me moveforward on this issue?
>
> Full Code:-
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.createLocalEnvironment
> //env.enableCheckpointing(10)
>
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new FlinkKinesisConsumer("credittransactions2", 
> new SimpleStringSchema(), consumerConfig))
>
> val mapFunction: MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]] =
>   new MapFunction[String, Tuple10[String, String, 
> String,String,String,String,String,String,String,String]]() {
>
> override def map(s: String): Tuple10[String, String, 
> String,String,String,String,String,String,String,String] = {
>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>   val csvData = data.getCc_num+","+
> data.getFirst+","+
> data.getLast+","+
> data.getTrans_num+","+
> data.getTrans_time+","+
> data.getCategory+","+
> data.getMerchant+","+
> data.getAmt+","+
> data.getMerch_lat+","+
> data.getMerch_long
>
>   //println(csvData)
>
>   val p:Array[String] = csvData.split(",")
>   var cc_num:String = p(0)
>   var first:String = p(1)
>   var last:String = p(2)
>   var trans_num:String = p(3)
>   var trans_time:String = p(4)
>   var category:String = p(5)
>   var merchant:String = p(6)
>   var amt:String = p(7)
>   var merch_lat:String = p(8)
>   var merch_long:String = p(9)
>
>   val creationDate: Time = new Time(System.currentTimeMillis())
>   return new Tuple10(cc_num, first, 
> last,trans_num,trans_time,category,merchant,amt,merch_lat,merch_long)
> }
>   }
>
> val data = kinesis.map(mapFunction)

Re: org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to org.apache.flink.table.api.scala.StreamTableEnvironment

2019-07-15 Thread sri hari kali charan Tummala
this is the error.

org.apache.flink.table.api.java.StreamTableEnvironment cannot be cast to
org.apache.flink.table.api.scala.StreamTableEnvironment


On Mon, Jul 15, 2019 at 9:54 PM Caizhi Weng  wrote:

> Hi Kali,
>
> What's the exception thrown or error message hinted when executing the
> erroneous step? Please print them here so that we can investigate the
> problem.
>
> sri hari kali charan Tummala  于2019年7月16日周二
> 上午4:49写道:
>
>> Hi ,
>>
>> I am trying to write flink table to streaming Sink it fails at casting
>> Java to Scala or Scala to Java, it fails at below step can anyone help me
>> out ? about this error.
>>
>>
>> val sink2:SinkFunction[Row] = StreamingFileSink.forRowFormat(new 
>> Path("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/test"),
>>   new SimpleStringEncoder[Row]("UTF-8")).build()
>>
>> table.addSink(sink2)
>>
>>
>> package com.aws.examples.kinesis.consumer.TransactionExample
>>
>> import java.util.Properties
>>
>> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>> import org.apache.flink.api.common.functions.MapFunction
>> import org.apache.flink.api.common.serialization.{SimpleStringEncoder, 
>> SimpleStringSchema}
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
>> import 
>> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, 
>> ConsumerConfigConstants}
>> import org.apache.flink.table.api.{Table, TableEnvironment}
>> import com.google.gson.{Gson, JsonObject}
>> import org.apache.flink.api.java.tuple.{Tuple10, Tuple3}
>> import java.sql.{DriverManager, Time}
>>
>> import com.aws.SchemaJavaClasses.Row1
>> import org.apache.flink.types.Row
>> import org.apache.flink.api.java.io.jdbc.{JDBCInputFormat, JDBCOutputFormat}
>> import org.apache.flink.table.api.scala._
>> import org.apache.flink.table.sinks.CsvTableSink
>> import org.apache.flink.api.java.io.jdbc
>> import 
>> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.JDBCInputFormatBuilder
>> import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat
>> import org.apache.flink.table.api.java._
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.api.java.DataSet
>> import org.apache.flink.table.sinks.TableSink
>> import com.aws.customSinks.CsvCustomSink
>> import org.apache.flink.core.fs.Path
>>
>> import scala.collection.JavaConversions._
>> import org.apache.flink.table.sources.CsvTableSource
>> import org.apache.flink.table.api.Table
>> import org.apache.flink.table.api.TableEnvironment
>> import org.apache.flink.table.api.java.StreamTableEnvironment
>> import org.apache.flink.streaming.api.datastream.DataStream
>> import 
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
>> import com.aws.customSinks.CsvCustomSink
>> import org.apache.flink.streaming.api.functions.sink.SinkFunction
>>
>> object KinesisConsumer {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> // set up the streaming execution environment
>> val env = StreamExecutionEnvironment.createLocalEnvironment
>> //env.enableCheckpointing(10)
>>
>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>
>> // Get AWS credentials
>> val credentialsProvider = new DefaultAWSCredentialsProviderChain
>> val credentials = credentialsProvider.getCredentials
>>
>> // Configure Flink Kinesis consumer
>> val consumerConfig = new Properties
>> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
>> credentials.getAWSAccessKeyId)
>> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
>> credentials.getAWSSecretKey)
>> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
>> "TRIM_HORIZON")
>>
>> // Create Kinesis stream
>> val kinesis = env.addSource(new 
>> FlinkKinesisConsumer("credittransactions2", new SimpleStringSchema(), 
>> consumerConfig))
>>
>> val mapFunction: MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]] =
>>   new MapFunction[String, Tuple10[String, String, 
>> String,String,String,String,String,String,String,String]]() {
>>
>> override def map(s: String): T

Fwd: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.

2019-07-16 Thread sri hari kali charan Tummala
>
>
> Hi All,
>
> I am trying to convert sql query results value to distinct and writing to
> CSV which is failing with below error.
>
> *Exception in thread "main" org.apache.flink.table.api.TableException:
> Only tables that originate from Scala DataStreams can be converted to Scala
> DataStreams.*
>
>
> * at
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:145)
> at
> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer$.main(KinesisConsumer.scala:153)
> at
> com.aws.examples.kinesis.consumer.TransactionExample.KinesisConsumer.main(KinesisConsumer.scala)*
>
> Code Example:-
>
> val data = kinesis.map(mapFunction)
> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
> val query = "SELECT distinct 
> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>  FROM transactions where cc_num not in ('cc_num')"
> val table = tEnv.sqlQuery(query)
> import org.apache.flink.streaming.api.scala._
> tEnv.sqlQuery(query).distinct().toRetractStream[Row]
>   
> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
> FileSystem.WriteMode.NO_OVERWRITE,"~","|")
>
>
>
Thanks & Regards
Sri Tummala


Re: org.apache.flink.table.api.TableException: Only tables that originate from Scala DataStreams can be converted to Scala DataStreams.

2019-07-16 Thread sri hari kali charan Tummala
, merch_long)
}
  }

val data = kinesis.map(mapFunction)
tEnv.registerDataStream("transactions", data,
"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
val query = "SELECT distinct
cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num')"
val table = tEnv.sqlQuery(query)
import org.apache.flink.streaming.api.scala._
tEnv.sqlQuery(query).distinct().toRetractStream[Row]
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut4",
FileSystem.WriteMode.NO_OVERWRITE, "~", "|")

env.execute()
  }
}

*POM:-*


http://maven.apache.org/POM/4.0.0";
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0

FlinkStreamAndSql
FlinkStreamAndSql
1.0-SNAPSHOT

src/main/scala
src/test/scala



net.alchim31.maven
scala-maven-plugin
3.1.3



compile
testCompile







org.apache.maven.plugins
maven-surefire-plugin
2.13

false
true



**/*Test.*
**/*Suite.*






maven-assembly-plugin
2.4.1


jar-with-dependencies




make-assembly
package

single









org.apache.flink
flink-core
1.8.1



org.apache.flink
flink-core
1.8.1



org.apache.flink
flink-clients_2.11
1.8.1



org.apache.derby
derby
10.13.1.1



org.apache.flink
flink-jdbc_2.11
1.8.1



org.apache.flink
flink-table-api-scala_2.11
1.8.1



org.apache.flink
flink-table-api-java
1.8.1




org.apache.flink
flink-table
1.8.1



org.apache.flink
flink-table-planner_2.11
1.8.1




org.apache.flink
flink-json
1.8.1



org.apache.flink
flink-scala_2.11
1.8.1


   
   org.apache.flink
   flink-scala_2.11
   1.8.1
   

   
   org.apache.flink
   flink-streaming-scala_2.11
   1.8.1
   

   
   org.apache.flink
   flink-connector-kinesis_2.11
   1.8.0
   

   
   org.apache.flink
   flink-connector-kafka-0.11_2.11
   1.8.1
   

   
   com.amazonaws
   amazon-kinesis-client
   1.8.8
   

   
   com.amazonaws
   aws-java-sdk-kinesis
   1.11.579
   

   
   commons-dbcp
   commons-dbcp
   1.2.2
   
   
   com.google.code.gson
   gson
   2.1
   

   
   commons-cli
   commons-cli
   1.4
   

   

org.apache.commons
commons-csv
1.7



org.apache.commons
commons-compress
1.4.1



com.amazonaws
dynamodb-streams-kinesis-adapter
1.4.0



    com.amazonaws
    dynamodb-streams-kinesis-adapter
1.4.0


 

Re: table toRetractStream missing last record and adding extra column (True)

2019-07-16 Thread sri hari kali charan Tummala
windows for question 1 or question 2 or both ?

Thanks
Sri

On Tue, Jul 16, 2019 at 12:25 PM taher koitawala  wrote:

> Looks like you need a window
>
> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi All,
>>
>> I am trying to write toRetractSream to CSV which is kind of working ok
>> but I get extra values like True and then my output data values.
>>
>> Question1 :-
>> I dont want true in my output data how to achieve this?
>>
>> Scree
>>
>> Question 2:-
>> in the output file (CSV) I am missing data in the last line is the
>> toRetractStram closing before writing to file?
>>
>> Screen Shot attached
>>
>> Code:-
>>
>> val data = kinesis.map(mapFunction)
>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>> val query = "SELECT distinct 
>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>  FROM transactions where cc_num not in ('cc_num')"
>> val table = tEnv.sqlQuery(query)
>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
>>   
>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>> FileSystem.WriteMode.OVERWRITE,"\n","|")
>>
>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Trying to Convert Tuple[Boolean,Row] to [Row]

2019-07-16 Thread sri hari kali charan Tummala
Hi All,

I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am
getting this error asking me for InferedR , what is InferedR in FLink?

  val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
  override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
t.f1
  }
  /*override def map(t: tuple.Tuple2[Boolean, Row], collector:
Collector[Object]): Unit = {
collector.collect(t.f1)
  }
*/
}

tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).map(mymapFunction)
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
  FileSystem.WriteMode.OVERWRITE,"\n","|")

and when I try to I get a different type of error.




*Error:(143, 74) type mismatch; found   :
org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
required:
org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
  tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).map(mymapFunction)*


-- 
Thanks & Regards
Sri Tummala


Re: table toRetractStream missing last record and adding extra column (True)

2019-07-17 Thread sri hari kali charan Tummala
Question 1:-

I did tired map function end up having issue (
https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i
)

I am trying to convert a Tuple[Boolean,Row] to Row using map function, I am
getting this error asking me for InferedR , what is InferedR in FLink?

  val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
  override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
t.f1
  }
  /*override def map(t: tuple.Tuple2[Boolean, Row], collector:
Collector[Object]): Unit = {
collector.collect(t.f1)
  }
*/
}

tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).map(mymapFunction)
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
  FileSystem.WriteMode.OVERWRITE,"\n","|")

and when I try to I get a different type of error.




*Error:(143, 74) type mismatch; found   :
org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
required:
org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
  tEnv.toRetractStream(table,
classOf[org.apache.flink.types.Row]).map(mymapFunction)*

*Question 2:- *
*I dont have any source data issue, to regenerate this issue for testing
its simple.*

*create a kinesis stream *
*run the producer *
https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/producer/TransactionExample/TransactionProducer.scala

then run the consumer:-
https://github.com/kali786516/FlinkStreamAndSql/blob/master/src/main/scala/com/aws/examples/kinesis/consumer/TransactionExample/KinesisConsumer.scala

Thanks
Sri







On Wed, Jul 17, 2019 at 10:03 AM Hequn Cheng  wrote:

> Hi Sri,
>
> Question1:
> You can use a map to filter the "true", i.e, ds.map(_._2).
> Note, it's ok to remove the "true" flag for distinct as it does not
> generate updates. For other query contains updates, such as a non-window
> group by, we should not filter the flag or the result is not correct.
>
> Question 2:
> I can't reproduce this problem in my local environment. Maybe there is
> something wrong with the source data?
>
> Best, Hequn
>
> On Wed, Jul 17, 2019 at 12:53 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> windows for question 1 or question 2 or both ?
>>
>> Thanks
>> Sri
>>
>> On Tue, Jul 16, 2019 at 12:25 PM taher koitawala 
>> wrote:
>>
>>> Looks like you need a window
>>>
>>> On Tue, Jul 16, 2019, 9:24 PM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am trying to write toRetractSream to CSV which is kind of working ok
>>>> but I get extra values like True and then my output data values.
>>>>
>>>> Question1 :-
>>>> I dont want true in my output data how to achieve this?
>>>>
>>>> Scree
>>>>
>>>> Question 2:-
>>>> in the output file (CSV) I am missing data in the last line is the
>>>> toRetractStram closing before writing to file?
>>>>
>>>> Screen Shot attached
>>>>
>>>> Code:-
>>>>
>>>> val data = kinesis.map(mapFunction)
>>>> tEnv.registerDataStream("transactions",data,"cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long")
>>>> val query = "SELECT distinct 
>>>> cc_num,first_column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
>>>>  FROM transactions where cc_num not in ('cc_num')"
>>>> val table = tEnv.sqlQuery(query)
>>>> tEnv.toRetractStream(table, classOf[org.apache.flink.types.Row])
>>>>   
>>>> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>>>> FileSystem.WriteMode.OVERWRITE,"\n","|")
>>>>
>>>>
>>>>
>>>> --
>>>> Thanks & Regards
>>>> Sri Tummala
>>>>
>>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Re: table toRetractStream missing last record and adding extra column (True)

2019-07-17 Thread sri hari kali charan Tummala
column,last_column,trans_num,trans_time,category_column,merchant_column,amt_column,merch_lat,merch_long
FROM transactions where cc_num not in ('cc_num') "
val table = tEnv.sqlQuery(query)

table
  .toRetractStream(TypeInformation.of(classOf[Row]))
  .map(_._2)
  
.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",FileSystem.WriteMode.OVERWRITE)

table.printSchema()

table.toRetractStream(TypeInformation.of(classOf[Row])).print()

env.execute()

/*


table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
  FileSystem.WriteMode.OVERWRITE,
  "\n","|")

val test = table.toRetractStream(TypeInformation.of(classOf[Row])).map(_._2)


test.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut123",FileSystem.WriteMode.OVERWRITE)


test.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut122",
  FileSystem.WriteMode.OVERWRITE,
  "\n","|")

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.typeinfo.TypeInformation
implicit val typeInfo = TypeInformation.of(classOf[Row])

val ds = table.toRetractStream(TypeInformation.of(classOf[Row]))


ds.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",FileSystem.WriteMode.OVERWRITE,
"\n","|")

tEnv.toRetractStream(table,
TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut15",
  FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")


table.distinct().writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
  "\n","|")

import org.apache.flink.api.common.time.Time
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.common.typeinfo.TypeInformation

implicit val typeInfo = TypeInformation.of(classOf[Row])


table.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
  FileSystem.WriteMode.OVERWRITE, "\n", "|")


table.toRetractStream(TypeInformation.of(classOf[Row])).writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
  FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

ds.
  
writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.NO_OVERWRITE, "\n", "|")

tEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6))

tEnv.toRetractStream(table)
  
.writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
FileSystem.WriteMode.OVERWRITE, "\n", "|")

tEnv.toRetractStream(table,classOf[T])

*/

  }

}







On Wed, Jul 17, 2019 at 10:11 AM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Question 1:-
>
> I did tired map function end up having issue (
> https://stackoverflow.com/questions/57063249/flink-scala-notinferedr-in-scala-type-mismatch-mapfunctiontuple2boolean-row-i
> )
>
> I am trying to convert a Tuple[Boolean,Row] to Row using map function, I
> am getting this error asking me for InferedR , what is InferedR in FLink?
>
>   val mymapFunction: MapFunction[tuple.Tuple2[Boolean, Row],AnyVal] =
> new MapFunction[tuple.Tuple2[Boolean, Row],AnyVal]() {
>   override def map(t: tuple.Tuple2[Boolean, Row]): Row = {
> t.f1
>   }
>   /*override def map(t: tuple.Tuple2[Boolean, Row], collector: 
> Collector[Object]): Unit = {
> collector.collect(t.f1)
>   }
> */
> }
>
> tEnv.toRetractStream(table, 
> classOf[org.apache.flink.types.Row]).map(mymapFunction)
>   
> .writeAsCsv("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut8",
>   FileSystem.WriteMode.OVERWRITE,"\n","|")
>
> and when I try to I get a different type of error.
>
>
>
>
> *Error:(143, 74) type mismatch; found   :
> org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[scala.Boolean,org.apache.flink.types.Row],AnyVal]
>  required:
> org.apache.flink.api.common.functions.MapFunction[org.apache.flink.api.java.tuple.Tuple2[java.lang.Boolean,org.apache.flink.types.Row],?]
>   tEnv.toRetractStream(table,
> classOf[org.apache.flink.types.Row]).map(mymapFunction)*
>
> *Question 2:- *
> *I dont have any source data issue, to regenerate this issue for testing
>

Re: table toRetractStream missing last record and adding extra column (True)

2019-07-17 Thread sri hari kali charan Tummala
yes even the delimiter can be replaced, have to test what happens if the
data itself has a comma in it I need to test.

table.toRetractStream(TypeInformation.of(classOf[Row]))
  .map(_._2.toString.replaceAll(",","~"))
  
.writeAsText("/Users/kalit_000/Downloads/FlinkStreamAndSql/src/main/resources/csvOut125",
FileSystem.WriteMode.OVERWRITE)


On Wed, Jul 17, 2019 at 6:47 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Amazing all issues resolved in one go thanks Cheng , one issue though I
> can't write map.(_._2) to CSV looks like it doesn't support right now have
> to be TextFile.
>
> below is a full code if someone wants in Scala.
>
> Git Code is here:-
> https://github.com/kali786516/FlinkStreamAndSql
>
> package com.aws.examples.kinesis.consumer.transactionExampleScala
>
> import java.util.Properties
> import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
> import 
> com.aws.examples.kinesis.consumer.TransactionExample.TransactionJsonClass
> import com.google.gson.Gson
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.types.Row
> import org.apache.flink.streaming.api.scala.{DataStream, 
> StreamExecutionEnvironment}
> import org.apache.flink.table.api.scala.StreamTableEnvironment
> import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer
> import 
> org.apache.flink.streaming.connectors.kinesis.config.{AWSConfigConstants, 
> ConsumerConfigConstants}
> import org.apache.flink.api.scala._
> import org.apache.flink.table.api.scala._
> import java.sql.{DriverManager, Time}
> import org.apache.flink.api.common.typeinfo.TypeInformation
> import org.apache.flink.core.fs.{FileSystem, Path}
>
> object TransactionScalaTest {
>
>   /*
>   extends RetractStreamTableSink[Row]
>   override def configure(strings: Array[String], typeInformations: 
> Array[TypeInformation[_]]): TableSink[tuple.Tuple2[lang.Boolean, Row]] = ???
>
>   override def getFieldNames: Array[String] = ???
>
>   override def getFieldTypes: Array[TypeInformation[_]] = ???
>
>   override def emitDataStream(dataStream: 
> DataStream[tuple.Tuple2[lang.Boolean, Row]]): Unit = ???
>
>   override def getOutputType: TupleTypeInfo[tuple.Tuple2[lang.Boolean, Row]] 
> = super.getOutputType
>
>   override def getRecordType: TypeInformation[Row] = ???
>
>*/
>
>   def main(args: Array[String]): Unit = {
>
>
>
> // set up the streaming execution environment
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> //env.enableCheckpointing(1)
>
> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
>
> // Get AWS credentials
> val credentialsProvider = new DefaultAWSCredentialsProviderChain
> val credentials = credentialsProvider.getCredentials
>
> // Configure Flink Kinesis consumer
> val consumerConfig = new Properties
> consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
> consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
> credentials.getAWSAccessKeyId)
> consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> credentials.getAWSSecretKey)
> consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, 
> "TRIM_HORIZON")
>
> // Create Kinesis stream
> val kinesis = env.addSource(new 
> FlinkKinesisConsumer("credittransactions3", new SimpleStringSchema(), 
> consumerConfig))
>
> val mapFunction: MapFunction[String, (String, String, String, String, 
> String, String, String, String, String, String)] =
>   new MapFunction[String, (String, String, String, String, String, 
> String, String, String, String, String)]() {
>
> override def map(s: String): (String, String, String, String, String, 
> String, String, String, String, String) = {
>
>   val data = new Gson().fromJson(s, classOf[TransactionJsonClass])
>
>   val csvData = data.getCc_num + "," +
> data.getFirst + "," +
> data.getLast + "," +
> data.getTrans_num + "," +
> data.getTrans_time + "," +
> data.getCategory + "," +
> data.getMerchant + "," +
> data.getAmt + "," +
> data.getMerch_lat + "," +
> data.getMerch_long
>
>   //println(csvData)
>
>   val p: Array[String] = csvData.split(",")
>   var cc_num: String = p(0)
>   var first: String = p(1)
> 

Re: Is it possible to decide the order of where conditions in Flink SQL

2019-07-26 Thread sri hari kali charan Tummala
try cte common table expressions if it supports or sql subquery.

On Fri, Jul 26, 2019 at 1:00 PM Fanbin Bu  wrote:

> how about move query db filter to the outer select.
>
> On Fri, Jul 26, 2019 at 9:31 AM Tony Wei  wrote:
>
>> Hi,
>>
>> If I have multiple where conditions in my SQL, is it possible to specify
>> its order, so that the query
>> can be executed more efficiently?
>>
>> For example, if I have the following SQL, it used a heavy UDF that needs
>> to access database.
>> However, if I can specify the order of conditions is executing
>> `!user.is_robot` first then executing
>> UDF, it will reduce the number of database access. Those records with
>> `true` in `user.is_robot` will
>> be dropped earlier and don't need to access database.
>>
>> select *
>>
>> from users
>>
>> where !user.is_robot and UDF_NEED_TO_QUERY_DB(user)
>>
>>
>> Thanks,
>> Tony Wei
>>
>

-- 
Thanks & Regards
Sri Tummala


FlinkDynamoDBStreamsConsumer Atleast Once

2019-08-07 Thread sri hari kali charan Tummala
Hi Flink Experts,

how to achieve at least once semantics with FlinkDynamoDBStreamsConsumer +
DynamoDB Streams ? Flink checkpointing or save points do the job?

My Scenario:-
Flink application uses FlinkDynamoDBStreamsConsumer which reads latest
changes from DynamoDB streams but if my software fails and the application
was down for 30 minutes due to some reason can Flink application read
missing changes from past 30 minutes from Dyn stream?

Is it better to write dyn stream changes to kafka or kinesis topic and ask
flink application to read from kafka or kinesis topic which will let's
flink application to read from last successful record using Kafka offsets
for example?

What would be the best architecture to move forward?

-- 
Thanks & Regards
Sri Tummala


Re: What is the recommended way to run flink with high availability on AWS?

2019-08-21 Thread sri hari kali charan Tummala
Ok, no problem.

On Wed, Aug 21, 2019 at 12:22 AM Pei HE  wrote:

> Thanks Kali for the information. However, it doesn't work for me, because
> I need features in Flink 1.7.x or later and use manged Amazon MSK.
> --
> Pei
>
>
>
> On Tue, Aug 20, 2019 at 7:17 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> check this out AWS managed Flink cluster in aws kinesis, in AWS kinesis
>> try to create Kinesis data analytics application using flink instead of sql.
>>
>> On Tue, Aug 20, 2019 at 9:28 PM Pei HE  wrote:
>>
>>> Hi there,
>>> Looking at Flink document [1], EMR is the recommended way to run Flink
>>> on AWS. However, Flink is currently not supported in an EMR cluster with
>>> multiple master nodes [2].
>>>
>>> For example, I can create a EMR HA cluster with Zookeeper, Hive, etc.
>>> But, if I add Flink to the application list when I create the cluster, I
>>> got the following error from aws cli:
>>> "An error occurred (ValidationException) when calling the RunJobFlow
>>> operation: HA clusters do not support the specified applications: Flink.
>>> Revise the applications and resubmit.".
>>>
>>> My question is what is the recommended way to run flink with high
>>> availability on AWS.
>>>
>>> Thanks
>>> --
>>> Pei
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html
>>> [2]:
>>> https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-ha-applications.html
>>>
>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Re: Is Scala the best language for Flink?

2022-01-28 Thread sri hari kali charan Tummala
Yes Scala is the best.

On Fri, Jan 28, 2022, 9:57 AM Nicolás Ferrario 
wrote:

> Hi Seb.
>
> In my team we are migrating things to Kotlin because we find it much
> easier to deal with. It's like the best of both worlds, but you do give up
> on Flink Scala serializers, since the only way to get Kotlin Data Classes
> working is by making them a POJO (or implementing your own TypeInfo). You
> do at least gain Evolution support.
>
> We've found that Kotlin + Gradle are a great combination, and we end up
> with quick compile times, a much simpler build config (compared to Maven
> and SBT).
> Another thing is that we've found that the macro createTypeInformation can
> be dangerous, because sometimes it may fallback to Kryo on classes you
> don't expect it to. Those implicit behaviors we see in Scala have caused us
> some headaches and we prefer to explicitly type things in the Data Stream.
> Just personal preference, but pointing it out in case it's useful to
> someone.
>
> Hope this helps!
>
> On Mon, Jan 24, 2022 at 7:15 AM seb  wrote:
>
>> Hi there,
>>
>> I am getting started with Apache Flink. I am curious whether there is a
>> clear winner between developing in either Scala or Java.
>>
>> It sounds like Flink is typically slower to support new versions of Scala
>> and that Java development might have fewer quirks.
>>
>> What do you think? I have experience coding in Scala, but I am more than
>> happy to learn Java.
>>
>> Thanks in advance for sharing your thoughts!
>>
>> Best,
>> Sebastian
>>
>


Flink/Scala contract positions ?

2022-06-03 Thread sri hari kali charan Tummala
Hi Folks,

Is anyone hiring for Flink or Scala Akka contract corp to corp positions ?
I am open in market looking for work in Scala Spark or Flink Scala or Scala
Akka world.

Thanks
Sri


Re: Flink/Scala contract positions ?

2022-06-03 Thread sri hari kali charan Tummala
Hi Jing,

thanks for reaching back are you offering a contract Job to fix Flink Scala
to fix Scala API for Flink? yes, I would be interested we can talk if you
want to schedule some time?

Thanks
Sri

On Fri, Jun 3, 2022 at 9:00 AM Jing Ge  wrote:

> Hi,
>
> Currently, the Flink Scala API is not in a good shape. Would you like to
> start from there?
>
> Best regards,
> Jing
>
> On Fri, Jun 3, 2022 at 4:29 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi Folks,
>>
>> Is anyone hiring for Flink or Scala Akka contract corp to corp positions
>> ? I am open in market looking for work in Scala Spark or Flink Scala or
>> Scala Akka world.
>>
>> Thanks
>> Sri
>>
>

-- 
Thanks & Regards
Sri Tummala


Re: Need help to join Apache Flink community on.Slack

2022-06-05 Thread sri hari kali charan Tummala
Hi Jing,

Please add me kali.tumm...@gmail.com.

Thanks
Sri

On Sat, Jun 4, 2022 at 4:47 PM Jing Ge  wrote:

> Hi Santhosh,
>
> just invited you. Please check your email. Looking forward to knowing your
> story! Thanks!
>
> To anyone else who wants to join, please send an email to
> user@flink.apache.org, you might have a better chance to get the invite.
> Thanks.
>
> Regards,
> Jing
>
> On Sat, Jun 4, 2022 at 10:37 PM santhosh venkat <
> santhoshvenkat1...@gmail.com> wrote:
>
>> Hi,
>>
>> Can you please invite me to join the apache flink slack community
>> channel. We have adopted apache flink and would like to participate in the
>> community forum.
>>
>> Thank you.
>>
>> Regards
>>
>

-- 
Thanks & Regards
Sri Tummala


Flink source Code Explanation

2022-06-05 Thread sri hari kali charan Tummala
Hi Flink Community,

I want to go through flink source code in my free time is there a document
that I can go through that explains to me where to start? other than Java
doc is there anything else to start my reserve engineering.

Thanks & Regards
Sri Tummala


Re: Flink source Code Explanation

2022-06-05 Thread sri hari kali charan Tummala
I am getting a connection timed out error in firefox and google chrome can
you double-check whether the weblink is working or not?

Thanks
Sri




On Sun, Jun 5, 2022 at 7:01 PM Jing Ge  wrote:

> Hi Sri,
>
> Flink is very well documented. You can find it under e.g.
> https://nightlies.apache.org/flink/flink-docs-master/
>
> Best regards,
> Jing
>
> On Mon, Jun 6, 2022 at 3:39 AM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi Flink Community,
>>
>> I want to go through flink source code in my free time is there a
>> document that I can go through that explains to me where to start? other
>> than Java doc is there anything else to start my reserve engineering.
>>
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Flink config driven tool ?

2022-06-07 Thread sri hari kali charan Tummala
Hi Flink Community,

can someone point me to a good config-driven flink data movement tool
Github repos? Imagine I build my ETL dag connecting source -->
transformations --> target just using a config file.

below are a few spark examples:-
https://github.com/mvrpl/big-shipper
https://github.com/BitwiseInc/Hydrograph

Thanks & Regards
Sri Tummala


Re: Flink config driven tool ?

2022-06-07 Thread sri hari kali charan Tummala
thanks but looks like a spark tool is there something similar in flink?

Thanks
Sri

On Tue, Jun 7, 2022 at 12:07 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey there,
>
> No idea if it's any good, but just saw Apache SeaTunnel[1] today which
> seems to fit your requirements.
>
> Best,
> Austin
>
> [1]: https://seatunnel.apache.org/
>
> On Tue, Jun 7, 2022 at 2:19 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi Flink Community,
>>
>> can someone point me to a good config-driven flink data movement tool
>> Github repos? Imagine I build my ETL dag connecting source -->
>> transformations --> target just using a config file.
>>
>> below are a few spark examples:-
>> https://github.com/mvrpl/big-shipper
>> https://github.com/BitwiseInc/Hydrograph
>>
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Re: Flink config driven tool ?

2022-06-07 Thread sri hari kali charan Tummala
Thanks, I'll check it out.

On Tue, Jun 7, 2022, 1:31 PM Austin Cawley-Edwards 
wrote:

> They support Flink as well. Looks like they even support the new Flink k8s
> operator.[1]
>
> Austin
>
> [1]:
> https://seatunnel.apache.org/docs/2.1.1/start/kubernetes#deploying-the-operator
>
> On Tue, Jun 7, 2022 at 3:11 PM sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> thanks but looks like a spark tool is there something similar in flink?
>>
>> Thanks
>> Sri
>>
>> On Tue, Jun 7, 2022 at 12:07 PM Austin Cawley-Edwards <
>> austin.caw...@gmail.com> wrote:
>>
>>> Hey there,
>>>
>>> No idea if it's any good, but just saw Apache SeaTunnel[1] today which
>>> seems to fit your requirements.
>>>
>>> Best,
>>> Austin
>>>
>>> [1]: https://seatunnel.apache.org/
>>>
>>> On Tue, Jun 7, 2022 at 2:19 PM sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> Hi Flink Community,
>>>>
>>>> can someone point me to a good config-driven flink data movement tool
>>>> Github repos? Imagine I build my ETL dag connecting source -->
>>>> transformations --> target just using a config file.
>>>>
>>>> below are a few spark examples:-
>>>> https://github.com/mvrpl/big-shipper
>>>> https://github.com/BitwiseInc/Hydrograph
>>>>
>>>> Thanks & Regards
>>>> Sri Tummala
>>>>
>>>>
>>
>> --
>> Thanks & Regards
>> Sri Tummala
>>
>>


Re: Flink config driven tool ?

2022-06-15 Thread sri hari kali charan Tummala
I would have helped you if it was written in Scala instead of Java.

On Wed, Jun 15, 2022 at 2:22 AM Rakshit Ramesh <
rakshit.ram...@datakaveri.org> wrote:

> I'm working on such a thing.
> It's in early stages and needs a lot more work.
> I'm open to collaborating.
> https://github.com/datakaveri/iudx-adaptor-framework
>
> On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi Flink Community,
>>
>> can someone point me to a good config-driven flink data movement tool
>> Github repos? Imagine I build my ETL dag connecting source -->
>> transformations --> target just using a config file.
>>
>> below are a few spark examples:-
>> https://github.com/mvrpl/big-shipper
>> https://github.com/BitwiseInc/Hydrograph
>>
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Re: Flink config driven tool ?

2022-06-20 Thread sri hari kali charan Tummala
found one more flink tool
https://www.splunk.com/en_us/products/stream-processing.html

On Wed, Jun 15, 2022 at 1:18 PM Jing Ge  wrote:

> Hi,
>
> Just like Shengkai mentioned. I would strongly suggest trying SQL for ETL
> dag. If you find anything that SQL does not work for you, please share your
> requirements with us. We might check if it makes sense to build new
> features in Flink to support them.
>
> Best regards,
> Jing
>
>
> On Wed, Jun 15, 2022 at 11:22 AM Rakshit Ramesh <
> rakshit.ram...@datakaveri.org> wrote:
>
>> I'm working on such a thing.
>> It's in early stages and needs a lot more work.
>> I'm open to collaborating.
>> https://github.com/datakaveri/iudx-adaptor-framework
>>
>> On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi Flink Community,
>>>
>>> can someone point me to a good config-driven flink data movement tool
>>> Github repos? Imagine I build my ETL dag connecting source -->
>>> transformations --> target just using a config file.
>>>
>>> below are a few spark examples:-
>>> https://github.com/mvrpl/big-shipper
>>> https://github.com/BitwiseInc/Hydrograph
>>>
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>

-- 
Thanks & Regards
Sri Tummala


Re: Flink config driven tool ?

2022-06-22 Thread sri hari kali charan Tummala
this looks interesting Nasa Akka project https://github.com/NASARace/race



On Mon, Jun 20, 2022 at 7:06 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> found one more flink tool
> https://www.splunk.com/en_us/products/stream-processing.html
>
> On Wed, Jun 15, 2022 at 1:18 PM Jing Ge  wrote:
>
>> Hi,
>>
>> Just like Shengkai mentioned. I would strongly suggest trying SQL for ETL
>> dag. If you find anything that SQL does not work for you, please share your
>> requirements with us. We might check if it makes sense to build new
>> features in Flink to support them.
>>
>> Best regards,
>> Jing
>>
>>
>> On Wed, Jun 15, 2022 at 11:22 AM Rakshit Ramesh <
>> rakshit.ram...@datakaveri.org> wrote:
>>
>>> I'm working on such a thing.
>>> It's in early stages and needs a lot more work.
>>> I'm open to collaborating.
>>> https://github.com/datakaveri/iudx-adaptor-framework
>>>
>>> On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala <
>>> kali.tumm...@gmail.com> wrote:
>>>
>>>> Hi Flink Community,
>>>>
>>>> can someone point me to a good config-driven flink data movement tool
>>>> Github repos? Imagine I build my ETL dag connecting source -->
>>>> transformations --> target just using a config file.
>>>>
>>>> below are a few spark examples:-
>>>> https://github.com/mvrpl/big-shipper
>>>> https://github.com/BitwiseInc/Hydrograph
>>>>
>>>> Thanks & Regards
>>>> Sri Tummala
>>>>
>>>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>

-- 
Thanks & Regards
Sri Tummala


Big Data Contract Roles ?

2022-09-14 Thread sri hari kali charan Tummala
Hi Flink Users/ Spark Users,

Is anyone hiring contract corp to corp big Data spark scala or Flink scala
roles ?


Thanks
Sri


Spark Scala Contract Opportunity @USA

2022-11-10 Thread sri hari kali charan Tummala
Hi All,

Is anyone looking for a spark scala contract role inside the USA? A company
called Maxonic has an open spark scala contract position (100% remote)
inside the USA if anyone is interested, please send your CV to
kali.tumm...@gmail.com.

Thanks & Regards
Sri Tummala


Flink India Jobs Refferal

2023-12-20 Thread sri hari kali charan Tummala
Hi Community,

I got laid off at Apple in Feb 2023 which forced me move out of USA due to
immigration problem (h1b) I was a Big Data,Spark,Scala,Python and Flink
consultant with over 12+ years of experience.

I am still haven't landed in a job in India since then I need referrals in
India in product firms (non-consulting please) who uses Flink or Spark or
Scala or Big Data or ML & AI.

Can someone help me with this?


Thanks
Sri

On Thu, Dec 21, 2023, 7:23 AM Xuyang  wrote:

> Hi, Praveen Chandna.
> I don't know much about this plan, you can ask on the dev mailing list. [1]
>
> [1]https://flink.apache.org/what-is-flink/community/
>
> --
> Best!
> Xuyang
>
>
> 在 2023-12-20 14:53:52,"Praveen Chandna via user" 
> 写道:
>
> Hello Xuyang
>
>
>
> One more query, is there plan to release formal Java 17 support in Flink
> release 1.19 or 1.20 ? Or do we need to wait till Flink 2.0 ?
>
>
>
> Thanks !!
>
>
>
> *From:* Xuyang 
> *Sent:* 19 December 2023 16:37
> *To:* user@flink.apache.org
> *Subject:* Re:RE: Flink - Java 17 support
>
>
>
> Hi, Praveen Chandna.
>
>
>
> Please correct me if I'm wrong. From the release note of Flink 1.18 [1],
> you can see that although 1.18 already supports Java 17, it is still in
> beta mode.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/zh/release-notes/flink-1.18/
>
> --
>
> Best!
>
> Xuyang
>
>
>
> 在 2023-12-19 18:32:48,"Praveen Chandna via user" 
> 写道:
>
> Hello Team
>
>
>
> Please helps to reply on Java 17 support. Does the Java 17 experimental
> support is production ready ?
>
> Is it recommended to use Flink 1.18 with Java 17 or is there any risk if
> we move to Java 17 in production ?
>
>
>
> Thanks !!
>
>
>
> *From:* Praveen Chandna
> *Sent:* 17 December 2023 21:28
> *To:* Praveen Chandna via user 
> *Subject:* Flink - Java 17 support
>
>
>
> Hello
>
>
>
> In the Flink version 1.18, there is experimental support for Java 17. What
> is the plan for Java 17 supported release of Flink ?
>
>
>
> As the below Jira is already closed for Java 17 support.
>
> https://issues.apache.org/jira/browse/FLINK-15736
>
>
>
> Thanks !!
>
>
>
> // Regards
>
> Praveen Chandna
>
>
>
>


Flink Scala Positions in India or USA !

2024-02-21 Thread sri hari kali charan Tummala
Hi Folks,

I am currently seeking full-time positions in Flink Scala in India or the
USA (non consulting) , specifically at the Principal or Staff level
positions in India or USA.

I require an h1b transfer and assistance with relocation from India , my
i40 is approved.

Thanks & Regards
Sri Tummala


Job Opportunities in India or UK with Tier 2 Sponsorship

2024-08-26 Thread sri hari kali charan Tummala
Hi Flink Community,

I'm a Hands on Apache Flink Software Engineer looking for job opportunities
in India or the UK (with Tier 2 sponsorship). If anyone knows of openings
or can point me in the right direction, please let me know.

Thanks,
Sri Tummala