[ANNOUNCE] Apache Flink Kafka Connectors 3.0.2 released

2023-12-01 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Kafka Connectors 3.0.2. This release is compatible with the Apache
Flink 1.17.x and 1.18.x release series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353768

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gordon


Re: dependency error with latest Kafka connector

2023-11-25 Thread Tzu-Li (Gordon) Tai
No problem, glad to hear that it's working now!

With release candidates, we always publish the url for staged artifacts in
the release candidate vote threads so that you can point your code to
compile against those for testing purposes.

Would be great to have your +1 on the vote thread for 3.0.2 Kafka connector.

Best,
Gordon

On Sat, Nov 25, 2023, 10:14 bobobabo  wrote:

> Thanks Gordon!
>
> I didn't know the name of the repository
> https://repository.apache.org/content/repositories/orgapacheflink-1675/
> Additionally something learned.
>
> Yes, with the new version I can add the dependency
> "org.apache.flink" % "flink-connector-kafka" % "3.0.2-1.18",
>
>
> and compile it without any errors.
>
> Günter
>
>
> On 25.11.23 17:40, Tzu-Li (Gordon) Tai wrote:
> > Hi Günter,
> >
> > With Maven you'd list the staged repository holding the RC artifacts as a
> > repository:
> >
> > ```
> > 
> > 
> >test_kafka_rc
> >Apache Flink Kafka Connector v3.0.2
> >
> > https://repository.apache.org/content/repositories/orgapacheflink-1675/
> > 
> > 
> > 
> > ```
> >
> > With SBT, I think the equivalent is using Resolvers [1]:
> >
> > ```
> > resolvers += "Apache Flink Kafka Connector v3.0.2" at "
> > https://repository.apache.org/content/repositories/orgapacheflink-1675/";
> > ```
> >
> > Hope that helps!
> >
> > Best,
> > Gordon
> >
> > [1] https://www.scala-sbt.org/1.x/docs/Resolvers.html
> >
> > On Sat, Nov 25, 2023 at 12:55 AM guenterh.lists <
> guenterh.li...@bluewin.ch>
> > wrote:
> >
> >> Hi Gordon,
> >>
> >> thanks for working on it.
> >>
> >> How can I reference the repository for the new artifact. Referencing
> >> 3.0.2-18 I get an unresolved dependency error.
> >>
> >> Thanks for a hint.
> >>
> >> Günter
> >>
> >> sbt:flink_essential_swrapper> compile
> >> [info] Updating
> >> [info] Resolved  dependencies
> >> [warn]
> >> [warn] Note: Unresolved dependencies path:
> >> [error] stack trace is suppressed; run last update for the full output
> >> [error] (update) sbt.librarymanagement.ResolveException: Error
> >> downloading org.apache.flink:flink-connector-kafka:3.0.2-18
> >> [error]   Not found
> >> [error]   Not found
> >> [error]   not found:
> >>
> >>
> /home/swissbib/.ivy2/local/org.apache.flink/flink-connector-kafka/3.0.2-18/ivys/ivy.xml
> >> [error]   not found:
> >>
> >>
> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.0.2-18/flink-connector-kafka-3.0.2-18.pom
> >>
> >>
> >> On 24.11.23 18:30, Tzu-Li (Gordon) Tai wrote:
> >>> Hi all,
> >>>
> >>> I've cherry-picked FLINK-30400 onto v3.0 branch of
> flink-connector-kafka.
> >>>
> >>> Treating this thread as justification to start a vote for 3.0.2 RC #1
> >>> immediately so we can get out a new release ASAP. Please see the vote
> >>> thread here [1].
> >>>
> >>> @guenterh.lists  Would you be able to test
> >> this
> >>> RC and see if the issue is resolved for you? It should work simply by
> >>> having a dependency on flink-streaming-java and flink-clients for
> 1.18.0,
> >>> as well as flink-connector-kafka 3.0.2-18. The flink-connector-base
> >>> dependency you added in the end as a workaround should not be needed.
> >>>
> >>> Thanks,
> >>> Gordon
> >>>
> >>> [1] https://lists.apache.org/thread/34zb5pnymfltrz607wqcb99h7675zdpj
> >>>
> >>> On Fri, Nov 24, 2023 at 5:16 AM Leonard Xu  wrote:
> >>>
> >>>>  - built a fat uber jar from quickstart with Flink 1.18.0 for
> >>>>  flink-streaming-java and flink-clients, and flink-connector-kafka
> >> version
> >>>>  3.0.1-1.18
> >>>>  - then submitted to local Flink cluster 1.18.0. Things worked as
> >>>>  expected and the job ran fine.
> >>>>
> >>>> Hey,@Gordan
> >>>> I guess things may work as expected when you submit your fat jar job
> to
> >>>> cluster, because  flink-connector-base (1.18.0 in this case) has been
> >>>> included to flink-dist jar [1] which will appear in your classpath,
> >> but it
> >>>> may meet issue when you run in local IDE environment, maybe you can
> >> have a
> >>>> local test to verify this.
> >>>>
> >>>> In the end, I think we need to backport FLINK-30400 to the Flink Kafka
> >>>> connector 3.0 branch and prepare a 3.0.2 soon.
> >>>>
> >>>> Best,
> >>>> Leonard
> >>>> [1]
> >>>>
> >>
> https://github.com/apache/flink/blob/977463cce3ea0f88e2f184c30720bf4e8e97fd4a/flink-dist/pom.xml#L156
> >> --
> >> Günter Hipler
> >> https://openbiblio.social/@vog61
> >> https://twitter.com/vog61
> >>
> >>
>


Re: dependency error with latest Kafka connector

2023-11-25 Thread Tzu-Li (Gordon) Tai
Hi Günter,

With Maven you'd list the staged repository holding the RC artifacts as a
repository:

```

   
  test_kafka_rc
  Apache Flink Kafka Connector v3.0.2
  
https://repository.apache.org/content/repositories/orgapacheflink-1675/

   

```

With SBT, I think the equivalent is using Resolvers [1]:

```
resolvers += "Apache Flink Kafka Connector v3.0.2" at "
https://repository.apache.org/content/repositories/orgapacheflink-1675/";
```

Hope that helps!

Best,
Gordon

[1] https://www.scala-sbt.org/1.x/docs/Resolvers.html

On Sat, Nov 25, 2023 at 12:55 AM guenterh.lists 
wrote:

> Hi Gordon,
>
> thanks for working on it.
>
> How can I reference the repository for the new artifact. Referencing
> 3.0.2-18 I get an unresolved dependency error.
>
> Thanks for a hint.
>
> Günter
>
> sbt:flink_essential_swrapper> compile
> [info] Updating
> [info] Resolved  dependencies
> [warn]
> [warn] Note: Unresolved dependencies path:
> [error] stack trace is suppressed; run last update for the full output
> [error] (update) sbt.librarymanagement.ResolveException: Error
> downloading org.apache.flink:flink-connector-kafka:3.0.2-18
> [error]   Not found
> [error]   Not found
> [error]   not found:
>
> /home/swissbib/.ivy2/local/org.apache.flink/flink-connector-kafka/3.0.2-18/ivys/ivy.xml
> [error]   not found:
>
> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/3.0.2-18/flink-connector-kafka-3.0.2-18.pom
>
>
> On 24.11.23 18:30, Tzu-Li (Gordon) Tai wrote:
> > Hi all,
> >
> > I've cherry-picked FLINK-30400 onto v3.0 branch of flink-connector-kafka.
> >
> > Treating this thread as justification to start a vote for 3.0.2 RC #1
> > immediately so we can get out a new release ASAP. Please see the vote
> > thread here [1].
> >
> > @guenterh.lists  Would you be able to test
> this
> > RC and see if the issue is resolved for you? It should work simply by
> > having a dependency on flink-streaming-java and flink-clients for 1.18.0,
> > as well as flink-connector-kafka 3.0.2-18. The flink-connector-base
> > dependency you added in the end as a workaround should not be needed.
> >
> > Thanks,
> > Gordon
> >
> > [1] https://lists.apache.org/thread/34zb5pnymfltrz607wqcb99h7675zdpj
> >
> > On Fri, Nov 24, 2023 at 5:16 AM Leonard Xu  wrote:
> >
> >>
> >> - built a fat uber jar from quickstart with Flink 1.18.0 for
> >> flink-streaming-java and flink-clients, and flink-connector-kafka
> version
> >> 3.0.1-1.18
> >> - then submitted to local Flink cluster 1.18.0. Things worked as
> >> expected and the job ran fine.
> >>
> >> Hey,@Gordan
> >> I guess things may work as expected when you submit your fat jar job to
> >> cluster, because  flink-connector-base (1.18.0 in this case) has been
> >> included to flink-dist jar [1] which will appear in your classpath,
> but it
> >> may meet issue when you run in local IDE environment, maybe you can
> have a
> >> local test to verify this.
> >>
> >> In the end, I think we need to backport FLINK-30400 to the Flink Kafka
> >> connector 3.0 branch and prepare a 3.0.2 soon.
> >>
> >> Best,
> >> Leonard
> >> [1]
> >>
> https://github.com/apache/flink/blob/977463cce3ea0f88e2f184c30720bf4e8e97fd4a/flink-dist/pom.xml#L156
> >>
> --
> Günter Hipler
> https://openbiblio.social/@vog61
> https://twitter.com/vog61
>
>


Re: dependency error with latest Kafka connector

2023-11-24 Thread Tzu-Li (Gordon) Tai
Hi all,

I've cherry-picked FLINK-30400 onto v3.0 branch of flink-connector-kafka.

Treating this thread as justification to start a vote for 3.0.2 RC #1
immediately so we can get out a new release ASAP. Please see the vote
thread here [1].

@guenterh.lists  Would you be able to test this
RC and see if the issue is resolved for you? It should work simply by
having a dependency on flink-streaming-java and flink-clients for 1.18.0,
as well as flink-connector-kafka 3.0.2-18. The flink-connector-base
dependency you added in the end as a workaround should not be needed.

Thanks,
Gordon

[1] https://lists.apache.org/thread/34zb5pnymfltrz607wqcb99h7675zdpj

On Fri, Nov 24, 2023 at 5:16 AM Leonard Xu  wrote:

>
>
>- built a fat uber jar from quickstart with Flink 1.18.0 for
>flink-streaming-java and flink-clients, and flink-connector-kafka version
>3.0.1-1.18
>- then submitted to local Flink cluster 1.18.0. Things worked as
>expected and the job ran fine.
>
> Hey,@Gordan
> I guess things may work as expected when you submit your fat jar job to
> cluster, because  flink-connector-base (1.18.0 in this case) has been
> included to flink-dist jar [1] which will appear in your classpath,  but it
> may meet issue when you run in local IDE environment, maybe you can have a
> local test to verify this.
>
> In the end, I think we need to backport FLINK-30400 to the Flink Kafka
> connector 3.0 branch and prepare a 3.0.2 soon.
>
> Best,
> Leonard
> [1]
> https://github.com/apache/flink/blob/977463cce3ea0f88e2f184c30720bf4e8e97fd4a/flink-dist/pom.xml#L156
>


Re: dependency error with latest Kafka connector

2023-11-23 Thread Tzu-Li (Gordon) Tai
] | \-
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.15.2:compile
[INFO] +- org.apache.logging.log4j:log4j-slf4j-impl:jar:2.17.1:runtime
[INFO] +- org.apache.logging.log4j:log4j-api:jar:2.17.1:runtime
[INFO] \- org.apache.logging.log4j:log4j-core:jar:2.17.1:runtime
```

On Thu, Nov 23, 2023 at 11:48 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi all,
>
> There seems to be an issue with the connector release scripts used in the
> release process that doesn't correctly overwrite the flink.version property
> in POMs.
>
> I'll kick off a new release for 3.0.2 shortly to address this. Sorry for
> overlooking this during the previous release.
>
> Best,
> Gordon
>
> On Thu, Nov 23, 2023 at 7:11 AM guenterh.lists 
> wrote:
>
>> Hi Danny
>>
>> thanks for taking a look into it and for the hint.
>>
>> Your assumption is correct - It compiles when the base connector is
>> excluded.
>>
>> In sbt:
>> "org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18"
>> exclude("org.apache.flink", "flink-connector-base"),
>>
>> Günter
>>
>>
>> On 23.11.23 14:24, Danny Cranmer wrote:
>> > Hey all,
>> >
>> > I believe this is because of FLINK-30400. Looking at the pom I cannot
>> see
>> > any other dependencies that would cause a problem. To workaround this,
>> can
>> > you try to remove that dependency from your build?
>> >
>> > 
>> >  org.apache.flink
>> >  flink-connector-kafka
>> >  3.0.1-1.18
>> >  
>> >  
>> >  org.apache.flink
>> >  flink-connector-base
>> >  
>> >  
>> > 
>> >
>> >
>> > Alternatively you can add it in:
>> >
>> > 
>> >  org.apache.flink
>> >  flink-connector-base
>> >  1.18.0
>> > 
>> >
>> > Sorry I am not sure how to do this in Scala SBT.
>> >
>> > Agree we should get this fixed and push a 3.0.2 Kafka connector.
>> >
>> > Thanks,
>> > Danny
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-30400
>> >
>> > On Thu, Nov 23, 2023 at 12:39 PM Leonard Xu  wrote:
>> >
>> >> Hi, Gurnterh
>> >>
>> >> It seems a bug for me that  3.0.1-1.18 flink Kafka connector use  flink
>> >>   1.17 dependency which lead to your issue.
>> >>
>> >> I guess we need propose a new release for Kafka connector for fix this
>> >> issue.
>> >>
>> >> CC: Gordan, Danny, Martijn
>> >>
>> >> Best,
>> >> Leonard
>> >>
>> >> 2023年11月14日 下午6:53,Alexey Novakov via user  写道:
>> >>
>> >> Hi Günterh,
>> >>
>> >> It looks like a problem with the Kafka connector release.
>> >>
>> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/3.0.1-1.18
>> >> Compile dependencies are still pointing to Flink 1.17.
>> >>
>> >> Release person is already contacted about this or will be contacted
>> soon.
>> >>
>> >> Best regards,
>> >> Alexey
>> >>
>> >> On Mon, Nov 13, 2023 at 10:42 PM guenterh.lists <
>> guenterh.li...@bluewin.ch>
>> >> wrote:
>> >>
>> >>> Hello
>> >>>
>> >>> I'm getting a dependency error when using the latest Kafka connector
>> in
>> >>> a Scala project.
>> >>>
>> >>> Using the 1.17.1 Kafka connector compilation is ok.
>> >>>
>> >>> With
>> >>>
>> >>> "org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18"
>> >>>
>> >>> I get
>> >>> [error] (update) sbt.librarymanagement.ResolveException: Error
>> >>> downloading org.apache.flink:flink-connector-base:
>> >>> [error]   Not found
>> >>> [error]   Not found
>> >>> [error]   not found:
>> >>>
>> >>>
>> /home/swissbib/.ivy2/local/org.apache.flink/flink-connector-base/ivys/ivy.xml
>> >>> [error]   not found:
>> >>>
>> >>>
>> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-base//flink-connector-base-.pom
>> >>>
>> >>> Seems Maven packaging is not correct.
>> >>>
>> >>> My sbt build file:
>> >>>
>> >>> ThisBuild / scalaVersion := "3.3.0"
>> >>> val flinkVersion = "1.18.0"
>> >>> val postgresVersion = "42.2.2"
>> >>>
>> >>> lazy val root = (project in file(".")).settings(
>> >>> name := "flink-scala-proj",
>> >>> libraryDependencies ++= Seq(
>> >>>   "org.flinkextended" %% "flink-scala-api" % "1.17.1_1.1.0",
>> >>>   "org.apache.flink" % "flink-clients" % flinkVersion % Provided,
>> >>>   "org.apache.flink" % "flink-connector-files" % flinkVersion %
>> >>> Provided,
>> >>>
>> >>> "org.apache.flink" % "flink-connector-kafka" % "1.17.1",
>> >>> //"org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18",
>> >>>
>> >>> //"org.apache.flink" % "flink-connector-jdbc" % "3.1.1-1.17",
>> >>> //"org.postgresql" % "postgresql" % postgresVersion,
>> >>> "org.apache.flink" % "flink-connector-files" % flinkVersion %
>> Provided,
>> >>> //"org.apache.flink" % "flink-connector-base" % flinkVersion %
>> Provided
>> >>> )
>> >>> )
>> >>>
>> >>>
>> >>>
>> >>> Thanks!
>> >>>
>> >>> --
>> >>> Günter Hipler
>> >>> https://openbiblio.social/@vog61
>> >>> https://twitter.com/vog61
>> >>>
>> >>>
>> --
>> Günter Hipler
>> https://openbiblio.social/@vog61
>> https://twitter.com/vog61
>>
>>


Re: dependency error with latest Kafka connector

2023-11-23 Thread Tzu-Li (Gordon) Tai
Hi all,

There seems to be an issue with the connector release scripts used in the
release process that doesn't correctly overwrite the flink.version property
in POMs.

I'll kick off a new release for 3.0.2 shortly to address this. Sorry for
overlooking this during the previous release.

Best,
Gordon

On Thu, Nov 23, 2023 at 7:11 AM guenterh.lists 
wrote:

> Hi Danny
>
> thanks for taking a look into it and for the hint.
>
> Your assumption is correct - It compiles when the base connector is
> excluded.
>
> In sbt:
> "org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18"
> exclude("org.apache.flink", "flink-connector-base"),
>
> Günter
>
>
> On 23.11.23 14:24, Danny Cranmer wrote:
> > Hey all,
> >
> > I believe this is because of FLINK-30400. Looking at the pom I cannot see
> > any other dependencies that would cause a problem. To workaround this,
> can
> > you try to remove that dependency from your build?
> >
> > 
> >  org.apache.flink
> >  flink-connector-kafka
> >  3.0.1-1.18
> >  
> >  
> >  org.apache.flink
> >  flink-connector-base
> >  
> >  
> > 
> >
> >
> > Alternatively you can add it in:
> >
> > 
> >  org.apache.flink
> >  flink-connector-base
> >  1.18.0
> > 
> >
> > Sorry I am not sure how to do this in Scala SBT.
> >
> > Agree we should get this fixed and push a 3.0.2 Kafka connector.
> >
> > Thanks,
> > Danny
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-30400
> >
> > On Thu, Nov 23, 2023 at 12:39 PM Leonard Xu  wrote:
> >
> >> Hi, Gurnterh
> >>
> >> It seems a bug for me that  3.0.1-1.18 flink Kafka connector use  flink
> >>   1.17 dependency which lead to your issue.
> >>
> >> I guess we need propose a new release for Kafka connector for fix this
> >> issue.
> >>
> >> CC: Gordan, Danny, Martijn
> >>
> >> Best,
> >> Leonard
> >>
> >> 2023年11月14日 下午6:53,Alexey Novakov via user  写道:
> >>
> >> Hi Günterh,
> >>
> >> It looks like a problem with the Kafka connector release.
> >>
> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/3.0.1-1.18
> >> Compile dependencies are still pointing to Flink 1.17.
> >>
> >> Release person is already contacted about this or will be contacted
> soon.
> >>
> >> Best regards,
> >> Alexey
> >>
> >> On Mon, Nov 13, 2023 at 10:42 PM guenterh.lists <
> guenterh.li...@bluewin.ch>
> >> wrote:
> >>
> >>> Hello
> >>>
> >>> I'm getting a dependency error when using the latest Kafka connector in
> >>> a Scala project.
> >>>
> >>> Using the 1.17.1 Kafka connector compilation is ok.
> >>>
> >>> With
> >>>
> >>> "org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18"
> >>>
> >>> I get
> >>> [error] (update) sbt.librarymanagement.ResolveException: Error
> >>> downloading org.apache.flink:flink-connector-base:
> >>> [error]   Not found
> >>> [error]   Not found
> >>> [error]   not found:
> >>>
> >>>
> /home/swissbib/.ivy2/local/org.apache.flink/flink-connector-base/ivys/ivy.xml
> >>> [error]   not found:
> >>>
> >>>
> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-base//flink-connector-base-.pom
> >>>
> >>> Seems Maven packaging is not correct.
> >>>
> >>> My sbt build file:
> >>>
> >>> ThisBuild / scalaVersion := "3.3.0"
> >>> val flinkVersion = "1.18.0"
> >>> val postgresVersion = "42.2.2"
> >>>
> >>> lazy val root = (project in file(".")).settings(
> >>> name := "flink-scala-proj",
> >>> libraryDependencies ++= Seq(
> >>>   "org.flinkextended" %% "flink-scala-api" % "1.17.1_1.1.0",
> >>>   "org.apache.flink" % "flink-clients" % flinkVersion % Provided,
> >>>   "org.apache.flink" % "flink-connector-files" % flinkVersion %
> >>> Provided,
> >>>
> >>> "org.apache.flink" % "flink-connector-kafka" % "1.17.1",
> >>> //"org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18",
> >>>
> >>> //"org.apache.flink" % "flink-connector-jdbc" % "3.1.1-1.17",
> >>> //"org.postgresql" % "postgresql" % postgresVersion,
> >>> "org.apache.flink" % "flink-connector-files" % flinkVersion %
> Provided,
> >>> //"org.apache.flink" % "flink-connector-base" % flinkVersion %
> Provided
> >>> )
> >>> )
> >>>
> >>>
> >>>
> >>> Thanks!
> >>>
> >>> --
> >>> Günter Hipler
> >>> https://openbiblio.social/@vog61
> >>> https://twitter.com/vog61
> >>>
> >>>
> --
> Günter Hipler
> https://openbiblio.social/@vog61
> https://twitter.com/vog61
>
>


[ANNOUNCE] Apache Flink Kafka Connectors 3.0.1 released

2023-10-31 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Kafka Connectors 3.0.1. This release is compatible with the Apache
Flink 1.17.x and 1.18.x release series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12352910

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gordon


Re: Which Flink engine versions do Connectors support?

2023-10-27 Thread Tzu-Li (Gordon) Tai
Hi Xianxun,

You can find the list supported Flink versions for each connector here:
https://flink.apache.org/downloads/#apache-flink-connectors

Specifically for the Kafka connector, we're in the process of releasing a
new version for the connector that works with Flink 1.18.
The release candidate vote thread is here if you want to test that out:
https://lists.apache.org/thread/35gjflv4j2pp2h9oy5syj2vdfpotg486

Thanks,
Gordon


On Fri, Oct 27, 2023 at 12:57 PM Xianxun Ye  wrote:

> 
> Hello Team,
>
> After the release of Flink 1.18, I found that most connectors had been
> externalized, e.g. Kafka, ES, HBase, JDBC, and pulsar connectors.   But I
> didn't find any manual or codes indicating which versions of Flink these
> connectors could work.
>
>
> Best regards,
> Xianxun
>
>


Re: Kafka Sink and Kafka transaction timeout

2023-10-02 Thread Tzu-Li (Gordon) Tai
Hi Lorenzo,

The main failure scenario that recommendation is addressing is when the
Flink job fails right after a checkpoint successfully completes, but before
the KafkaSink subtasks receive from the JM the checkpoint completed RPC
notification to commit the transactions. It is possible that during this
window of opportunity, the Kafka txns can be aborted by Kafka due to
timeout, leaving you with an inconsistency between Flink and Kafka.

This inconsistency can happen if the checkpoint duration is too close to
the configured Kafka transaction timeout, or if the job fails just at the
right time during that window of opportunity and remains down long enough
for the transaction to timeout.

I have a more detailed formulation of this in FLIP-319 [1], at the very end
of the proposal in the Appendix section.

Thanks,
Gordon

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710

On Mon, Oct 2, 2023 at 2:47 PM Lorenzo Nicora 
wrote:

> Hi team
>
> In Kafka Sink docs [1], with EXACTLY_ONCE it is recommended to set:
> transaction_timeout  > maximum_checkpoint duration +
> maximum_restart_duration.
>
> I understand transaction_timeout > maximum_checkpoint_duration
> But why adding maximum_restart_duration?
>
> If the application recovers from a checkpoint, any uncommitted message
> that was written after the last successful checkpoint will be
> re-written regardless.
> If a transaction times out during the recovery it doesn't matter.
>
> I would rather say:
> transaction_timeout > maximum_checkpoint duration + checkpoint_interval
>
> Any thoughts?
>
> Regards
> Lorenzo
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/#fault-tolerance
>


Re: [DISCUSS] Status of Statefun Project

2023-08-18 Thread Tzu-Li (Gordon) Tai
Hi Galen,

The original intent of having a separate repo for the playground repo, was
that StateFun users can just go to that and start running simple examples
without any other distractions from the core code. I personally don't have
a strong preference here and can understand how it would make the workflow
more streamlined, but just FYI on the reasoning why are separate in the
first place.

re: paths for locating StateFun artifacts.
Can this be solved by simply passing in the path to the artifacts? As well
as the image tag for the locally build base StateFun image. They could
probably be environment variables.

Cheers,
Gordon

On Fri, Aug 18, 2023 at 12:13 PM Galen Warren via user <
user@flink.apache.org> wrote:

> Yes, exactly! And in addition to the base Statefun jars and the jar for
> the Java SDK, it does an equivalent copy/register operation for each of the
> other SDK libraries (Go, Python, Javascript) so that those libraries are
> also available when building the playground examples.
>
> One more question: In order to copy the various build artifacts into the
> Docker containers, those artifacts need to be part of the Docker context.
> With the playground being a separate project, that's slightly tricky to do,
> as there is no guarantee (other than convention) about the relative paths
> of *flink-statefun* and* flink-statefun-playground *in someone's local
> filesystem. The way I've set this up locally is to copy the playground into
> the* flink-statefun* project -- i.e. to *flink-statefun*/playground --
> such that I can set the Docker context to the root folder of
> *flink-statefun* and then have access to any local code and/or build
> artifacts.
>
> I'm wondering if there might be any appetite for making that move
> permanent, i.e. moving the playground to *flink-statefun*/playground and
> deprecating the standalone playground project. In addition to making the
> problem of building with unreleased artifacts a bit simpler to solve, it
> would also simplify the process of releasing a new Statefun version, since
> the entire process could be handled with a single PR and associated
> build/deploy tasks. In other words, a single PR could both update and
> deploy the Statefun package and the playground code and images.
>
> As it stands, at least two PRs would be required for each Statefun version
> update -- one for *flink-statefun* and one for *flink-statefun-playground*
> .
>
> Anyway, just an idea. Maybe there's an important reason for these projects
> to remain separate. If we do want to keep the playground project where it
> is, I could solve the copying problem by requiring the two projects to be
> siblings in the file system and by pre-copying the local build artifacts
> into a location accessible by the existing Docker contexts. This would
> still leave us with the need to have two PRs and releases instead of one,
> though.
>
> Thanks for your help!
>
>
> On Fri, Aug 18, 2023 at 2:45 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Galen,
>>
>> > locally built code is copied into the build containers
>> so that it can be accessed during the build.
>>
>> That's exactly what we had been doing for release testing, yes. Sorry I
>> missed that detail in my previous response.
>>
>> And yes, that sounds like a reasonable approach. If I understand you
>> correctly, the workflow would become this:
>>
>>1. Build the StateFun repo locally to install the snapshot artifact
>>jars + have a local base StateFun image.
>>2. Run the playground in "local" mode, so that it uses the local base
>>StateFun image + builds the playground code using copied artifact jars
>>(instead of pulling from Maven).
>>
>> That looks good to me!
>>
>> Thanks,
>> Gordon
>>
>> On Fri, Aug 18, 2023 at 11:33 AM Galen Warren
>>  wrote:
>>
>> > Thanks.
>> >
>> > If you were to build a local image, as you suggest, how do you access
>> that
>> > image when building the playground images? All the compilation of
>> > playground code happens inside containers, so local images on the host
>> > aren't available in those containers. Unless I'm missing something?
>> >
>> > I've slightly reworked things such that the playground images can be
>> run in
>> > one of two modes -- the default mode, which works like before, and a
>> > "local" mode where locally built code is copied into the build
>> containers
>> > so that it can be accessed during the build. It works fine, you just
>> have
>> > to define a couple of environment variables when running docker

Re: [DISCUSS] Status of Statefun Project

2023-08-18 Thread Tzu-Li (Gordon) Tai
Hi Galen,

> locally built code is copied into the build containers
so that it can be accessed during the build.

That's exactly what we had been doing for release testing, yes. Sorry I
missed that detail in my previous response.

And yes, that sounds like a reasonable approach. If I understand you
correctly, the workflow would become this:

   1. Build the StateFun repo locally to install the snapshot artifact
   jars + have a local base StateFun image.
   2. Run the playground in "local" mode, so that it uses the local base
   StateFun image + builds the playground code using copied artifact jars
   (instead of pulling from Maven).

That looks good to me!

Thanks,
Gordon

On Fri, Aug 18, 2023 at 11:33 AM Galen Warren
 wrote:

> Thanks.
>
> If you were to build a local image, as you suggest, how do you access that
> image when building the playground images? All the compilation of
> playground code happens inside containers, so local images on the host
> aren't available in those containers. Unless I'm missing something?
>
> I've slightly reworked things such that the playground images can be run in
> one of two modes -- the default mode, which works like before, and a
> "local" mode where locally built code is copied into the build containers
> so that it can be accessed during the build. It works fine, you just have
> to define a couple of environment variables when running docker-compose to
> specify default vs. local mode and what versions of Flink and Statefun
> should be referenced, and then you can build a run the local examples
> without any additional steps. Does that sound like a reasonable approach?
>
>
> On Fri, Aug 18, 2023 at 2:17 PM Tzu-Li (Gordon) Tai 
> wrote:
>
> > Hi Galen,
> >
> > > Gordon, is there a trick to running the sample code in
> > flink-statefun-playground against yet-unreleased code that I'm missing?
> >
> > You'd have to locally build an image from the release branch, with a
> > temporary image version tag. Then, in the flink-statefun-playground,
> change
> > the image versions in the docker-compose files to use that locally built
> > image. IIRC, that's what we have been doing in the past. Admittedly, it's
> > pretty manual - I don't think the CI manages this workflow.
> >
> > Thanks,
> > Gordon
> >
> > On Mon, Aug 14, 2023 at 10:42 AM Galen Warren 
> > wrote:
> >
> > > I created a pull request for this: [FLINK-31619] Upgrade Stateful
> > > Functions to Flink 1.16.1 by galenwarren · Pull Request #331 ·
> > > apache/flink-statefun (github.com)
> > > <https://github.com/apache/flink-statefun/pull/331>.
> > >
> > > JIRA is here: [FLINK-31619] Upgrade Stateful Functions to Flink 1.16.1
> -
> > > ASF JIRA (apache.org)
> > > <https://issues.apache.org/jira/browse/FLINK-31619?filter=-1>.
> > >
> > > Statefun references 1.16.2, despite the title -- that version has come
> > out
> > > since the issue was created.
> > >
> > > I figured out how to run all the playground tests locally, but it took
> a
> > > bit of reworking of the playground setup with respect to Docker;
> > > specifically, the Docker contexts used to build the example functions
> > > needed to be broadened (i.e. moved up the tree) so that, if needed,
> local
> > > artifacts/code can be accessed from within the containers at build
> time.
> > > Then I made the Docker compose.yml configurable through environment
> > > variables to allow for them to run in either the original manner --
> i.e.
> > > pulling artifacts from public repos -- or in a "local" mode, where
> > > artifacts are pulled from local builds.
> > >
> > > This process is a cleaner if the playground is a subfolder of the
> > > flink-statefun project rather than be its own repository
> > > (flink-statefun-playground), because then all the relative paths
> between
> > > the playground files and the build artifacts are fixed. So, I'd like to
> > > propose to move the playground files, modified as described above, to
> > > flink-statefun/playground and retire flink-statefun-playground. I can
> > > submit separate PR s those changes if everyone is on board.
> > >
> > > Also, should I plan to do the same upgrade to handle Flink 1.17.x? It
> > > should be easy to do, especially while the 1.16.x upgrade is fresh on
> my
> > > mind.
> > >
> > > Thanks.
> > >
> > >
> > > On Fri, Aug 11, 2023 at 6:40 PM Galen Warren 
> > > wrote:
>

Re: [DISCUSS] Status of Statefun Project

2023-08-18 Thread Tzu-Li (Gordon) Tai
Hi Galen,

> Gordon, is there a trick to running the sample code in
flink-statefun-playground against yet-unreleased code that I'm missing?

You'd have to locally build an image from the release branch, with a
temporary image version tag. Then, in the flink-statefun-playground, change
the image versions in the docker-compose files to use that locally built
image. IIRC, that's what we have been doing in the past. Admittedly, it's
pretty manual - I don't think the CI manages this workflow.

Thanks,
Gordon

On Mon, Aug 14, 2023 at 10:42 AM Galen Warren 
wrote:

> I created a pull request for this: [FLINK-31619] Upgrade Stateful
> Functions to Flink 1.16.1 by galenwarren · Pull Request #331 ·
> apache/flink-statefun (github.com)
> <https://github.com/apache/flink-statefun/pull/331>.
>
> JIRA is here: [FLINK-31619] Upgrade Stateful Functions to Flink 1.16.1 -
> ASF JIRA (apache.org)
> <https://issues.apache.org/jira/browse/FLINK-31619?filter=-1>.
>
> Statefun references 1.16.2, despite the title -- that version has come out
> since the issue was created.
>
> I figured out how to run all the playground tests locally, but it took a
> bit of reworking of the playground setup with respect to Docker;
> specifically, the Docker contexts used to build the example functions
> needed to be broadened (i.e. moved up the tree) so that, if needed, local
> artifacts/code can be accessed from within the containers at build time.
> Then I made the Docker compose.yml configurable through environment
> variables to allow for them to run in either the original manner -- i.e.
> pulling artifacts from public repos -- or in a "local" mode, where
> artifacts are pulled from local builds.
>
> This process is a cleaner if the playground is a subfolder of the
> flink-statefun project rather than be its own repository
> (flink-statefun-playground), because then all the relative paths between
> the playground files and the build artifacts are fixed. So, I'd like to
> propose to move the playground files, modified as described above, to
> flink-statefun/playground and retire flink-statefun-playground. I can
> submit separate PR s those changes if everyone is on board.
>
> Also, should I plan to do the same upgrade to handle Flink 1.17.x? It
> should be easy to do, especially while the 1.16.x upgrade is fresh on my
> mind.
>
> Thanks.
>
>
> On Fri, Aug 11, 2023 at 6:40 PM Galen Warren 
> wrote:
>
>> I'm done with the code to make Statefun compatible with Flink 1.16, and
>> all the tests (including e2e succeed). The required changes were pretty
>> minimal.
>>
>> I'm running into a bit of a chicken/egg problem executing the tests in
>> flink-statefun-playground
>> <https://github.com/apache/flink-statefun-playground>, though. That
>> project seems to assume that all the various Statefun artifacts are built
>> and deployed to the various public repositories already. I've looked into
>> trying to redirect references to local artifacts; however, that's also
>> tricky since all the building occurs in Docker containers.
>>
>> Gordon, is there a trick to running the sample code in
>> flink-statefun-playground against yet-unreleased code that I'm missing?
>>
>> Thanks.
>>
>> On Sat, Jun 24, 2023 at 12:40 PM Galen Warren 
>> wrote:
>>
>>> Great -- thanks!
>>>
>>> I'm going to be out of town for about a week but I'll take a look at
>>> this when I'm back.
>>>
>>> On Tue, Jun 20, 2023 at 8:46 AM Martijn Visser 
>>> wrote:
>>>
>>>> Hi Galen,
>>>>
>>>> Yes, I'll be more than happy to help with Statefun releases.
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>> On Tue, Jun 20, 2023 at 2:21 PM Galen Warren 
>>>> wrote:
>>>>
>>>>> Thanks.
>>>>>
>>>>> Martijn, to answer your question, I'd need to do a small amount of
>>>>> work to get a PR ready, but not much. Happy to do it if we're deciding to
>>>>> restart Statefun releases -- are we?
>>>>>
>>>>> -- Galen
>>>>>
>>>>> On Sat, Jun 17, 2023 at 9:47 AM Tzu-Li (Gordon) Tai <
>>>>> tzuli...@apache.org> wrote:
>>>>>
>>>>>> > Perhaps he could weigh in on whether the combination of automated
>>>>>> tests plus those smoke tests should be sufficient for testing with new
>>>>>> Flink versions
>>>>>>
>>>>>&

Re: Flink Kafka source getting marked as Idle

2023-06-17 Thread Tzu-Li (Gordon) Tai
Hi Anirban,

> But this happened only once and now it is not getting reproduced at all.

This does make it sound a lot like
https://issues.apache.org/jira/browse/FLINK-31632.

> 1. What is the default watermarking strategy used in Flink. Can I quickly
check the default parameters being used by calling some function or so ?

Are you using the DataStream API and using the KafkaSource (and not the
older FlinkKafkaConsumer connector)? If that's the case, I believe you
always have to set a watermark strategy when adding the KafkaSource to the
job topology, so there isn't a default.

If you're using SQL, the strategy would be noWatermarks if not specified,
which shouldn't let you bump into the bug.

> 2. Are there any other conditions in which a source can be marked as Idle
apart from the watermarking issue mentioned below ?

That bug is the only known cause of incorrect permanent idleness that I'm
aware of.

> 3. If a Flink source is marked as Idle, is there any way to make it
active without having to re-submit the Flink Job ?

With the currently supported watermark strategies, the only way for a
source subtask to resume activeness is when it reports a new watermark
(e.g. when new data is produced).

Best,
Gordon

On Fri, Jun 16, 2023 at 7:10 AM Anirban Dutta Gupta <
anir...@indicussoftware.com> wrote:

> Hello All,
>
> Sorry to be replying to an existing thread for my question. Actually we
> are also facing the issue of the Flink Kafka source stopping consuming
> messages completely.
> It only started consuming messages after we re-submitted the Job. But this
> happened only once and now it is not getting reproduced at all.
> We are not using any watermarking strategy in specific.
>
> I have a few questions:
> 1. What is the default watermarking strategy used in Flink. Can I quickly
> check the default parameters being used by calling some function or so ?
> 2. Are there any other conditions in which a source can be marked as Idle
> apart from the watermarking issue mentioned below ?
> 3. If a Flink source is marked as Idle, is there any way to make it active
> without having to re-submit the Flink Job ?
> Or Is it that the source automatically becomes active after a certain
> duration ?
>
> Many thanks in advance,
> Anirban
>
> On 16-06-2023 02:27, Ken Krugler wrote:
>
> I think you’re hitting this issue:
>
> https://issues.apache.org/jira/browse/FLINK-31632
>
> Fixed in 1.16.2, 1.171.
>
> — Ken
>
>
> On Jun 15, 2023, at 1:39 PM, Piotr Domagalski 
> wrote:
>
> Hi all!
>
> We've been experimenting with watermark alignment in Flink 1.15 and
> observed an odd behaviour that I couldn't find any mention of in the
> documentation.
>
> With the following strategy:
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(60))
> .withTimestampAssigner((e, t) -> e.timestamp) .withIdleness(Duration.
> ofSeconds(3600)) .withWatermarkAlignment("group-1", Duration.ofSeconds(15
> ));
>
> Kafka sources stop consuming completely after 3600s (even when the data is
> flowing into all the partitions). Is this an expected behaviour? Where
> could I find more information on this?
>
> --
> Piotr Domagalski
>
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>
>


Re: [DISCUSS] Status of Statefun Project

2023-06-17 Thread Tzu-Li (Gordon) Tai
> Perhaps he could weigh in on whether the combination of automated tests
plus those smoke tests should be sufficient for testing with new Flink
versions

What we usually did at the bare minimum for new StateFun releases was the
following:

   1. Build tests (including the smoke tests in the e2e module, which
   covers important tests like exactly-once verification)
   2. Updating the flink-statefun-playground repo and manually running all
   language examples there.

If upgrading Flink versions was the only change in the release, I'd
probably say that this is sufficient.

Best,
Gordon

On Thu, Jun 15, 2023 at 5:25 AM Martijn Visser 
wrote:

> Let me know if you have a PR for a Flink update :)
>
> On Thu, Jun 8, 2023 at 5:52 PM Galen Warren via user <
> user@flink.apache.org> wrote:
>
>> Thanks Martijn.
>>
>> Personally, I'm already using a local fork of Statefun that is compatible
>> with Flink 1.16.x, so I wouldn't have any need for a released version
>> compatible with 1.15.x. I'd be happy to do the PRs to modify Statefun to
>> work with new versions of Flink as they come along.
>>
>> As for testing, Statefun does have unit tests and Gordon also sent me
>> instructions a while back for how to do some additional smoke tests which
>> are pretty straightforward. Perhaps he could weigh in on whether the
>> combination of automated tests plus those smoke tests should be sufficient
>> for testing with new Flink versions (I believe the answer is yes).
>>
>> -- Galen
>>
>>
>>
>> On Thu, Jun 8, 2023 at 8:01 AM Martijn Visser 
>> wrote:
>>
>>> Hi all,
>>>
>>> Apologies for the late reply.
>>>
>>> I'm willing to help out with merging requests in Statefun to keep them
>>> compatible with new Flink releases and create new releases. I do think
>>> that
>>> validation of the functionality of these releases depends a lot on those
>>> who do these compatibility updates, with PMC members helping out with the
>>> formal process.
>>>
>>> > Why can't the Apache Software Foundation allow community members to
>>> bring
>>> it up to date?
>>>
>>> There's nothing preventing anyone from reviewing any of the current PRs
>>> or
>>> opening new ones. However, none of them are approved [1], so there's also
>>> nothing to merge.
>>>
>>> > I believe that there are people and companies on this mailing list
>>> interested in supporting Apache Flink Stateful Functions.
>>>
>>> If so, then now is the time to show.
>>>
>>> Would there be a preference to create a release with Galen's merged
>>> compatibility update to Flink 1.15.2, or do we want to skip that and go
>>> straight to a newer version?
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1]
>>>
>>> https://github.com/apache/flink-statefun/pulls?q=is%3Apr+is%3Aopen+review%3Aapproved
>>>
>>> On Tue, Jun 6, 2023 at 3:55 PM Marco Villalobos <
>>> mvillalo...@kineteque.com>
>>> wrote:
>>>
>>> > Why can't the Apache Software Foundation allow community members to
>>> bring
>>> > it up to date?
>>> >
>>> > What's the process for that?
>>> >
>>> > I believe that there are people and companies on this mailing list
>>> > interested in supporting Apache Flink Stateful Functions.
>>> >
>>> > You already had two people on this thread express interest.
>>> >
>>> > At the very least, we could keep the library versions up to date.
>>> >
>>> > There are only a small list of new features that might be worthwhile:
>>> >
>>> > 1. event time processing
>>> > 2. state rest api
>>> >
>>> >
>>> > On Jun 6, 2023, at 3:06 AM, Chesnay Schepler 
>>> wrote:
>>> >
>>> > If you were to fork it *and want to redistribute it* then the short
>>> > version is that
>>> >
>>> >1. you have to adhere to the Apache licensing requirements
>>> >2. you have to make it clear that your fork does not belong to the
>>> >Apache Flink project. (Trademarks and all that)
>>> >
>>> > Neither should be significant hurdles (there should also be plenty of
>>> > online resources regarding 1), and if you do this then you can freely
>>> share
>>> > your fork with others.
>>> >
>>> > I've also pinged Martijn to take a look at this thread.
>>> > To my knowledge the project hasn't decided anything yet.
>>> >
>>> > On 27/05/2023 04:05, Galen Warren wrote:
>>> >
>>> > Ok, I get it. No interest.
>>> >
>>> > If this project is being abandoned, I guess I'll work with my own
>>> fork. Is
>>> > there anything I should consider here? Can I share it with other
>>> people who
>>> > use this project?
>>> >
>>> > On Tue, May 16, 2023 at 10:50 AM Galen Warren 
>>> 
>>> > wrote:
>>> >
>>> >
>>> > Hi Martijn, since you opened this discussion thread, I'm curious what
>>> your
>>> > thoughts are in light of the responses? Thanks.
>>> >
>>> > On Wed, Apr 19, 2023 at 1:21 PM Galen Warren 
>>> 
>>> > wrote:
>>> >
>>> >
>>> > I use Apache Flink for stream processing, and StateFun as a hand-off
>>> >
>>> > point for the rest of the application.
>>> > It serves well as a bridge between a Flink Streaming job and
>>> > micro-services.
>>> >
>>> > This is esse

Re: KafkaSource consumer group

2023-03-30 Thread Tzu-Li (Gordon) Tai
Sorry, I meant to say "Hi Ben" :-)

On Thu, Mar 30, 2023 at 9:52 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Robert,
>
> This is a design choice. Flink's KafkaSource doesn't rely on consumer
> groups for assigning partitions / rebalancing / offset tracking. It
> manually assigns whatever partitions are in the specified topic across its
> consumer instances, and rebalances only when the Flink job / KafkaSink is
> rescaled.
>
> Is there a specific reason that you need two Flink jobs for this? I
> believe the Flink-way of doing this would be to have one job read the
> topic, and then you'd do a stream split if you want to have two different
> branches of processing business logic.
>
> Thanks,
> Gordon
>
> On Thu, Mar 30, 2023 at 9:34 AM Roberts, Ben (Senior Developer) via user <
> user@flink.apache.org> wrote:
>
>> Hi,
>>
>>
>>
>> Is there a way to run multiple flink jobs with the same Kafka group.id
>> and have them join the same consumer group?
>>
>>
>>
>> It seems that setting the group.id using
>> KafkaSource.builder().set_group_id() does not have the effect of creating
>> an actual consumer group in Kafka.
>>
>>
>>
>> Running the same flink job with the same group.id, consuming from the
>> same topic, will result in both flink jobs receiving the same messages from
>> the topic, rather than only one of the jobs receiving the messages (as
>> would be expected for consumers in a consumer group normally with Kafka).
>>
>>
>>
>> Is this a design choice, and is there a way to configure it so messages
>> can be split across two jobs using the same “group.id”?
>>
>>
>>
>> Thanks in advance,
>>
>> Ben
>>
>>
>> Information in this email including any attachments may be privileged,
>> confidential and is intended exclusively for the addressee. The views
>> expressed may not be official policy, but the personal views of the
>> originator. If you have received it in error, please notify the sender by
>> return e-mail and delete it from your system. You should not reproduce,
>> distribute, store, retransmit, use or disclose its contents to anyone.
>> Please note we reserve the right to monitor all e-mail communication
>> through our internal and external networks. SKY and the SKY marks are
>> trademarks of Sky Limited and Sky International AG and are used under
>> licence.
>>
>> Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited
>> (Registration No. 2067075), Sky Subscribers Services Limited (Registration
>> No. 2340150) and Sky CP Limited (Registration No. 9513259) are direct or
>> indirect subsidiaries of Sky Limited (Registration No. 2247735). All of the
>> companies mentioned in this paragraph are incorporated in England and Wales
>> and share the same registered office at Grant Way, Isleworth, Middlesex TW7
>> 5QD
>>
>


Re: KafkaSource consumer group

2023-03-30 Thread Tzu-Li (Gordon) Tai
Hi Robert,

This is a design choice. Flink's KafkaSource doesn't rely on consumer
groups for assigning partitions / rebalancing / offset tracking. It
manually assigns whatever partitions are in the specified topic across its
consumer instances, and rebalances only when the Flink job / KafkaSink is
rescaled.

Is there a specific reason that you need two Flink jobs for this? I believe
the Flink-way of doing this would be to have one job read the topic, and
then you'd do a stream split if you want to have two different branches of
processing business logic.

Thanks,
Gordon

On Thu, Mar 30, 2023 at 9:34 AM Roberts, Ben (Senior Developer) via user <
user@flink.apache.org> wrote:

> Hi,
>
>
>
> Is there a way to run multiple flink jobs with the same Kafka group.id
> and have them join the same consumer group?
>
>
>
> It seems that setting the group.id using
> KafkaSource.builder().set_group_id() does not have the effect of creating
> an actual consumer group in Kafka.
>
>
>
> Running the same flink job with the same group.id, consuming from the
> same topic, will result in both flink jobs receiving the same messages from
> the topic, rather than only one of the jobs receiving the messages (as
> would be expected for consumers in a consumer group normally with Kafka).
>
>
>
> Is this a design choice, and is there a way to configure it so messages
> can be split across two jobs using the same “group.id”?
>
>
>
> Thanks in advance,
>
> Ben
>
>
> Information in this email including any attachments may be privileged,
> confidential and is intended exclusively for the addressee. The views
> expressed may not be official policy, but the personal views of the
> originator. If you have received it in error, please notify the sender by
> return e-mail and delete it from your system. You should not reproduce,
> distribute, store, retransmit, use or disclose its contents to anyone.
> Please note we reserve the right to monitor all e-mail communication
> through our internal and external networks. SKY and the SKY marks are
> trademarks of Sky Limited and Sky International AG and are used under
> licence.
>
> Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited
> (Registration No. 2067075), Sky Subscribers Services Limited (Registration
> No. 2340150) and Sky CP Limited (Registration No. 9513259) are direct or
> indirect subsidiaries of Sky Limited (Registration No. 2247735). All of the
> companies mentioned in this paragraph are incorporated in England and Wales
> and share the same registered office at Grant Way, Isleworth, Middlesex TW7
> 5QD
>


Re: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

2022-10-26 Thread Tzu-Li (Gordon) Tai
Hi Filip,

I think what you are seeing is expected. The State Processor API was
intended to allow access only to commonly used user-facing state
structures, while Stateful Functions uses quite a bit of Flink internal
features, including for its state maintenance.
The list state in question in StateFun's FunctionGroupOperator is an
internal kind of state normally used in the context of Flink window states
that are namespaced. Normal user-facing list states are not namespaced.

Just curious, which specific state in FunctionGroupOperator are you trying
to transform? I assume all other internal state in FunctionGroupOperator
you want to remain untouched, and only wish to carry them over to be
included in the transformed savepoint?

Thanks,
Gordon


On Wed, Oct 26, 2022 at 3:50 AM Filip Karnicki 
wrote:

> Hi Thias
>
> Thank you for your reply. I can re-create a simplified use case at home
> and stick it on github if you think it will help.
>
> What I'm trying to access is pretty internal to Flink Stateful Functions.
> It seems that a custom operator (
> https://github.com/apache/flink-statefun/blob/09a5cba521e9f994896c746ec9f8cc6479403612/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java#L188)
> is accessing a KeyedStateBackend and creating an InternalListState, which
> I'm not sure I'll be able to get my hands on using the State Processor API.
>
> The only reason why I need to get my hands on all the states from this
> Stateful Functions operator is because later I (think I) have to use
> .removeOperator(uid) on a savepoint and replace it .withOperator(uid,
> myTransformation) in order to transform my own, non-stateful-functions
> keyed state which also belongs to this operator.
>
> Kind regards
> Fil
>
> On Tue, 25 Oct 2022 at 16:24, Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Hi Filip,
>>
>>
>>
>> It looks like, your state primitive is used in the context of Windows:
>>
>> Keyed state works like this:
>>
>>- It uses a cascade of key types to store and retrieve values:
>>   - The key (set by .keyBy)
>>   - A namespace (usually a VoidNamespace), unless it is used in
>>   context of a specific window
>>   - An optional key of the state primitive (if it is a MapState)
>>
>>
>>
>> In your case the state primitive is (probably) declared in the context of
>> a window and hence when loading the state by means of StateProcessorAPI you
>> also need to specify the correct Namespace TypeInformation.
>>
>> If I am in doubt, how a state primitive is set up, I let the debugger
>> stop in a process function and walk up the call stack to find the proper
>> components implementing it.
>>
>>
>>
>> If you share a little more of your code it is much easier to provide
>> specific guidance 😊
>>
>> (e.g. ‘savepoint’ is never used again in your code snippet …)
>>
>>
>>
>> Sincere greeting
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>> *From:* Filip Karnicki 
>> *Sent:* Tuesday, October 25, 2022 10:08 AM
>> *To:* user 
>> *Subject:* State Processor API - VoidNamespaceSerializer must be
>> compatible with the old namespace serializer LongSerializer
>>
>>
>>
>> Hi, I'm trying to load a list state using the State Processor API (Flink
>> 1.14.3)
>>
>>
>>
>> Cluster settings:
>>
>>
>>
>> state.backend: rocksdb
>>
>> state.backend.incremental: true
>>
>> (...)
>>
>>
>>
>> Code:
>>
>>
>>
>> val env = ExecutionEnvironment.getExecutionEnvironment
>>
>> val savepoint = Savepoint.load(env, pathToSavepoint, new
>> EmbeddedRocksDBStateBackend(true))
>>
>>
>> val tpe = new
>> MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS,
>> null) // using Flink Stateful Functions
>> val envelopeSerializer: TypeSerializer[Message] =
>> tpe.createSerializer(env.getConfig)
>>
>> val listDescriptor = new
>> ListStateDescriptor[Message]("delayed-message-buffer",
>> envelopeSerializer.duplicate)
>>
>> (...)
>> override def open(parameters: Configuration): Unit = {
>>
>> getRuntimeContext.getListState(listDescriptor) // fails with error [1]
>>
>> }
>>
>>
>>
>>
>>
>> Error [1]:
>>
>>
>>
>> Caused by: java.io.IOException: Failed to restore timer state
>>
>> at
>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>>
>> at
>> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)
>>
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
>>
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>>
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>>
>> at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>>
>> at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>>
>> at java.base/java.lang.Thread.run(Thread.java:8

Re: Ignoring state's offset when restoring checkpoints

2022-07-08 Thread Tzu-Li (Gordon) Tai
Hi Robin,

Apart from what Alexander suggested, I think you could also try the
following first:
Let the job use a "new" Kafka source, which you can achieve by simply
assigning a different operator ID than before. If you previously did not
set an ID, then previously by default it would have been a hash computed by
Flink.
With a new operator ID, Flink would see this as a new source operator that
does not have previous state (i.e. there would be no partition offsets to
restore from). All other existing operators in the job will still restore
its previous state. With this "new" Kafka source, you can then set the
initial offsets to start consuming from by either setting a startup date or
specific map of partition offsets.

Also, in order for the job to successfully restore, I think you would need
to set the "--allowNonRestoredState" option when submitting the job.
This essentially tells Flink to ignore the fact that the "old" Kafka source
state is not being restored for the job (since there is no longer a
matching operator to restore those offsets to).

Cheers,
Gordon

On Fri, Jul 8, 2022 at 7:29 AM Alexander Fedulov 
wrote:

> Hi Robin,
>
> you should be able to use the State Processor API to modify the operator
> state (sources) and override the offsets manually there. I never tried
> that, but I believe conceptually it should work.
>
> Best,
> Alexander Fedulov
>


Re: [ANNOUNCE] Apache Flink Stateful Functions 3.1.0 released

2021-08-31 Thread Tzu-Li (Gordon) Tai
Congrats on the release!

And thank you for driving this release, Igal.

Cheers
Gordon

On Tue, Aug 31, 2021, 23:13 Igal Shilman  wrote:

> The Apache Flink community is very happy to announce the release of Apache
> Flink Stateful Functions (StateFun) 3.1.0.
>
> StateFun is a cross-platform stack for building Stateful Serverless
> applications, making it radically simpler to develop scalable, consistent,
> and elastic distributed applications.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2021/08/31/release-statefun-3.1.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for StateFun can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>
> Python SDK for StateFun published to the PyPI index can be found at:
> https://pypi.org/project/apache-flink-statefun/
>
> Official Docker images for StateFun are published to Docker Hub:
> https://hub.docker.com/r/apache/flink-statefun
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350038&projectId=12315522
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
>
> Thanks,
> Igal
>


Re: Configure Kafka ingress through property files in Stateful function 3.0.0

2021-05-28 Thread Tzu-Li (Gordon) Tai
Hi Jessy,

I assume "consumer.properties" is a file you have included in your StateFun
application's image?

The ingress.spec.properties field in the module YAML specification file
expects a list of key value pairs, not a properties file. See for example
[1].

I think it could make sense to supporting specifying property files directly
as while. Could you open a JIRA for this?

Thanks,
Gordon

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io-bundle/src/test/resources/routable-protobuf-kafka-ingress.yaml#L36



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Statefun 2.2.2 Checkpoint restore NPE

2021-05-28 Thread Tzu-Li (Gordon) Tai
Hi Timothy,

It would indeed be hard to figure this out without any stack traces.

Have you tried changing to debug level logs? Maybe you can also try using
the StateFun Harness to restore and run your job in the IDE - in that case
you should be able to see which code exactly is throwing this exception.

Cheers,
Gordon

On Fri, May 28, 2021 at 12:39 PM Timothy Bess  wrote:

> Hi,
>
> Just checking to see if anyone has experienced this error. Might just be a
> Flink thing that's irrelevant to statefun, but my job keeps failing over
> and over with this message:
>
> 2021-05-28 03:51:13,001 INFO org.apache.flink.streaming.connectors.kafka.
> FlinkKafkaProducer [] - Starting FlinkKafkaInternalProducer (10/10) to
> produce into default topic
> __stateful_functions_random_topic_lNVlkW9SkYrtZ1oK
> 2021-05-28 03:51:13,001 INFO
> org.apache.flink.streaming.connectors.kafka.internal.
> FlinkKafkaInternalProducer [] - Attempting to resume transaction
> feedback-union -> functions -> Sink:
> bluesteel-kafka_egress-egress-dd0a6f77c8b5eccd4b7254cdfd577ff9-45 with
> producerId 31 and epoch 3088
> 2021-05-28 03:51:13,017 WARN org.apache.flink.runtime.taskmanager.Task []
> - Source: lead-leads-ingress -> router (leads) (10/10)
> (ff51aacdb850c6196c61425b82718862) switched from RUNNING to FAILED.
> java.lang.NullPointerException: null
>
> The null pointer doesn't come with any stack traces or anything. It's
> really mystifying. Seems to just fail while restoring continuously.
>
> Thanks,
>
> Tim
>


Re: Stateful Functions, Kinesis, and ConsumerConfigConstants

2021-04-29 Thread Tzu-Li (Gordon) Tai
Hi Ammon,

Unfortunately you're right. I think the Flink Kinesis Consumer specific
configs, e.g. keys in the ConsumerConfigConstants class, were overlooked in
the initial design.

One way to workaround this is to use the `SourceFunctionSpec` [1]. Using
that spec, you can use any Flink SourceFunction (e.g. a
FlinkKinesisConsumer) as the ingress.
Simply instantiate a `SourceFunctionSpec` with the desired ID, and provide
a custom FlinkKinesisConsumer that you create directly (which should allow
you to provide the ConsumerConfigConstants).

As a side note, I've created this JIRA to address the issue you
encountered, as I believe this should be supported in the native StateFun
Kinesis ingress [2].

Cheers,
Gordon

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-io/src/main/java/org/apache/flink/statefun/flink/io/datastream/SourceFunctionSpec.java
[2] https://issues.apache.org/jira/browse/FLINK-22529

On Thu, Apr 29, 2021 at 7:25 AM Ammon Diether  wrote:

>
> When using Flink Stateful Function's KinesisIngressBuilder, I do not see a
> way to set things like ConsumerConfigConstants.SHARD_GETRECORDS_MAX or
> ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS
>
> Looking at KinesisSourceProvider, it appears that this is the spot that
> creates the FlinkKinesisConsumer. The function named
> propertiesFromSpec(kinesisIngressSpec) only allows for AWS properties and a
> few startup position properties.
> ConsumerConfigConstants.SHARD_GETRECORDS_MAX cannot be provided.
>
> Is there an obvious workaround?
>
>


[ANNOUNCE] Apache Flink Stateful Functions 3.0.0 released

2021-04-15 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Stateful Functions (StateFun) 3.0.0.

StateFun is a cross-platform stack for building Stateful Serverless
applications, making it radically simpler to develop scalable, consistent,
and elastic distributed applications.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2021/04/15/release-statefun-3.0.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for StateFun can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for StateFun published to the PyPI index can be found at:
https://pypi.org/project/apache-flink-statefun/

Official Docker images for StateFun are published to Docker Hub:
https://hub.docker.com/r/apache/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348822

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: How to know if task-local recovery kicked in for some nodes?

2021-04-06 Thread Tzu-Li (Gordon) Tai
Hi Sonam,

Pulling in Till (cc'ed), I believe he would likely be able to help you here.

Cheers,
Gordon

On Fri, Apr 2, 2021 at 8:18 AM Sonam Mandal  wrote:

> Hello,
>
> We are experimenting with task local recovery and I wanted to know whether
> there is a way to validate that some tasks of the job recovered from the
> local state rather than the remote state.
>
> We've currently set this up to have 2 Task Managers with 2 slots each, and
> we run a job with parallelism 4. To simulate failure, we kill one of the
> Task Manager pods (we run on Kubernetes). I want to see if the local state
> of the other Task Manager was used or not. I do understand that the state
> for the killed Task Manager will need to be fetched from the checkpoint.
>
> Also, do you have any suggestions on how to test such failure scenarios in
> a better way?
>
> Thanks,
> Sonam
>


Re: Why is Hive dependency flink-sql-connector-hive not available on Maven Central?

2021-04-06 Thread Tzu-Li (Gordon) Tai
Hi,

I'm pulling in Rui Li (cc'ed) who might be able to help you here as he
actively maintains the hive connectors.

Cheers,
Gordon


On Fri, Apr 2, 2021 at 11:36 AM Yik San Chan 
wrote:

> The question is cross-posted in StackOverflow
> https://stackoverflow.com/questions/66914119/flink-why-is-hive-dependency-flink-sql-connector-hive-not-available-on-maven-ce
>
> According to [Flink SQL Hive: Using bundled hive jar](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
> ):
>
> > The following tables list all available bundled hive jars. You can pick
> one to the /lib/ directory in Flink distribution.
> > - flink-sql-connector-hive-1.2.2 (download link)
> > - flink-sql-connector-hive-2.2.0 (download link)
> > ...
>
> However, these dependencies are not available from Maven central. As a
> work around, I use [user defined dependencies](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#user-defined-dependencies),
> but this is not recommended:
>
> > the recommended way to add dependency is to use a bundled jar. Separate
> jars should be used only if bundled jars don’t meet your needs.
>
> I wonder why the bundle jars are not available in Maven central?
>
> Follow-up: Since they are not available from Maven central, I wonder how
> to include them in pom.xml in order to run `mvn package`?
>
> Thanks!
>


Re: How to specific key serializer

2021-03-31 Thread Tzu-Li (Gordon) Tai
Hi CZ,

The issue here is that the Scala DataStream API uses Scala macros to decide
the serializer to be used. Since that recognizes Scala case classes, the
CaseClassSerializer will be used.
However, in the State Processor API, those Scala macros do not come into
play, and therefore it directly goes to Flink's type extraction for Java
classes, which recognizes this as a Avro generated class.
In general, currently the State Processor API doesn't support savepoints
written by Scala DataStream jobs that well.

You can try using TypeInfo annotations to specify a TypeInformationFactory
for your key class [1].
This allows you to "plug-in" the TypeInformation extracted by Flink for a
given class. In that custom TypeInformation, you should let it return the
correct serializer.

Best,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#defining-type-information-using-a-factory

On Mon, Mar 29, 2021 at 2:42 PM ChangZhuo Chen (陳昌倬) 
wrote:

> Hi,
>
> Currently we use sbt-avrohugger [0] to generate key class for keyed
> state.  The key class generated by sbt-avrohugger is both case class,
> and AVRO specific record. However, in the following scenarons, Flink
> uses different serializers:
>
>
> * In streaming application, Flink uses CaseClassSerializer for key
>   class.
> * In state processor API application, Flink uses AvroSerializer for key
>   class.
>
>
> Since they use different serializers for key, they are not compatible.
> Is there any way to specific key serializer so that both applications
> use the same serializer?
>
>
>
> [0] https://github.com/julianpeeters/sbt-avrohugger
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: Support for sending generic class

2021-03-30 Thread Tzu-Li (Gordon) Tai
Hi Le,

Thanks for reaching out with this question! It's actually a good segue to
allow me to introduce you to StateFun 3.0.0 :)

StateFun 3.0+ comes with a new type system that would eliminate this
hassle. You can take a sneak peek here [1].
This is part 1 of a series of tutorials on fundamentals on the upcoming new
Java SDK (you can find tutorials for other languages there as well), and it
guides you through a bit on the new type system.

For your specific case, what you would do is implement a `Type` for your
Tuple3 messages. The `Type` contains information including a typename to
identify the data type, and a serializer for de-/serializing the data.
This `Type` can then be used when creating messages to be sent to other
functions and egresses, or used as the type specification for persisted
state values.

If you're not in production usage already, I would highly suggest waiting a
bit for StateFun 3.0.0 as it is just around the corner with an ongoing
release candidate vote [2] and is expected to be available within 1-2 weeks.

Let me know if this helps!

Cheers,
Gordon

[1]
https://github.com/apache/flink-statefun-playground/blob/dev/java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part1/types/TypeSystemShowcaseFn.java
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html

On Tue, Mar 30, 2021 at 8:17 PM Till Rohrmann  wrote:

> Hi Le,
>
> I am pulling in Gordon who might be able to help you with your question.
>
> Looking at the interface Context, it looks that you cannot easily specify
> a TypeHint for the message you want to send. Hence, I guess that you
> explicitly need to register these types.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 8:20 AM Le Xu  wrote:
>
>> Hello!
>>
>> I'm trying to figure out whether Flink Statefun supports sending object
>> with class that has generic parameter types (and potentially nested types).
>> For example, I send a message that looks like this:
>>
>> context.send(SINK_EVENT, idString, new Tuple3<>(someLongObject,
>> listOfLongObject, Long));
>>
>> And obviously I'm getting complaints like this:
>>
>> Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract
>> TypeInformation from Class alone, because generic parameters are missing.
>> Please use TypeInformation.of(TypeHint) instead, or another equivalent
>> method in the API that accepts a TypeHint instead of a Class. For example
>> for a Tuple2 pass a 'new TypeHint>(){}'.
>> at
>> org.apache.flink.api.common.typeinfo.TypeInformation.of(TypeInformation.java:214)
>> at
>> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.typeInformation(DynamicallyRegisteredTypes.java:60)
>> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>> at
>> org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes.registerType(DynamicallyRegisteredTypes.java:49)
>> at
>> org.apache.flink.statefun.flink.core.state.FlinkState.createFlinkStateTableAccessor(FlinkState.java:100)
>> at
>> org.apache.flink.statefun.flink.core.state.FlinkStateBinder.bindTable(FlinkStateBinder.java:54)
>> at
>> org.apache.flink.statefun.sdk.state.StateBinder.bind(StateBinder.java:39)
>> at
>> org.apache.flink.statefun.flink.core.state.PersistedStates.findReflectivelyAndBind(PersistedStates.java:42)
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.load(StatefulFunctionRepository.java:74)
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunctionRepository.get(StatefulFunctionRepository.java:59)
>> at
>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.newActivation(LocalFunctionGroup.java:75)
>> at
>> org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.enqueue(LocalFunctionGroup.java:52)
>> at
>> org.apache.flink.statefun.flink.core.functions.LocalSink.accept(LocalSink.java:36)
>> at
>> org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:92)
>> at org.apache.flink.statefun.sdk.Context.send(Context.java:88)
>> at
>> benchmark.HotItemsPersisted$ParseEventFunction.invoke(HotItemsPersisted.java:292)
>> at
>> org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
>>
>>
>> Is there any API function that statefun support for parameterized class
>> like this or does the user function need to handle the serialization
>> process -- or is there anyway to quickly modify statefun message interface
>> to support this functionality.
>>
>> Thanks!
>>
>> Le
>>
>>
>>
>>
>>
>>


Re: StateFun examples in scala

2021-03-30 Thread Tzu-Li (Gordon) Tai
Hi Jose!

For Scala, we would suggest to wait until StateFun 3.0.0 is released, which
is actually happening very soon (likely within 1-2 weeks) as there is an
ongoing release candidate vote [1].

The reason for this is that version 3.0 adds a remote SDK for Java, which
you should be able to use with Scala (or any other JVM language) seamlessly.
With StateFun <= 2.x, you only have the option to use embedded functions if
you'd like to use Java / Scala, which is a bit problematic and we can't
guarantee that it'll work for all Scala versions.

You can take a look at a preview of the new Java SDK here [2], this is a
nice tutorial that runs you through all the SDK fundamentals.
Note that this is not completely finalized yet, as the release voting is
still ongoing.

Would be great if you want to try the release candidate out already, or
have some feedback for the new SDK!

Cheers,
Gordon

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-3-0-0-release-candidate-1-td49821.html
[2]
https://github.com/apache/flink-statefun-playground/tree/dev/java/showcase

On Tue, Mar 30, 2021 at 8:21 PM Till Rohrmann  wrote:

> Hi Jose,
>
> I am pulling in Gordon who will be able to help you with your question.
>
> Personally, I am not aware of any limitations which prohibit the usage of
> Scala.
>
> Cheers,
> Till
>
> On Tue, Mar 30, 2021 at 11:55 AM jose farfan  wrote:
>
>> Hi
>>
>> I am trying to find some examples written in scala of StateFun.
>>
>> But, I cannot find nothing.
>> My questions is:
>>
>>1. is there any problem to use statefun with Scala
>>2. is there any place with examples written in scala.
>>
>> BR
>> Jose
>>
>


Re: Relation between Two Phase Commit and Kafka's transaction aware producer

2021-03-15 Thread Tzu-Li (Gordon) Tai
+ user@f.a.o  (adding the conversation back to the user mailing list)

On Fri, Mar 12, 2021 at 6:06 AM Kevin Kwon  wrote:

> Thanks Tzu-Li
>
> Interesting algorithm. Is consumer offset also committed to Kafka at the
> last COMMIT stage after the checkpoint has completed?
>

Flink does commit the offsets back to Kafka when sources perform
checkpoints, but those offsets are not used for fault-tolerance and restore
by Flink. They are purely used as a means for exposing consumption progress.
Flink only respects the offsets being written to its checkpoints. Those
offsets are essentially the state of the FlinkKafkaConsumer sources, and
are written to checkpoints by the sources.
As previously explained, the last COMMIT stage comes after that, i.e. after
all Flink operators complete their state checkpoint.


>
> Also does the coordinator (JM) write any data in write-ahead-log before
> sending out commit messages to all Flink entities? I'm concerned when JM
> succeeds sending a commit message to some entities but fails to others and
> dies.
>

No. And indeed, while Flink guarantees that checkpoint complete
notifications will be eventually received by all listening operators (e.g.
the Kafka sinks), the job can ideed fail when only partially some sinks
have received the notification (and commits).
The way Flink handles the issue you mentioned, is that all pending-commit
transaction ids will be part of the sink's state.
When a sink checkpoints its state (during the pre-commit phase), it writes
all pending-commit transaction ids. If for any reason the job fails and
failover is triggered, the restored lastest complete checkpoint will
contain those pending-commit transaction ids.
Then, those pending transactions will be attempted to be committed.
So, in the end, you can see this as the transactions will all eventually be
successfully committed, even in the event of a failure.


>
> Finally, seems 2PC is implemented in order to make 3 entities, Kafka
> producer data / Kafka consumer offset / Flink Checkpoint to be in
> consistent state. However, since checkpoint is an ever increasing state
> like ledger that prunes the previous state as it goes, isn't
> write-ahead-log in the sink side enough to handle the exactly-once
> processing guarantee? what I mean is checking the state between WHL and the
> current checkpoint status and conservatively rollback to previous
> checkpoint and replay all data
>
> On Thu, Mar 11, 2021 at 7:44 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Kevin,
>>
>> Perhaps the easiest way to answer your question, is to go through how the
>> exactly-once FlinkKafkaProducer using a 2PC implementation on top of
>> Flink's checkpointing mechanism.
>>
>> The phases can be broken down as follows (simplified assuming max 1
>> concurrent checkpoint and that checkpoint completion notifications are
>> never late):
>>
>>1. BEGIN_TXN: In between each Flink checkpoint, each
>>FlinkKafkaProducer sink operator creates a new Kafka transaction. You can
>>assume that on startup, a new Kafka transaction is created immediately for
>>records that occur before the first checkpoint.
>>2. PRE_COMMIT: Once a FlinkKafkaProducer sink operator receives
>>Flink's checkpoint barrier, it flushes pending records to the current open
>>transaction, and opens a new one for future records, which belongs to the
>>next checkpoint and thus should be written to the next transaction. Once
>>flushed, the sink operator acknowledges it has completed its checkpoint.
>>3. COMMIT: Once all sinks acknowledge checkpoint completion, the
>>Flink checkpoint is considered complete (containing state of all operators
>>+ consumer offsets). Once that happens, Flink notifies each sink operator
>>of the completion, and only upon receiving this notification, can the sink
>>operator commit the previous transaction.
>>
>> There are some edge cases that is handled, e.g. a checkpoint is
>> considered complete, but before all sinks receive the completion
>> notification and commit their transactions, the job fails (that's why txn
>> ids are written into the checkpoint as well, to make sure all txns
>> belonging to that checkpoint is still eventually committed after restore).
>>
>> The general takeaway is that each parallel sink operator can commit the
>> Kafka transactions only after all participants in the 2PC (i.e. all Flink
>> operators and sinks) acknowledge that they are ready to commit.
>> In Flink terms, the JM is the coordinator, and an operator / sink
>> completing their checkpoint is acknowledging that they are ready for
>> committing.
>>
>> From an end-to-end point of view

Re: EOFException on attempt to scale up job with RocksDB state backend

2021-03-15 Thread Tzu-Li (Gordon) Tai
Hi,

Could you provide info on the Flink version used?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: uniqueness of name when constructing a StateDescriptor

2021-03-15 Thread Tzu-Li (Gordon) Tai
Hi,

The scope is per individual operator, i.e. a single KeyedProcessFunction
instance cannot have multiple registered state with the same name.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Statefun] Interaction Protocol for Statefun

2021-03-15 Thread Tzu-Li (Gordon) Tai
Hi,

Interesting idea! Just some initial thoughts and questions, maybe others can
chime in as well.

In general I think the idea of supporting more high-level protocols on top
of the existing StateFun messaging primitives is good.

For example, what probably could be categorized under this effort is, we've
already been thinking about a pub/sub / broadcast / fan-out implementation
with StateFun [1].

As for the DSL specification language for protocols, that definitely sounds
like a stretch goal for the near future.

I'm curious, if you were to start with adding support for one interaction
protocol, which one would you start with and would find most useful for
users?

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-16319



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Extracting state keys for a very large RocksDB savepoint

2021-03-14 Thread Tzu-Li (Gordon) Tai
Hi Andrey,

Perhaps the functionality you described is worth adding to the State
Processor API.
Your observation on how the library currently works is correct; basically it
tries to restore the state backends as is.

In you current implementation, do you see it worthwhile to try to add this?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Relation between Two Phase Commit and Kafka's transaction aware producer

2021-03-10 Thread Tzu-Li (Gordon) Tai
Hi Kevin,

Perhaps the easiest way to answer your question, is to go through how the
exactly-once FlinkKafkaProducer using a 2PC implementation on top of
Flink's checkpointing mechanism.

The phases can be broken down as follows (simplified assuming max 1
concurrent checkpoint and that checkpoint completion notifications are
never late):

   1. BEGIN_TXN: In between each Flink checkpoint, each FlinkKafkaProducer
   sink operator creates a new Kafka transaction. You can assume that on
   startup, a new Kafka transaction is created immediately for records that
   occur before the first checkpoint.
   2. PRE_COMMIT: Once a FlinkKafkaProducer sink operator receives Flink's
   checkpoint barrier, it flushes pending records to the current open
   transaction, and opens a new one for future records, which belongs to the
   next checkpoint and thus should be written to the next transaction. Once
   flushed, the sink operator acknowledges it has completed its checkpoint.
   3. COMMIT: Once all sinks acknowledge checkpoint completion, the Flink
   checkpoint is considered complete (containing state of all operators +
   consumer offsets). Once that happens, Flink notifies each sink operator of
   the completion, and only upon receiving this notification, can the sink
   operator commit the previous transaction.

There are some edge cases that is handled, e.g. a checkpoint is considered
complete, but before all sinks receive the completion notification and
commit their transactions, the job fails (that's why txn ids are written
into the checkpoint as well, to make sure all txns belonging to that
checkpoint is still eventually committed after restore).

The general takeaway is that each parallel sink operator can commit the
Kafka transactions only after all participants in the 2PC (i.e. all Flink
operators and sinks) acknowledge that they are ready to commit.
In Flink terms, the JM is the coordinator, and an operator / sink
completing their checkpoint is acknowledging that they are ready for
committing.

>From an end-to-end point of view, downstream consumers of the output Kafka
topic will not see records (assuming they are consuming in Kafka's
read.commited mode) until the upstream Flink application sink commits the
open Kafka transactions.
This boils down to, the read latency for downstream applications is at
least the upstream Flink app's checkpoint interval.

Hope this helps!

Cheers,
Gordon

On Wed, Mar 10, 2021 at 5:20 PM Kevin Kwon  wrote:

> Hi team, I just have a bit of confusion where Two Phase Commit and Kafka's
> transaction aware producer using transaction.id and enable.autocommit
> plays together
>
> what I understand of Flink checkpoint (correct me if I'm wrong) is that it
> saves the transaction ID as well as the consumer's commit offsets, so when
> application fails and restarts, it will reprocess everything from the last
> checkpoint and data will be idempotently processed in the Kafka side.
> (exactly-once processing rather than exactly-once delivery)
>
> the question is where does 2 phase commit play a role here?
>


Re: Best practices for complex state manipulation

2021-03-10 Thread Tzu-Li (Gordon) Tai
Hi Dan,

For a deeper dive into state backends and how they manage state, or
performance critical aspects such as state serialization and choosing
appropriate state structures, I highly recommend starting from this webinar
done by my colleague Seth Weismann:
https://www.youtube.com/watch?v=9GF8Hwqzwnk.

Cheers,
Gordon

On Wed, Mar 10, 2021 at 1:58 AM Dan Hill  wrote:

> Hi!
>
> I'm working on a join setup that does fuzzy matching in case the client
> does not send enough parameters to join by a foreign key.  There's a few
> ways I can store the state.  I'm curious about best practices around this.
> I'm using rocksdb as the state storage.
>
> I was reading the code for IntervalJoin
> 
> and was a little shocked by the implementation.  It feels designed for very
> short join intervals.
>
> I read this set of pages
> 
> but I'm looking for one level deeper.  E.g. what are performance
> characteristics of different types of state crud operations with rocksdb?
> E.g. I could create extra MapState to act as an index.  When is this worth
> it?
>
>
>


Re: Job downgrade

2021-03-07 Thread Tzu-Li (Gordon) Tai
Hi Alexey,

Thanks for confirming.

Can you send me a copy of the exception stack trace? That could help me
pinpoint the exact issue.

Cheers,
Gordon

On Fri, Mar 5, 2021 at 2:02 PM Alexey Trenikhun  wrote:

> Hi Gordon,
> I was using RocksDB backend
> Alexey
>
> ------
> *From:* Tzu-Li (Gordon) Tai 
> *Sent:* Thursday, March 4, 2021 12:58:01 AM
> *To:* Alexey Trenikhun 
> *Cc:* Piotr Nowojski ; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: Job downgrade
>
> Hi Alexey,
>
> Are you using the heap backend? If that's the case, then for whatever
> state was registered at the time of a savepoint, Flink will attempt to
> restore it to the heap backends.
> This essentially means that state "B" will be read as well, that would
> explain why Flink is trying to locate class B in the classpath.
>
> For this scenario, class B needs to be in the classpath if you downgrade
> back to version 1, with a savepoint taken with version 2 of the job.
>
> - Gordon
>
> On Thu, Mar 4, 2021 at 4:04 AM Alexey Trenikhun  wrote:
>
> If I copy class A into version 1+ it works. But it is the problem from CD
> perspective - I want to introduce feature which required new state: 1st I
> need make version 1+ with class B, but no other changes, then version 2 with
> class B and logic changes, upgrade job and if job doesn’t do what expected
> “rollback” to version 1+.
>
> --
> *From:* Piotr Nowojski 
> *Sent:* Wednesday, March 3, 2021 11:47:45 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Job downgrade
>
> Hi,
>
> I'm not sure what's the reason behind this. Probably classes are somehow
> attached to the state and this would explain why you are experiencing this
> issue. I've asked someone else from the community to chip in, but in the
> meantime, can not you just prepare a new "version 1" of the job, with just
> some empty `class B` on the class path? Or if this doesn't work, just copy
> the whole `class B` from version 2?
>
> Best,
> Piotrek
>
> sob., 27 lut 2021 o 19:10 Alexey Trenikhun  napisał(a):
>
> Hello,
> Let's have version 1 of my job uses keyed state with name "a" and type A,
> which some Avro generated class. Then I upgrade to version 2, which in
> addition uses keyed state "b" and type B (another concrete Avro generated
> class), I take savepoint with version 2 and decided to downgrade to version
> 1 and start with taken savepoint, can I do it? On one hand, version 1
> doesn't have state "b", but seems Flink still tries to create call
> restoreSerializer​ and it tries to read runtimeType (`class B`) which is
> not available in version 1
>
> Thanks,
> Alexey
>
>


Re: Job downgrade

2021-03-04 Thread Tzu-Li (Gordon) Tai
Hi Alexey,

Are you using the heap backend? If that's the case, then for whatever state
was registered at the time of a savepoint, Flink will attempt to restore it
to the heap backends.
This essentially means that state "B" will be read as well, that would
explain why Flink is trying to locate class B in the classpath.

For this scenario, class B needs to be in the classpath if you downgrade
back to version 1, with a savepoint taken with version 2 of the job.

- Gordon

On Thu, Mar 4, 2021 at 4:04 AM Alexey Trenikhun  wrote:

> If I copy class A into version 1+ it works. But it is the problem from CD
> perspective - I want to introduce feature which required new state: 1st I
> need make version 1+ with class B, but no other changes, then version 2 with
> class B and logic changes, upgrade job and if job doesn’t do what expected
> “rollback” to version 1+.
>
> --
> *From:* Piotr Nowojski 
> *Sent:* Wednesday, March 3, 2021 11:47:45 AM
> *To:* Alexey Trenikhun 
> *Cc:* Flink User Mail List 
> *Subject:* Re: Job downgrade
>
> Hi,
>
> I'm not sure what's the reason behind this. Probably classes are somehow
> attached to the state and this would explain why you are experiencing this
> issue. I've asked someone else from the community to chip in, but in the
> meantime, can not you just prepare a new "version 1" of the job, with just
> some empty `class B` on the class path? Or if this doesn't work, just copy
> the whole `class B` from version 2?
>
> Best,
> Piotrek
>
> sob., 27 lut 2021 o 19:10 Alexey Trenikhun  napisał(a):
>
> Hello,
> Let's have version 1 of my job uses keyed state with name "a" and type A,
> which some Avro generated class. Then I upgrade to version 2, which in
> addition uses keyed state "b" and type B (another concrete Avro generated
> class), I take savepoint with version 2 and decided to downgrade to version
> 1 and start with taken savepoint, can I do it? On one hand, version 1
> doesn't have state "b", but seems Flink still tries to create call
> restoreSerializer​ and it tries to read runtimeType (`class B`) which is
> not available in version 1
>
> Thanks,
> Alexey
>
>


Re: Flink Statefun TTL

2021-02-24 Thread Tzu-Li (Gordon) Tai
On Thu, Feb 25, 2021 at 12:06 PM Timothy Bess  wrote:

> Hi Gordon,
>
> Ah so when it said "all registered state" that means all state keys
> defined in the "module.yaml", not all state for all function instances. So
> the expiration has always been _per_ instance then and not across all
> instances of a function.
>

Exactly! Expiration happens individually for each function instance per
declared state.


>
> Thanks for the heads up, that sounds like a good change! I definitely like
> the idea of putting more configuration into the SDK so that there's not two
> sources that have to be kept up to date. Would be neat if eventually the
> SDK just hosts some "/spec" endpoint that serves a list of functions and
> all their configuration options to Statefun on boot.
>

> Btw, I ended up also making a Scala replica of my Haskell library to use
> at work (some of my examples in the microsite are a bit out of date, need
> to revisit that):
> https://github.com/BlueChipFinancial/flink-statefun4s
>
> I know it seems weird to not use an embedded function, but it keeps us
> from having to deal with mismatched Scala versions since Flink is still on
> 2.12, and generally reduces friction using stuff in the Scala Cats
> ecosystem.
>

Really cool to hear about your efforts on a Scala SDK!

I would not say it is weird to implement a Scala SDK for remote functions.
In fact, with the changes upcoming in 3.0, the community is doubling down
on remote as the primary deployment mode for functions, and would like to
have a wider array of supported language SDKs. There's actually a remote
Java SDK that was just merged to master and to be released in 3.0 [1].

Cheers,
Gordon

[1] https://github.com/apache/flink-statefun/tree/master/statefun-sdk-java


> Thanks,
>
> Tim
>
> On Wed, Feb 24, 2021 at 11:49 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Timothy,
>>
>> Starting from StateFun 2.2.x, in the module.yaml file, you can set for
>> each individual state of a function an "expireMode" field, which values can
>> be either "after-invoke" or "after-write". For example:
>>
>> ```
>> - function:
>> meta:
>>   ...
>> spec:
>>   states:
>> - name: state-1
>>   expireMode: after-write
>>   expireAfter: 1min
>> - name: state-2
>>   expireMode: after-invoke
>>   expireAfter: 5sec
>> ```
>>
>> In earlier versions, expireMode can not be individually set for each
>> state. This is more flexible with 2.2.x.
>>
>> As a side note which is somewhat related, all state related
>> configurations will be removed from the module.yaml, instead to be defined
>> by the language SDKs starting from StateFun 3.0.
>> This opens up even more flexibility, such as zero-downtime upgrades of
>> remote functions which allows adding / removing state declarations without
>> restarting the StateFun cluster.
>> We're planning to reach out to the language SDK developers we know of
>> (which includes you for the Haskell SDK ;) ) soon on a briefing of this
>> change, as there is a change in the remote invocation protocol and will
>> require existing SDKs to be updated in order to work with StateFun 3.0.
>>
>> Cheers,
>> Gordon
>>
>> On Wed, Feb 24, 2021 at 11:00 PM Timothy Bess  wrote:
>>
>>> Hey,
>>>
>>> I noticed that the Flink Statefun 2.1.0 release notes had this snippet
>>> with regards to TTL:
>>>
>>> Note: The state expiration mode for remote functions is currently
>>>> restricted to AFTER_READ_AND_WRITE, and the actual TTL being set is the
>>>> longest duration across all registered state, not for each individual state
>>>> entry. This is planned to be improved in upcoming releases (FLINK-17954).
>>>>
>>>
>>> I noticed that the Ticket and PR for this have been closed with a
>>> reference to commit "289c30e8cdb54d2504ee47a57858a1d179f9a540". Does this
>>> mean that if I upgrade to 2.2.2 and set an expiration in my modules.yaml it
>>> is now "per function id" rather than across instances of said function?
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>


Re: Flink Statefun TTL

2021-02-24 Thread Tzu-Li (Gordon) Tai
Hi Timothy,

Starting from StateFun 2.2.x, in the module.yaml file, you can set for each
individual state of a function an "expireMode" field, which values can be
either "after-invoke" or "after-write". For example:

```
- function:
meta:
  ...
spec:
  states:
- name: state-1
  expireMode: after-write
  expireAfter: 1min
- name: state-2
  expireMode: after-invoke
  expireAfter: 5sec
```

In earlier versions, expireMode can not be individually set for each state.
This is more flexible with 2.2.x.

As a side note which is somewhat related, all state related configurations
will be removed from the module.yaml, instead to be defined by the language
SDKs starting from StateFun 3.0.
This opens up even more flexibility, such as zero-downtime upgrades of
remote functions which allows adding / removing state declarations without
restarting the StateFun cluster.
We're planning to reach out to the language SDK developers we know of
(which includes you for the Haskell SDK ;) ) soon on a briefing of this
change, as there is a change in the remote invocation protocol and will
require existing SDKs to be updated in order to work with StateFun 3.0.

Cheers,
Gordon

On Wed, Feb 24, 2021 at 11:00 PM Timothy Bess  wrote:

> Hey,
>
> I noticed that the Flink Statefun 2.1.0 release notes had this snippet
> with regards to TTL:
>
> Note: The state expiration mode for remote functions is currently
>> restricted to AFTER_READ_AND_WRITE, and the actual TTL being set is the
>> longest duration across all registered state, not for each individual state
>> entry. This is planned to be improved in upcoming releases (FLINK-17954).
>>
>
> I noticed that the Ticket and PR for this have been closed with a
> reference to commit "289c30e8cdb54d2504ee47a57858a1d179f9a540". Does this
> mean that if I upgrade to 2.2.2 and set an expiration in my modules.yaml it
> is now "per function id" rather than across instances of said function?
>
> Thanks,
>
> Tim
>


Re: Run the code in the UI

2021-02-21 Thread Tzu-Li (Gordon) Tai
Hi,

Could you re-elaborate what exactly you mean?

If you wish to run a Flink job within the IDE, but also have the web UI
running for it, you can use
`StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(Configuration)`
to create the execution environment.
The default port 8081 will be used unless specified via `rest.port` in the
configuration.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Statefun] Dynamic behavior

2021-02-21 Thread Tzu-Li (Gordon) Tai
Hi,

FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
primitive in StateFun:
https://issues.apache.org/jira/browse/FLINK-16319

This is probably what you are looking for. And I do agree, in the case that
the control stream (which updates the application logic) is high volume,
redeploying functions may not work well.

I don't think there really is a "recommended" way of doing the "broadcast
control stream, join with main stream" pattern with StateFun at the moment,
at least without FLINK-16319.
On the other hand, it could be possible to use stateful functions to
implement a pub-sub model in user space for the time being. I've actually
left some ideas for implementing that in the comments of FLINK-16319.

Cheers,
Gordon


On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo  wrote:

> Hi everyone,
>
> What is the recommended way of achieving the equivalent of a broadcast in
> Flink when using Stateful Functions?
>
> For instance, assume we are implementing something similar to Flink's
> demo fraud detection
>  but
> in Stateful Functions - how can one dynamically update the application's
> logic then?
> There was a similar question in this mailing list in the past where it was 
> recommended
> moving the dynamic logic to a remote function
> 
>  so
> that one could achieve that by deploying a new container. I think that's
> not very realistic as updates might happen with a frequency that's not
> compatible with that approach (e.g., sticking to the fraud detection
> example, updating fraud detection rules every hour is not unusual), nor
> should one be deploying a new container when data (not code) changes.
>
> Is there a way of, for example, modifying FunctionProviders
> 
> on the fly?
>
> Thanks,
> Miguel
>


Re: Flink 1.11.3 not able to restore with savepoint taken on Flink 1.9.3

2021-02-19 Thread Tzu-Li (Gordon) Tai
Hi,

I'm not aware of any breaking changes in the savepoint formats from 1.9.3 to
1.11.3.

Let's first try to rule out any obvious causes of this:
- Were any data types / classes that were used in state changed across the
restores? Remember that keys types are also written as part of state
snapshots.
- Did you register any Kryo types in the 1.9.3 execution, had changed those
configuration across the restores?
- Was unaligned checkpointing enabled in the 1.11.3 restore?

As of now it's a bit hard to debug this with just an EOFException, as the
corrupted read could have happened anywhere before that point. If it's
possible to reproduce a minimal job of yours that has the same restore
behaviour, that could also help a lot.

Thanks,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Statefun: cancel "sendAfter"

2021-02-05 Thread Tzu-Li (Gordon) Tai
Hi Stephan,

Thanks for providing the details of the use case! It does indeed sound like
being able to delete scheduled delayed messages would help here.

And yes, please do proceed with creating an issue. As for details on the
implementation, we can continue to discuss that on the JIRA.

Cheers,
Gordon

On Wed, Feb 3, 2021 at 3:43 PM Stephan Pelikan 
wrote:

> Hi,
>
>
>
> thank you Gordon for clarification. My use-case is processing business
> events of customers. Those events are triggered by ourself or by the
> customer depending of what’s the current state of the ongoing customer’s
> business use-case. We need to monitor delayed/missing business events which
> belong to previous events. For example: the customer has to confirm
> something we did. Depending on what it is the confirmation has to be within
> hours, days or even months. If there is a delay we need to know. But if the
> customer confirms in time we want to cleanup to keep the state small.
>
>
>
> I dug a little bit into the code. May I create an issue to discuss my
> ideas?
>
>
>
> Cheers,
>
> Stephan
>
>
>
>
>
> *Von:* Tzu-Li (Gordon) Tai 
> *Gesendet:* Mittwoch, 3. Februar 2021 07:58
> *An:* Stephan Pelikan 
> *Cc:* user@flink.apache.org; Igal Shilman 
> *Betreff:* Re: Statefun: cancel "sendAfter"
>
>
>
> Hi,
>
> You are right, currently StateFun does not support deleting a scheduled
> delayed message.
>
> StateFun supports delayed messages by building on top of two Flink
> constructs: 1) registering processing time timers, and 2) buffering the
> message payload to be sent in state.
>
> The delayed messages are kept in the Flink state of the sending operator,
> and timers are registered on the sending operator as well. So technically,
> there doesn't seem to be a blocker for deleting a delayed message and its
> associated timer, if it hasn't been sent yet.
>
> Can you maybe open a JIRA ticket for this, so we have something that
> tracks it?
> Also cc'ing Igal, who might have more comments on whether supporting this
> makes sense.
>
> Cheers,
> Gordon
>
>
>
> On Wed, Feb 3, 2021 at 3:51 AM Stephan Pelikan 
> wrote:
>
> Hi,
>
>
>
> I think about using „sendAfter“ to implement some kind of timer
> functionality. I’m wondering if there is no possibility to cancel delayed
> sent message!
>
>
>
> In my use case it is possible that intermediate events make the delayed
> message obsolete. In some cases the statefun of that certain ID is cleared
> (clear all state variables) and does not exist anymore. In other cases the
> statefun of that ID still exists (and its state). In the latter case I
> could ignore the delayed message, but what about those statefun which do
> not exist anymore?
>
>
>
> Additionally there can be millions of delayed messages which I do not need
> any more and some delays are also hours, days or even months. I don’t want
> to pollute my state with this because it will inflate the size of my
> checkpoints.
>
>
>
> There are no hints in the docs (
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/java.html#sending-delayed-messages)
> how those situations are treated. I found in the Flink’s docs that timers
> of keyed processors can be deleted. As far as I know statefuns are based on
> those processors, so I hope that there is something about it. I hope
> someone can clarify what I can expect and how those situations are handled
> internally.
>
>
>
> Thanks,
>
> Stephan
>
>


Re: Statefun: cancel "sendAfter"

2021-02-02 Thread Tzu-Li (Gordon) Tai
Hi,

You are right, currently StateFun does not support deleting a scheduled
delayed message.

StateFun supports delayed messages by building on top of two Flink
constructs: 1) registering processing time timers, and 2) buffering the
message payload to be sent in state.

The delayed messages are kept in the Flink state of the sending operator,
and timers are registered on the sending operator as well. So technically,
there doesn't seem to be a blocker for deleting a delayed message and its
associated timer, if it hasn't been sent yet.

Can you maybe open a JIRA ticket for this, so we have something that tracks
it?
Also cc'ing Igal, who might have more comments on whether supporting this
makes sense.

Cheers,
Gordon


On Wed, Feb 3, 2021 at 3:51 AM Stephan Pelikan 
wrote:

> Hi,
>
>
>
> I think about using „sendAfter“ to implement some kind of timer
> functionality. I’m wondering if there is no possibility to cancel delayed
> sent message!
>
>
>
> In my use case it is possible that intermediate events make the delayed
> message obsolete. In some cases the statefun of that certain ID is cleared
> (clear all state variables) and does not exist anymore. In other cases the
> statefun of that ID still exists (and its state). In the latter case I
> could ignore the delayed message, but what about those statefun which do
> not exist anymore?
>
>
>
> Additionally there can be millions of delayed messages which I do not need
> any more and some delays are also hours, days or even months. I don’t want
> to pollute my state with this because it will inflate the size of my
> checkpoints.
>
>
>
> There are no hints in the docs (
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/java.html#sending-delayed-messages)
> how those situations are treated. I found in the Flink’s docs that timers
> of keyed processors can be deleted. As far as I know statefuns are based on
> those processors, so I hope that there is something about it. I hope
> someone can clarify what I can expect and how those situations are handled
> internally.
>
>
>
> Thanks,
>
> Stephan
>


Re: Question on Flink and Rest API

2021-02-02 Thread Tzu-Li (Gordon) Tai
Hi,

There is no out-of-box Flink source/sink connector for this, but it isn't
unheard of that users have implemented something to support what you
outlined.

One way to possibly achieve this is: in terms of a Flink streaming job
graph, what you would need to do is co-locate the source (which exposes the
endpoint and maintains a pool of open client connections mapped by request
ID), and the sink operators (which receives processed results with the
original request IDs attached, and is in charge for replying to the original
requests). The open client connections need to be process-wide accessible
(e.g. via a static reference), so that when a co-located sink operator
receives a result, it can directly fetch the corresponding client connection
and return a response.

The specifics are of course a bit more evolved; probably need some digging
around previous Flink Forward conference talks to get a better picture.
Hopefully this gives you a starting point to think about.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question a possible use can for Iterative Streams.

2021-02-02 Thread Tzu-Li (Gordon) Tai
Hi Marco,

In the ideal setup, enrichment data existing in external databases is
bootstrapped into the streaming job via Flink's State Processor API, and any
follow-up changes to the enrichment data is streamed into the job as a
second union input on the enrichment operator.
For this solution to scale, lookups to the enrichment data needs to be by
the same key as the input data, i.e. the enrichment data is co-partitioned
with the input data stream.

I assume you've already thought about whether or not this would work for
your case, as it's a common setup for streaming enrichment.

Otherwise, I believe your brainstorming is heading in the right direction,
in the case that remote database lookups + local caching in state is a must.
I'm personally not familiar with the iterative streams in Flink, but in
general I think it is currently discouraged to use it.

On the other hand, I think using Stateful Function's [1] programing
abstraction might work here, as it allows arbitrary messaging between
functions and cyclic dataflows.
There's also an SDK that allows you to embed StateFun functions within a
Flink DataStream job [2].

Very briefly, the way you would model this database cache hit / remote
lookup is by implementing a function, e.g. called DatabaseCache.
The function would expect message types of Lookup(lookupKey), and replies
with a response of Result(lookupKey, value). The abstraction allows you, for
on incoming message, to register state (similar to vanilla Flink), as well
as register async operations with which you'll use to perform remote
database lookups in case of cache / state miss. It also provides means for
"timers" in the form of delayed messages being sent to itself, if you need
some mechanism for cache invalidation.

Hope this provides some direction for you to think about!

Cheers,
Gordon

[1] https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/flink-datastream.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Stateful Functions] Problems with Protobuf Versions

2021-02-01 Thread Tzu-Li (Gordon) Tai
Hi,

This hints an incompatible Protobuf generated class by the protoc compiler,
and the runtime dependency used by the code.

Could you try to make sure the `protoc` compiler version matches the
Protobuf version in your code?

Cheers,
Gordon

On Fri, Jan 29, 2021 at 6:07 AM Jan Brusch 
wrote:

> Hi,
>
> I have a bit of a strange problem: I can't get a Statefun Application to
> Compile or Run (Depending on the exact Protobuf version) with a Protobuf
> version newer than 3.3.0. I have had this problem over multiple project
> setups and multiple versions of Flink Statefun with Java8.
>
> Protobuf 3.3.0 works fine and all, but it does seem a bit odd...
>
>
> The most common error behaviour is a successful maven build and the
> following Runtime Error on Startup:
>
> java.lang.NoClassDefFoundError:
> com/google/protobuf/GeneratedMessageV3$UnusedPrivateParameter
>
>
> Does anyone else have this Problem or found a solution for this in the
> past?
>
>
> Best regards
>
> Jan
>
> 
>
>
>
> --
> neuland  – Büro für Informatik GmbH
> Konsul-Smidt-Str. 8g, 28217 Bremen
>
> Telefon (0421) 380107 57
> Fax (0421) 380107 99
> https://www.neuland-bfi.de
>
> https://twitter.com/neuland
> https://facebook.com/neulandbfi
> https://xing.com/company/neulandbfi
>
>
> Geschäftsführer: Thomas Gebauer, Jan Zander
> Registergericht: Amtsgericht Bremen, HRB 23395 HB
> USt-ID. DE 246585501
>
>


Re: Stateful Functions - accessing the state aside of normal processing

2021-01-27 Thread Tzu-Li (Gordon) Tai
Hi Stephan,

Great to hear about your experience with StateFun so far!

I think what you are looking for is a way to read StateFun checkpoints,
which are basically an immutable consistent point-in-time snapshot of all
the states across all your functions, and run some computation or simply to
explore the state values.
StateFun checkpoints are essentially adopted from Flink, so you can find
more detail about that here [1].

Currently, StateFun does provide a means for state "bootstrapping": running
a batch offline job to write and compose a StateFun checkpoint [2].
What is still missing is the "reading / analysis" side of things, to do
exactly what you described: running a separate batch offline job for
reading and processing an existing StateFun checkpoint.

Before we dive into details on how that may look like, do you think that is
what you would need?

Although I don't think we would be able to support such a feature yet since
we're currently focused on reworking the SDKs and request-reply protocol,
in any case it would be interesting to discuss if this feature would be
important for multiple users already.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-master/concepts/stateful-stream-processing.html#checkpointing
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/deployment-and-operations/state-bootstrap.html

On Wed, Jan 27, 2021 at 11:41 PM Stephan Pelikan 
wrote:

> Hi,
>
>
>
> We are trying to use Statefuns for our tool and it seems to be a good fit.
> I already adopted it and it works quite well. However, we have millions of
> different states (all the same FunctionType but different ids) and each
> state consists of several @Persisted values (values and tables). We want to
> build an administration tool for examining the crowd of states (count,
> histogram, etc.) and each state in detail (the persisted-tables and
> -values).
>
>
>
> Additionally we need some kind of dig-down functionality for finding those
> individual states. For example some of those persisted values can be used
> to categorize the crowd of states.
>
>
>
> My question now is how to achieve this. Is there a way to browse and
> examine statefuns in a read-only fashion (their ids, their persisted
> values)? How can one achieve this without duplicating status in e.g. a
> relational database?
>
>
>
> Thanks,
>
> Stephan
>
>
>
> PS: I have another questions but I will send them in separate mails to
> avoid mixing up topics.
>


[ANNOUNCE] Stateful Functions Docker images are now hosted on Dockerhub at apache/flink-statefun

2021-01-18 Thread Tzu-Li (Gordon) Tai
Hi,

We have created an "apache/flink-statefun" Dockerhub repository managed by
the Flink PMC, at:
https://hub.docker.com/r/apache/flink-statefun

The images for the latest stable StateFun release, 2.2.2, have already been
pushed there.
Going forward, it will be part of the release process to make official
images of newer release versions available there as well.

Cheers,
Gordon


Re: Statefun with RabbitMQ consumes message but does not run statefun

2021-01-12 Thread Tzu-Li (Gordon) Tai
Hi,

There is no lock-step of releasing a new StateFun release when a new Flink
release goes out. StateFun and Flink have individual releasing schemes and
schedules.

Usually, for new major StateFun version releases, we will upgrade its Flink
dependency to the latest available version.
We are currently targeting mid February for the next major StateFun
release, which by then the Flink dependency will be upgraded to 1.12.x.
In the meantime, if you'd like to work against Flink 1.12.x with StateFun,
you might have to resort to building the artifacts yourself.

Cheers,
Gordon

On Tue, Jan 12, 2021 at 3:57 PM Stephan Pelikan 
wrote:

> I found the reason: There is a class incompatibility because I changed from
>
> Statefun 2.2.1 + Flink 1.11.1
>
> to
>
> Statefun 2.2.1 + Flink 1.12.0
>
>
>
> But even the newest version of Statefun 2.2.2 refers to Flink 1.11.3.
>
>
>
> Is there a possibility to use the newest version of Flink in combination
> with the newest version of Statefun? I’m wondering why there is no Statefun
> version matching the current stable version of Flink?
>
>
>
> Stephan
>
>
>
>
>
> *Von:* Stephan Pelikan 
> *Gesendet:* Montag, 11. Jänner 2021 19:37
> *An:* user@flink.apache.org
> *Betreff:* Statefun with RabbitMQ consumes message but does not run
> statefun
>
>
>
> Hi,
>
>
>
> I try to use RabbitMQ as a Source. My source consumes messages of the
> queue but the statefun is not execution – not even created.
>
>
>
> This is my main function:
>
>
>
> 1 public static void main(String[] args) throws Exception {
>
> 2
>
> 3 final var env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> 4
>
> 5 env.registerTypeWithKryoSerializer(Any.class,
> ProtobufSerializer.class);
>
> 6
>
> 7 env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE);
>
> 8 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
>
> 9 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> 10
> env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
> 11
>
> 12 final var statefunConfig =
> StatefulFunctionsConfig.fromEnvironment(env);
>
> 13 statefunConfig.setFlinkJobName("test");
>
> 14
> statefunConfig.setFactoryType(MessageFactoryType.WITH_KRYO_PAYLOADS);
>
> 15
>
> 16 final var connectionConfig = new RMQConnectionConfig.Builder()
>
> 17 .setHost("localhost")
>
> 18 .setUserName("guest")
>
> 19 .setPassword("guest")
>
> 20 .setPort(5672)
>
> 21 .setVirtualHost("test")
>
> 22 .setPrefetchCount(5000)
>
> 23 .build();
>
> 24
>
> 25 final var deserializationSchema = new
> TypeInformationSerializationSchema<>(
>
> 26 new ProtobufTypeInformation<>(Any.class), env.getConfig());
>
> 27 final var rmqSource = new RMQSource<>(connectionConfig,
> TEST_INGRESS, true, deserializationSchema);
>
> 28
>
> 29 final var source = env
>
> 30 .addSource(rmqSource, TEST_INGRESS)
>
> 31 .setParallelism(1)
>
> 32 .map(msg -> {
>
> 33 return RoutableMessageBuilder
>
> 34 .builder()
>
> 35 .withTargetAddress(MyStatefun.TYPE, Utils.getUUID())
>
> 36 .withMessageBody(msg)
>
> 37 .build();
>
> 38 });
>
> 39
>
> 40 StatefulFunctionDataStreamBuilder
>
> 41 .builder("test")
>
> 42 .withDataStreamAsIngress(source)
>
> 43 .withFunctionProvider(MyStatefun.TYPE, unused -> {
>
> 44 return new MyStatefun();
>
> 45 })
>
> 46 .withEgressId(MyStatefun.EGRESS)
>
> 47 .withConfiguration(statefunConfig)
>
> 48 .build(env)
>
> 49 .getDataStreamForEgressId(MyStatefun.EGRESS)
>
> 50 .addSink(new PrintSinkFunction<>(true));
>
> 51
>
> 52 env.execute();
>
> 53
>
> 54 }
>
>
>
> A breakpoint in line 33 shows me the messages consumed. A breakpoint in
> line 44 is never called. The message is reportingly consumed but never
> acknowledged or processed. Before using RabbitMQ I used a custom
> SourceFunction to fake input data and it worked well.
>
>
>
> To setup things I use a local environment but logging does not show up any
> errors. Before my current problem I had another error during message
> deserialization and it wasn’t reported either. Unfortunately I didn’t
> manage to get the exception in the log/stdout. I had to use the debugger to
> find the reason of the former problem. In this situation now the debugger
> shows no thrown or caught exceptions. That’s way I stuck.
>
>
>
> Of course I would like to know what’s the problem with my code. But I
> guess it is not obviously. Maybe some can give me a hint how to turn on
> exception logging which might help to get closer to the origin of the
> phenomenon.
>
>
>
> Thanks in advance,
>
> Stephan
>
>
>


[ANNOUNCE] Apache Flink Stateful Functions 2.2.2 released

2021-01-01 Thread Tzu-Li (Gordon) Tai
The Apache Flink community released the second bugfix release of the
Stateful Functions (StateFun) 2.2 series, version 2.2.2.

*We strongly recommend all users to upgrade to this version.*

*Please check out the release announcement:*
https://flink.apache.org/news/2021/01/02/release-statefun-2.2.2.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at:
https://pypi.org/project/apache-flink-statefun/

Official Dockerfiles for building Stateful Functions Docker images can be
found at:
https://github.com/apache/flink-statefun-docker

Alternatively, Ververica has volunteered to make Stateful Function's images
available for the community via their public Docker Hub registry:
https://hub.docker.com/r/ververica/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349366

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: Problems with FlinkKafkaProducer - closing after timeoutMillis = 9223372036854775807 ms.

2020-11-19 Thread Tzu-Li (Gordon) Tai
Hi,

One thing to clarify first:
I think the "Closing the Kafka producer with timeoutMillis =
9223372036854775807 ms" log doesn't necessarily mean that a producer was
closed due to timeout (Long.MAX_VALUE).
I guess that is just a Kafka log message that is logged when a Kafka
producer is closed without specifying a timeout (i.e., infinite timeout, or
Long.MAX_VALUE in Kafka's case).

With that in mind, when using exactly-once semantics for the
FlinkKafkaProducer, there is a fixed-sized pool of short-living Kafka
producers that are created for each concurrent checkpoint.
When a checkpoint begins, the FlinkKafkaProducer creates a new producer for
that checkpoint. Once said checkpoint completes, the producer for that
checkpoint is attempted to be closed and recycled.
So, it is normal to see logs of Kafka producers being closed if you're
using an exactly-once transactional FlinkKafkaProducer.

Best,
Gordon

On Mon, Nov 16, 2020 at 9:11 PM Tim Josefsson 
wrote:

> To add to this, setting FlinkKafkaProducer.Semantic.AT_LEAST_ONCE instead
> of EXACTLY_ONCE makes the problem go away so I imagine there is something
> wrong with my setup.
> I'm using Kafka 2.2 and I have the following things set on the cluster:
>
> transaction.max.timeout.ms=360
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
>
>
> On Mon, 16 Nov 2020 at 14:05, Tim Josefsson 
> wrote:
>
>> Hello!
>>
>> I'm having some problems with my KafkaProducer that I've been unable to
>> find a solution to.
>>
>> I've set up a simple Flink Job that reads from one kafka topic, using
>>*kafkaProps.setProperty("isolation.level", "read_committed") *
>> since I want to support exactly once data in my application.
>>
>> After doing some enriching of the data I read from kafka I have the
>> following producer set up
>>
>> FlinkKafkaProducer kafkaSinkProducer = new
>> FlinkKafkaProducer<>(
>> "enrichedPlayerSessionsTest",
>> new
>> KafkaStringSerializationSchema("enrichedPlayerSessionsTest"),
>> producerProps,
>> FlinkKafkaProducer.Semantic.EXACTLY_ONCE
>> );
>>
>> The producer above is then added as a sink at the end of my Flink job.
>>
>> Now when I run this application I get the following message,
>>
>> 13:44:40,758 INFO  
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
>> clientId=producer-6, transactionalId=Source: playerSession and 
>> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
>> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
>> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
>> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] ProducerId set to 21280 with 
>> epoch 4
>> 13:44:40,759 INFO  org.apache.kafka.clients.producer.KafkaProducer   
>> - [Producer clientId=producer-6, transactionalId=Source: 
>> playerSession and playserSessionStarted from Kafka -> Filter out 
>> playerSessionStarted -> Extract PlayerSession -> Set MDC for event -> Map -> 
>> (Wrap playerSessionEnriched into EntityEventBatch, Sink: Post 
>> sessionEnriched to Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-2] Closing the 
>> Kafka producer with timeoutMillis = 9223372036854775807 ms.
>>
>> Sometime I also see the following:
>>
>> 13:44:43,740 INFO  
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
>> clientId=producer-26, transactionalId=Source: playerSession and 
>> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
>> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
>> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
>> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to -1 with epoch -1
>> 13:44:44,136 INFO  
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer 
>> clientId=producer-26, transactionalId=Source: playerSession and 
>> playserSessionStarted from Kafka -> Filter out playerSessionStarted -> 
>> Extract PlayerSession -> Set MDC for event -> Map -> (Wrap 
>> playerSessionEnriched into EntityEventBatch, Sink: Post sessionEnriched to 
>> Kafka)-ac09b6abdcafcb1e86b87abd220f2a9d-0] ProducerId set to 21297 with 
>> epoch 11
>> 13:44:44,147 INFO  
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - 
>> Consumer subtask 0 has no restore state.
>>
>> Now since this isn't an error the job doesn't crash while running and data 
>> does get written to Kafka even with this message. However it does seem wrong 
>> to me and I'm wondering if anyone has any insight into why this is happening?
>>
>> I'm attaching a GIST with the complete log from the application, I ran the 
>> job with *env.setParallelism(1)* but I still get around 26 producers created 
>> which still seems odd to me. Running without any parallelism set creates 
>> about 300-400 producers (based of the clientIds reported)
>>
>> Thankful for any insight into this!

Re: Flink State Processor API - Bootstrap One state

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi,

Using the State Processor API, modifying the state in an existing savepoint
results in a new savepoint (new directory) with the new modified state.
The original savepoint remains intact.
The API allows you to only touch certain operators, without having to touch
any other state and have them remain as is.

Note that when generating a new savepoint from an existing savepoint, the
State Processor API does not perform a deep copy for untouched state.
Instead, the new savepoint will contain references to the old savepoint for
these untouched state.
Essentially these means that modified savepoints written by the State
Processor API are not self-contained, and you should be careful not to
delete the original savepoints as that would invalidate the generated new
one.

Cheers,
Gordon


On Tue, Nov 17, 2020 at 2:19 PM ApoorvK 
wrote:

> Currently my flink application has state size of 160GB(around 50
> operators),
> where few state operator size is much higher, I am planning to use state
> processor API to bootstrap let say one particular state having operator id
> o1 and inside is a ValueState s1 as ID.
>
> Following steps I have planned to do it :
>
> 1. If I take a savepoint of the application I will be having a state folder
> containing operator and meta data file having o1 and s1.
>
> 2. Now II read only that state using state processor API (another flink
> app)and re-write it with the data that I want with same o1 and s1 ids and
> copy paste this folder to the savepoint folder taken in step 1.
>
> 3. Restore the application with the savepoint taken in step 1.
>
> Doing so as I do not want to touch any other state , I have my concerned
> with a particular state operator.
>
> Team, Kindly let me know if this is the right way to do it, or is there any
> better way using which I can achieve this.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: split avro kafka field

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi,

1. You'd have to configure your Kafka connector source to use a
DeserializationSchema that deserializes the Kafka record byte to your
generated Avro type. You can use the shipped `AvroDeserializationSchema` for
that.

2. After your Kafka connector source, you can use a flatMap transformation
to do the splitting.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Kafka SQL table Re-partition via Flink SQL

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi,

I'm pulling in some Flink SQL experts (in CC) to help you with this one :)

Cheers,
Gordon

On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra 
wrote:

> Hi,
> I am trying to author a SQL job that does repartitioning a Kafka SQL table
> into another Kafka SQL table.
> as example input/output tables have exactly the same SQL schema (see
> below) and data the only difference is that the new kafka stream need to be
> repartition using a simple project like item_id (input stream is
> partitioned by user_id)
> is there a way to do this via SQL only ? without using
> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner
>
> In other words how can we express the stream key (keyedBy) via the SQL
> layer ?
>
> For instance in Hive they expose a system column called  __key or
> __partition that can be used to do this via SQL layer  (see
> https://github.com/apache/hive/tree/master/kafka-handler#table-definitions
> )
>
> CREATE TABLE input_kafkaTable (
>  user_id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior_partition_by_uid',
>  'properties.bootstrap.servers' = 'localhost:9092',
> )
>
> CREATE TABLE output_kafkaTable (
>  user_id BIGINT,
>  item_id BIGINT,
>  category_id BIGINT,
>  behavior STRING,
>  ts TIMESTAMP(3)
> ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'user_behavior_partition_by_iid',
>  'properties.bootstrap.servers' = 'localhost:9092',
> )
>
>
>
> --
>
> B-Slim
> ___/\/\/\___/\/\/\___/\/\/\___/\/\/\___/\/\/\___
>


Re: Flink 1.10 -> Savepoints referencing to checkpoints or not

2020-11-16 Thread Tzu-Li (Gordon) Tai
Hi,

Both the data and metadata is being stored in the savepoint directory, since
Flink 1.3.
The metadata in the savepoint directory does not reference and checkpoint
data files.

In 1.11, what was changed was that the savepoint metadata uses relative
paths to point to the data files in the savepoint directory, instead of
absolute paths.
This would allow you to treat the savepoint directories as self-contained
and free to relocate with filesystem operations outside of Flink.

Hope this clarifies things for you!

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[ANNOUNCE] Apache Flink Stateful Functions 2.2.1 released

2020-11-11 Thread Tzu-Li (Gordon) Tai
The Apache Flink community released the first bugfix release of the
Stateful Functions (StateFun) 2.2 series, version 2.2.1.

This release fixes a critical bug that causes restoring a Stateful
Functions cluster from snapshots (checkpoints or savepoints) to fail under
certain conditions.

*We strongly recommend all users to upgrade to this version.*

*Please check out the release announcement for details on upgrading to
2.2.1:*https://flink.apache.org/news/2020/11/11/release-statefun-2.2.1.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at:
https://pypi.org/project/apache-flink-statefun/

Official Dockerfiles for building Stateful Functions Docker images can be
found at:
https://github.com/apache/flink-statefun-docker

Alternatively, Ververica has volunteered to make Stateful Function's images
available for the community via their public Docker Hub registry:
https://hub.docker.com/r/ververica/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349291

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: debug statefun

2020-11-10 Thread Tzu-Li (Gordon) Tai
On Wed, Nov 11, 2020 at 1:44 PM Tzu-Li (Gordon) Tai 
wrote:

> Hi Lian,
>
> Sorry, I didn't realize that the issue you were bumping into was caused by
> the module not being discovered.
> You're right, the harness utility would not help here.
>

Actually, scratch this comment. The Harness utility actually would help
here with surfacing these module discovery issues / missing META-INF files
in embedded module jars.
When using the Harness, module discovery works exactly the same as normal
application submissions, loaded via the Java SPI.

So, in general, the harness utility can be used to check:

   - Your application logic, messaging between functions, mock ingress
   inputs, etc.
   - Missing constructs in your application modules (e.g. missing ingress /
   egresses, routers)
   - Incorrect module packaging (e.g. missing module.yaml for remote
   modules, or missing META-INF metadata files for embedded modules)

Best,
Gordon

>


Re: debug statefun

2020-11-10 Thread Tzu-Li (Gordon) Tai
Hi Lian,

Sorry, I didn't realize that the issue you were bumping into was caused by
the module not being discovered.
You're right, the harness utility would not help here.

As for the module discovery problem:

   - Have you looked at the contents of your jar, and see that a
   META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
   has indeed been generated by AutoService?
   - Just to rule out the obvious first: besides the
   auto-service-annotations dependency, you also have to add the auto-service
   compiler plugin, as demonstrated here:
   https://github.com/apache/flink-statefun/blob/master/pom.xml#L192

Only after adding the build plugin mentioned above, the META-INF metadata
will be generated for classes annotated with @AutoService.

Please let us know if this resolves the issue for you.

Cheers,
Gordon

On Wed, Nov 11, 2020 at 3:15 AM Lian Jiang  wrote:

> Igal,
>
> I am using AutoService and I don't need to add auto-service-annotations
> since it is provided by statefun-flink-core. Otherwise, my project cannot
> even build. I did exactly the same as
>
>
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-greeter-example/src/main/java/org/apache/flink/statefun/examples/greeter/GreetingModule.java
>
> I did below test:
> In statefun-greeter-example project, replace greeter jar with my jar in
> Dockerfile, running this project can NOT find my module.
>
> In my project, replace my jar with the greeter jar in Dockerfile, running
> this project can find the greeter module.
>
> So I am really puzzled about what is wrong with my jar.
>
>
>
> Gorden,
>
> harness test plumbing of ingress/egress. But it may not help me debug why
> Flink cannot discover my module. Correct?
>
> Thanks guys.
>
>
>
>
>
>
>
>
>
> On Tue, Nov 10, 2020 at 9:11 AM Igal Shilman  wrote:
>
>> Hi Lian,
>>
>> If you are using the statefun-sdk directly (an embedded mode) then, most
>> likely is that you are missing a
>> META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule
>> file that would point to your modules class. We are using Java SPI [1] to
>> load all the stateful functions modules at runtime.
>> Alternatively, you can use the @AutoService annotation [2] (you will need
>> to add a maven dependency for that [3])
>>
>> If you are using the remote functions deployment mode, then please make
>> sure that your module.yaml file is present in your Dockerfile. (for example
>> [4])
>>
>> Good luck,
>> Igal.
>>
>> [1] https://docs.oracle.com/javase/tutorial/ext/basics/spi.html
>> [2]
>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-greeter-example/src/main/java/org/apache/flink/statefun/examples/greeter/GreetingModule.java#L30
>> [3] https://github.com/apache/flink-statefun/blob/master/pom.xml#L85,L89
>> [4]
>> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-python-greeter-example/Dockerfile#L20
>>
>> On Tue, Nov 10, 2020 at 4:47 PM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi,
>>>
>>> StateFun provide's a Harness utility exactly for that, allowing you to
>>> test a StateFun application in the IDE / setting breakpoints etc.
>>> You can take a look at this example on how to use the harness:
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>> .
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On Tue, Nov 10, 2020 at 5:04 AM Lian Jiang 
>>> wrote:
>>>
>>>>
>>>> Hi,
>>>>
>>>> I created a POC by mimicing statefun-greeter-example. However, it
>>>> failed due to:
>>>>
>>>> Caused by: java.lang.IllegalStateException: There are no ingress
>>>> defined.
>>>> at
>>>> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
>>>> ~[statefun-flink-core.jar:2.2.0]
>>>> at
>>>> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
>>>> ~[statefun-flink-core.jar:2.2.0]
>>>> at
>>>> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
>>>> ~[statefun-flink-core.jar:2.2.0]
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> ~[?:1.8.0_265]
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAc

Re: debug statefun

2020-11-10 Thread Tzu-Li (Gordon) Tai
Hi,

StateFun provide's a Harness utility exactly for that, allowing you to test
a StateFun application in the IDE / setting breakpoints etc.
You can take a look at this example on how to use the harness:
https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
.

Cheers,
Gordon

On Tue, Nov 10, 2020 at 5:04 AM Lian Jiang  wrote:

>
> Hi,
>
> I created a POC by mimicing statefun-greeter-example. However, it failed
> due to:
>
> Caused by: java.lang.IllegalStateException: There are no ingress defined.
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)
> ~[statefun-flink-core.jar:2.2.0]
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:71)
> ~[statefun-flink-core.jar:2.2.0]
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob.main(StatefulFunctionsJob.java:47)
> ~[statefun-flink-core.jar:2.2.0]
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_265]
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_265]
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_265]
> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:101)
> ~[statefun-flink-distribution.jar:2.2.0]
> at
> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:194)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:216)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
> ~[flink-dist_2.12-1.11.1.jar:1.11.1]
>
> I have confirmed that something is wrong in my application causing this
> error. However, it is hard to spot the issue visually and a little tricky
> to debug in IDE (e.g. intellij). For example, if I can create an
> application in Intellij and step through statefun library code and my code,
> it will be easier to find the root cause. Any guidance on how to set this
> up? Appreciate any hint. Thanks!
>


Re: cannot pull statefun docker image

2020-11-06 Thread Tzu-Li (Gordon) Tai
Hi,

The Dockerfiles in the examples in the flink-statefun repo currently work
against images built from snapshot development branches.

Ververica has been hosting StateFun base images for released versions:
https://hub.docker.com/r/ververica/flink-statefun
You can change `FROM flink-statefun:*` to `FROM ververica/flink-statefun:*`
and the examples should work.

Alternatively, you can build the images yourself, using the Dockerfiles for
released StateFun versions at:
https://github.com/apache/flink-statefun-docker

Cheers,
Gordon

On Sat, Nov 7, 2020, 12:17 AM Lian Jiang  wrote:

> Hi,
>
> I tried to build statefun-greeter-example docker image with "docker build
> ." but cannot pull the base statefun docker image due to access denied. Any
> idea? Thanks.
>
> $ docker login
> Authenticating with existing credentials...
> Login Succeeded
> lianj:~/repo/flink-statefun/statefun-examples/statefun-greeter-example
> (flink-statefun) (master)
> $ docker build .
> Sending build context to Docker daemon  294.9kB
> Step 1/3 : FROM flink-statefun:2.3-SNAPSHOT
> pull access denied for flink-statefun, repository does not exist or may
> require 'docker login': denied: requested access to the resource is denied
>
>
>


Re: Remote Stateful Function Scalability

2020-10-17 Thread Tzu-Li (Gordon) Tai
Hi Elias,

On Sun, Oct 18, 2020 at 6:16 AM Elias Levy 
wrote:

> After reading the Stateful Functions documentation, I am left wondering
> how remote stateful functions scale.
>
> The documentation mentions that the use of remote functions allows the
> state and compute tiers to scale independently. But the documentation seems
> to imply that only a single instance of a function type can execute at a
> time per worker ("*When an application starts, each parallel worker of
> the framework will create one physical object per function type. This
> object will be used to execute all logical instances of that type that are
> run by that particular worker.*") That would seem to tie and limit the
> parallelism of the compute layer to that of the storage layer even when
> using remote functions.
>

Your observation is correct only for embedded functions, not for remote
functions.
For remote functions, in the StateFun workers each physical object per
function type acts as an asynchronous invocation dispatcher to the type's
remote function service.

Just to quickly brief what the dispatcher does:
The dispatcher *only ensures sequential invocation per logical address*
(function type + logical instance ID / key).
Invocations for different logical addresses (different types / different
keys) can happen concurrently.

If an invocation request for a logical address is in-flight, and other
messages targeted for that address arrive, they are buffered in a backlog
(state) until the pending request completes.
Upon completion, the backlog is flushed and all buffered messages are sent
to the remote function as a single batch invocation request.
Backpressure is applied once the backlog size reaches a threshold.

All in all, in vanilla Flink-land terms, this works similarly to Flink's
AsyncIO without the stream order preserved.

So, to conclude by answering your specific questions:


>
> Can a worker execute multiple concurrent remote stateful functions of
> different types?
>

Yes.


>
> Can a worker execute multiple concurrent remote stateful functions of the
> same type with different keys?
>

Yes.


>
> If a worker can execute multiple concurrent remote stateful functions of
> the same type with different keys, does it ensure their output is ordered
> like its inputs?
>

No, currently StateFun handles outgoing messages (i.e. messages going to
other functions / egresses) only based on the order that the concurrent
invocation requests complete.
However, I believe that it should be possible to support an ordered mode
here at the cost of extra latency (early completes need to be buffered,
checkpoint overhead etc.).

Hope this helps clarify some things!

Cheers,
Gordon


Re: Stateful function and large state applications

2020-10-13 Thread Tzu-Li (Gordon) Tai
Hi,

The StateFun runtime is built directly on top of Apache Flink, so RocksDB
as the state backend is supported as well as all the features for large
state such as checkpointing and local task recovery.

Cheers,
Gordon


On Wed, Oct 14, 2020 at 11:49 AM Lian Jiang  wrote:

> Hi,
>
> I am learning Stateful function and saw below:
>
> "In addition to the Apache Flink processes, a full deployment requires
> ZooKeeper  (for master failover
> )
> and bulk storage (S3, HDFS, NAS, GCS, Azure Blob Store, etc.) to store
> Flink’s checkpoints
> .
> In turn, the deployment requires no database, and Flink processes do not
> require persistent volumes."
>
> Does this mean stateful function does not support rocksdb (and incremental
> checkpoint, local task recovery)? Will it be an issue for large state (e.g.
> 200GB) applications? Thanks for clarifying.
>
>
> Thanks
> Lian
>


Re: Native State in Python Stateful Functions?

2020-10-09 Thread Tzu-Li (Gordon) Tai
Hi,

On Fri, Oct 9, 2020, 4:20 PM Clements, Danial C 
wrote:

> Hi,
>
>
>
> This makes sense and I can appreciate the stateless aspect for the remote
> functions.  We have a number of components that need access to quite a bit
> of data, the idea was to key the incoming stream in a way that would
> minimize calls to a reference DB and then store that result set in the
> state so it would be readily available for subsequent messages with the
> same key.  Additionally, I had hoped to use delayed messages as a way of
> invalidating cache after a certain amount of time.  Please tell me if this
> is an antipattern as this project is really my first foray into stream
> processing.
>

This is definitely not an antipattern!
Co-sharding state and message streams so that compute may benefit from
local state access instead of requiring remote queries is one of the key
principles of distributed stateful stream processing.

Your idea of invalidating old state is also very sane. You can actually
just set a state TTL for that [1].

>
>
> For us, python is a hard requirement so I was hoping that the state would
> be similar to the other Flink jobs where its local to the processor,
> however given the remote stateful architecture, it completely makes sense
> why it
>
Using non-JVM languages, state access must always somehow be transported
out from the Flink JVM processes (where the state is maintained) to the
functions, whether it's over a local or remote network.

This is the same for all Python libraries on top of Flink, such as Flink's
Python Table API, or Apache Beam's Python API.
Both of these require transporting state over a local network.

If you'd like to use StateFun because of the programming constructs and
dynamic messaging flexibility it provides, you actually have many different
function deployment options.

For example, the remote deployment approach I explained in my previous
email, in which functions are deployed as services separate to the Flink
StateFun cluster and can benefit from rapid scalability and zero downtime
upgrades / live reloads.

Alternatively, if you prefer performance over operational flexibility, you
can consider the sidecar / co-location deployment approach [2].

Cheers,
Gordon


[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/index.html#defining-functions

[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/concepts/distributed_architecture.html#co-located-functions

On a separate topic, is anyone using StateFun in production?
>
>
>
> Thanks,
>
> Dan
>
>
>
> *From: *"Tzu-Li (Gordon) Tai" 
> *Date: *Friday 9 October 2020 at 06:54
> *To: *"Clements, Danial C" 
> *Cc: *user 
> *Subject: *Re: Native State in Python Stateful Functions?
>
>
>
> Hi,
>
>
>
> Nice to hear that you are trying out StateFun!
>
>
>
> It is by design that function state is attached to each HTTP invocation
> request, as defined by StateFun's remote invocation request-reply protocol.
>
> This decision was made with typical application cloud-native architectures
> in mind - having function deployments be stateless and require no session
> dependencies between the StateFun runtime and the functions services allows
> the functions to scale out very easily.
>
>
>
> There are some discussions on potentially adding a bi-directional protocol
> in the future so that state can be lazily fetched on demand instead of
> every invocation, but that is still in very early stages of discussion.
>
>
>
> Could you briefly describe what the state access pattern in your
> application looks like?
>
> Maybe this can provide some insight for us in figuring out how a more
> advanced / efficient protocol should be designed in future releases.
>
>
>
> On Thu, Oct 8, 2020, 6:20 PM Clements, Danial C 
> wrote:
>
> Hi,
>
>
>
> In doing some testing with Flink stateful functions in Python and I’ve
> gotten a small POC working.  One of our key requirements for our stream
> processors is that they be written in python due to the skillset of our
> team.  Given that the Python DataStreams api seems to be under development
> in Flink 1.12, we’ve implemented our business logic as a stateful function
> using the remote pattern.  In some testing, it seems the state object is
> getting serialized and sent along with each HTTP request
>
> One clarification here:
>
> StateFun does not serialize or deserialize state, everything is maintained
> and provided to functions as byte arrays.
>
> Serialization / deserialization happens in user code (i.e. the functions).
>
>
>
> Cheers,
>
> Gordon
>
> and given that we’re storing quite a bit of data in this state, this seems
> t

Re: Native State in Python Stateful Functions?

2020-10-08 Thread Tzu-Li (Gordon) Tai
Hi,

Nice to hear that you are trying out StateFun!

It is by design that function state is attached to each HTTP invocation
request, as defined by StateFun's remote invocation request-reply protocol.
This decision was made with typical application cloud-native architectures
in mind - having function deployments be stateless and require no session
dependencies between the StateFun runtime and the functions services allows
the functions to scale out very easily.

There are some discussions on potentially adding a bi-directional protocol
in the future so that state can be lazily fetched on demand instead of
every invocation, but that is still in very early stages of discussion.

Could you briefly describe what the state access pattern in your
application looks like?
Maybe this can provide some insight for us in figuring out how a more
advanced / efficient protocol should be designed in future releases.

On Thu, Oct 8, 2020, 6:20 PM Clements, Danial C 
wrote:

> Hi,
>
>
>
> In doing some testing with Flink stateful functions in Python and I’ve
> gotten a small POC working.  One of our key requirements for our stream
> processors is that they be written in python due to the skillset of our
> team.  Given that the Python DataStreams api seems to be under development
> in Flink 1.12, we’ve implemented our business logic as a stateful function
> using the remote pattern.  In some testing, it seems the state object is
> getting serialized and sent along with each HTTP request
>
One clarification here:
StateFun does not serialize or deserialize state, everything is maintained
and provided to functions as byte arrays.
Serialization / deserialization happens in user code (i.e. the functions).

Cheers,
Gordon

> and given that we’re storing quite a bit of data in this state, this seems
> to contribute to the latency of the application in a linear fashion.  Is
> there any way around this?  Is there a way to store the state local to the
> python application?
>
>
>
> Thanks,
>
> Dan
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>


Re: Stateful Functions + ML model prediction

2020-10-07 Thread Tzu-Li (Gordon) Tai
Hi John,

Thanks a lot for opening the JIRA ticket! If you are interested in
contributing that to StateFun, I'm also happy to guide you with the
contribution.

On Mon, Oct 5, 2020 at 10:24 PM John Morrow 
wrote:

> Thanks for the response Gordon, and that FlinkForward presentation - it's
> been very helpful.
>
> I put in a JIRA ticket for it:
> https://issues.apache.org/jira/browse/FLINK-19507
>
> I did find this page:
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/io-module/flink-connectors.html
> and there are source/sink connectors for Pulsar (
> https://github.com/streamnative/pulsar-flink) - I'm guessing that's how I
> should approach using Pulsar as an ingress/egress?
>

That is correct! The `SourceFunctionSpec` and `SinkFunctionSpec` are a
means for users to bridge existing Flink sources and sinks to StateFun
ingress / egress.

The downside to that approach, is that even if you're purely using remote
functions, you'd still have to provide an embedded module to add ingresses
/ egresses this way.
Eventually it would be best (if we have several users requesting Pulsar) to
have native support like Kinesis and Kafka so that users can define them
textually in `module.yaml` definition files, but this approach you pointed
definitely works for the time being.

Cheers,
Gordon


>
> Cheers,
> John.
>
> --
> *From:* Tzu-Li (Gordon) Tai 
> *Sent:* Monday 5 October 2020 03:21
> *To:* John Morrow ; user  >
> *Subject:* Re: Stateful Functions + ML model prediction
>
> Hi John,
>
> It is definitely possible to use Apache Pulsar with StateFun. Could you
> open a JIRA ticket for that?
> It would be nice to see how much interest we can gather on adding that as
> a new IO module, and consider adding native support for Pulsar in future
> releases.
>
> If you are already using StateFun and want to start using Pulsar as an
> ingress/egress already for current versions, there's also a way to do that
> right now.
> If that's the case, please let me know and I'll try to provide some
> guidelines on how to achieve that.
>
> Cheers,
> Gordon
>
>
> On Fri, Oct 2, 2020, 1:38 AM John Morrow 
> wrote:
>
> Hi Flink Users,
>
> I was watching Tzu-Li Tai's talk on stateful functions from Flink Forward (
> https://www.youtube.com/watch?v=tuSylBadNSo) which mentioned that Kafka &
> Kinesis are supported, and looking at
> https://repo.maven.apache.org/maven2/org/apache/flink/ I can see IO
> packages for those two: statefun-kafka-io & statefun-kinesis-io
>
>
> Is it possible to use Apache Pulsar as a Statefun ingress & egress?
>
> Thanks,
> John.
>
> --
> *From:* John Morrow 
> *Sent:* Wednesday 23 September 2020 11:37
> *To:* Igal Shilman 
> *Cc:* user 
> *Subject:* Re: Stateful Functions + ML model prediction
>
> Thanks very much Igal - that sounds like a good solution!
>
> I'm new to StateFun so I'll have to dig into it a bit more, but this
> sounds like a good direction.
>
> Thanks again,
> John.
>
> --
> *From:* Igal Shilman 
> *Sent:* Wednesday 23 September 2020 09:06
> *To:* John Morrow 
> *Cc:* user 
> *Subject:* Re: Stateful Functions + ML model prediction
>
> Hi John,
>
> Thank you for sharing your interesting use case!
>
> Let me start from your second question:
>
> Are stateful functions available to all Flink jobs within a cluster?
>
>
> Yes, the remote functions are some logic exposed behind an HTTP endpoint,
> and Flink would forward any message addressed to them via an HTTP request.
> The way StateFun works is, for every invocation, StateFun would attach the
> necessary context (any previous state for a key, and the message) to the
> HTTP request.
> So practically speaking the same remote function can be contacted by
> different Jobs, as the remote functions are effectively stateless.
>
>  Does this sound like a good use case for stateful functions?
>
>
> The way I would approach this is, I would consider moving the
> business rules and the enrichment to the remote function.
> This would:
>
> a) Eliminate the need for a broadcast stream, you can simply deploy a new
> version of the remote function container, as they can be independy
> restarted (without the need to restart the Flink job that contacts them)
> b) You can perform the enrichment immediately without going through
> an RichAsyncFunction, as StateFun, by default, invokes many remote
> functions in parallel (but never for the same key)
> c) You can contact the remote service that hosts the machine learning
> model, or even load the model in the remote fun

Re: Statefun + Confluent Fully-managed Kafka

2020-10-07 Thread Tzu-Li (Gordon) Tai
Hi Hezekiah,

I've confirmed that the Kafka properties set in the module specification
file (module.yaml) are indeed correctly being parsed and used to construct
the internal Kafka clients.
StateFun / Flink does not alter or modify the properties.

So, this should be something wrong with your property settings, and causing
the Kafka client itself to not pick up the `sasl.jaas.config` property
value.
>From the resolved producer config in the logs, it looks like your
`sasl.jaas.config` is null, but all other properties are being picked up
correctly.

Please check your properties again, and make sure their keys are correct
and values conform to the JAAS config formats.
For starters, there's a typo in your `sasl.mechanism` config, you've
mis-typed an extra 's'.

I've verified that the following properties will work, with SASL JAAS
config being picked up correctly:

```
egresses:
  - egress:
  meta:
type: statefun.kafka.io/generic-egress
id: example/greets
  spec:
address: 
deliverySemantic:
  type: exactly-once
  transactionTimeoutMillis: 10
properties:
  - security.protocol: SASL_SSL
  - sasl.mechanism: PLAIN
  - sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
  - ssl.endpoint.identification.algorithm: https
```

Cheers,
Gordon

On Wed, Oct 7, 2020 at 11:36 PM Till Rohrmann  wrote:

> Hi Hezekiah, thanks for reporting this issue. I am pulling Gordon and Igal
> in who might be able to help you with this problem.
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 3:56 PM hezekiah maina 
> wrote:
>
>> Hi,
>>
>> I'm trying to use Stateful Functions with Kafka as my ingress and egress.
>> I'm using the Confluent fully-managed Kafka and I'm having a challenge
>> adding my authentication details in the module.yaml file.
>> Here is my current config details:
>> version: "1.0"
>> module:
>>   meta:
>> type: remote
>>   spec:
>> functions:
>>   - function:
>>   meta:
>> kind: http
>> type: example/greeter
>>   spec:
>> endpoint: 
>> states:
>>   - seen_count
>> maxNumBatchRequests: 500
>> timeout: 2min
>> ingresses:
>>   - ingress:
>>   meta:
>> type: statefun.kafka.io/routable-protobuf-ingress
>> id: example/names
>>   spec:
>> address: 
>> consumerGroupId: statefun-consumer-group
>> topics:
>>   - topic: names
>> typeUrl: com.googleapis/example.GreetRequest
>> targets:
>>   - example/greeter
>> properties:
>>   - bootstrap.servers:
>>   - security.protocol: SASL_SSL
>>   - sasl.mechanism: PLAIN
>>   - sasl.jaas.config:
>> org.apache.kafka.common.security.plain.PlainLoginModule required
>> username="USERNAME" password="PASSWORD";
>>   - ssl.endpoint.identification.algorithm: https
>> egresses:
>>   - egress:
>>   meta:
>> type: statefun.kafka.io/generic-egress
>> id: example/greets
>>   spec:
>> address: 
>> deliverySemantic:
>>   type: exactly-once
>>   transactionTimeoutMillis: 10
>> properties:
>>   - bootstrap.servers: 
>>   - security.protocol: SASL_SSL
>>   - sasl.mechanisms: PLAIN
>>   - sasl.jaas.config:
>> org.apache.kafka.common.security.plain.PlainLoginModule required
>> username="USERNAME" password="PASSWORD";
>>   - ssl.endpoint.identification.algorithm: https
>>
>> After running docker-compose with a master and worker containers I'm
>> getting this error:
>> Could not find a 'KafkaClient' entry in the JAAS configuration. System
>> property 'java.security.auth.login.config' is
>> /tmp/jaas-2846080966990890307.conf
>>
>> The producer config logged :
>> worker_1  | 2020-10-07 13:38:08,489 INFO
>>  org.apache.kafka.clients.producer.ProducerConfig  -
>> ProducerConfig values:
>> worker_1  | acks = 1
>> worker_1  | batch.size = 16384
>> worker_1  | bootstrap.servers = [https://
>> ---.asia-southeast1.gcp.confluent.cloud:9092]
>> worker_1  | buffer.memory = 33554432
>> worker_1  | client.dns.lookup = default
>> worker_1  | client.id =
>> worker_1  | compression.type = none
>> worker_1  | connections.max.idle.ms = 54
>> worker_1  | delivery.timeout.ms = 12
>> worker_1  | enable.idempotence = false
>> worker_1  | interceptor.classes = []
>> worker_1  | key.serializer = class
>> org.apache.kafka.common.serialization.ByteArraySerializer
>> worker_1  | linger.ms = 0
>> worker_1  | max.block.ms = 6
>> worker_1  | max.

Re: Stateful Functions + ML model prediction

2020-10-05 Thread Tzu-Li (Gordon) Tai
Hi John,

It is definitely possible to use Apache Pulsar with StateFun. Could you
open a JIRA ticket for that?
It would be nice to see how much interest we can gather on adding that as a
new IO module, and consider adding native support for Pulsar in future
releases.

If you are already using StateFun and want to start using Pulsar as an
ingress/egress already for current versions, there's also a way to do that
right now.
If that's the case, please let me know and I'll try to provide some
guidelines on how to achieve that.

Cheers,
Gordon


On Fri, Oct 2, 2020, 1:38 AM John Morrow  wrote:

> Hi Flink Users,
>
> I was watching Tzu-Li Tai's talk on stateful functions from Flink Forward (
> https://www.youtube.com/watch?v=tuSylBadNSo) which mentioned that Kafka &
> Kinesis are supported, and looking at
> https://repo.maven.apache.org/maven2/org/apache/flink/ I can see IO
> packages for those two: statefun-kafka-io & statefun-kinesis-io
>
>
> Is it possible to use Apache Pulsar as a Statefun ingress & egress?
>
> Thanks,
> John.
>
> --
> *From:* John Morrow 
> *Sent:* Wednesday 23 September 2020 11:37
> *To:* Igal Shilman 
> *Cc:* user 
> *Subject:* Re: Stateful Functions + ML model prediction
>
> Thanks very much Igal - that sounds like a good solution!
>
> I'm new to StateFun so I'll have to dig into it a bit more, but this
> sounds like a good direction.
>
> Thanks again,
> John.
>
> --
> *From:* Igal Shilman 
> *Sent:* Wednesday 23 September 2020 09:06
> *To:* John Morrow 
> *Cc:* user 
> *Subject:* Re: Stateful Functions + ML model prediction
>
> Hi John,
>
> Thank you for sharing your interesting use case!
>
> Let me start from your second question:
>
> Are stateful functions available to all Flink jobs within a cluster?
>
>
> Yes, the remote functions are some logic exposed behind an HTTP endpoint,
> and Flink would forward any message addressed to them via an HTTP request.
> The way StateFun works is, for every invocation, StateFun would attach the
> necessary context (any previous state for a key, and the message) to the
> HTTP request.
> So practically speaking the same remote function can be contacted by
> different Jobs, as the remote functions are effectively stateless.
>
>  Does this sound like a good use case for stateful functions?
>
>
> The way I would approach this is, I would consider moving the
> business rules and the enrichment to the remote function.
> This would:
>
> a) Eliminate the need for a broadcast stream, you can simply deploy a new
> version of the remote function container, as they can be independy
> restarted (without the need to restart the Flink job that contacts them)
> b) You can perform the enrichment immediately without going through
> an RichAsyncFunction, as StateFun, by default, invokes many remote
> functions in parallel (but never for the same key)
> c) You can contact the remote service that hosts the machine learning
> model, or even load the model in the remote function's process on startup.
>
> So, in kubernetes terms:
>
> 1. You would need a set of pods (a deployment) that are able to serve HTTP
> traffic and expose a StateFun endpoint.
> 2. You would need a separate deployment for Flink that runs a StateFun job
> 3. The StateFun job would need to know how to contact these pods, so you
> would also need a kubernetes service (or a LoadBalancer) that
> balances the requests from (2) to (1).
>
> If you need to change your business rules, or the enrichment logic you can
> simply roll a new version of (1).
>
>
> Good luck,
> Igal.
>
> On Tue, Sep 22, 2020 at 10:22 PM John Morrow 
> wrote:
>
> Hi Flink Users,
>
> I'm using Flink to process a stream of records containing a text field.
> The records are sourced from a message queue, enriched as they flow through
> the pipeline based on business rules and finally written to a database.
> We're using the Ververica platform so it's running on Kubernetes.
>
> The initial business rules were straightforward, e.g. if field X contains
> a certain word then set field Y to a certain value. For the implementation
> I began by looking at
> https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html for
> inspiration. I ended up implementing a business rule as a Java class with a
> match-predicate & an action. The records enter the pipeline on a data
> stream which is joined with the rules in a broadcast stream and a
> ProcessFunction checks each record to see if it matches any rule
> predicates. If the record doesn't match any business rule predicates it
> continues on in the pipeline. If the record does match one or more business
> rule predicates it is sent to a side output with the list of business rules
> that it matched. The side output data stream goes through a
> RichAsyncFunction which loops through the matched rules and applies each
> one's action to the record. At the end, that enriched side-output record
> stream is unioned back with the non-enriched record

[ANNOUNCE] Apache Flink Stateful Functions 2.2.0 released

2020-09-27 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Stateful Functions 2.2.0.

Stateful Functions is an API that simplifies the building of distributed
stateful applications with a runtime built for serverless architectures.
It's based on functions with persistent state that can interact dynamically
with strong consistency guarantees.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2020/09/28/release-statefun-2.2.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at:
https://pypi.org/project/apache-flink-statefun/

Official Docker image for building Stateful Functions applications is
currently being published to Docker Hub. Progress for creating the Docker
Hub repository can be tracked at:
https://github.com/docker-library/official-images/pull/7749

In the meantime, before the official Docker images are available,
Ververica has volunteered to make Stateful Function's images available for
the community via their public registry:
https://hub.docker.com/r/ververica/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12348350

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: Flink Stateful Functions API

2020-09-14 Thread Tzu-Li (Gordon) Tai
Hi!

Dawid is right, there currently is no developer documentation for the
remote request-reply protocol.
One reason for this is that the protocol isn't considered a fully stable
user-facing interface yet, and thus not yet properly advertised in the
documentation.
However, there are plans to revisit it and announce it as publicly stable
in the near future releases.

In the meantime, the Python SDK and Aljoscha's Rust SDK are good
reference examples of implementation of the protocol across different
languages.
The request body from Flink and expected response body from functions are
essentially these [1] Protobuf messages, ToFunction (request) and
FromFunction (response).

If you bump into any issues when implementing, please feel free to let us
know on the mailing lists as well.
A Haskell SDK is definitely interesting to see implemented :)

Cheers,
Gordon

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/protobuf/http-function.proto

On Mon, Sep 14, 2020 at 3:46 PM Dawid Wysakowicz 
wrote:

> Hi,
>
> Not sure if there is a "developer" documentation for the protocol. I am
> cc'ing Igal and Gordon who know better than I if there is one.
>
> To give you some hints though. If I am correct the Python API is
> implemented as a so called remote functions [1][2], which communicate
> with Flink via HTTP/gRPC. Besides the bundled Python API you can also
> use a Rust SDK[3] implemented by my colleague Aljoscha as a reference.
>
> BTW, it would be really cool to see an SDK written in Haskell ;)
>
> Best,
>
> Dawid
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/concepts/distributed_architecture.html#remote-functions
>
> [2]
>
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module
>
> [3] https://github.com/aljoscha/statefun-rust
>
> On 12/09/2020 07:26, Timothy Bess wrote:
> > The flink stateful function Python API looks cool, but is there a
> > documented spec for how it communicates with Flink? I'd like to
> > implement an SDK in Haskell if I can.
>
>


Re: State Storage Questions

2020-09-07 Thread Tzu-Li (Gordon) Tai
Hi!

Operator state is bound to a single parallel operator instance; there is no
partitioning happening here.
It is typically used in Flink source and sink operators. For example, the
Flink Kafka source operator's parallel instances maintain as operator state
a mapping of partitions to offsets for the partitions that it is assigned
to. For state like these, it isn't partitionable by any key associated with
an input DataStream.

Since there is no partitioning scheme, redistribution of the state on
operator rescale also happens differently compared to keyed state.
Take for example a ListState; in contrast to a keyed ListState, an Operator
ListState is a collection of state items that are independent from each
other and eligible for redistribution across operator instances in the
event of a rescale (by default, Flink uses simple round-robin for the
redistribution).
In other words, the list entries are the finest granularity at which the
operator state can be redistributed, and should not be correlated with each
other since each entry of the list may end up in different parallel
operator instances on rescale.

In general, there should rarely be a need to use operator state for typical
user applications. It isn't massively scalable and usually is small in size.

Cheers,
Gordon

On Sat, Sep 5, 2020 at 12:26 AM Rex Fenley  wrote:

> This is so helpful, thank you!
>
> So just to clarify (3), Operator state has a partitioning scheme, but it's
> simply not by key, it's something else that's special under-the-hood? In
> which case, what data is stored in an Operator? I assumed it must be the
> input data for e.g. a join, so that it can react efficiently to any data
> changes in the stream and recombine only what has actually changed. Is this
> correct?
>
> On Fri, Sep 4, 2020 at 1:20 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi,
>>
>> On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley  wrote:
>>
>>> Hello!
>>>
>>> I've been digging into State Storage documentation, but it's left me
>>> scratching my head with a few questions. Any help will be much appreciated.
>>>
>>> Qs:
>>> 1. Is there a way to use RocksDB state backend for Flink on AWS EMR?
>>> Possibly with S3 backed savepoints for recovery (or maybe hdfs for
>>> savepoints?)? Only documentation related to AWS I can find makes it look
>>> like AWS must use the S3 File System state backend and not RocksDB at all.
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html
>>>
>>
>> I think there's some misunderstanding of the role of RocksDB vs
>> filesystems for fault-tolerance here.
>> RocksDB is a state backend option that manages user state out-of-core,
>> and is managed by the Flink runtime. Users do not need to separately manage
>> RocksDB instances.
>> For persistence of that state as checkpoints / savepoints for
>> fault-tolerance, you may choose the commonly used filesystems like S3 /
>> HDFS.
>>
>> See [1] for how to configure your job to use RocksDBStateBackend as the
>> runtime state backend and configuring a filesystem path for persistence.
>>
>>
>>>
>>> 2. Does the FS state backend not compact? I thought everything in Flink
>>> was stored as key/value. In which case, why would the last n values for a
>>> key need to stick around, or how would they?
>>> > An incremental checkpoint builds upon (typically multiple) previous
>>> checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a
>>> way that is self-consolidating over time. As a result, the incremental
>>> checkpoint history in Flink does not grow indefinitely, and old checkpoints
>>> are eventually subsumed and pruned automatically.
>>>
>>>
>> The sentence that you quote simply states how Flink leverages RocksDB's
>> background compaction of SSTables to ensure that incremental checkpoints
>> don't grow indefinitely in size.
>> This has nothing to do with the FsStateBackend, as incremental
>> checkpointing isn't supported there.
>>
>> Just as a clarification as there might be some other misunderstanding
>> here:
>> The difference between FsStateBackend v.s. RocksDBStateBackend is the
>> state backend being used to maintain local state at runtime.
>> RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend uses
>> in-memory hash maps. For persistence, both are checkpointed to a filesystem
>> for fault-tolerance.
>> The naming may be a bit confusing, so just wanted to clarify that here in
>> case that may have caused any confusion with the questions above.
>>
&

Re: State Storage Questions

2020-09-04 Thread Tzu-Li (Gordon) Tai
Hi,

On Fri, Sep 4, 2020 at 1:37 PM Rex Fenley  wrote:

> Hello!
>
> I've been digging into State Storage documentation, but it's left me
> scratching my head with a few questions. Any help will be much appreciated.
>
> Qs:
> 1. Is there a way to use RocksDB state backend for Flink on AWS EMR?
> Possibly with S3 backed savepoints for recovery (or maybe hdfs for
> savepoints?)? Only documentation related to AWS I can find makes it look
> like AWS must use the S3 File System state backend and not RocksDB at all.
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html
>

I think there's some misunderstanding of the role of RocksDB vs filesystems
for fault-tolerance here.
RocksDB is a state backend option that manages user state out-of-core, and
is managed by the Flink runtime. Users do not need to separately manage
RocksDB instances.
For persistence of that state as checkpoints / savepoints for
fault-tolerance, you may choose the commonly used filesystems like S3 /
HDFS.

See [1] for how to configure your job to use RocksDBStateBackend as the
runtime state backend and configuring a filesystem path for persistence.


>
> 2. Does the FS state backend not compact? I thought everything in Flink
> was stored as key/value. In which case, why would the last n values for a
> key need to stick around, or how would they?
> > An incremental checkpoint builds upon (typically multiple) previous
> checkpoints. Flink leverages RocksDB’s internal compaction mechanism in a
> way that is self-consolidating over time. As a result, the incremental
> checkpoint history in Flink does not grow indefinitely, and old checkpoints
> are eventually subsumed and pruned automatically.
>
>
The sentence that you quote simply states how Flink leverages RocksDB's
background compaction of SSTables to ensure that incremental checkpoints
don't grow indefinitely in size.
This has nothing to do with the FsStateBackend, as incremental
checkpointing isn't supported there.

Just as a clarification as there might be some other misunderstanding here:
The difference between FsStateBackend v.s. RocksDBStateBackend is the state
backend being used to maintain local state at runtime.
RocksDBStateBackend obviously uses RocksDB, while the FsStateBackend uses
in-memory hash maps. For persistence, both are checkpointed to a filesystem
for fault-tolerance.
The naming may be a bit confusing, so just wanted to clarify that here in
case that may have caused any confusion with the questions above.


> 3. In the docs, Operators are referred to as non-keyed state, yet,
> Operators have IDs that they are keyed by, so why are they referred to as
> non-keyed state?
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
>
>
Operator state is referred to as non-keyed state because it is not
co-partitioned with the stream by key and not values are not bound to
single key (i.e. when you access keyed state, the access is bound to a
single key), and have different schemes for repartitioning when operators
are scaled up or down.
The operator IDs you referred to are simply a unique ID to identify the
same operators across different executions of the same job. I'm not sure
what you mean by "operators have IDs that are keyed by"; those IDs are not
used in any partitioning operation.



> 4. For the Table API / SQL are primary keys and join keys automatically
> used as the keys for state under the hood?
>

Yes.


>
> Lastly
> 5. Is there a way to estimate roughly how much disk space state storage
> will take per operation?
>
>
Thanks again!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#configuring-a-state-backend


Re: FLINK YARN SHIP from S3 Directory

2020-09-04 Thread Tzu-Li (Gordon) Tai
Hi,

As far as I can tell from a recent change [1], this seems to be possible
now starting from Flink 1.11.x. Have you already tried this with the latest
Flink version?
Also including Klou in this email, who might be able to confirm this.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-17632

On Fri, Sep 4, 2020 at 2:02 AM Vijayendra Yadav 
wrote:

> Hi Team,
>
> Is there any feature to be able to ship directory to containers from s3
> Directory instead of local.
>
> -yt,--yarnship  Ship files in the specified directory
>   (t for transfer)
>
>


Re: Unit Test for KeyedProcessFunction with out-of-core state

2020-09-04 Thread Tzu-Li (Gordon) Tai
Hi Alexey,

Is there a specific reason why you want to test against RocksDB?

Otherwise, in Flink tests we use a `KeyedOneInputStreamOperatorTestHarness`
[1] that allows you to wrap a user function and eliminate the need to worry
about setting up heavy runtime context / dependencies such as the state
backend.
As a unit test, this should be sufficient for you to implement basic test
scenarios for your function, such as expected output given inputs, state
etc.
Does this provide what you are looking for?

Cheers,
Gordon

[1]
https://github.com/apache/flink/blob/1d5f44710270d1c615537f0d05ab49e699d3a6e5/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java

On Fri, Sep 4, 2020 at 2:28 AM Alexey Trenikhun  wrote:

> Hello,
> I want to unit test KeyedProcessFunction which uses with out-of-core state
> (like rocksdb).
> Does Flink has mock for rocksdb, which can be used in unit tests ?
>
> Thanks,
> Alexey
>


Re: Updating kafka connector with state

2020-08-09 Thread Tzu-Li (Gordon) Tai
Hi Nikola,

If I remember correctly, state is not compatible between
flink-connector-kafka-0.11 and the universal flink-connector-kafka.
Piotr (cc'ed) would probably know whats going on here.

Cheers,
Gordon

On Mon, Aug 10, 2020 at 1:07 PM Nikola Hrusov  wrote:

> Hello,
>
> We are trying to update our kafka connector dependency. So far we have
> been using flink-connector-kafka-0.11 and we would like to update the
> dependency to flink-connector-kafka.
> However, when I try to restart the job with a savepoint I get the
> following exception:
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore
> operator state backend for StreamSink_351727121bb1ca0d704092960989d25b_(1/
> 10) from any of the 1 provided restore options.
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:255)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:143)
> ... 5 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Failed when trying to restore operator state backend
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:537)
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:246)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 7 more
> Caused by: java.io.IOException: Could not find class
> 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$NextTransactionalIdHint'
> in classpath.
> at
> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:711)
> at
> org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:681)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.readSnapshotData(PojoSerializerSnapshotData.java:178)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshotData.createFrom(PojoSerializerSnapshotData.java:122)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializerSnapshot.readSnapshot(PojoSerializerSnapshot.java:125)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:170)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> at
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
> at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
> ... 11 more
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
> $NextTransactionalIdHint
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:120)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:3

Re: MaxConnections understanding on FlinkKinesisProducer via KPL

2020-07-23 Thread Tzu-Li (Gordon) Tai
ThreadPoolSize is per KPL instance, so yes that is per subtask.
As I previously mentioned, the maximum concurrent requests going to KDS
would be capped by MaxConnections.

On Thu, Jul 23, 2020 at 6:25 AM Vijay Balakrishnan 
wrote:

> Hi Gordon,
> Thx for your reply.
> FlinkKinesisProducer default is ThreadPool which is what I am using. So,
> does that mean only 10 threads are making calls to KDS by default ??
> I see from the number of records coming to the KDS that I need only 1-2
> shards. So, the bottleneck is on the KPL side.
> Does this mean I have to set a QueueLimit of 500 as shown in the example
> below ??
> From what you said, Total MaxConnections would then be by default: 24 *
> number of subtasks = 24 * 80 = 1920 connections to KDS.
> KPL ThreadPoolSize would be 10 Threads by default - is this per subtask ?
> So, would it be 10 * number of subtasks = 10 * 80 = 800 Threads ??
>
> I am trying to reconcile the diff above ? Somewhere I am flooding KPL with
> too many requests & it gives the curl 28 error.
>
> So, calculating Queue Limit:
> Based on this, my records size = 1600 bytes. I have 96 shards
> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>
> Acc. to the docs:
>
> By default, FlinkKinesisProducer does not backpressure. Instead, records
> that cannot be sent because of the rate restriction of 1 MB per second per
> shard are buffered in an unbounded queue and dropped when their RecordTtl
>  expires.
>
> To avoid data loss, you can enable backpressuring by restricting the size
> of the internal queue:
>
> // 200 Bytes per record, 1 shard
> kinesis.setQueueLimit(500);
>
>
> On Tue, Jul 21, 2020 at 8:00 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Vijay,
>>
>> I'm not entirely sure of the semantics between ThreadPoolSize and
>> MaxConnections since they are all KPL configurations (this specific
>> question would probably be better directed to AWS),
>> but my guess would be that the number of concurrent requests to the KPL
>> backend is capped by MaxConnections. This is per parallel
>> FlinkKinesisProducer subtask.
>>
>> As for ThreadPoolSize, do note that the default threading model by KPL is
>> PER_REQUEST, for which the KPL native process will launch a thread for each
>> request.
>> Under heavy load, this would of course be an issue. Since you didn't
>> explicitly mention this config, make sure to set this to POOLED to actually
>> make use of a fixed thread pool for requests.
>>
>> Overall, my suggestion is to set a reasonable queue limit for the number
>> of records buffered by KPL's native process (by default it is unbounded).
>> Without that in place, under high load you would easily be resource
>> exhausted, and can cause more unpredictable checkpointing times since the
>> FlinkKinesisProducer would need to flush pending records on checkpoints
>> (which ultimately also applies backpressure upstream).
>>
>> BR,
>> Gordon
>>
>> On Wed, Jul 22, 2020 at 5:21 AM Vijay Balakrishnan 
>> wrote:
>>
>>> Hi,
>>> Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
>>> stream(KDS).
>>> Getting following errors:
>>> 1.
>>> Throttling
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>
>>>  
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at
>>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
>>>
>>> 2. ERROR
>>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>>>  - [2020-06-18 15:49:24.238655] [0x0ed6][0x7fc2086c8700] [error]
>>> [shard_map.cc:150] Shard map update for stream "_write" failed. Code: 
>>> *LimitExceededException
>>> Message: Rate exceeded for stream *..._write under account
>>> 753274046439.; retrying in 1500 ms
>>>
>>> 3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*
>>>
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure
>>>
>>>
>>> https://github.com/

Re: MaxConnections understanding on FlinkKinesisProducer via KPL

2020-07-21 Thread Tzu-Li (Gordon) Tai
Hi Vijay,

I'm not entirely sure of the semantics between ThreadPoolSize and
MaxConnections since they are all KPL configurations (this specific
question would probably be better directed to AWS),
but my guess would be that the number of concurrent requests to the KPL
backend is capped by MaxConnections. This is per parallel
FlinkKinesisProducer subtask.

As for ThreadPoolSize, do note that the default threading model by KPL is
PER_REQUEST, for which the KPL native process will launch a thread for each
request.
Under heavy load, this would of course be an issue. Since you didn't
explicitly mention this config, make sure to set this to POOLED to actually
make use of a fixed thread pool for requests.

Overall, my suggestion is to set a reasonable queue limit for the number of
records buffered by KPL's native process (by default it is unbounded).
Without that in place, under high load you would easily be resource
exhausted, and can cause more unpredictable checkpointing times since the
FlinkKinesisProducer would need to flush pending records on checkpoints
(which ultimately also applies backpressure upstream).

BR,
Gordon

On Wed, Jul 22, 2020 at 5:21 AM Vijay Balakrishnan 
wrote:

> Hi,
> Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
> stream(KDS).
> Getting following errors:
> 1.
> Throttling
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>
>  
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
>
> 2. ERROR
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-06-18 15:49:24.238655] [0x0ed6][0x7fc2086c8700] [error]
> [shard_map.cc:150] Shard map update for stream "_write" failed. Code: 
> *LimitExceededException
> Message: Rate exceeded for stream *..._write under account 753274046439.;
> retrying in 1500 ms
>
> 3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure
>
>
> https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
>
>
> https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/
>
> These are the KPL property changes I am planning to make.
>
> *RequestTimeput*: 1 //default 6000 ms
>
> *AggregationEnabled*: true //default is true
>
> *ThreadPoolSize*: *15* //default 10
>
> *MaxConnections*: *48* //default 24 - this might have been a bottleneck
> when we flooded KPL with requests. Requests are sent in parallel over
> multiple connections to the backend.
>
> *RecordTtl*: *1* //default 3 ms  - drop record after 10s.
>
> *FailIfThrottled*: *true* //default false - so if throttled, don't retry.
>
>
> We were using parallelism for sinks at 80. So each corresponds to 1
> FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads.
> MaxConnections is 24 from KPL.
>
> I am not sure about the MaxConnections setting - what does 48 mean here
> -is it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS
> backend via KPL ?
>
> Any thoughts on how not to overwhelm KPL while handling real time
> streaming load to the Kinesis via the FlinkKinesisProducer ?
>
> TIA,
>


Re: FlinkKinesisProducer blocking ?

2020-07-21 Thread Tzu-Li (Gordon) Tai
Hi Vijay,

ThreadPoolSize is for per Kinesis producer, which there is one for each
parallel subtask.
If you are constantly hitting the 1MB per second per shard quota, then the
records will be buffered by the FlinkKinesisProducer.
During this process, backpressure is not applied if you have not configured
an upper bound for the buffer queue.

One other thing to note, which might explain the backpresses at regular
intervals that you are experiencing,
is that the FlinkKinesisProducer needs to flush all pending records in the
buffer before the checkpoint can complete for the sink.
That would also apply backpressure upstream.

Gordon

On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan 
wrote:

> Hi Gordon,
> ThreadPoolSize default is 10. I have parallelism of 80 spread out across
> 32 nodes.
> Could it be that the 80 threads get bottlenecked on a common ThreadPool of
> 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run
> in separate slots/vCPUs and can be spread across 32 nodes in my case but
> occupying 80 slots/vCPUs. Is my understanding correct and will this be the
> reason that the KPL gets flooded with too many pending requests at regular
> intervals ??
>
> TIA,
>
> On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan 
> wrote:
>
>> Thanks,Gordon for your reply.
>>
>> I do not set a queueLimit and so the default unbounded queueSize is 
>> 2147483647.
>> So, it should just be dropping records being produced from the
>> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
>> do not want backpressure as you said it effectively blocks all upstream
>> operators.
>>
>> But from what you are saying, it will apply backpressure when the number
>> of outstanding records accumulated exceeds the default queue limit of 
>> 2147483647
>> or* does it also do it if it is r**ate-limited* *to 1MB per second per
>> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
>> probable.
>>
>> So, calculating Queue Limit:
>> Based on this, my records size = 1600 bytes. I have 96 shards
>> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
>> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
>> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
>> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>>
>> Acc. to the docs:
>>
>> By default, FlinkKinesisProducer does not backpressure. Instead, records
>> that cannot be sent because of the rate restriction of 1 MB per second per
>> shard are buffered in an unbounded queue and dropped when their RecordTtl
>> expires.
>>
>> To avoid data loss, you can enable backpressuring by restricting the size
>> of the internal queue:
>>
>> // 200 Bytes per record, 1 shard
>> kinesis.setQueueLimit(500);
>>
>>
>> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Vijay,
>>>
>>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
>>> It does however apply backpressure (therefore effectively blocking all
>>> upstream operators) when the number of outstanding records accumulated
>>> exceeds a set limit, configured using the
>>> FlinkKinesisProducer#setQueueLimit
>>> method.
>>>
>>> For starters, you can maybe check if that was set appropriately.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Validating my understanding of SHARD_DISCOVERY_INTERVAL_MILLIS

2020-07-21 Thread Tzu-Li (Gordon) Tai
Hi Vijay,

Your assumption is correct that the discovery interval does not affect the
interval of fetching records.

As a side note, you can actually disable shard discovery, by setting the
value to -1.
The FlinkKinesisProducer would then only call ListShards once at job
startup.

Cheers,
Gordon

On Fri, Jul 10, 2020 at 2:35 AM Vijay Balakrishnan 
wrote:

> Hi,
> I see these 2 constants- SHARD_GETRECORDS_INTERVAL_MILLIS &
> SHARD_DISCOVERY_INTERVAL_MILLIS.
>
> My understanding was SHARD_GETRECORDS_INTERVAL_MILLIS defines how often
> records are fetched from Kinesis Data Stream(KDS). Code seems to be doing
> this in ShardConsumer.run()-->getRecords()
>
> SHARD_DISCOVERY_INTERVAL_MILLIS defines how often the KinesisConsmer
> checks if there are any changes to shards. We don't change shards during
> our Application run.I have changed it to a very high value to avoid this
> check as I was running into ListShards issues with LimitExceedeException
> when using 282 shards
> Would this be a correct understanding of these 2 constants -especially the
> SHARD_DISCOVERY_INTERVAL_MILLIS
>
> My assumption that needs to be validated:
> The SHARD_DISCOVERY_INTERVAL_MILLIS should not affect the fetching of
> records as defined by SHARD_GETRECORDS_INTERVAL_MILLIS.
>
> Code below:
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> getRecsIntervalMs);//2000
>
> /*
> We do not change shards while the app is running.
> So, we can increase SHARD_DISCOVERY_INTERVAL_MILLIS to a very high value
> to avoid any rateLimiting issues from the AWS API with the ListShards call.
> Default is 10s. We can increase this to avoid this LimitExceededException
> as we don't change shards in the middle.
>  */
>
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
> shardDiscoveryInterval);//1800 ms
>
>
> TIA,
>


Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Tzu-Li (Gordon) Tai
Ah, didn't realize Chesnay has it answered already, sorry for the concurrent
reply :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Chaining the creation of a WatermarkStrategy doesn't work?

2020-07-08 Thread Tzu-Li (Gordon) Tai
Hi,

This would be more of a Java question.
In short, type inference of generic types does not work for chained
invocations, and therefore type information has to be explicitly included.

If you'd like to chain the calls, this would work:

WatermarkStrategy watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
.withTimestampAssigner((SerializableTimestampAssigner) (element,
recordTimestamp) -> 42L);

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: TaskManager docker image for Beam WordCount failing with ClassNotFound Exception

2020-07-08 Thread Tzu-Li (Gordon) Tai
Hi,

Assuming that the job jar bundles all the required dependencies (including
the Beam dependencies), making them available under `/opt/flink/usrlib/` in
the container either by mounting or directly adding the job artifacts should
work. AFAIK It is also the recommended way, as opposed to adding them under
`/lib`.

>From the exception, what seems to be missing is the Beam dependencies.
Just to get rid of the obvious first: are you sure that the job jar has
those bundled?

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FlinkKinesisProducer blocking ?

2020-07-08 Thread Tzu-Li (Gordon) Tai
Hi Vijay,

The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
It does however apply backpressure (therefore effectively blocking all
upstream operators) when the number of outstanding records accumulated
exceeds a set limit, configured using the FlinkKinesisProducer#setQueueLimit
method.

For starters, you can maybe check if that was set appropriately.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Any python example with json data from Kafka using flink-statefun

2020-06-16 Thread Tzu-Li (Gordon) Tai
(forwarding this to user@ as it is more suited to be located there)

Hi Sunil,

With remote functions (using the Python SDK), messages sent to / from them
must be Protobuf messages.
This is a requirement since remote functions can be written in any
language, and we use Protobuf as a means for cross-language messaging.
If you are defining Kafka ingresses in a remote module (via textual YAML
module configs), then records in the Kafka ingress will be directly routed
to the remote functions, and therefore they are required to be Protobuf
messages as well.

With embedded functions (using the current Java SDK), then what you are
trying to do is possible.
When using the Java SDK, the Kafka ingress allows providing a
`KafkaIngressDeserializer` [1], where you can convert the bytes in Kafka
into any type you intend for messaging within the StateFun application. So
there, you can convert your JSON records.

If you want to still write your main application logic in Python, but the
input and output messages in Kafka are required to be JSON,
what you can currently do is have a mix of remote module [2] containing the
application logic as Python functions,
and a separate embedded module [3] containing the Java Kafka ingress and
egresses.
So, concretely, your 2 modules will contain:

Remote module:
- Your Python functions implementing the main business logic.

Embedded module:
- Java Kafka ingress with deserializer that converts JSON to Protobuf
messages. Here you have the freedom to extract only the fields that you
need.
- A Java router [4] that routes those converted messages to the remote
functions, by their logical address
- A Java Kafka egress with serializer that converts Protobuf messages from
remote functions into JSON Kafka records.
- A Java function that simply forwards input messages to the Kafka Kafka
egress. If the remote functions need to write JSON messages to Kafka, they
send a Protobuf message to this function.


Hope this helps.
Note that the egress side of things can definitely be easier (without the
extra forwarding through a Java function) if the Python SDK's
`kafka_egress_record` method allows supplying arbitrary bytes.
Then you would be able to already write to Kafka JSON messages in the
Python functions.
This however isn't supported yet, but technically it is quite easy to
achieve. I've just filed a issue for this [5], in case you'd like to follow
that.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/apache-kafka.html#kafka-deserializer
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module

[3]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#embedded-module
[4]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/index.html#router
[5] https://issues.apache.org/jira/browse/FLINK-18340

On Wed, Jun 17, 2020 at 9:25 AM Sunil  wrote:

> checking to see if this is possible currently.
> Read json data from kafka topic => process using statefun => write out to
> kafka in json format.
>
> I could have a separate process to read the source json data convert to
> protobuf into another kafka topic but it sounds in-efficient.
> e.g.
> Read json data from kafka topic =>convert json to protobuf =>  process
> using statefun => write out to kafka in protobuf format.=> convert protobuf
> to json message
>
> Appreciate any advice on how to process json messages using statefun ,
> also if this is not possible in the current python sdk, can i do that using
> the java/scala sdk?
>
> Thanks.
>
> On 2020/06/15 15:34:39, Sunil Sattiraju  wrote:
> > Thanks Igal,
> > I dont have control over the data source inside kafka ( current kafka
> topic contains either json or avro formats only, i am trying to reproduce
> this scenario using my test data generator ).
> >
> > is it possible to convert the json to proto at the receiving end of
> statefun applicaiton?
> >
> > On 2020/06/15 14:51:01, Igal Shilman  wrote:
> > > Hi,
> > >
> > > The values must be valid encoded Protobuf messages [1], while in your
> > > attached code snippet you are sending utf-8 encoded JSON strings.
> > > You can take a look at this example with a generator that produces
> Protobuf
> > > messages [2][3]
> > >
> > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> > > [2]
> > >
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> > > [3]
> > >
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
> > >
> > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <
> sunilsattir...@gmail.com>
> > > wrote:
> > >
> > > > Hi, Based on the example from
> > > >
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greete

Re: How To subscribe a Kinesis Stream using enhance fanout?

2020-06-09 Thread Tzu-Li (Gordon) Tai
Hi all,

Just to give a quick update: there will be contributors from the AWS
Kinesis team working on contributing enhanced fan out support to the
connector.
You can follow the progress here:
https://issues.apache.org/jira/browse/FLINK-17688

Cheers,
Gordon

On Fri, May 15, 2020 at 5:55 PM orionemail 
wrote:

> Hi,
>
> We also recently needed this functionality, unfortunately we were unable
> to implement it ourselves so changed our plan accordingly.
>
> However we very much see the benefit for this feature and would be
> interested in following the JIRA ticket.
>
> Thanks
>
>
>
> ‐‐‐ Original Message ‐‐‐
> On Thursday, 14 May 2020 11:34, Xiaolong Wang 
> wrote:
>
> Thanks, I'll check it out.
>
> On Thu, May 14, 2020 at 6:26 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Xiaolong,
>>
>> You are right, the way the Kinesis connector is implemented / the way the
>> AWS APIs are used, does not allow it to consume Kinesis streams with
>> enhanced fan-out enabled consumers [1].
>> Could you open a JIRA ticket for this?
>> As far as I can tell, this could be a valuable contribution to the
>> connector for Kinesis users who require dedicated throughput isolated from
>> other running consumers.
>>
>> Cheers,
>> Gordon
>>
>> [1]
>> https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html
>>
>> On Wed, May 13, 2020 at 1:44 PM Xiaolong Wang <
>> xiaolong.w...@smartnews.com> wrote:
>>
>>> Hello Flink Community!
>>>
>>>   I'm currently coding on a project relying on AWS Kinesis. With the
>>> provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the
>>> message.
>>>
>>>  But as the main stream is used among several other teams, I was
>>> required to use the enhance fanout of Kinesis. I checked the connector code
>>> and found no implementations.
>>>
>>>  Has this issue occurred to anyone before?
>>>
>>> Thanks for your help.
>>>
>>
>


Re: KeyedStream and keyedProcessFunction

2020-06-09 Thread Tzu-Li (Gordon) Tai
Hi,

Records with the same key will be processed by the same partition.
Note there isn't an instance of a keyed process function for each key.
There is a single instance per partition, and all keys that are distributed
to the same partition will get processed by the same keyed process function
instance.

Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


[ANNOUNCE] Apache Flink Stateful Functions 2.1.0 released

2020-06-09 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Stateful Functions 2.1.0.

Stateful Functions is an API that simplifies building distributed stateful
applications.
It's based on functions with persistent state that can interact dynamically
with strong consistency guarantees.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2020/06/09/release-statefun-2.1.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at:
https://pypi.org/project/apache-flink-statefun/

Official Docker image for building Stateful Functions applications is
currently being published to Docker Hub. Progress for creating the Docker
Hub repository can be tracked at:
https://github.com/docker-library/official-images/pull/7749

In the meantime, before the official Docker images are available,
Ververica has volunteered to make Stateful Function's images available for
the community via their public registry:
https://hub.docker.com/r/ververica/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12347861

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: Suggestions for using both broadcast sync and conditional async-io

2020-06-03 Thread Tzu-Li (Gordon) Tai
Hi,

For the initial DB fetch and state bootstrapping:
That's exactly what the State Processor API is for, have you looked at that
already?
It currently does support bootstrapping broadcast state [1], so that should
be good news for you.

As a side note, I may be missing something, is broadcast state really
necessary in your use case?
If in your current application, for each record you lookup DynamoDB with
the current key of the record,
then in the new architecture where you move the DynamoDB database into the
application as Flink state, you should co-partition the entries with the
input record stream.
If for each record you need to do cross-key lookups, then of course
broadcast state is required.

As for the AsyncIO process -
>From my understanding, in the new architecture, you should no longer need
the AsyncIO process / lookup to DynamoDB to generate the new mapping, as
all information is locally available in Flink state after the bootstrap.
So, when a record is processed, you check Flink state for existing mapping
and proceed, or generate a new mapping and write it to Flink state.
Essentially, in this scenario Flink state replaces DynamoDB and all lookups
are local.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1

On Wed, Jun 3, 2020 at 10:15 PM orionemail 
wrote:

> Hi,
>
> My current application makes use of a DynamoDB database too map a key to a
> value. As each record enters the system the async-io calls this db and
> requests a value for the key but if that value doesn't exist a new value is
> generated and inserted.  I have managed to do all this in one update
> operation to the dynamodb so performance isn't too bad.  This is usable for
> our current load, but our load will increase considerably in the near
> future and as writes are expensive (each update even if it actually returns
> the existing value is classed as a write) this could be a cost factor going
> forward.
>
> Looking at broadcast state seems like it might be the answer.  DynamoDB
> allows 'streams' of table modification events to be output to what is
> essentially a kinesis stream, so it might be possible to avoid the majority
> of write calls by storing local copies of the mapping.  I should also point
> out that these mappings are essentially capped.  The majority of events
> that come through will have an existing mapping.
>
> My idea is to try the following:
>
> 1. Application startup request the entire dataset from the DB (this is ~5m
> key:value pairs)
> 2. Inject this data into flink state somehow, possibly via broadcast state?
> 3. Subscribe to the DyanmoDB stream via broadcast state to capture updates
> to this table and update the flink state
> 4. When a record is processed, check flink state for existing mapping and
> proceed if found.  If not, then AsyncIO process as before to generate a new
> mapping
> 5. DynamoDB writes the new value to the stream so all operators get the
> new value via broadcast state
>
> Is this idea workable?  I am unsure about the initial DB fetch and the
> AsyncIO process should a new value need to be inserted.
>
>
> Any thoughts appreciated.
>
> Thanks
>
> O
>


Re: StateFun remote/embedded polyglot example

2020-05-31 Thread Tzu-Li (Gordon) Tai
Hi,

On Mon, Jun 1, 2020 at 5:47 AM Omid Bakhshandeh 
wrote:

> Hi,
>
> I'm very confused about StateFun 2.0 new architecture.
>
> Is it possible to have both remote and embedded functions in the same
> deployment?
>

Yes that is possible. Embedded functions simply run within the Flink
StateFun workers (which essentially are Flink TMs).
Remote functions run as standalone services independent of the Flink
StateFun cluster.


> Is there a tutorial that shows the deployment of the two types in the same
> Kubernetes cluster alongside with Flink(possible in Python and Java)?
>

You simply have to include all modules [1] (embedded and remote modules
included) when packaging your StateFun application image [2].
You can see the Dockerfile in [2] that demonstrates exactly that.
It copies an embedded module (a JAR containing the service files and Java
functions / ingresses / egresses) and a remote module (a YAML file defining
the endpoints of remote functions, ingresses / egresses) into a pre-defined
modules directory in the image.


>
> Also, is there a path towards KNative support and scale to zero option?
>

I've cc'ed Igal, who has been looking at KNative support.
As for scale-to-zero, that would already work for your remote function
deployments if you are deploying them with, for example, FaaS solutions
like KNative / AWS Lambda / GCP Cloud Functions, etc.
That is already supported with the new StateFun 2.0 architecture.


>
> Best,
> --
> ---
> Omid
>

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/sdk/modules.html
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/deployment-and-operations/packaging.html


Re: Stateful functions Harness

2020-05-27 Thread Tzu-Li (Gordon) Tai
On Thu, May 28, 2020, 12:19 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> I think I figured this out.
> The project seems to be missing
>
> resources
> <https://github.com/ververica/flink-statefun-workshop/tree/master/statefun-workshop-functions/src/main/resources>
> /META-INF
> <https://github.com/ververica/flink-statefun-workshop/tree/master/statefun-workshop-functions/src/main/resources/META-INF>
> /services directory, which should contain services
>

Yes, the functions / ingresses / regresses etc. are not discoverable if the
service file isnt present in the classpath.

For the examples, if you are running it straight from the repo, should all
have that service file defined and therefore readily runnable.

If you are creating your own application project, you'll have to add that
yourself.


> Another question:
> I see org.apache.flink.statefun.flink.io.datastream.SourceSinkModule
>
> Class, which I think allows to use existing data streams as ingress/egress.
>
> Are there any examples of its usage
>

On the Harness class, there is a withFlinkSourceFunction method in which
you can directly add a Flink source function as the ingress.

If you want to use that directly in a normal application (not just
execution in IDE with the Harness), you can define your ingesses/egresses
by binding SourceFunctionSpec / SinkFunctionSpec.
Please see how they are being used in the Harness class for examples.

Gordon


>
> On May 27, 2020, at 11:10 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi,
>
> The example is working fine on my side (also using IntelliJ).
> This could most likely be a problem with your project setup in the IDE,
> where the classpath isn't setup correctly.
>
> What do you see when you right click on the statefun-flink-harness-example
> directory (in the IDE) --> Open Module Settings, and then under the
> "Sources" / "Dependencies" tab?
> Usually this should all be automatically setup correctly when importing
> the project.
>
> Gordon
>
> On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
>> The project
>> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
>> Does not work in Intellij.
>>
>> The problem is that when running in Intellij, method public static Modules
>> loadFromClassPath() {
>> Does not pick up classes, which are local in Intellij
>>
>> Any work arounds?
>>
>>
>>
>>
>> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai 
>> wrote:
>>
>> Hi,
>>
>> Sorry, I need to correct my comment on using the Kafka ingress / egress
>> with the Harness.
>>
>> That is actually doable, by adding an extra dependency to
>> `statefun-flink-distribution` in your Harness program.
>> That pulls in all the other required dependencies required by the Kafka
>> ingress / egress, such as the source / sink providers and Flink Kafka
>> connectors.
>>
>> Cheers,
>> Gordon
>>
>> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Are you getting an exception from running the Harness?
>>> The Harness should already have the required configurations, such as the
>>> parent first classloading config.
>>>
>>> Otherwise, if you would like to add your own configuration, use the
>>> `withConfiguration` method on the `Harness` class.
>>>
>>> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
>>> boris.lublin...@lightbend.com> wrote:
>>>
>>>> Also, where do I put flint-conf.yaml in Idea to add additional required
>>>> config parameter:
>>>>
>>>> classloader.parent-first-patterns.additional: 
>>>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>>>
>>>>
>>>>
>>>> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
>>>> boris.lublin...@lightbend.com> wrote:
>>>>
>>>> Hi,
>>>> I am trying to run
>>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>>>  locally
>>>> using
>>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>>>
>>>> And have several questions.
>>>> 1. It seems fairly straightforward to use it with in memory message
>>>> generators, but I can’t figure out how to add Kafka ingress/Egress so that
>>>> I can use it with Kafk
>>>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
>>>> Harness. Is there a way to short circuit it and have Harness get
>>>> StatefulFunctionUniverse directly
>>>> 3. Is there an example on how to write Flink main for stageful function?
>>>> 4. Is there an example anywhere on how to run such examples in the IDE
>>>> with Kafka?
>>>> 5 There is a great stateful functions example
>>>> https://github.com/ververica/flink-statefun-workshop, but its readme
>>>> does not really describe implementation and neither does this article,
>>>> referencing it
>>>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39.
>>>> Is there anything that describes this implementation?
>>>>
>>>>
>>>>
>>
>


Re: Stateful functions Harness

2020-05-27 Thread Tzu-Li (Gordon) Tai
Hi,

The example is working fine on my side (also using IntelliJ).
This could most likely be a problem with your project setup in the IDE,
where the classpath isn't setup correctly.

What do you see when you right click on the statefun-flink-harness-example
directory (in the IDE) --> Open Module Settings, and then under the
"Sources" / "Dependencies" tab?
Usually this should all be automatically setup correctly when importing the
project.

Gordon

On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> The project
> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
> Does not work in Intellij.
>
> The problem is that when running in Intellij, method public static Modules
> loadFromClassPath() {
> Does not pick up classes, which are local in Intellij
>
> Any work arounds?
>
>
>
>
> On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
> Hi,
>
> Sorry, I need to correct my comment on using the Kafka ingress / egress
> with the Harness.
>
> That is actually doable, by adding an extra dependency to
> `statefun-flink-distribution` in your Harness program.
> That pulls in all the other required dependencies required by the Kafka
> ingress / egress, such as the source / sink providers and Flink Kafka
> connectors.
>
> Cheers,
> Gordon
>
> On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Are you getting an exception from running the Harness?
>> The Harness should already have the required configurations, such as the
>> parent first classloading config.
>>
>> Otherwise, if you would like to add your own configuration, use the
>> `withConfiguration` method on the `Harness` class.
>>
>> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
>> boris.lublin...@lightbend.com> wrote:
>>
>>> Also, where do I put flint-conf.yaml in Idea to add additional required
>>> config parameter:
>>>
>>> classloader.parent-first-patterns.additional: 
>>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>>
>>>
>>>
>>> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
>>> boris.lublin...@lightbend.com> wrote:
>>>
>>> Hi,
>>> I am trying to run
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>>  locally
>>> using
>>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>>
>>> And have several questions.
>>> 1. It seems fairly straightforward to use it with in memory message
>>> generators, but I can’t figure out how to add Kafka ingress/Egress so that
>>> I can use it with Kafk
>>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
>>> Harness. Is there a way to short circuit it and have Harness get
>>> StatefulFunctionUniverse directly
>>> 3. Is there an example on how to write Flink main for stageful function?
>>> 4. Is there an example anywhere on how to run such examples in the IDE
>>> with Kafka?
>>> 5 There is a great stateful functions example
>>> https://github.com/ververica/flink-statefun-workshop, but its readme
>>> does not really describe implementation and neither does this article,
>>> referencing it
>>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39.
>>> Is there anything that describes this implementation?
>>>
>>>
>>>
>


Re: stateful-fun2.0 checkpointing

2020-05-25 Thread Tzu-Li (Gordon) Tai
Hi,

I replied to your question on this in your other email thread.

Let us know if you have other questions!

Cheers,
Gordon

On Sun, May 24, 2020, 1:01 AM C DINESH  wrote:

> Hi Team,
>
> 1. How can we enable checkpointing in stateful-fun2.0
> 2. How to set parallelism
>
> Thanks,
> Dinesh.
>
>


Re: Stateful-fun-Basic-Hello

2020-05-25 Thread Tzu-Li (Gordon) Tai
Hi,

You're right, maybe the documentation needs a bit more directions there,
especially for people who are newer to Flink.

1. How to increase parallelism

There are two ways to do this. Either set the `parallelism.default` also in
the flink-conf.yaml, or use the -p command line option when starting the
application via packaged Docker images.

2. How to enable checkpointing

You would have to set execution.checkpointing.mode and
execution.checkpointing.interval configs, also in flink-conf.yaml.

For example, the mode can be set to `EXACTLY_ONCE` and interval to `5sec`
to have exactly-once mode checkpoints at 5 second intervals.


In general, the Statefun specific configurations are listed here [1].
All other configurations available in Flink are also available in Stateful
Functions as well.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/deployment-and-operations/configurations.html

On Tue, May 26, 2020, 11:42 AM C DINESH  wrote:

> Hi Team,
>
> I mean to say that know I understood. but in the documentation page
> flink-conf.yaml is not mentioned
>
> On Mon, May 25, 2020 at 7:18 PM C DINESH  wrote:
>
>> Thanks Gordon,
>>
>> I read the documentation several times. But I didn't understand at that
>> time, flink-conf.yaml is not there.
>>
>> can you please suggest
>> 1. how to increase parallelism
>> 2. how to give checkpoints to the job
>>
>> As far as I know there is no documentation regarding this. or Are these
>> features are not there yet?
>>
>> Cheers,
>> Dinesh.
>>
>


Re: Stateful-fun-Basic-Hello

2020-05-25 Thread Tzu-Li (Gordon) Tai
Hi,

It seems like you are trying to package your Stateful Functions app as a
Flink job, and submit that to an existing cluster.

If that indeed is the case,
Stateful Functions apps have some required confogurations that need to be
set via the flink-conf.yaml file for your existing cluster. Please see the
bottom half of [1] for more details.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/deployment-and-operations/packaging.html#flink-jar

On Sat, May 23, 2020, 9:55 PM C DINESH  wrote:

> Hi Team,
>
> I am writing my first stateful fun basic hello application. I am getting
> the following Exception.
>
> $ ./bin/flink run -c
> org.apache.flink.statefun.flink.core.StatefulFunctionsJob
> ./stateful-sun-hello-java-1.0-SNAPSHOT-jar-with-dependencies.jar
>
>
>
> 
>
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Invalid configuration:
> classloader.parent-first-patterns.additional; Must contain all of
> org.apache.flink.statefun, org.apache.kafka, com.google.protobuf
>
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>
> at
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>
> Caused by:
> org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException:
> Invalid configuration: classloader.parent-first-patterns.additional; Must
> contain all of org.apache.flink.statefun, org.apache.kafka,
> com.google.protobuf
>
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsConfigValidator.validateParentFirstClassloaderPatterns(StatefulFunctionsConfigValidator.java:55)
>
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsConfigValidator.validate(StatefulFunctionsConfigValidator.java:44)
>
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.(StatefulFunctionsConfig.java:143)
>
> at
> org.apache.flink.statefun.flink.core.StatefulFunctionsConfig.fromEnvironment(StatefulFunctionsConfig.java:105)
>
> This is my POM file I hope I have added all the dependencies. Please
> suggest me what to do.
>
> 
> http://maven.apache.org/POM/4.0.0";
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
> 4.0.0
>
> org.example
> stateful-sun-hello-java
> 1.0-SNAPSHOT
>
> 
> 
> com.google.protobuf
> protobuf-java
> 3.6.1
> 
> 
> org.apache.flink
> statefun-sdk
> 2.0.0
> 
> 
> org.apache.flink
> statefun-flink-distribution
> 2.0.0
> 
> 
> org.apache.flink
> statefun-kafka-io
> 2.0.0
> 
> 
>
>
>
> 
> clean generate-sources compile install
>
> 
> 
> 
> com.github.os72
> protoc-jar-maven-plugin
> 3.6.0.1
> 
> 
> generate-sources
> 
> run
> 
> 
> direct
>
> 
> src/main/protobuf
> 
>
> 
> 
> java
> 
> src/main/java
> 
> 
> grpc-java
> 
> io.grpc:protoc-gen-grpc-java:1.15.0
> 
> src/main/java
> 
> 
> 
> 
> 
> 
>
> 
> org.apache.maven.plugins
> maven-assembly-plugin
> 2.4.1
> 
> 
> 
> jar-with-dependencies
> 

Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi,

Sorry, I need to correct my comment on using the Kafka ingress / egress
with the Harness.

That is actually doable, by adding an extra dependency to
`statefun-flink-distribution` in your Harness program.
That pulls in all the other required dependencies required by the Kafka
ingress / egress, such as the source / sink providers and Flink Kafka
connectors.

Cheers,
Gordon

On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai 
wrote:

> Are you getting an exception from running the Harness?
> The Harness should already have the required configurations, such as the
> parent first classloading config.
>
> Otherwise, if you would like to add your own configuration, use the
> `withConfiguration` method on the `Harness` class.
>
> On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
>> Also, where do I put flint-conf.yaml in Idea to add additional required
>> config parameter:
>>
>> classloader.parent-first-patterns.additional: 
>> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>>
>>
>>
>> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
>> boris.lublin...@lightbend.com> wrote:
>>
>> Hi,
>> I am trying to run
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>>  locally
>> using
>> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>>
>> And have several questions.
>> 1. It seems fairly straightforward to use it with in memory message
>> generators, but I can’t figure out how to add Kafka ingress/Egress so that
>> I can use it with Kafk
>> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
>> Harness. Is there a way to short circuit it and have Harness get
>> StatefulFunctionUniverse directly
>> 3. Is there an example on how to write Flink main for stageful function?
>> 4. Is there an example anywhere on how to run such examples in the IDE
>> with Kafka?
>> 5 There is a great stateful functions example
>> https://github.com/ververica/flink-statefun-workshop, but its readme
>> does not really describe implementation and neither does this article,
>> referencing it
>> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39.
>> Is there anything that describes this implementation?
>>
>>
>>


Re: Flink Window with multiple trigger condition

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi,

To achieve what you have in mind, I think what you have to do is to use a
processing time window of 30 mins, and have a custom trigger that matches
the "start" event in the `onElement` method and return
TriggerResult.FIRE_AND_PURGE.

That way, the window fires either when the processing time has passed, or
the start event was recieved.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using Queryable State within 1 job + docs suggestion

2020-05-21 Thread Tzu-Li (Gordon) Tai
This in general is not a good idea, as the state you query using queryable
state within a job does not provide any consistency guarantees at all.

Would it be possible to have some trigger that emits state of the windows,
and join the states downstream?
In general, that is a better approach for what you seem to be trying to
achieve.

Otherwise, when it comes to "querying state across operators", that's a hint
where the Stateful Functions [1] model could maybe be a better fit to your
use case here. Specifically, using Stateful Functions, you would model
"querying state" as a request to the target function, with the target
function sending its state back as a response.

Cheers,
Gordon

[1] https://flink.apache.org/stateful-functions.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using Queryable State within 1 job + docs suggestion

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi,

That in general is not a good idea, with the problem you mentioned as well
as the fact that the state you
query within the same job using queryable state does not provide any means
of consistency guarantee.

When it comes to "querying state from another operator", it is a hint that
your use case can potentially be
better modeled using the Stateful Functions framework [1]. With Stateful
Functions, you would model this
as a request message to the target function, with the target function
replying a response carrying its state.
There are still other shortcomings though, for example StateFun currently
doesn't support windowed state yet.

Cheers,
Gordon

[1] https://flink.apache.org/stateful-functions.html

On Thu, May 21, 2020 at 10:25 PM Annemarie Burger <
annemarie.bur...@campus.tu-berlin.de> wrote:

> Hi,
>
> So what I meant was that I have a keyed stream, and from each
> thread/keygroup/PU I want to query the state of the other
> threads/keygroups/PUs.
>
> Does anybody have any experience with this?
>
> I'm currently working on it, and the main problem seems to be that the
> Queryable State Client requires the JobID from which to query the state,
> which in my case would be the same as its own jobID. Any ideas how to
> workaround this?
> Using env.getStreamGraph.getJobGraph.getJobID doesn't seem to work.
>
> Best,
> Annemarie
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Performance impact of many open windows at the same time

2020-05-21 Thread Tzu-Li (Gordon) Tai
Hi Joe,

The main effect this should have is more state to be kept until the windows
can be fired (and state purged).
This would of course increase the time it takes to checkpoint the operator.

I'm not sure if there will be significant runtime per-record impact caused
by how windows are bookkeeped in data structures in the WindowOperator,
maybe Aljoscha (cc'ed) can chime in here for anything.
If it is certain that these windows will never fire (until far into the
future) because the event-timestamps are in the first place corrupted, then
it might make sense to have a way to drop windows based on some criteria.
I'm not sure if that is supported in any way without triggers (since you
mentioned that those windows might not receive any data), again Aljoscha
might be able to provide more info here.

Cheers,
Gordon

On Thu, May 21, 2020 at 7:02 PM Joe Malt  wrote:

> Hi all,
>
> I'm looking into what happens when messages are ingested with timestamps
> far into the future (e.g. due to corruption or a wrong clock at the sender).
>
> I'm aware of the effect on watermarking, but another thing I'm concerned
> about is the performance impact of the extra windows this will create.
>
> If a Flink operator has many (perhaps hundreds or thousands) of windows
> open but not receiving any data (and never firing), will this degrade
> performance?
>
> Thanks,
> Joe
>


Re: Stateful functions Harness

2020-05-21 Thread Tzu-Li (Gordon) Tai
Are you getting an exception from running the Harness?
The Harness should already have the required configurations, such as the
parent first classloading config.

Otherwise, if you would like to add your own configuration, use the
`withConfiguration` method on the `Harness` class.

On Fri, May 22, 2020 at 7:19 AM Boris Lublinsky <
boris.lublin...@lightbend.com> wrote:

> Also, where do I put flint-conf.yaml in Idea to add additional required
> config parameter:
>
> classloader.parent-first-patterns.additional: 
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
>
>
>
> On May 21, 2020, at 12:22 PM, Boris Lublinsky <
> boris.lublin...@lightbend.com> wrote:
>
> Hi,
> I am trying to run
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-greeter-example
>  locally
> using
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example
>
> And have several questions.
> 1. It seems fairly straightforward to use it with in memory message
> generators, but I can’t figure out how to add Kafka ingress/Egress so that
> I can use it with Kafk
> 2. GreetingModule already creates StatefulFunctionUniverse  and so does
> Harness. Is there a way to short circuit it and have Harness get
> StatefulFunctionUniverse directly
> 3. Is there an example on how to write Flink main for stageful function?
> 4. Is there an example anywhere on how to run such examples in the IDE
> with Kafka?
> 5 There is a great stateful functions example
> https://github.com/ververica/flink-statefun-workshop, but its readme does
> not really describe implementation and neither does this article,
> referencing it
> https://dev.to/morsapaes/flink-stateful-functions-where-to-start-2j39. Is
> there anything that describes this implementation?
>
>
>


  1   2   3   4   5   6   >