Running Flink on kerberized HDP 3.1 (minimal getting started)

2020-06-12 Thread Georg Heiler
Hi,

I try to run Flink on a kerberized HDP 3.1 instance and need some help
getting started.
https://stackoverflow.com/questions/62330689/execute-flink-1-10-on-a-hdp-3-1-cluster-to-access-hive-tables
describes how far I have gotten so far.

In the end, I want to be able to start task managers on YARN and interact
with HDFS and hive and Kafka as well.

Best,
Georg


Re: DROOLS rule engine with flink

2020-06-23 Thread Georg Heiler
Why not use flink CEP?

https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html

has a nice interactive example

Best,
Georg

Jaswin Shah  schrieb am Di. 23. Juni 2020 um 21:03:

> Hi I am thinking of using some rule engine like DROOLS with flink to solve
> a problem described below:
>
> I have stream of events coming from kafka topic and I want to analyze
> those events based on some rules and give the results in results streams
> when rules are satisfied.
>
> Now, I am able to solve the same problem with flink entirely but I need to
> write hard coded conditions in flink for the rules and in future I want to
> keep my flink job generic that if any rules are changed I should not need
> the redeployment of flink job.
>
> Use case:
> Consider there are events coming like A,B,C Dand those events are
> denoting those entity A is down, B is down,C is down ...etc.
>
> Now, there are many rules like:
> 1.A is actually down if A is down and there are 3 Bs for A are down...here
> A entity can have B in event json so.
>
> 2. B IS ACTUALLY DOWN IF B IS DOWN AND 2As for B are not down.
>
> THOSE EVENTS FROM SAME MINUTE
>
> See the not condition here, so,  here When I received event of B down the,
> I will wait for buffer time say 1 min and after 1 min if I dont receive 2A
> down events, I declare B as down in result stream.
>
> Here basically we check on events at some minute so, keyboard
> minute.==》very imp
>
> I need a help on how can use DROOls engine to get those rules out from
> business logic and also maintenaning maximum partitioning as I am able to
> do it with static rules.
>
> Any help will be really appreciated.
>
> Thanks,
> Jaswin
>
>
>
>
>
> Get Outlook for Android 
>


passing additional jvm parameters to the configuration

2020-06-24 Thread Georg Heiler
Hi,

how can I pass additional configuration parameters like spark`s
extraJavaOptions to a flink job?

https://stackoverflow.com/questions/62562153/apache-flink-and-pureconfig-passing-java-properties-on-job-startup

contains the details. But the gist is:
flink run --class
com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

is not passing the -Dconfig.file to the flink job!

Best,
Georg


Re: passing additional jvm parameters to the configuration

2020-06-24 Thread Georg Heiler
Hi Arvid,

thanks for the quick reply. I have a strong Apache spark background. There,
when executing on YARN or locally usually, the cluster is created on-demand
for the duration of the batch /streaming job.
There, there is only the concept of A) master/driver (application master)
B) slave/executor C) Driver: the node where the main class is invoked. In
Sparks`notion, I want the -D parameter to be available on the (C) Driver
node. When translating this to Flink, I want this to be available to the
Main class which is invoked when the job is submitted/started by the job
manager (which should be equivalent to the driver).

But maybe my understanding of Flink is not 100% correct yet.

Unfortunately, using -D directly is not working.

Best,
Georg

Am Mi., 24. Juni 2020 um 22:13 Uhr schrieb Arvid Heise :

> Hi Georg,
>
> could you check if simply using -D is working as described here [1].
>
> If not, could you please be more precise: do you want the parameter to be
> passed to the driver, the job manager, or the task managers?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#deployment-targets
>
> On Wed, Jun 24, 2020 at 8:55 PM Georg Heiler 
> wrote:
>
>> Hi,
>>
>> how can I pass additional configuration parameters like spark`s
>> extraJavaOptions to a flink job?
>>
>>
>> https://stackoverflow.com/questions/62562153/apache-flink-and-pureconfig-passing-java-properties-on-job-startup
>>
>> contains the details. But the gist is:
>> flink run --class
>> com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
>> "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" \
>> -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"
>>
>> is not passing the -Dconfig.file to the flink job!
>>
>> Best,
>> Georg
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: passing additional jvm parameters to the configuration

2020-06-25 Thread Georg Heiler
Hi,

but how can I change/configure it per submitted job and not for the whole
cluster?

Best,
Georg

Am Do., 25. Juni 2020 um 10:07 Uhr schrieb Arvid Heise :

> Hi Georg,
>
> thank you for your detailed explanation. You want to use env.java.opts[1].
> There are flavors if you only want to make it available on job manager or
> task manager but I guess the basic form is good enough for you.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jvm-and-logging-options
>
> On Wed, Jun 24, 2020 at 10:52 PM Georg Heiler 
> wrote:
>
>> Hi Arvid,
>>
>> thanks for the quick reply. I have a strong Apache spark background.
>> There, when executing on YARN or locally usually, the cluster is created
>> on-demand for the duration of the batch /streaming job.
>> There, there is only the concept of A) master/driver (application master)
>> B) slave/executor C) Driver: the node where the main class is invoked. In
>> Sparks`notion, I want the -D parameter to be available on the (C) Driver
>> node. When translating this to Flink, I want this to be available to the
>> Main class which is invoked when the job is submitted/started by the job
>> manager (which should be equivalent to the driver).
>>
>> But maybe my understanding of Flink is not 100% correct yet.
>>
>> Unfortunately, using -D directly is not working.
>>
>> Best,
>> Georg
>>
>> Am Mi., 24. Juni 2020 um 22:13 Uhr schrieb Arvid Heise <
>> ar...@ververica.com>:
>>
>>> Hi Georg,
>>>
>>> could you check if simply using -D is working as described here [1].
>>>
>>> If not, could you please be more precise: do you want the parameter to
>>> be passed to the driver, the job manager, or the task managers?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#deployment-targets
>>>
>>> On Wed, Jun 24, 2020 at 8:55 PM Georg Heiler 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> how can I pass additional configuration parameters like spark`s
>>>> extraJavaOptions to a flink job?
>>>>
>>>>
>>>> https://stackoverflow.com/questions/62562153/apache-flink-and-pureconfig-passing-java-properties-on-job-startup
>>>>
>>>> contains the details. But the gist is:
>>>> flink run --class
>>>> com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
>>>> "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
>>>> \
>>>> -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"
>>>>
>>>> is not passing the -Dconfig.file to the flink job!
>>>>
>>>> Best,
>>>> Georg
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: passing additional jvm parameters to the configuration

2020-06-25 Thread Georg Heiler
Thanks a lot!
Your point is right.

One Cluster per job should be used in the thought model to be comparable.

In particular for YARN:

-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"

You mentioned, that the path must be accessible. Spark has a --files
parameter and then the local file is automatically copied to the root
of the YARN container. Is something similar available in Flink?


Best,
Georg

Am Do., 25. Juni 2020 um 14:58 Uhr schrieb Arvid Heise :

> Hi Georg,
>
> I think there is a conceptual misunderstanding. If you reuse the cluster
> for several jobs, they need to share the JVM_ARGS since it's the same
> process. [1] On Spark, new processes are spawned for each stage afaik.
>
> However, the current recommendation is to use only one ad-hoc cluster per
> job/application (which is closer to how Spark works). So if you use YARN,
> every job/application spawns a new cluster that just has the right size for
> it. Then you can supply new parameters for new YARN submission with
>
> flink run -m yarn-cluster -yD 
> env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" \
>
> -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
> "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
>
> However, make sure that the path is accessible from within your YARN
> cluster, since the driver is probably executed on the cluster (not 100%
> sure).
>
>
> If you want per job level configurations on a shared cluster, I'd
> recommend to use normal parameters and initialize PureConfig manually
> (haven't used it, so not sure how). Then, you'd probably invoke your
> program as follows.
>
> flink run \
>
> -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
> "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
> config.file='config/jobs/twitter-analysis.conf'
>
>
> For local execution, I had some trouble configuring it as well (tried it
> with your code). The issue is that all parameters that we previously tried
> are only passed to newly spawned processes while your code is directly
> executed in the CLI.
>
> FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf"
> flink run
> -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
> "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
>
> FLINK_ENV_JAVA_OPTS is usually parsed from flink-conf.yaml using the
> env.java.opts but doesn't respect -Denv.java.opts. I'm not sure if this
> is intentional.
>
>
> If you could put the env.java.opts in the flink-conf.yaml, it would most
> likely work for both YARN and local. With FLINK_CONF_DIR you can set a
> different conf dir per job. Alternatively, you could also specify both
> FLINK_ENV_JAVA_OPTS and -yD to inject the property.
>
>
> [1] https://stackoverflow.com/a/33855802/10299342
>
> On Thu, Jun 25, 2020 at 12:49 PM Georg Heiler 
> wrote:
>
>> Hi,
>>
>> but how can I change/configure it per submitted job and not for the whole
>> cluster?
>>
>> Best,
>> Georg
>>
>> Am Do., 25. Juni 2020 um 10:07 Uhr schrieb Arvid Heise <
>> ar...@ververica.com>:
>>
>>> Hi Georg,
>>>
>>> thank you for your detailed explanation. You want to use
>>> env.java.opts[1]. There are flavors if you only want to make it available
>>> on job manager or task manager but I guess the basic form is good enough
>>> for you.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jvm-and-logging-options
>>>
>>> On Wed, Jun 24, 2020 at 10:52 PM Georg Heiler 
>>> wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> thanks for the quick reply. I have a strong Apache spark background.
>>>> There, when executing on YARN or locally usually, the cluster is created
>>>> on-demand for the duration of the batch /streaming job.
>>>> There, there is only the concept of A) master/driver (application
>>>> master) B) slave/executor C) Driver: the node where the main class is
>>>> invoked. In Sparks`notion, I want the -D parameter to be available on the
>>>> (C) Driver node. When translating this to Flink, I want this to be
>>>> available to the Main class which is invoked when the job is
>>>> submitted/started by the job manager (which should be equivalent to the
>>>> driver).
>>>>
>>>> But maybe my understanding of Flink is not 100% correct yet.
&

Re: passing additional jvm parameters to the configuration

2020-06-25 Thread Georg Heiler
Do you also want to answer
https://stackoverflow.com/questions/62562153/apache-flink-and-pureconfig-passing-java-properties-on-job-startup
?

Your suggestion seems to work well.

Best,
Georg

Am Do., 25. Juni 2020 um 15:32 Uhr schrieb Arvid Heise :

> You are welcome.
>
> I'm not an expert on the yarn executor but I hope that
>
>  -yt,--yarnship  Ship files in the specified 
> directory
>   (t for transfer)
>
> can help [1]. Oddly this option is not given on the YARN page. But it should 
> be available as it's also used in the SSL setup [2].
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/cli.html
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/security-ssl.html#tips-for-yarn--mesos-deployment
>
>
> On Thu, Jun 25, 2020 at 3:23 PM Georg Heiler 
> wrote:
>
>> Thanks a lot!
>> Your point is right.
>>
>> One Cluster per job should be used in the thought model to be comparable.
>>
>> In particular for YARN:
>>
>> -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"
>>
>> You mentioned, that the path must be accessible. Spark has a --files 
>> parameter and then the local file is automatically copied to the root of the 
>> YARN container. Is something similar available in Flink?
>>
>>
>> Best,
>> Georg
>>
>> Am Do., 25. Juni 2020 um 14:58 Uhr schrieb Arvid Heise <
>> ar...@ververica.com>:
>>
>>> Hi Georg,
>>>
>>> I think there is a conceptual misunderstanding. If you reuse the cluster
>>> for several jobs, they need to share the JVM_ARGS since it's the same
>>> process. [1] On Spark, new processes are spawned for each stage afaik.
>>>
>>> However, the current recommendation is to use only one ad-hoc cluster
>>> per job/application (which is closer to how Spark works). So if you use
>>> YARN, every job/application spawns a new cluster that just has the right
>>> size for it. Then you can supply new parameters for new YARN submission
>>> with
>>>
>>> flink run -m yarn-cluster -yD 
>>> env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" \
>>>
>>> -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
>>> "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
>>>
>>> However, make sure that the path is accessible from within your YARN
>>> cluster, since the driver is probably executed on the cluster (not 100%
>>> sure).
>>>
>>>
>>> If you want per job level configurations on a shared cluster, I'd
>>> recommend to use normal parameters and initialize PureConfig manually
>>> (haven't used it, so not sure how). Then, you'd probably invoke your
>>> program as follows.
>>>
>>> flink run \
>>>
>>> -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
>>> "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
>>> config.file='config/jobs/twitter-analysis.conf'
>>>
>>>
>>> For local execution, I had some trouble configuring it as well (tried it
>>> with your code). The issue is that all parameters that we previously tried
>>> are only passed to newly spawned processes while your code is directly
>>> executed in the CLI.
>>>
>>>
>>> FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf"
>>> flink run
>>> -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \
>>> "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
>>>
>>> FLINK_ENV_JAVA_OPTS is usually parsed from flink-conf.yaml using the
>>> env.java.opts but doesn't respect -Denv.java.opts. I'm not sure if this
>>> is intentional.
>>>
>>>
>>> If you could put the env.java.opts in the flink-conf.yaml, it would most
>>> likely work for both YARN and local. With FLINK_CONF_DIR you can set a
>>> different conf dir per job. Alternatively, you could also specify both
>>> FLINK_ENV_JAVA_OPTS and -yD to inject the property.
>>>
>>>
>>> [1] https://stackoverflow.com/a/33855802/10299342
>>>
>>> On Thu, Jun 25, 2020 at 12:49 PM Georg Heiler 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> but how can I change/configure it per submitted job and not for

[no subject]

2020-06-29 Thread Georg Heiler
Hi,

I try to use the confluent schema registry in an interactive Flink Scala
shell.

My problem is trying to initialize the serializer from the
ConfluentRegistryAvroDeserializationSchema fails:

```scala
val serializer =
ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet],
schemaRegistryUrl)
error: type arguments [Tweet] conform to the bounds of none of the
overloaded alternatives of
value forSpecific: [T <: org.apache.avro.specific.SpecificRecord](x$1:
Class[T], x$2: String, x$3:
Int)org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema[T]
 [T <: org.apache.avro.specific.SpecificRecord](x$1: Class[T],
x$2: 
String)org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema[T]
```

please see
https://stackoverflow.com/questions/62637009/flink-use-confluent-schema-registry-for-avro-serde
for details how the shell was set up and which additional JARs were loaded

Best,
Georg


Avro from avrohugger still invalid

2020-06-29 Thread Georg Heiler
Older versions of flink were incompatible with the Scala specific record
classes generated from AvroHugger.

https://issues.apache.org/jira/browse/FLINK-12501 Flink 1.10 apparently is
fixing this. I am currently using 1.10.1. However, still experience thus
problem
https://stackoverflow.com/questions/62637009/flink-use-confluent-schema-registry-for-avro-serde
of:

AvroRuntimeException: Not a Specific class

What is still wrong here?

Best,

Georg


Re: Avro from avrohugger still invalid

2020-07-02 Thread Georg Heiler
What is the suggested workaround for now?


Thanks!

Aljoscha Krettek  schrieb am Do. 2. Juli 2020 um 20:55:

> Hi Georg,
>
> unfortunately, it seems I only fixed the issue for AvroSerializer and
> not for AvroDeserializationSchema. I created a new issue (which is a
> clone of the old one) to track this [1]. The fix should be very simple
> since it's the same issue.
>
> Best,
> Aljoscha
>
> [1] https://issues.apache.org/jira/browse/FLINK-18478
>
> On 01.07.20 09:11, Till Rohrmann wrote:
> > Hi Georg,
> >
> > I'm pulling in Aljoscha who might know more about the problem you are
> > describing.
> >
> > Cheers,
> > Till
> >
> > On Mon, Jun 29, 2020 at 10:21 PM Georg Heiler  >
> > wrote:
> >
> >> Older versions of flink were incompatible with the Scala specific record
> >> classes generated from AvroHugger.
> >>
> >> https://issues.apache.org/jira/browse/FLINK-12501 Flink 1.10 apparently
> >> is fixing this. I am currently using 1.10.1. However, still experience
> thus
> >> problem
> >>
> https://stackoverflow.com/questions/62637009/flink-use-confluent-schema-registry-for-avro-serde
> >> of:
> >>
> >> AvroRuntimeException: Not a Specific class
> >>
> >> What is still wrong here?
> >>
> >> Best,
> >>
> >> Georg
> >>
> >>
> >
>
>


Re: Avro from avrohugger still invalid

2020-07-02 Thread Georg Heiler
But would it be possible to somehow use AvroSerializer for now?

Best,
Georg

Am Do., 2. Juli 2020 um 23:44 Uhr schrieb Georg Heiler <
georg.kf.hei...@gmail.com>:

> What is the suggested workaround for now?
>
>
> Thanks!
>
> Aljoscha Krettek  schrieb am Do. 2. Juli 2020 um
> 20:55:
>
>> Hi Georg,
>>
>> unfortunately, it seems I only fixed the issue for AvroSerializer and
>> not for AvroDeserializationSchema. I created a new issue (which is a
>> clone of the old one) to track this [1]. The fix should be very simple
>> since it's the same issue.
>>
>> Best,
>> Aljoscha
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18478
>>
>> On 01.07.20 09:11, Till Rohrmann wrote:
>> > Hi Georg,
>> >
>> > I'm pulling in Aljoscha who might know more about the problem you are
>> > describing.
>> >
>> > Cheers,
>> > Till
>> >
>> > On Mon, Jun 29, 2020 at 10:21 PM Georg Heiler <
>> georg.kf.hei...@gmail.com>
>> > wrote:
>> >
>> >> Older versions of flink were incompatible with the Scala specific
>> record
>> >> classes generated from AvroHugger.
>> >>
>> >> https://issues.apache.org/jira/browse/FLINK-12501 Flink 1.10
>> apparently
>> >> is fixing this. I am currently using 1.10.1. However, still experience
>> thus
>> >> problem
>> >>
>> https://stackoverflow.com/questions/62637009/flink-use-confluent-schema-registry-for-avro-serde
>> >> of:
>> >>
>> >> AvroRuntimeException: Not a Specific class
>> >>
>> >> What is still wrong here?
>> >>
>> >> Best,
>> >>
>> >> Georg
>> >>
>> >>
>> >
>>
>>


flink take single element from stream

2020-07-09 Thread Georg Heiler
How can I explore a stream in Flink interactively?

Spark has the concept of take/head to extract the first n elements of a
dataframe / table.

Is something similar available in Flink for a stream like:

val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
new FlinkKafkaConsumer(
  "tweets-raw-json",
  serializer,
  properties
).setStartFromEarliest() // TODO experiment with different start values
  )

stream.head/take

does not seem to be implemented.


map JSON to scala case class & off-heap optimization

2020-07-09 Thread Georg Heiler
Hi,

I want to map a stream of JSON documents from Kafka to a scala case-class.
How can this be accomplished using the JSONKeyValueDeserializationSchema?Is
a manual mapping of object nodes required?

I have a Spark background. There, such manual mappings usually are
discouraged. Instead, they offer a nice API (dataset API) to perform such a
type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be
faster

In Flink, instead, such off-heap optimizations seem not to be talked much
about (sorry if I miss something, I am a Flink newbie). Is there a reason
why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
new FlinkKafkaConsumer(
  "tweets-raw-json",
  serializer,
  properties
).setStartFromEarliest() // TODO experiment with different start values
  )

to map to this Tweet class concisely, i.e. without manually iterating
through all the attribute fields and parsing the keys from the object node
tree.

final case class Tweet(tweet_id: Option[String], text: Option[String],
source: Option[String], geo: Option[String], place: Option[String], lang:
Option[String], created_at: Option[String], timestamp_ms: Option[String],
coordinates: Option[String], user_id: Option[Long], user_name:
Option[String], screen_name: Option[String], user_created_at:
Option[String], followers_count: Option[Long], friends_count: Option[Long],
user_lang: Option[String], user_location: Option[String], hashtags:
Option[Seq[String]])

Best,
Georg


Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Georg Heiler
How can I use it with a scala case class?
If I understand it correctly for better performance the Object Mapper is
already initialized in each KafkaConsumer and returning ObjectNodes. So
probably I should rephrase to: how can I then map these to case classes
without handcoding it?  https://github.com/json4s/json4s or
https://github.com/FasterXML/jackson-module-scala both only seem to consume
strings.

Best,
Georg

Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
taher...@gmail.com>:

> You can try the Jackson ObjectMapper library and that will get you from
> json to object.
>
> Regards,
> Taher Koitawala
>
> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler 
> wrote:
>
>> Hi,
>>
>> I want to map a stream of JSON documents from Kafka to a scala
>> case-class. How can this be accomplished using the
>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>> required?
>>
>> I have a Spark background. There, such manual mappings usually are
>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>> type of assignment.
>> 1) this is concise
>> 2) it operates on sparks off-heap memory representations (tungsten) to be
>> faster
>>
>> In Flink, instead, such off-heap optimizations seem not to be talked much
>> about (sorry if I miss something, I am a Flink newbie). Is there a reason
>> why these optimizations are not necessary in Flink?
>>
>>
>> How could I get the following example:
>> val serializer = new JSONKeyValueDeserializationSchema(false)
>> val stream = senv.addSource(
>> new FlinkKafkaConsumer(
>>   "tweets-raw-json",
>>   serializer,
>>   properties
>> ).setStartFromEarliest() // TODO experiment with different start
>> values
>>   )
>>
>> to map to this Tweet class concisely, i.e. without manually iterating
>> through all the attribute fields and parsing the keys from the object node
>> tree.
>>
>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>> source: Option[String], geo: Option[String], place: Option[String], lang:
>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>> coordinates: Option[String], user_id: Option[Long], user_name:
>> Option[String], screen_name: Option[String], user_created_at:
>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>> user_lang: Option[String], user_location: Option[String], hashtags:
>> Option[Seq[String]])
>>
>> Best,
>> Georg
>>
>


MalformedClassName for scala case class

2020-07-09 Thread Georg Heiler
Hi,

why can't I register the stream as a table and get a MalformedClassName
exception?

val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
new FlinkKafkaConsumer(
  "tweets-raw-json",
  serializer,
  properties
).setStartFromEarliest() // TODO experiment with different start values
  )

case class Foo(lang: String, count: Int)
val r = stream
.map(e => {
  Foo(e.get("value").get("lang").asText(), 1)
})
.keyBy(_.lang)
.timeWindow(Time.seconds(10))
.sum("count")
r.print()
stenv.registerDataStream("tweets_json", r)

Best,
Georg


Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Georg Heiler
Great. Thanks.
But would it be possible to automate this i.e. to have this work
automatically for the case class / product?

Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
taher...@gmail.com>:

> The performant way would be to apply a map function over the stream and
> then use the Jackson ObjectMapper to convert to scala objects. In flink
> there is no API like Spark to automatically get all fields.
>
> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler 
> wrote:
>
>> How can I use it with a scala case class?
>> If I understand it correctly for better performance the Object Mapper is
>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>> probably I should rephrase to: how can I then map these to case classes
>> without handcoding it?  https://github.com/json4s/json4s or
>> https://github.com/FasterXML/jackson-module-scala both only seem to
>> consume strings.
>>
>> Best,
>> Georg
>>
>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>> taher...@gmail.com>:
>>
>>> You can try the Jackson ObjectMapper library and that will get you from
>>> json to object.
>>>
>>> Regards,
>>> Taher Koitawala
>>>
>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I want to map a stream of JSON documents from Kafka to a scala
>>>> case-class. How can this be accomplished using the
>>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>>> required?
>>>>
>>>> I have a Spark background. There, such manual mappings usually are
>>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>>>> type of assignment.
>>>> 1) this is concise
>>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>>> be faster
>>>>
>>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>>> reason why these optimizations are not necessary in Flink?
>>>>
>>>>
>>>> How could I get the following example:
>>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>>> val stream = senv.addSource(
>>>> new FlinkKafkaConsumer(
>>>>   "tweets-raw-json",
>>>>   serializer,
>>>>   properties
>>>> ).setStartFromEarliest() // TODO experiment with different start
>>>> values
>>>>   )
>>>>
>>>> to map to this Tweet class concisely, i.e. without manually iterating
>>>> through all the attribute fields and parsing the keys from the object node
>>>> tree.
>>>>
>>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>>> Option[String], screen_name: Option[String], user_created_at:
>>>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>>> Option[Seq[String]])
>>>>
>>>> Best,
>>>> Georg
>>>>
>>>


Re: Avro from avrohugger still invalid

2020-07-11 Thread Georg Heiler
Hi,

Many thanks for the PR!
However, I have tried to build an updated version of Flink and still run
into issues.
https://issues.apache.org/jira/browse/FLINK-18478

I have documented in a GIST (see the last comment) how to replicate it.

Best,
Georg

Am Fr., 3. Juli 2020 um 12:00 Uhr schrieb Aljoscha Krettek <
aljos...@apache.org>:

> Hi,
>
> I don't think there's a workaround, except copying the code and manually
> fixing it. Did you check out my comment on the Jira issue and the new
> one I created?
>
> Best,
> Aljoscha
>
> On 03.07.20 07:19, Georg Heiler wrote:
> > But would it be possible to somehow use AvroSerializer for now?
> >
> > Best,
> > Georg
> >
> > Am Do., 2. Juli 2020 um 23:44 Uhr schrieb Georg Heiler <
> > georg.kf.hei...@gmail.com>:
> >
> >> What is the suggested workaround for now?
> >>
> >>
> >> Thanks!
> >>
> >> Aljoscha Krettek  schrieb am Do. 2. Juli 2020 um
> >> 20:55:
> >>
> >>> Hi Georg,
> >>>
> >>> unfortunately, it seems I only fixed the issue for AvroSerializer and
> >>> not for AvroDeserializationSchema. I created a new issue (which is a
> >>> clone of the old one) to track this [1]. The fix should be very simple
> >>> since it's the same issue.
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>> [1] https://issues.apache.org/jira/browse/FLINK-18478
> >>>
> >>> On 01.07.20 09:11, Till Rohrmann wrote:
> >>>> Hi Georg,
> >>>>
> >>>> I'm pulling in Aljoscha who might know more about the problem you are
> >>>> describing.
> >>>>
> >>>> Cheers,
> >>>> Till
> >>>>
> >>>> On Mon, Jun 29, 2020 at 10:21 PM Georg Heiler <
> >>> georg.kf.hei...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Older versions of flink were incompatible with the Scala specific
> >>> record
> >>>>> classes generated from AvroHugger.
> >>>>>
> >>>>> https://issues.apache.org/jira/browse/FLINK-12501 Flink 1.10
> >>> apparently
> >>>>> is fixing this. I am currently using 1.10.1. However, still
> experience
> >>> thus
> >>>>> problem
> >>>>>
> >>>
> https://stackoverflow.com/questions/62637009/flink-use-confluent-schema-registry-for-avro-serde
> >>>>> of:
> >>>>>
> >>>>> AvroRuntimeException: Not a Specific class
> >>>>>
> >>>>> What is still wrong here?
> >>>>>
> >>>>> Best,
> >>>>>
> >>>>> Georg
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >
>
>


Re: map JSON to scala case class & off-heap optimization

2020-07-11 Thread Georg Heiler
Hi,


Many thanks.
So do I understand correctly that:

1) similarly to spark the Table API works on some optimized binary
representation
2) this is only available in the SQL way of interaction - there is no
programmatic API

This leads me then to some questions:

q1) I have read somewhere (I think in some Flink Forward presentations)
that the SQL API is not necessarily stable with regards to state - even
with small changes to the DAG (due to optimization). So does this also
/still apply to the table API? (I assume yes)
q2) When I use the DataSet/Stream (classical scala/java) API it looks like
I must create a custom serializer if I want to handle one/all of:

  - side-output failing records and not simply crash the job
  - as asked before automatic serialization to a scala (case) class

q3)
So as asked before:
>>> But I also read that creating the ObjectMapper (i.e. in Jackson terms)
inside the map function is not recommended. From Spark I know that there is
a map-partitions function, i.e. something where a database connection can
be created and then reused for the individua elements. Is a similar
construct available in Flink as well?
>>> Also, I have read a lot of articles and it looks like a lot of people
are using the String serializer and then manually parse the JSON which also
seems inefficient.
Where would I find an example for some Serializer with side outputs for
failed records as well as efficient initialization using some similar
construct to map-partitions?

Best,
Georg

Am Fr., 10. Juli 2020 um 16:22 Uhr schrieb Aljoscha Krettek <
aljos...@apache.org>:

> Hi Georg,
>
> I'm afraid the other suggestions are missing the point a bit. From your
> other emails it seems you want to use Kafka with JSON records together
> with the Table API/SQL. For that, take a look at [1] which describes how
> to define data sources for the Table API. Especially the Kafka and JSON
> sections should be relevant.
>
> That first link I mentioned is for the legacy connector API. There is a
> newer API with slightly different properties which will allow us to do
> the kinds of optimization like working on binary data throughout the
> stack: [2]. Unfortunately, there is no programmatic API yet, you would
> have to use `TableEnvironment.executeSql()` to execute SQL DDL that
> defines your sources. There is a FLIP for adding the programmatic API: [3]
>
> Best,
> Aljoscha
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connect.html
>
> [2]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
>
> [3]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
>
> On 10.07.20 05:01, Aaron Levin wrote:
> > Hi Georg, you can try using the circe library for this which has a way to
> > automatically generate JSON decoders for scala case classes.
> >
> > As it was mentioned earlier, Flink does not come packaged with
> > JSON-decoding generators for Scala like spark does.
> >
> > On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler 
> > wrote:
> >
> >> Great. Thanks.
> >> But would it be possible to automate this i.e. to have this work
> >> automatically for the case class / product?
> >>
> >> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
> >> taher...@gmail.com>:
> >>
> >>> The performant way would be to apply a map function over the stream and
> >>> then use the Jackson ObjectMapper to convert to scala objects. In flink
> >>> there is no API like Spark to automatically get all fields.
> >>>
> >>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler 
> >>> wrote:
> >>>
> >>>> How can I use it with a scala case class?
> >>>> If I understand it correctly for better performance the Object Mapper
> is
> >>>> already initialized in each KafkaConsumer and returning ObjectNodes.
> So
> >>>> probably I should rephrase to: how can I then map these to case
> classes
> >>>> without handcoding it?  https://github.com/json4s/json4s or
> >>>> https://github.com/FasterXML/jackson-module-scala both only seem to
> >>>> consume strings.
> >>>>
> >>>> Best,
> >>>> Georg
> >>>>
> >>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
> >>>> taher...@gmail.com>:
> >>>>
> >>>>> You can try the Jackson ObjectMapper library and that will get you
> from
> >>>>> json to object.
> >>>>>
> >>>>> Regards,

Re: map JSON to scala case class & off-heap optimization

2020-07-16 Thread Georg Heiler
Many thanks!

Am Mi., 15. Juli 2020 um 15:58 Uhr schrieb Aljoscha Krettek <
aljos...@apache.org>:

> On 11.07.20 10:31, Georg Heiler wrote:
> > 1) similarly to spark the Table API works on some optimized binary
> > representation
> > 2) this is only available in the SQL way of interaction - there is no
> > programmatic API
>
> yes it's available from SQL, but also the Table API, which is a
> programmatic declarative API, similar to Spark's Structured Streaming.
>
>
> > q1) I have read somewhere (I think in some Flink Forward presentations)
> > that the SQL API is not necessarily stable with regards to state - even
> > with small changes to the DAG (due to optimization). So does this also
> > /still apply to the table API? (I assume yes)
>
> Yes, unfortunately this is correct. Because the Table API/SQL is
> declarative users don't have control over the DAG and the state that the
> operators have. Some work will happen on at least making sure that the
> optimizer stays stable between Flink versions or that we can let users
> pin a certain physical graph of a query so that it can be re-used across
> versions.
>
> > q2) When I use the DataSet/Stream (classical scala/java) API it looks
> like
> > I must create a custom serializer if I want to handle one/all of:
> >
> >- side-output failing records and not simply crash the job
> >- as asked before automatic serialization to a scala (case) class
>
> This is true, yes.
>
> > But I also read that creating the ObjectMapper (i.e. in Jackson terms)
> > inside the map function is not recommended. From Spark I know that there
> is
> > a map-partitions function, i.e. something where a database connection can
> > be created and then reused for the individua elements. Is a similar
> > construct available in Flink as well?
>
> Yes, for this you can use "rich functions", which have an open()/close()
> method that allows initializing and re-using resources across
> invocations:
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#rich-functions
>
> > Also, I have read a lot of articles and it looks like a lot of people
> > are using the String serializer and then manually parse the JSON which
> also
> > seems inefficient.
> > Where would I find an example for some Serializer with side outputs for
> > failed records as well as efficient initialization using some similar
> > construct to map-partitions?
>
> I'm not aware of such examples, unfortunately.
>
> I hope that at least some answers will be helpful!
>
> Best,
> Aljoscha
>


GenericData cannot be cast to type scala.Product

2020-07-23 Thread Georg Heiler
Hi,

as a follow up to https://issues.apache.org/jira/browse/FLINK-18478 I now
face a class cast exception.
The reproducible example is available at
https://gist.github.com/geoHeil/5a5a4ae0ca2a8049617afa91acf40f89

I do not understand (yet) why such a simple example of reading Avro from a
Schema Registry and Kafka (in the scala API) is still causing problems.

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to scala.Product

ava.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to scala.Product
at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(
CaseClassSerializer.scala:32) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:52) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.CountingOutput.collect(
CountingOutput.java:30) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:
104) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
StreamSourceContexts.java:111) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
.emitRecordsWithTimestamps(AbstractFetcher.java:352)
~[flink-connector-kafka-base_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher
.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
~[flink-connector-kafka_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher
.runFetchLoop(KafkaFetcher.java:141) ~[flink-connector-kafka_2.11-1.11.1
.jar:1.11.1]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
.run(FlinkKafkaConsumerBase.java:755) ~[flink-connector-kafka-base_2.11-1.11
.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:100) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:63) ~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.1.jar:1.11.1]

Best,
Georg


arbitrary state handling in python api

2020-09-08 Thread Georg Heiler
Hi,

does the python API expose some kind of mapGroupsWithState operator which
can be applied on a window to handle arbitrary state?

I would want to perform time-series anomaly detection using a streaming
implementation of the matrix profile algorithm using
https://github.com/TDAmeritrade/stumpy There would be multiple instances of
matrix profile, one for each group.

Best,
Georg


Re: arbitrary state handling in python api

2020-09-11 Thread Georg Heiler
Many thanks.
This is great to hear.

Yes! This looks great.

Many Thanks!

Best,
Georg

Am Do., 10. Sept. 2020 um 23:53 Uhr schrieb Dian Fu :

> Hi Georg,
>
> It still doesn't support state access in Python API in the latest version
> 1.11.
>
> Could you take a look at if KeyedProcessFunction could meet your
> requirements? We are planning to support it in Python DataStream API in
> 1.12.
>
> Regards,
> Dian
>
> 在 2020年9月9日,下午2:28,Georg Heiler  写道:
>
> Hi,
>
> does the python API expose some kind of mapGroupsWithState operator which
> can be applied on a window to handle arbitrary state?
>
> I would want to perform time-series anomaly detection using a streaming
> implementation of the matrix profile algorithm using
> https://github.com/TDAmeritrade/stumpy There would be multiple instances
> of matrix profile, one for each group.
>
> Best,
> Georg
>
>
>


getting started with link / scala

2017-11-29 Thread Georg Heiler
Getting started with Flink / scala, I wonder whether the scala base library
should be excluded as a best practice:
https://github.com/tillrohrmann/flink-project/blob/master/build.sbt#L32
// exclude Scala library from assembly
assemblyOption in assembly := (assemblyOption in
assembly).value.copy(includeScala = false)

Also I would like to know if https://github.com/tillrohrmann/flink-project is
the most up to date getting started with flink-scala sample project you
would recommend.

Best,
Georg


Re: getting started with link / scala

2017-11-29 Thread Georg Heiler
Thanks, this sounds like a good idea - can you recommend such a project?

Jörn Franke  schrieb am Mi., 29. Nov. 2017 um
22:30 Uhr:

> If you want to really learn then I recommend you to start with a flink
> project that contains unit tests and integration tests (maybe augmented
> with https://wiki.apache.org/hadoop/HowToDevelopUnitTests to simulate a
> HDFS cluster during unit tests). It should also include coverage reporting.
> These aspects are equally crucial to know for developers to develop high
> quality big data applications and virtually all companies will require that
> you know these things.
>
> I am not sure if a hello world project in Flink exists containing all
> these but it would be a good learning task to create such a thing.
>
> On 29. Nov 2017, at 22:03, Georg Heiler  wrote:
>
> Getting started with Flink / scala, I wonder whether the scala base
> library should be excluded as a best practice:
> https://github.com/tillrohrmann/flink-project/blob/master/build.sbt#L32
> // exclude Scala library from assembly
> assemblyOption in assembly := (assemblyOption in
> assembly).value.copy(includeScala = false)
>
> Also I would like to know if https://github.com/tillrohrmann/flink-project is
> the most up to date getting started with flink-scala sample project you
> would recommend.
>
> Best,
> Georg
>
>


Re: getting started with link / scala

2017-11-29 Thread Georg Heiler
You would suggest: https://github.com/ottogroup/flink-spector for unit
tests?

Georg Heiler  schrieb am Mi., 29. Nov. 2017 um
22:33 Uhr:

> Thanks, this sounds like a good idea - can you recommend such a project?
>
> Jörn Franke  schrieb am Mi., 29. Nov. 2017 um
> 22:30 Uhr:
>
>> If you want to really learn then I recommend you to start with a flink
>> project that contains unit tests and integration tests (maybe augmented
>> with https://wiki.apache.org/hadoop/HowToDevelopUnitTests to simulate a
>> HDFS cluster during unit tests). It should also include coverage reporting.
>> These aspects are equally crucial to know for developers to develop high
>> quality big data applications and virtually all companies will require that
>> you know these things.
>>
>> I am not sure if a hello world project in Flink exists containing all
>> these but it would be a good learning task to create such a thing.
>>
>> On 29. Nov 2017, at 22:03, Georg Heiler 
>> wrote:
>>
>> Getting started with Flink / scala, I wonder whether the scala base
>> library should be excluded as a best practice:
>> https://github.com/tillrohrmann/flink-project/blob/master/build.sbt#L32
>> // exclude Scala library from assembly
>> assemblyOption in assembly := (assemblyOption in
>> assembly).value.copy(includeScala = false)
>>
>> Also I would like to know if
>> https://github.com/tillrohrmann/flink-project is the most up to date
>> getting started with flink-scala sample project you would recommend.
>>
>> Best,
>> Georg
>>
>>


Flik typesafe configuration

2017-11-29 Thread Georg Heiler
Starting out with flint from a scala background I would like to use the
Typesafe configuration like: https://github.com/pureconfig/pureconfig,
however,
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html
link
recommends to setup:

env.getConfig().setGlobalJobParameters(parameters);

which is not fully compatible with a case class - what is the recommended
approach here?

Best,
Georg


flink local & interactive development

2017-11-30 Thread Georg Heiler
Is interactive development possible with fink like with spark in a REPL?

When trying to use the console mode of SBT I get the following error:

java.lang.Exception: Deserializing the OutputFormat
(org.apache.flink.api.java.Utils$CollectHelper@210d5aa7) failed: Could not
read the user code wrapper

for the word count example - even though a sbt run works just fine.

Here the code: https://github.com/geoHeil/flinkGraphs maybe it is a class
path problem with sbt provided scopes?

Best, Georg


Re: flink local & interactive development

2017-12-01 Thread Georg Heiler
That is basically the same thing, only that sbt will start a default scala
shell but put my personal code directly on the class path and requiring to
manually start flink.

However, this doesn't fully seem to work.
Fabian Hueske  schrieb am Fr. 1. Dez. 2017 um 10:11:

> Hi Georg,
>
> I have no experience with SBT's console mode, so I cannot comment on that,
> but Flink provides a Scala REPL that might be useful [1].
>
> Best, Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/scala_shell.html
>
> 2017-11-30 23:09 GMT+01:00 Georg Heiler :
>
>> Is interactive development possible with fink like with spark in a REPL?
>>
>> When trying to use the console mode of SBT I get the following error:
>>
>> java.lang.Exception: Deserializing the OutputFormat
>> (org.apache.flink.api.java.Utils$CollectHelper@210d5aa7) failed: Could
>> not read the user code wrapper
>>
>> for the word count example - even though a sbt run works just fine.
>>
>> Here the code: https://github.com/geoHeil/flinkGraphs maybe it is a
>> class path problem with sbt provided scopes?
>>
>> Best, Georg
>>
>
>


Re: using a Yarn cluster for both Spark and Flink

2018-01-17 Thread Georg Heiler
Why not? Isn't a resource manager meant for this?
You should however clearly define service level agreements as a flink
streaming job might require certain maximum latency opposed to a spark
batch job.
Soheil Pourbafrani  schrieb am Do. 18. Jan. 2018 um
08:30:

> Is it standard approach to set up a Yarn cluster for running both Spark
> and Flink applications?
>


flink testing

2017-04-22 Thread Georg Heiler
Hi,

is there something like spark-testing-base for flink as well?

Cheers,
Georg


Re: flink testing

2017-04-23 Thread Georg Heiler
Spark testing base https://github.com/holdenk/spark-testing-base offers
some Base classes to use when writing tests with Spark which make it
simpler to write unit tests for functions in spark i.e. you do not manually
need to instantiate a spark context / flink execution environment for each
test case.

You've written an awesome program in Spark and now its time to write some
tests. Only you find yourself writing the code to setup and tear down local
mode Spark in between each suite and you say to your self: This is not my
beautiful code.

<https://github.com/holdenk/spark-testing-base#how>

Ted Yu  schrieb am So., 23. Apr. 2017 um 10:46 Uhr:

> Please give more context by describing what spark-test-base does :-)
>
> > On Apr 22, 2017, at 10:57 PM, Georg Heiler 
> wrote:
> >
> > Hi,
> >
> > is there something like spark-testing-base for flink as well?
> >
> > Cheers,
> > Georg
>


Flink first project

2017-04-23 Thread Georg Heiler
New to flink I would like to do a small project to get a better feeling for
flink. I am thinking of getting some stats from several REST api (i.e.
Bitcoin course values from different exchanges) and comparing prices over
different exchanges in real time.

Are there already some REST api sources for flink as a sample to get
started implementing a custom REST source?

I was thinking about using https://github.com/timmolter/XChange to connect
to several exchanges. E.g. to make a single api call by hand would look
similar to

val currencyPair = new CurrencyPair(Currency.XMR, Currency.BTC)
  CertHelper.trustAllCerts()
  val poloniex =
ExchangeFactory.INSTANCE.createExchange(classOf[PoloniexExchange].getName)
  val dataService = poloniex.getMarketDataService

  generic(dataService)
  raw(dataService.asInstanceOf[PoloniexMarketDataServiceRaw])
System.out.println(dataService.getTicker(currencyPair))

How would be a proper way to make this available as a flink source? I have
seen
https://github.com/apache/flink/blob/master/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
but
new to flink am a bit unsure how to proceed.

Regards,
Georg


Re: Flink first project

2017-04-24 Thread Georg Heiler
So you would suggest flume over a custom akka-source from bahir?

Jörn Franke  schrieb am So., 23. Apr. 2017 um
18:59 Uhr:

> I would use flume to import these sources to HDFS and then use flink or
> Hadoop or whatever to process them. While it is possible to do it in flink,
> you do not want that your processing fails because the web service is not
> available etc.
> Via flume which is suitable for this kind of tasks it is more controlled
> and reliable.
>
> On 23. Apr 2017, at 18:02, Georg Heiler  wrote:
>
> New to flink I would like to do a small project to get a better feeling
> for flink. I am thinking of getting some stats from several REST api (i.e.
> Bitcoin course values from different exchanges) and comparing prices over
> different exchanges in real time.
>
> Are there already some REST api sources for flink as a sample to get
> started implementing a custom REST source?
>
> I was thinking about using https://github.com/timmolter/XChange to
> connect to several exchanges. E.g. to make a single api call by hand would
> look similar to
>
> val currencyPair = new CurrencyPair(Currency.XMR, Currency.BTC)
>   CertHelper.trustAllCerts()
>   val poloniex =
> ExchangeFactory.INSTANCE.createExchange(classOf[PoloniexExchange].getName)
>   val dataService = poloniex.getMarketDataService
>
>   generic(dataService)
>   raw(dataService.asInstanceOf[PoloniexMarketDataServiceRaw])
> System.out.println(dataService.getTicker(currencyPair))
>
> How would be a proper way to make this available as a flink source? I have
> seen
> https://github.com/apache/flink/blob/master/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
>  but
> new to flink am a bit unsure how to proceed.
>
> Regards,
> Georg
>
>


Re: Flink first project

2017-04-24 Thread Georg Heiler
Wouldn't adding flume -> Kafka -> flink also introduce additional latency?

Georg Heiler  schrieb am So., 23. Apr. 2017 um
20:23 Uhr:

> So you would suggest flume over a custom akka-source from bahir?
>
> Jörn Franke  schrieb am So., 23. Apr. 2017 um
> 18:59 Uhr:
>
>> I would use flume to import these sources to HDFS and then use flink or
>> Hadoop or whatever to process them. While it is possible to do it in flink,
>> you do not want that your processing fails because the web service is not
>> available etc.
>> Via flume which is suitable for this kind of tasks it is more controlled
>> and reliable.
>>
>> On 23. Apr 2017, at 18:02, Georg Heiler 
>> wrote:
>>
>> New to flink I would like to do a small project to get a better feeling
>> for flink. I am thinking of getting some stats from several REST api (i.e.
>> Bitcoin course values from different exchanges) and comparing prices over
>> different exchanges in real time.
>>
>> Are there already some REST api sources for flink as a sample to get
>> started implementing a custom REST source?
>>
>> I was thinking about using https://github.com/timmolter/XChange to
>> connect to several exchanges. E.g. to make a single api call by hand would
>> look similar to
>>
>> val currencyPair = new CurrencyPair(Currency.XMR, Currency.BTC)
>>   CertHelper.trustAllCerts()
>>   val poloniex =
>> ExchangeFactory.INSTANCE.createExchange(classOf[PoloniexExchange].getName)
>>   val dataService = poloniex.getMarketDataService
>>
>>   generic(dataService)
>>   raw(dataService.asInstanceOf[PoloniexMarketDataServiceRaw])
>> System.out.println(dataService.getTicker(currencyPair))
>>
>> How would be a proper way to make this available as a flink source? I
>> have seen
>> https://github.com/apache/flink/blob/master/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
>>  but
>> new to flink am a bit unsure how to proceed.
>>
>> Regards,
>> Georg
>>
>>


Re: flink testing

2017-04-24 Thread Georg Heiler
Thanks
Konstantin Knauf  schrieb am So. 23. Apr.
2017 um 19:39:

> + user@flink (somehow I didn't reply to the list)
>
> Hi Georg,
>
> there are two classes "MultipleProgramsTestBase" and
> "StreamingMultipeProgramsTestBase", which your integration tests can
> extend from.
>
> This will spin up a local Flink Mini-Cluster before the test execution
> and tear it down afterwards.
>
> I have also recently build a small Flink-JUnit rule available on Github
> to make this a little bit easier and  more flexible [1]. The snapshot
> version should be available in the sonatype open source repositories in
> the next days. Let me know, if it seems useful to you.
>
> This is for integration tests of our jobs or parts of the jobs though.
> For unit tests we usually mock the RuntimeContext.
>
> Cheers,
>
> Konstantin
>
> [1] https://github.com/knaufk/flink-junit
>
> On 23.04.2017 17:19, Georg Heiler wrote:
> > Spark testing base https://github.com/holdenk/spark-testing-base offers
> > some Base classes to use when writing tests with Spark which make it
> > simpler to write unit tests for functions in spark i.e. you do not
> > manually need to instantiate a spark context / flink execution
> > environment for each test case.
> >
> > You've written an awesome program in Spark and now its time to write
> > some tests. Only you find yourself writing the code to setup and tear
> > down local mode Spark in between each suite and you say to your self:
> > This is not my beautiful code.
> >
> >
> > <https://github.com/holdenk/spark-testing-base#how>
> >
> >
> > Ted Yu mailto:yuzhih...@gmail.com>> schrieb am
> > So., 23. Apr. 2017 um 10:46 Uhr:
> >
> > Please give more context by describing what spark-test-base does :-)
> >
> > > On Apr 22, 2017, at 10:57 PM, Georg Heiler
> > mailto:georg.kf.hei...@gmail.com>>
> wrote:
> > >
> > > Hi,
> > >
> > > is there something like spark-testing-base for flink as well?
> > >
> > > Cheers,
> > > Georg
> >
>
> --
> Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>


Re: Flink first project

2017-04-27 Thread Georg Heiler
Thanks for the overview. I think I will use akka streams and pipe the
result to kafka, then move on with flink.
Tzu-Li (Gordon) Tai  schrieb am Do. 27. Apr. 2017 um
18:37:

> Hi Georg,
>
> Simply from the aspect of a Flink source that listens to a REST endpoint
> for input data, there should be quite a variety of options to do that. The
> Akka streaming source from Bahir should also serve this purpose well. It
> would also be quite straightforward to implement one yourself.
>
> On the other hand, what Jörn was suggesting was that you would want to
> first persist the incoming data from the REST endpoint to a repayable
> storage / queue, and your Flink job reads from that replayable storage /
> queue.
> The reason for this is that Flink’s checkpointing mechanism for
> exactly-once guarantee relies on a replayable source (see [1]), and since a
> REST endpoint is not replayable, you’ll not be able to benefit from the
> fault-tolerance guarantees provided by Flink. The most popular source used
> with Flink for exactly-once, currently, is Kafka [2]. The only extra
> latency compared to just fetching REST endpoint, in this setup, is writing
> to the intermediate Kafka topic.
>
> Of course, if you’re just testing around and just getting to know Flink,
> this setup isn’t necessary.
> You can just start off with a source such as the Flink Akka connector in
> Bahir, and start writing your first Flink job right away :)
>
> Cheers,
> Gordon
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/guarantees.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html
>
> On 24 April 2017 at 4:02:14 PM, Georg Heiler (georg.kf.hei...@gmail.com)
> wrote:
>
> Wouldn't adding flume -> Kafka -> flink also introduce additional latency?
>
> Georg Heiler  schrieb am So., 23. Apr. 2017 um
> 20:23 Uhr:
>
>> So you would suggest flume over a custom akka-source from bahir?
>>
>> Jörn Franke  schrieb am So., 23. Apr. 2017 um
>> 18:59 Uhr:
>>
>>> I would use flume to import these sources to HDFS and then use flink or
>>> Hadoop or whatever to process them. While it is possible to do it in flink,
>>> you do not want that your processing fails because the web service is not
>>> available etc.
>>> Via flume which is suitable for this kind of tasks it is more controlled
>>> and reliable.
>>>
>>> On 23. Apr 2017, at 18:02, Georg Heiler 
>>> wrote:
>>>
>>> New to flink I would like to do a small project to get a better feeling
>>> for flink. I am thinking of getting some stats from several REST api (i.e.
>>> Bitcoin course values from different exchanges) and comparing prices over
>>> different exchanges in real time.
>>>
>>> Are there already some REST api sources for flink as a sample to get
>>> started implementing a custom REST source?
>>>
>>> I was thinking about using https://github.com/timmolter/XChange to
>>> connect to several exchanges. E.g. to make a single api call by hand would
>>> look similar to
>>>
>>> val currencyPair = new CurrencyPair(Currency.XMR, Currency.BTC)
>>>   CertHelper.trustAllCerts()
>>>   val poloniex =
>>> ExchangeFactory.INSTANCE.createExchange(classOf[PoloniexExchange].getName)
>>>   val dataService = poloniex.getMarketDataService
>>>
>>>   generic(dataService)
>>>   raw(dataService.asInstanceOf[PoloniexMarketDataServiceRaw])
>>> System.out.println(dataService.getTicker(currencyPair))
>>>
>>> How would be a proper way to make this available as a flink source? I
>>> have seen
>>> https://github.com/apache/flink/blob/master/flink-contrib/flink-connector-wikiedits/src/main/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSource.java
>>>  but
>>> new to flink am a bit unsure how to proceed.
>>>
>>> Regards,
>>> Georg
>>>
>>>


multiple users per flink deployment

2017-08-01 Thread Georg Heiler
Hi,

flink currently only seems to support a single kerberos ticket for
deployment. Are there plans to support different users per each job?

regards,
Georg


Re: multiple users per flink deployment

2017-08-02 Thread Georg Heiler
Thanks for the overview.
Currently a single flink cluster seems to run all tasks with the same user.
I would want to be able to run each flink job as a separate user instead.

The update for separate read/write users is nice though.
Tzu-Li (Gordon) Tai  schrieb am Mi. 2. Aug. 2017 um
10:59:

> Hi,
>
> There’s been quite a few requests on this recently on the mailing lists
> and also mentioned by some users offline, so I think we may need to start
> with plans to probably support this.
> I’m CC’ing Eron to this thread to see if he has any thoughts on this, as
> he was among the first authors driving the Kerberos support in Flink.
> I’m not really sure if such a feature support makes sense, given that all
> jobs of a single Flink deployment have full privileges and therefore no
> isolation in between.
>
> Related question: what external service are you trying to authenticate to
> with different users?
> If it is Kafka and perhaps you have different users for the consumer /
> producer, that will be very soon available in 1.3.2, which includes a
> version bump to Kafka 0.10 that allows multiple independent users within
> the same JVM through dynamic JAAS configuration.
> See this mail thread [1] for more detail on that.
>
> Cheers,
> Gordon
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-0-10-jaas-multiple-clients-td12831.html#a13317
>
> On 1 August 2017 at 6:16:08 PM, Georg Heiler (georg.kf.hei...@gmail.com)
> wrote:
>
> Hi,
>
> flink currently only seems to support a single kerberos ticket for
> deployment. Are there plans to support different users per each job?
>
> regards,
> Georg
>
>


queryable state vs. writing result back to Kafka

2017-08-05 Thread Georg Heiler
What is the advantage of queryable state compared to writing the result
back to Kafka?

regards,
Georg


Re: Pyflink/Flink Java parquet streaming file sink for a dynamic schema stream

2021-12-02 Thread Georg Heiler
Do the JSONs have the same schema overall? Or is each potentially
structured differently?

Best,
Georg

Am Fr., 3. Dez. 2021 um 00:12 Uhr schrieb Kamil ty :

> Hello,
>
> I'm wondering if there is a possibility to create a parquet streaming file
> sink in Pyflink (in Table API) or in Java Flink (in Datastream api).
>
> To give an example of the expected behaviour. Each element of the stream
> is going to contain a json string. I want to save this stream to parquet
> files without having to explicitly define the schema/types of the messages
> (also using a single sink).
>
> If this is possible, (might be in Java Flink using a custom
> ParquetBulkWriterFactory etc.) any direction for the implementation would
> be appreciated.
>
> Best regards
> Kamil
>


Re: Pyflink/Flink Java parquet streaming file sink for a dynamic schema stream

2021-12-03 Thread Georg Heiler
Hi,

the schema of the after part depends on each table i.e. holds different
columns for each table.
So do you receive debezium changelog statements for all/ >1 table? I.e. is
the schema in the after part different?

Best,
Georg

Am Fr., 3. Dez. 2021 um 08:35 Uhr schrieb Kamil ty :

> Yes the general JSON schema should follow a debezium JSON schema. The
> fields that need to be saved to the parquet file are in the "after" key.
>
> On Fri, 3 Dec 2021, 07:10 Georg Heiler,  wrote:
>
>> Do the JSONs have the same schema overall? Or is each potentially
>> structured differently?
>>
>> Best,
>> Georg
>>
>> Am Fr., 3. Dez. 2021 um 00:12 Uhr schrieb Kamil ty :
>>
>>> Hello,
>>>
>>> I'm wondering if there is a possibility to create a parquet streaming
>>> file sink in Pyflink (in Table API) or in Java Flink (in Datastream api).
>>>
>>> To give an example of the expected behaviour. Each element of the stream
>>> is going to contain a json string. I want to save this stream to parquet
>>> files without having to explicitly define the schema/types of the messages
>>> (also using a single sink).
>>>
>>> If this is possible, (might be in Java Flink using a custom
>>> ParquetBulkWriterFactory etc.) any direction for the implementation would
>>> be appreciated.
>>>
>>> Best regards
>>> Kamil
>>>
>>


scala shell not part of 1.14.4 download

2022-03-18 Thread Georg Heiler
Hi,

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/repls/scala_shell/
mentions:

bin/start-scala-shell.sh local

a script to start a scala REPL shell.

But the download for Flink
https://www.apache.org/dyn/closer.lua/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz

Does not seem to include this script anymore.

Am I missing something?

How can I still start a scala repl?

Best,

Georg


Re: scala shell not part of 1.14.4 download

2022-03-20 Thread Georg Heiler
Many thanks.
I will try your suggestions in the coming days.
Why is support for the scala-shell dropped in 2.12? Other projects i.e.
spark also managed to keep a spark-shell (REPL like flink's current
scala-shell) working for the 2.12 release.

Best,
Georg

Am Fr., 18. März 2022 um 13:33 Uhr schrieb Chesnay Schepler <
ches...@apache.org>:

> The Scala Shell only works with Scala 2.11. You will need to use the Scala
> 2.11 Flink distribution.
>
> On 18/03/2022 12:42, Georg Heiler wrote:
>
> Hi,
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/repls/scala_shell/
> mentions:
>
> bin/start-scala-shell.sh local
>
>
> a script to start a scala REPL shell.
>
>
> But the download for Flink 
> https://www.apache.org/dyn/closer.lua/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
>
> Does not seem to include this script anymore.
>
>
> Am I missing something?
>
> How can I still start a scala repl?
>
> Best,
>
> Georg
>
>
>


Re: scala shell not part of 1.14.4 download

2022-03-22 Thread Georg Heiler
Many thanks.

In the linked discussion it sounds like a move - not a delete. However, no
destination is named. Is there currently any moved version of the scala
shell available elsewhere?

Best,
Georg

Am Mo., 21. März 2022 um 09:26 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi Georg,
>
> You can check out the discussion thread for the motivation [1].
>
> Best regards,
>
> Martijn Visser
> https://twitter.com/MartijnVisser82
>
> [1] https://lists.apache.org/thread/pojsrrdckjwow5186nd7hn9y5j9t29ov
>
> On Sun, 20 Mar 2022 at 08:13, Georg Heiler 
> wrote:
>
>> Many thanks.
>> I will try your suggestions in the coming days.
>> Why is support for the scala-shell dropped in 2.12? Other projects i.e.
>> spark also managed to keep a spark-shell (REPL like flink's current
>> scala-shell) working for the 2.12 release.
>>
>> Best,
>> Georg
>>
>> Am Fr., 18. März 2022 um 13:33 Uhr schrieb Chesnay Schepler <
>> ches...@apache.org>:
>>
>>> The Scala Shell only works with Scala 2.11. You will need to use the
>>> Scala 2.11 Flink distribution.
>>>
>>> On 18/03/2022 12:42, Georg Heiler wrote:
>>>
>>> Hi,
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/repls/scala_shell/
>>> mentions:
>>>
>>> bin/start-scala-shell.sh local
>>>
>>>
>>> a script to start a scala REPL shell.
>>>
>>>
>>> But the download for Flink 
>>> https://www.apache.org/dyn/closer.lua/flink/flink-1.14.4/flink-1.14.4-bin-scala_2.12.tgz
>>>
>>> Does not seem to include this script anymore.
>>>
>>>
>>> Am I missing something?
>>>
>>> How can I still start a scala repl?
>>>
>>> Best,
>>>
>>> Georg
>>>
>>>
>>>


flink SQL client with kafka confluent avro binaries setup

2022-03-23 Thread Georg Heiler
Hi,

When trying to set up a demo for the kafka-sql-client reading an Avro topic
from Kafka I run into problems with regards to the additional dependencies.
In the spark-shell there is a --packages option which automatically
resolves any additional required jars (transitively) using the provided
maven coordinates. So far, I could not find this function for flink. Am
I missing something?

When now instead of trying to set this up manually I first get the
additional jars (for flink 1.14.1 scala 2.12) which are mentioned here:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/
and
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/avro-confluent/

wget
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.12/1.14.4/flink-connector-kafka_2.12-1.14.4.jar
-P lib/
wget
https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.0.0/kafka-clients-3.0.0.jar
-P lib/
wget
https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.14.4/flink-avro-confluent-registry-1.14.4.jar
-P lib/
wget
https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.14.4/flink-avro-1.14.4.jar
-P lib/

I still fail to get them loaded (even though they are put into the default
lib path.
When starting a local cluster:

./bin/start-cluster.sh local

and the SQL client:

./bin/sql-client.sh

Any option:
./bin/sql-client.sh -j or ./bin/sql-client.sh -l (with the path to the lib
folder or the additional jars wich were added before) all fails with the
same reason:

Caused by: java.lang.ClassNotFoundException:
org.apache.avro.SchemaParseException

when trying to execute:
CREATE TABLE foo (foo string) WITH (
'connector' = 'kafka',
'topic' = 'foo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://localhost:8081/',
'properties.group.id' = 'flink-test-001',
'properties.bootstrap.servers' = 'localhost:9092'
);
SELECT * FROM foo;

(irrespective of any dummy data loaded) Though I have some dummy data in
the following structure available using the Kafka Connect dummy data
generator for the following Avro schema (and serialized using Avro into
Kafkas topic):

{
  "type": "record",
  "name": "commercialrating",
  "fields": [
{
  "name": "brand",
  "type": {
"type": "string",
"arg.properties": {
  "options": ["Acme", "Globex"]
}
  }
},
{
  "name": "duration",
  "type": {
"type": "int",
"arg.properties": {
  "options": [30, 45, 60]
}
  }
},
{
  "name": "rating",
  "type": {
"type": "int",
"arg.properties": {
  "range": { "min": 1, "max": 5 }
}
  }
}
  ]
}


*Questions:*

*1) can I somehow specify maven coordinates directly? (for the naive method
of using the SQL client like in the spark-shell) to simplify the setup of
the required jars?*


*2) given the fact that I manually have downloaded the jars into the lib
folder of the flink installation - why are they not loaded by default? What
needs to change so the additional (required) jars for Avro +
confluent-schema-registry + Kafka are loaded by the flink SQL client?*

Best,
Georg


Re: flink SQL client with kafka confluent avro binaries setup

2022-03-24 Thread Georg Heiler
Hi,

the solution is the following:

wget
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.14.4/flink-sql-connector-kafka_2.12-1.14.4.jar
-P lib/
wget
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.14.4/flink-sql-avro-confluent-registry-1.14.4.jar
-P lib/

instead of manually specifying some jars where transitive dependencies need
to be added additionally. Flink seems to publish flink-sql-** jars which
are fat jars including any required transitive dependencies.

Perhaps this part of the documentation should/could be improved on the
website so this is clearer for new Flink users.

Best,
Georg

Am Do., 24. März 2022 um 07:27 Uhr schrieb Biao Geng :

> Hi Georg,
>
> I recently made a demo using flink sql client + schema registry + kafka as
> well to test my own schema registry catalog. To help us locate the root
> cause, I think you  can add  "SET 'sql-client.verbose' = 'true';" in your
> launched sql client to enable the sql client output more information or go
> to check the sql client logs under "/log" dir. "Caused by:
> java.lang.ClassNotFoundException: org.apache.avro.SchemaParseException" may
> not be the root cause as you have already tried to use -j option to specify
> flink-avro jar.
> For me, the root cause is that I miss the jar dependencies of jackson
> jars. I attach my command for starting sql client here and hope it can give
> you some hints:
>
> sql-client.sh -j  jackson-core-*.jar -j  jackson-annotations-*.jar -j
> jackson-databind-*.jar -j ververica-connector-kafka-*.jar -j
> flink-avro-*-jar-with-dependencies.jar
>
> Note, you may need to substitute ververica-connector-kafka with open
> sourced flink-sql-connector-kafka jar.
>
> For your questions, to my best knowledge, '-j' and '-l' options are the
> only options for now. Maybe others in the community can provide more
> information.
>
> Best,
> Biao Geng
>
>
> Georg Heiler  于2022年3月23日周三 23:59写道:
>
>> Hi,
>>
>> When trying to set up a demo for the kafka-sql-client reading an Avro
>> topic from Kafka I run into problems with regards to the additional
>> dependencies.
>> In the spark-shell there is a --packages option which automatically
>> resolves any additional required jars (transitively) using the provided
>> maven coordinates. So far, I could not find this function for flink. Am
>> I missing something?
>>
>> When now instead of trying to set this up manually I first get the
>> additional jars (for flink 1.14.1 scala 2.12) which are mentioned here:
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/
>> and
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/avro-confluent/
>>
>> wget
>> https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.12/1.14.4/flink-connector-kafka_2.12-1.14.4.jar
>> -P lib/
>> wget
>> https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.0.0/kafka-clients-3.0.0.jar
>> -P lib/
>> wget
>> https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.14.4/flink-avro-confluent-registry-1.14.4.jar
>> -P lib/
>> wget
>> https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.14.4/flink-avro-1.14.4.jar
>> -P lib/
>>
>> I still fail to get them loaded (even though they are put into the
>> default lib path.
>> When starting a local cluster:
>>
>> ./bin/start-cluster.sh local
>>
>> and the SQL client:
>>
>> ./bin/sql-client.sh
>>
>> Any option:
>> ./bin/sql-client.sh -j or ./bin/sql-client.sh -l (with the path to the
>> lib folder or the additional jars wich were added before) all fails with
>> the same reason:
>>
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.avro.SchemaParseException
>>
>> when trying to execute:
>> CREATE TABLE foo (foo string) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'foo',
>> 'scan.startup.mode' = 'earliest-offset',
>> 'format' = 'avro-confluent',
>> 'avro-confluent.schema-registry.url' = 'http://localhost:8081/',
>> 'properties.group.id' = 'flink-test-001',
>> 'properties.bootstrap.servers' = 'localhost:9092'
>> );
>> SELECT * FROM foo;
>>
>> (irrespective of any dummy data loaded) Though I have some dummy data in
>> the following structure available using the K

SQL Client Kafka (UPSERT?) Sink for confluent-avro

2022-03-24 Thread Georg Heiler
Hi,

how can I get Flinks SQL client to nicely sink some data to either the
regular kafka or the kafka-upsert connector?

I have a table/ topic with dummy data:
CREATE TABLE metrics_brand_stream (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
brand string,
duration int,
rating int

) WITH (
'connector' = 'kafka',
'topic' = 'commercials_avro',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://localhost:8081/',
'properties.group.id' = 'flink-test-001',
'properties.bootstrap.servers' = 'localhost:9092'
);

And the following aggregation:

SELECT brand,
 COUNT(*) AS cnt,
 AVG(duration) AS  duration_mean,
 AVG(rating) AS rating_mean
  FROM metrics_brand_stream
  GROUP BY brand;

When trying to define an output table:

CREATE TABLE metrics_per_brand (
brand string,
cnt BIGINT,
duration_mean DOUBLE,
rating_mean DOUBLE

) WITH (
'connector' = 'upsert-kafka',
'topic' = 'metrics_per_brand',
'avro-confluent.schema-registry.url' = 'http://localhost:8081/',
'properties.group.id' = 'flink-test-001',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'avro-confluent',
'value.format' = 'avro-confluent'
);

And trying to INSERT some result data:

INSERT INTO metrics_per_brand
  SELECT brand,
 COUNT(*) AS cnt,
 AVG(duration) AS  duration_mean,
 AVG(rating) AS rating_mean
  FROM metrics_brand_stream
  GROUP BY brand;

The query fails with:

org.apache.flink.table.api.ValidationException: One or more required
options are missing.

Missing required options are:

url

But neither:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/
nor
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
nor
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
seems to list the right configuration (or I am misreading the
documentation).


How can I sink data to kafka after some arbitrary computation using the
flink-sql client using either the kafka or upsert-kafka connector where the
input is AVRO with a schema from the confluent schema registry and the
output should store its schema there as well (and serialize using AVRO).


Best,
Georg


Flink SQL AVG with mandatory type casting

2022-03-24 Thread Georg Heiler
Hi,

I observe strange behavior in Flink SQL:
For an input stream:

CREATE TABLE input_stream (
duration int,
rating int

) WITH (
'connector' = 'kafka',
'topic' = 't',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://localhost:8081/',
'properties.group.id' = 'flink-test-001',
'properties.bootstrap.servers' = 'localhost:9092'
);

The following SQL:
SELECT AVG(duration) AS  duration_mean, AVG(CAST(rating AS DOUBLE)) AS
rating_mean FROM input_stream;

returns:

duration_meanrating_mean
45  2.503373819163293

I.e. duration_mean is truncated to an INT!

Any other database system I know by default outputs a DOUBLE type for any
input (including INT) and does not truncate it.

Why does Flink decide to truncate here? Why is a manual type cast necessary?

Best,
Georg


DBT-flink profile?

2022-03-24 Thread Georg Heiler
Hi,

is anyone working on a DBT Flink plugin/profile?

https://docs.getdbt.com/reference/profiles.yml hosts many other databases -
and I think this kind of support would be really beneficial for the SQL
part of Flink.

Best,
Georg


Re: DBT-flink profile?

2022-03-25 Thread Georg Heiler
Hi,

use is perhaps not the right word (yet) rather experiment. But both would
be relevant. And in particular, also the streaming option.

I also just found: https://materialize.com/docs/guides/dbt/ outlining how
dbt and streaming could potentially be married. Perhaps their integration
could serve as an example?

Best,
Georg

Am Fr., 25. März 2022 um 05:01 Uhr schrieb Yun Gao :

> Hi Georg,
>
> May I have a double confirmation for integrating with dbt,
> are you currenty want to use it for batch jobs or streaming jobs?
>
> Best,
> Yun Gao
>
>
>
> --
> Sender:Georg Heiler
> Date:2022/03/25 01:27:26
> Recipient:user
> Theme:DBT-flink profile?
>
> Hi,
>
> is anyone working on a DBT Flink plugin/profile?
>
> https://docs.getdbt.com/reference/profiles.yml hosts many other databases
> - and I think this kind of support would be really beneficial for the SQL
> part of Flink.
>
> Best,
> Georg
>
>


Re: SQL Client Kafka (UPSERT?) Sink for confluent-avro

2022-03-29 Thread Georg Heiler
I got it working now: It needs to be specified both for the key and value



thanks

Am Mo., 28. März 2022 um 13:33 Uhr schrieb Ingo Bürk :

> Hi Georg,
>
> which Flink version are you using? The missing property is for the
> avro-confluent format, and if I recall correctly, how these are passed
> has changed in recent versions, so it'd be good to double check you are
> using the documentation for the version you are running on.
>
>
> Best
> Ingo
>
> On 24.03.22 11:57, Georg Heiler wrote:
> > Hi,
> >
> > how can I get Flinks SQL client to nicely sink some data to either the
> > regular kafka or the kafka-upsert connector?
> >
> > I have a table/ topic with dummy data:
> > CREATE TABLE metrics_brand_stream (
> >  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
> >  WATERMARK FOR event_time AS event_time - INTERVAL '10' MINUTE,
> >`partition` BIGINT METADATA VIRTUAL,
> >`offset` BIGINT METADATA VIRTUAL,
> >  brand string,
> >  duration int,
> >  rating int
> >
> > ) WITH (
> >  'connector' = 'kafka',
> >  'topic' = 'commercials_avro',
> >  'scan.startup.mode' = 'earliest-offset',
> >  'format' = 'avro-confluent',
> >  'avro-confluent.schema-registry.url' = 'http://localhost:8081/
> > <http://localhost:8081/>',
> >  'properties.group.id <http://properties.group.id>' =
> 'flink-test-001',
> >  'properties.bootstrap.servers' = 'localhost:9092'
> > );
> >
> > And the following aggregation:
> >
> > SELECT brand,
> >   COUNT(*) AS cnt,
> >   AVG(duration) AS  duration_mean,
> >   AVG(rating) AS rating_mean
> >FROM metrics_brand_stream
> >GROUP BY brand;
> >
> > When trying to define an output table:
> >
> > CREATE TABLE metrics_per_brand (
> >  brand string,
> >  cnt BIGINT,
> >  duration_mean DOUBLE,
> >  rating_mean DOUBLE
> >
> > ) WITH (
> >  'connector' = 'upsert-kafka',
> >  'topic' = 'metrics_per_brand',
> >  'avro-confluent.schema-registry.url' = 'http://localhost:8081/
> > <http://localhost:8081/>',
> >  'properties.group.id <http://properties.group.id>' =
> 'flink-test-001',
> >  'properties.bootstrap.servers' = 'localhost:9092',
> >  'key.format' = 'avro-confluent',
> >  'value.format' = 'avro-confluent'
> > );
> >
> > And trying to INSERT some result data:
> >
> > INSERT INTO metrics_per_brand
> >SELECT brand,
> >   COUNT(*) AS cnt,
> >   AVG(duration) AS  duration_mean,
> >   AVG(rating) AS rating_mean
> >FROM metrics_brand_stream
> >GROUP BY brand;
> >
> > The query fails with:
> >
> > org.apache.flink.table.api.ValidationException: One or more required
> > options are missing.
> >
> > Missing required options are:
> >
> > url
> >
> > But neither:
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/>
>
> > nor
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/>
>
> > nor
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/
> > <
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/>
>
> > seems to list the right configuration (or I am misreading the
> > documentation).
> >
> >
> > How can I sink data to kafka after some arbitrary computation using the
> > flink-sql client using either the kafka or upsert-kafka connector where
> > the input is AVRO with a schema from the confluent schema registry and
> > the output should store its schema there as well (and serialize using
> AVRO).
> >
> >
> > Best,
> > Georg
>


trigger once (batch job with streaming semantics)

2022-05-02 Thread Georg Heiler
Hi,

spark
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
offers a variety of triggers.

In particular, it also has the "once" mode:

*One-time micro-batch* The query will execute *only one* micro-batch to
process all the available data and then stop on its own. This is useful in
scenarios you want to periodically spin up a cluster, process everything
that is available since the last period, and then shutdown the cluster. In
some case, this may lead to significant cost savings.

Does flink have a similar possibility?

Best,
Georg


Re: trigger once (batch job with streaming semantics)

2022-05-06 Thread Georg Heiler
Hi,

I would disagree:
In the case of spark, it is a streaming application that is offering full
streaming semantics (but with less cost and bigger latency) as it triggers
less often. In particular, windowing and stateful semantics as well as
late-arriving data are handled automatically using the regular streaming
features.

Would these features be available in a Flink Batch job as well?

Best,
Georg

Am Fr., 6. Mai 2022 um 13:26 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi Georg,
>
> Flink batch applications run until all their input is processed. When
> that's the case, the application finishes. You can read more about this in
> the documentation for DataStream [1] or Table API [2]. I think this matches
> the same as Spark is explaining in the documentation.
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/
>
> On Mon, 2 May 2022 at 16:46, Georg Heiler 
> wrote:
>
>> Hi,
>>
>> spark
>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
>> offers a variety of triggers.
>>
>> In particular, it also has the "once" mode:
>>
>> *One-time micro-batch* The query will execute *only one* micro-batch to
>> process all the available data and then stop on its own. This is useful in
>> scenarios you want to periodically spin up a cluster, process everything
>> that is available since the last period, and then shutdown the cluster. In
>> some case, this may lead to significant cost savings.
>>
>> Does flink have a similar possibility?
>>
>> Best,
>> Georg
>>
>


Re: trigger once (batch job with streaming semantics)

2022-05-09 Thread Georg Heiler
Hi Martijn,

many thanks for this clarification. Do you know of any example somewhere
which would showcase such an approach?

Best,
Georg

Am Mo., 9. Mai 2022 um 14:45 Uhr schrieb Martijn Visser <
martijnvis...@apache.org>:

> Hi Georg,
>
> No they wouldn't. There is no capability out of the box that lets you
> start Flink in streaming mode, run everything that's available at that
> moment and then stops when there's no data anymore. You would need to
> trigger the stop yourself.
>
> Best regards,
>
> Martijn
>
> On Fri, 6 May 2022 at 13:37, Georg Heiler 
> wrote:
>
>> Hi,
>>
>> I would disagree:
>> In the case of spark, it is a streaming application that is offering full
>> streaming semantics (but with less cost and bigger latency) as it triggers
>> less often. In particular, windowing and stateful semantics as well as
>> late-arriving data are handled automatically using the regular streaming
>> features.
>>
>> Would these features be available in a Flink Batch job as well?
>>
>> Best,
>> Georg
>>
>> Am Fr., 6. Mai 2022 um 13:26 Uhr schrieb Martijn Visser <
>> martijnvis...@apache.org>:
>>
>>> Hi Georg,
>>>
>>> Flink batch applications run until all their input is processed. When
>>> that's the case, the application finishes. You can read more about this in
>>> the documentation for DataStream [1] or Table API [2]. I think this matches
>>> the same as Spark is explaining in the documentation.
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/
>>>
>>> On Mon, 2 May 2022 at 16:46, Georg Heiler 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> spark
>>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
>>>> offers a variety of triggers.
>>>>
>>>> In particular, it also has the "once" mode:
>>>>
>>>> *One-time micro-batch* The query will execute *only one* micro-batch
>>>> to process all the available data and then stop on its own. This is useful
>>>> in scenarios you want to periodically spin up a cluster, process everything
>>>> that is available since the last period, and then shutdown the cluster. In
>>>> some case, this may lead to significant cost savings.
>>>>
>>>> Does flink have a similar possibility?
>>>>
>>>> Best,
>>>> Georg
>>>>
>>>