metrics in processElement

2021-11-22 Thread Lian Jiang
Hi, I want to get metrics inside a processElement() function of a processWindowFucntion class. Below is my understanding: *counter*: a counter variable defined inside processElement() will be reset every time when processElement() is called *meter*: a meter variable defined inside

Re: upgrade kafka-schema-registry-client to 6.1.2 for Flink-avro-confluent-registry

2021-06-25 Thread Lian Jiang
Thanks Fabian. [FLINK-23160] upgrade kafka-schema-registry-client to 6.1.2 for Flink-avro-confluent-registry - ASF JIRA (apache.org) is created. I will create a private build Flink to try out the fix. If it goes well, I can contribute back.

upgrade kafka-schema-registry-client to 6.1.2 for Flink-avro-confluent-registry

2021-06-24 Thread Lian Jiang
Hi, I am using ConfluentRegistryAvroSerializationSchema and blocked by a bug mentioned by https://github.com/confluentinc/schema-registry/issues/1352 and the final fix is done by https://github.com/confluentinc/schema-registry/pull/1839. I did not find any easy workaround.

multiple streams joining

2021-05-26 Thread Lian Jiang
Hi, Imagine I have one class having 4 fields: ID, A, B, C. There are three data sources providing data in the form of (ID, A), (ID, B), (ID, C) respectively. I want to join these three data sources to get final (ID, A, B, C) without any window. For example, (ID, A) could come one month after

Use State query to dump state into datalake

2021-05-02 Thread Lian Jiang
Hi, I am interested in dumping Flink state from Rockdb to datalake using state query https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/. My map state could have 200 million key-values pairs and the total size could be 150G bytes. My batch

Re: statefun creates unexpected new physical function

2020-11-24 Thread Lian Jiang
Probolved solved. It is because another function sends messages to myFunc by using non hard coded ids. Thanks. On Tue, Nov 24, 2020 at 11:24 AM Lian Jiang wrote: > Hi, > > I am using statefun 2.2.0 and have below routing: > > downstream.forward(myFunc.TYPE, myFunc.TYPE.name(),

statefun creates unexpected new physical function

2020-11-24 Thread Lian Jiang
Hi, I am using statefun 2.2.0 and have below routing: downstream.forward(myFunc.TYPE, myFunc.TYPE.name(), message); I expect this statement will create only one physical myFunc because the id is hard coded with myFunc.TYPE.name(). This design can use the PersistedValue field in myFunc for all

Re: debug statefun

2020-11-11 Thread Lian Jiang
Just realized making autoservice class discoverable also solved "There are no routers defined" mentioned by Puneet. Yes, harness does test statefun module discovery. Thanks. On Tue, Nov 10, 2020 at 9:57 PM Tzu-Li (Gordon) Tai wrote: > On Wed, Nov 11, 2020 at 1:44 PM Tzu-Li (Gordon) Tai >

Re: debug statefun

2020-11-10 Thread Lian Jiang
e META-INF metadata > will be generated for classes annotated with @AutoService. > > Please let us know if this resolves the issue for you. > > Cheers, > Gordon > > On Wed, Nov 11, 2020 at 3:15 AM Lian Jiang wrote: > >> Igal, >> >> I am using AutoService

Re: debug statefun

2020-11-10 Thread Lian Jiang
b.com/apache/flink-statefun/tree/master/statefun-examples/statefun-flink-harness-example >> . >> >> Cheers, >> Gordon >> >> On Tue, Nov 10, 2020 at 5:04 AM Lian Jiang wrote: >> >>> >>>

debug statefun

2020-11-09 Thread Lian Jiang
Hi, I created a POC by mimicing statefun-greeter-example. However, it failed due to: Caused by: java.lang.IllegalStateException: There are no ingress defined. at org.apache.flink.statefun.flink.core.StatefulFunctionsUniverseValidator.validate(StatefulFunctionsUniverseValidator.java:25)

cannot pull statefun docker image

2020-11-06 Thread Lian Jiang
Hi, I tried to build statefun-greeter-example docker image with "docker build ." but cannot pull the base statefun docker image due to access denied. Any idea? Thanks. $ docker login Authenticating with existing credentials... Login Succeeded

Re: Rich Function Thread Safety

2020-10-25 Thread Lian Jiang
Hi, I am learning https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/getting-started/java_walkthrough.html and wondering if the invoke function is thread safe for: final int seen = count.getOrDefault(0);count.set(seen + 1); >From

Re: pull access denied for flink-statefun

2020-10-24 Thread Lian Jiang
It worked by replacing FROM flink-statefun:2.3-SNAPSHOT with FROM ververica/flink-statefun:2.2.0 Thanks! On Sat, Oct 24, 2020 at 8:13 PM Lian Jiang wrote: > Hi, > > I am following https://github.com/apache/flink-statefun#build to build > the greeter example but got below e

pull access denied for flink-statefun

2020-10-24 Thread Lian Jiang
Hi, I am following https://github.com/apache/flink-statefun#build to build the greeter example but got below error. I have logged in docker hub. Any idea? Thanks. $ docker-compose build zookeeper uses an image, skipping kafka-broker uses an image, skipping Building master Step 1/3 : FROM

Stateful function and large state applications

2020-10-13 Thread Lian Jiang
Hi, I am learning Stateful function and saw below: "In addition to the Apache Flink processes, a full deployment requires ZooKeeper (for master failover ) and bulk

Re: Flink 1.12 cannot handle large schema

2020-10-02 Thread Lian Jiang
t; any patch. > > [1] https://issues.apache.org/jira/browse/FLINK-19491 > > On Fri, Oct 2, 2020 at 2:23 AM Lian Jiang wrote: > >> Hi, >> >> I am using Flink 1.12 snapshot built on my machine. My job throws an >> exception when writeUTF a schema from th

Re: Flink 1.12 snapshot throws ClassNotFoundException

2020-10-01 Thread Lian Jiang
u are mixing Flink jars which > use Scala 2.11 with Akka dependencies which are built against Scala 2.12. > > I am not an Gradle expert but can't Gradle simply pull in the transitive > dependencies of flink-runtime? > > Cheers, > Till > > On Wed, Sep 30, 2020 at 2:22 AM

Flink 1.12 snapshot throws ClassNotFoundException

2020-09-29 Thread Lian Jiang
Hi, I use Flink source master to build a snapshot and use the jars in my project. The goal is to avoid hacky deserialization code caused by avro 1.8 in old Flink versions since Flink 1.12 uses avro 1.10. Unfortunately, the code throws below ClassNotFoundException. I have verified that the

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-23 Thread Lian Jiang
sions of Flink is to change > the AvroSerializer manually. You would need to do a similar thing as I do > in the linked PR. > > Best, > > Dawid > > [1] https://issues.apache.org/jira/browse/FLINK-19339 > > [2] https://github.com/apache/flink/pull/13450 > On 21/09/2020 19:2

hourly counter

2020-09-21 Thread Lian Jiang
Hi, I have a window function with a window width of 1 min. I want to have an hourly counter which is reset every hour so it never overflows. There are multiple ways but none of them is straightforward: StatsDClient instance = new NonBlockingStatsDClientBuilder() int count = 0; void incr() {

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-21 Thread Lian Jiang
VRO-files-td35850.html > > [2] https://issues.apache.org/jira/browse/FLINK-12532 > > [3] https://issues.apache.org/jira/browse/HIVE-21737 > > [4] https://issues.apache.org/jira/browse/AVRO-1891 > > > > On Sun, Sep 20, 2020 at 9:56 PM Lian Jiang > wrote: > > > &g

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-20 Thread Lian Jiang
dRecord) in the > SpecificRecordBase#getConversion(). > > Best, > > Dawid > > > On 17/09/2020 16:34, Lian Jiang wrote: > > Piotr/Dawid, > > Thanks for the reply. FLINK-18223 seems not to related to this issue and I > double checked that I am using Flink 1.11.0 instea

Re: Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-17 Thread Lian Jiang
with Flink or did you > built custom DeserializationSchema? Could you maybe share the code for > instantiating the source with us? It could help us track down the > problematic spot. > > Best, > > Dawid > > On 16/09/2020 08:09, Lian Jiang wrote: > > Hi, > > > >

Unknown datum type java.time.Instant: 2020-09-15T07:00:00Z

2020-09-16 Thread Lian Jiang
Hi, i am using avro 1.9.1 + Flink 1.10.1 + Confluent Kafka 5.5. In Intellij, I can see the FlinkKafkaConsumer already deserialized the upstream kafka message. However, I got below error when this message is serialized during pushToOperator. Per the stack trace, the reason is that AvroSerializer

Flink 1.11 Sql client environment yaml

2020-07-17 Thread Lian Jiang
Hi, I am experimenting Flink SQL by following https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html. I want to set up an environment yaml to query Kafka data (json in avro format). Where can I find the information below? 1. use GenericInMemoryCatalog (e.g. type,

Re: Flink 1.11 throws Unrecognized field "error_code"

2020-07-17 Thread Lian Jiang
ent and server are using the same Flink > version. > > On 17/07/2020 02:42, Lian Jiang wrote: > > Hi, > > I am using java 1.8 and Flink 1.11 by following > https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html > on my MAC Mojave 10.1

Flink 1.11 throws Unrecognized field "error_code"

2020-07-16 Thread Lian Jiang
Hi, I am using java 1.8 and Flink 1.11 by following https://ci.apache.org/projects/flink/flink-docs-release-1.11/try-flink/local_installation.html on my MAC Mojave 10.14.6. But " ./bin/flink run examples/streaming/WordCount.jar " throw below error. These are the java versions that I tried:

Re: Flink 1.11 Table API cannot process Avro

2020-07-12 Thread Lian Jiang
Thanks Leonard and Jark. Here is my repo for your repro: https://bitbucket.org/jiangok/flink-playgrounds/src/0d242a51f02083711218d3810267117e6ce4260c/table-walkthrough/pom.xml#lines-131. As you can see, my pom.xml has already added flink-avro and avro dependencies. You can run this repro by:

Re: Flink 1.11 Table API cannot process Avro

2020-07-12 Thread Lian Jiang
Thanks guys. I missed the runtime dependencies. After adding below into https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/Dockerfile. The original issue of "Could not find any factory for identifier" is gone. wget -P /opt/flink/lib/

Re: Flink 1.11 Table API cannot process Avro

2020-07-11 Thread Lian Jiang
i am using flink playground as the base: https://github.com/apache/flink-playgrounds/blob/master/table-walkthrough/pom.xml I observed "PhysicalLegacyTableSourceScan". Not sure whether this is related. Thanks. Regards! On Sat, Jul 11, 2020 at 3:43 PM Lian Jiang wrote: > Thanks Jörn

Re: Flink 1.11 Table API cannot process Avro

2020-07-11 Thread Lian Jiang
: > You are missing additional dependencies > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html > > Am 11.07.2020 um 04:16 schrieb Lian Jiang : > >  > Hi, > > According to > https://ci.apache.org/projects/fli

Flink 1.11 Table API cannot process Avro

2020-07-10 Thread Lian Jiang
Hi, According to https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html, avro is supported for table API but below code failed: tEnv.executeSql("CREATE TABLE people (\n" + "id INT,\n" + "name STRING\n" + ") WITH (\n" +

Table API throws "No FileSystem for scheme: file" when loading local parquet

2020-07-10 Thread Lian Jiang
Hi, I am trying Table API in Flink 1.11: tEnv.executeSql("CREATE TABLE people (\n" + "id INT,\n" + "name STRING\n" + ") WITH (\n" + "'connector' = 'filesystem',\n" + "'path' = 'file:///data/test.parquet',\n" + "'format'

Re: FlinkKinesisConsumer throws "Unknown datum type org.joda.time.DateTime"

2020-02-20 Thread Lian Jiang
Thanks. I see from mvnrepository that even Flink-Avro 1.10 (the latest) uses avro 1.8.2. Does this mean I have to use GenericData instead of avro POJOs if I use FlinkKinesisConsumer? Sent from my iPhone > On Feb 20, 2020, at 4:34 AM, Chesnay Schepler wrote: >

FlinkKinesisConsumer throws "Unknown datum type org.joda.time.DateTime"

2020-02-19 Thread Lian Jiang
Hi, I use a FlinkKinesisConsumer in a Flink job to de-serialize kinesis events. Flink: 1.9.2 Avro: 1.9.2 The serDe class is like: public class ManagedSchemaKinesisPayloadSerDe implements KinesisSerializationSchema, KinesisDeserializationSchema { private static final String

Re: processing avro data source using DataSet API and output to parquet

2019-08-16 Thread Lian Jiang
t/src/main/java/org/apache/flink/formats/parquet/ParquetBulkWriter.java > > > Best Regards, > Zhenghua Gao > > >> On Fri, Aug 16, 2019 at 1:04 PM Lian Jiang wrote: >> Hi, >> >> I am using Flink 1.8.1 DataSet for a batch processing. The data source is >> avro files a

processing avro data source using DataSet API and output to parquet

2019-08-15 Thread Lian Jiang
Hi, I am using Flink 1.8.1 DataSet for a batch processing. The data source is avro files and I want to output the result into parquet. https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/batch/ only has no related information. What's the recommended way for doing this? Do I need to

Scylla connector

2019-08-12 Thread Lian Jiang
Hi, i am new to Flink. Is there scylla connector equivalent to the cassandra connector: https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/cassandra.html? Or can Flink use Scylla as a sink via the cassandra connector? Thanks.